apache beam KafkaIO的使用(流处理IO)
这里我以下面这个为例,弄1个demo
这里我以下面这个为例,弄1个demo
![image.png](http://image.huawei.com/tiny-lts/v1/images/f02ee2683767ede57975_310x621.png@900-0-90-f.png)
第一步建立option和pipeline
```java
PipelineOptions kafkaOption = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(kafkaOption);
```
## 如何消费beamKafaTest主题里的数据
首先要建立1个kafkaRead对象
```java
KafkaIO.Read<String, String> kafkaRead =
KafkaIO.<String, String>read()
.withBootstrapServers("189.211.150.241:21005,189.211.150.246:21005,189.211.151.18:21005,189.211.151.36:21005")
.withTopic("beamKafkaTest")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class);
```
boostrapServer的ip和端口视各自集群上的kafka配置而定
接着用apply进行组装,组装时要加上withoutMetaData作为最终的建立操作
```java
PCollection<KV<String, String>> pKv = pipeline.apply(kafkaRead.withoutMetadata());
```
## 设置窗口
数据集里的KV<String,String>指的就是消费的kafkaRecord里的key和value,这里直接帮我们转成KV了
* 我只需要里面的value整数值,因此需要把Key-Value转成Value。
* 同时我们希望以每5秒作为1个窗口将结果进行收集,因此需要使用windows类进行组装
* 同时要把5秒内的结果做个总和作为输出,要使用Sum这个类,并且要加上withoutDefaults,否则无法进行流处理。
```java
PCollection<Integer> pIntSum = pKv.apply(MapElements.via(new SimpleFunction<KV<String, String>, Integer>() {
@Override
public Integer apply(KV<String, String> input) {
return Integer.valueOf(input.getValue());
}
}))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))))
.apply(Sum.integersGlobally().withoutDefaults());
```
## 生产消息到beamPrint主题
生产消息就比较简单了, 注意要先map成KV对象,才能使用KafkaIO.write
```java
pIntSum.apply(MapElements.<Integer, KV<Integer, String>>via(new SimpleFunction<Integer, KV<Integer, String>>() {
// v转kv
@Override
public KV<Integer, String> apply(Integer input) {
return KV.of((input!=null?input:0), "answer=" + (input!=null?input:0));
}
}))
.apply(KafkaIO.<Integer, String>write()
.withBootstrapServers("189.211.150.241:21005,189.211.150.246:21005,189.211.151.18:21005,189.211.151.36:21005")
.withTopic("beamPrint")
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(StringSerializer.class));
```
## 测试
启动beamDemo程序,然后用Kafka客户端脚本打开生产者和消费者。
5秒内在beamKafkaTest中输入1、2、3
然后可在beamPrint中看到输出:
- 点赞
- 收藏
- 关注作者
评论(0)