spark streaming 整合flume ,kafka 打造通用流处理
一、编写LoggerGenerator.java文件用于生成日志信息
添加依赖
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.6.0</version>
</dependency>
编写 日志产生类 LoggerGenerator.java
public class LoggerGenerator {
private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());
public static void main(String[] args) throws InterruptedException {
int index = 0;
while (true) {
Thread.sleep(1000);
logger.info("current value is " + index++);
}
}
}
编写 log4j.properties 配置文件
log4j.rootLogger=INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
#日志输出到flume
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname=192.168.42.85
log4j.appender.flume.Port=41414
log4j.appender.flume.UnsafeMode=true
二、使用flume 采集 log4j产生的日志
cd $FLUME_HOME/conf ,编写 streaming.conf
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
agent1.channels.logger-channel.type=memory
agent1.sinks.log-sink.type=logger
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel
启动 flume ,运行 日志产生的java 文件,可以看到 控制台上面有 采集到的log4j 日志输出
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 -Dflume.root.logger=INFO,console
三、将flume 收集到的数据输出到kafka
启动 zookeeper
./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
jps
查看 topic
./kafka-topics.sh --list --zookeeper hadoop000:2181
创建一个 streamingtopic 主题
./kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic "streamingtopic"
cd $FLUME_HOME/conf ,编写 streaming2.conf
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
agent1.channels.logger-channel.type=memory
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic= streamingtopic
agent1.sinks.kafka-sink.brokerList = hadoop000:9092
agent1.sinks.kafka-sink.requiredAcks= 1
agent1.sinks.kafka-sink.batchSize = 20
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel
启动 flume
flume-ng agent --name agent1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming2.conf \
-Dflume.root.logger= INFO,console
启动 kafka
./kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic streamingtopic
编写 KafkaReceiverWordCount.java 文件
package com.dzx.scala.dzxbootscala.spark
import kafka.serializer.{StringDecoder, StringEncoder}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author DuanZhaoXu
* @ClassName:
* @Description:
* @date 2019年01月03日 16:29:19
*/
class KafkaReceiverWordCount {
//args : hadoop000:2181 test streamingtopic 1
def main(args: Array[String]): Unit = {
if(args.length!=4){
System.err.print("必须为四个参数")
System.exit(1)
}
val Array(zkQuorum,group,topics,numThreads) =args
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//todo spark streaming 对接kafka
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val messages = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
// todo 测试为什么要取第二个
messages.map(_._2).count.print()
ssc.start()
ssc.awaitTermination()
}
}
运行LoggerGenerator.java ,确保 flume ,kafka 都启动好了之后,运行 KafkaReceiverWordCount .java 文件 , flume 每采集到 20条 日志, KafkaReceiverWordCount.java 文件的控制台就会 输出 一个 数字20.
现在是在本地 用 idea 运行 日志产生类,然后使用flume ,kafka ,spark streaming 进行 处理操作的,在生产上面我们需要打成一个jar 包
- 将 日志产生类 打成 一个jar 包
- 启动 flume 和 kafka、
- spark streaming 也是需要打成 jar包的,然后使用spark-submit的 方式进行 提交到环境上 执行
使用哪种方式,可以根据实际情况进行选择模式 : local/yarn/ standlone /mesos
在生产上 整个 流处理的流程 是一样的,不同的是我们的业务逻辑的复杂性。
文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_31905135/article/details/86158082
- 点赞
- 收藏
- 关注作者
评论(0)