flink读取kafka导入css
运行环境:
flink(mrs3.0.1)+kafka(mrs3.0.5)+css7.9.3
后台执行:
flink run -m yarn-cluster -yjm 2048 -ytm 2048 -ys 2
-c com.huawei.bigdata.flink.examples.ReadFromKafka
./FlinkKafkaJavaExample.jar -topic test_wwk
-bootstrap.servers 192.168.2.52:9092,192.168.2.66:9092,192.168.2.40:9092,192.168.2.131:9092,192.168.0.19:9092,192.168.0.241:9092
-es.servers 192.168.0.40:9200,192.168.0.251:9200,192.168.0.153:9200 -index my-index
前台执行:
代码样例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
ParameterTool paraTool = ParameterTool.fromArgs(args);
//读取kafka数据
DataStream<String> messageStream =
env.addSource(
new FlinkKafkaConsumer<>(
paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties()))
.rebalance().map(
new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
return s +","+System.currentTimeMillis();
}
});
//将es连接串放到list中
List<HttpHost> transportAddresses = new ArrayList<>();
for (String ipport:paraTool.get("es.servers").split(",")) {
String[] ip=ipport.split(":");
System.out.println(ip[0]+" "+ip[1]);
transportAddresses.add(new HttpHost(ip[0], Integer.parseInt(ip[1]),"http"));
}
//创建esSinkBuilder,将数据放到map中
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
String[] strs=element.split(",");
System.out.println(element);
//将需要写入ES的字段依次添加到Map当中
json.put("data", strs[0]);
json.put("time", strs[1]);
return Requests.indexRequest()
.index(paraTool.get("index"))
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
});
//配置es的参数
esSinkBuilder.setBulkFlushMaxActions(1);
messageStream.addSink(esSinkBuilder.build());
env.execute();
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.huawei.bigdata.flink.examples</groupId>
<artifactId>FlinkKafkaJavaExample</artifactId>
<version>1.0</version>
<properties>
<flink.version>1.10.0-hw-ei-302002</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>
验证结果:
kafka输入:
es结果输出:
- 点赞
- 收藏
- 关注作者
评论(0)