flink读取kafka导入css

举报
王伟康 发表于 2021/04/27 09:38:26 2021/04/27
【摘要】 flink读取kafka导入css编辑日期:2021-04-26 21:03浏览:1回复:02026-04-26 失效运行环境:flink(mrs3.0.1)+kafka(mrs3.0.5)+css7.9.3后台执行:flink run -m yarn-cluster -yjm 2048 -ytm 2048 -ys 2 -c com.huawei.bigdata.flink.examples...


flink读取kafka导入css
日期:2021-04-26 21:03浏览:1回复:02026-04-26 失效

运行环境:

flink(mrs3.0.1)+kafka(mrs3.0.5)+css7.9.3


后台执行:

flink run -m yarn-cluster -yjm 2048 -ytm 2048 -ys 2 

-c com.huawei.bigdata.flink.examples.ReadFromKafka 

./FlinkKafkaJavaExample.jar -topic test_wwk 

-bootstrap.servers 192.168.2.52:9092,192.168.2.66:9092,192.168.2.40:9092,192.168.2.131:9092,192.168.0.19:9092,192.168.0.241:9092

 -es.servers  192.168.0.40:9200,192.168.0.251:9200,192.168.0.153:9200 -index my-index

 

前台执行:

       

 

代码样例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
ParameterTool paraTool = ParameterTool.fromArgs(args);

//读取kafka数据
DataStream<String> messageStream =
        env.addSource(
                
new FlinkKafkaConsumer<>(
                        paraTool.get(
"topic"), new SimpleStringSchema(), paraTool.getProperties()))
                .rebalance().map(
                
new MapFunction<String, String>() {
                    
@Override
                    
public String map(String s) throws Exception {
                        
return  s +","+System.currentTimeMillis();
                    
}
                })
;

//es连接串放到list
List<HttpHost> transportAddresses = new ArrayList<>();
for 
(String ipport:paraTool.get("es.servers").split(",")) {
    String[] ip=ipport.split(
":");
    
System.out.println(ip[0]+"    "+ip[1]);
    
transportAddresses.add(new HttpHost(ip[0], Integer.parseInt(ip[1]),"http"));
}

//创建esSinkBuilder,将数据放到map
ElasticsearchSink.Builder<String> esSinkBuilder = 
new ElasticsearchSink.Builder<String>(transportAddresses, new ElasticsearchSinkFunction<String>() {
    
public IndexRequest createIndexRequest(String element) {
        Map<String
, String> json = new HashMap<>();
        
String[] strs=element.split(",");
        
System.out.println(element);
        
//将需要写入ES的字段依次添加到Map当中
        
json.put("data", strs[0]);
        
json.put("time", strs[1]);
        return 
Requests.indexRequest()
                .index(
paraTool.get("index"))
                .source(json)
;
    
}

    
@Override
    
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element))
;
    
}
})
;

//配置es的参数
esSinkBuilder.setBulkFlushMaxActions(1);
messageStream.addSink(esSinkBuilder.build());
env.execute();

 

 

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.huawei.bigdata.flink.examples</groupId>
    <artifactId>FlinkKafkaJavaExample</artifactId>
    <version>1.0</version>
    <properties>
        <flink.version>1.10.0-hw-ei-302002</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-shaded-hadoop-2</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-shaded-hadoop-2</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-shaded-hadoop-2</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
</project>

验证结果:

kafka输入:

es结果输出:

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。