大数据之 kafka 入门

举报
小米粒-biubiubiu 发表于 2020/12/03 00:50:21 2020/12/03
【摘要】 一、zookeeper 下载 安装 cdh 版本的 zookeeper 下载地址 http://archive.cloudera.com/cdh5/cdh/5/zookeeper-3.4.5-cdh5.7.0.tar.gz 解压 tar -zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz 配置环境变量 export ZK_HOME=/home/h...

一、zookeeper 下载 安装

  • cdh 版本的 zookeeper 下载地址

http://archive.cloudera.com/cdh5/cdh/5/zookeeper-3.4.5-cdh5.7.0.tar.gz

解压 tar -zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz

  • 配置环境变量

  
  1. export ZK_HOME=/home/hadoop/app/zookeeper-3.4.5-cdh5.7.0
  2. export PATH=${ZK_HOME}/bin:$PATH
  • 修改配置文件 

cd  /zookeeper-3.4.5-cdh5.7.0/conf

cp zoo_sample.cfg zoo.cfg

vim zoo.cfg


  
  1. #修改数据存放目录,默认 目录是/tmp/zookeeper 临时文件夹,重启系统之后数据会被清除
  2. dataDir=/home/hadoop/app/tmp/zookeeper

启动 zookeeper  start

 zkServer.sh 
JMX enabled by default
Using config: /home/hadoop/app/zookeeper-3.4.5-cdh5.7.0/bin/../conf/zoo.cfg
Usage: /home/hadoop/app/zookeeper-3.4.5-cdh5.7.0/bin/zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}
[root@hadoop000 /home/hadoop/app/zookeeper-3.4.5-cdh5.7.0/conf]#

执行 jps  查看存在 QuorumPeerMain 说明启动成功

 二、kafka 下载 安装 配置 单节点 单 b'roker 启动

1、下载地址 :http://kafka.apache.org/downloads

 https://archive.apache.org/dist/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

解压 kafka    tar -zxvf  kafka_2.11-0.9.0.0.gz

配置kafka 环境变量

vim  /etc/profile


  
  1. export KAFKA_HOME=/home/hadoop/app/kafka_2.11-0.9.0.0
  2. export PATH=${KAFKA_HOME}/bin:$PATH

2、修改 配置文件

vim /kafka_2.11-0.9.0.0/config/server.properties


  
  1. ############################# Server Basics #############################
  2. # broker 唯一id
  3. # The id of the broker. This must be set to a unique integer for each broker.
  4. broker.id=0
  5. ############################# Socket Server Settings #############################
  6. listeners=PLAINTEXT://:9092
  7. # The port the socket server listens on
  8. #port=9092
  9. # Hostname the broker will bind to. If not set, the server will bind to all interfaces
  10. host.name=hadoop000
  11. ############################# Log Basics #############################
  12. #修改 log 存放目录
  13. # A comma seperated list of directories under which to store log files
  14. log.dirs=/home/hadoop/app/tmp/kafka-logs
  15. ############################# Zookeeper #############################
  16. # Zookeeper connection string (see zookeeper docs for details).
  17. # This is a comma separated host:port pairs, each corresponding to a zk
  18. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
  19. # You can also append an optional chroot string to the urls to specify the
  20. # root directory for all kafka znodes.
  21. zookeeper.connect=hadoop000:2181
  22. # Timeout in ms for connecting to zookeeper
  23. zookeeper.connection.timeout.ms=6000

3、启动 kafka  

#kafka-server-start.sh 
USAGE: /home/hadoop/app/kafka_2.11-0.9.0.0/bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*

启动命令 

kafka-server-start.sh $KAFKA_HOME/config/server.properties 
 

jps -m

 

三 、kafka 的基本 操作

1.创建 topic 

kafka-topics.sh  --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic hello_topic
 

2.生产者发送 消息 到 topic 

kafka-console-producer.sh --broker-list hadoop000:9092 --topic hello_topic
 

hello

hadoop

spark

kafka

flume

3.消费者接收消息

kafka-console-consumer.sh  --zookeeper hadoop000:2181 --topic hello_topic --from-beginning
 

--from-beginning 的 使用 :加上 该参数,会接收之前所有的消息,不加该参数只接收最后面发送的消息。

4. 查看所有topic 详情

kafka-topics.sh --describe --zookeeper hadoop000:2181
 


Topic:hello_topic       PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: hello_topic      Partition: 0    Leader: 0       Replicas: 0     Isr: 0

5.查看指定 topic 详情

kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic  hello_topic
 


Topic:hello_topic       PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: hello_topic      Partition: 0    Leader: 0       Replicas: 0     Isr: 0 

 

四、单节点多broker 部署 及 使用

cp  server.properties server-1.properties

cp server.properties  server-2.properties

cp server.properties  server-3.properties

分别修改 broker.id  监听端口号,log 目录配置 

server-1.properties


  
  1. # The id of the broker. This must be set to a unique integer for each broker.
  2. broker.id=1
  3. ############################# Socket Server Settings #############################
  4. listeners=PLAINTEXT://:9093
  5. ############################# Log Basics #############################
  6. # A comma seperated list of directories under which to store log files
  7. log.dirs=/home/hadoop/app/tmp/kafka-logs-1

server-2.properties


  
  1. # The id of the broker. This must be set to a unique integer for each broker.
  2. broker.id=2
  3. ############################# Socket Server Settings #############################
  4. listeners=PLAINTEXT://:9094
  5. ############################# Log Basics #############################
  6. # A comma seperated list of directories under which to store log files
  7. log.dirs=/home/hadoop/app/tmp/kafka-logs-2

server-3.properties


  
  1. # The id of the broker. This must be set to a unique integer for each broker.
  2. broker.id=3
  3. ############################# Socket Server Settings #############################
  4. listeners=PLAINTEXT://:9095
  5. ############################# Log Basics #############################
  6. # A comma seperated list of directories under which to store log files
  7. log.dirs=/home/hadoop/app/tmp/kafka-logs-3

启动三个 kafka 节点;

kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties

kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties

kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties

 

 创建topic

kafka-topics.sh  --create --zookeeper hadoop000:2181 --replication-factor 3 --partitions 1 --topic my_replication_topic
 

 

发送消息

kafka-console-producer.sh --broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095 --topic my_replication_topic
 

 

接收消息

kafka-console-consumer.sh  --zookeeper hadoop000:2181 --topic my_replication_topic 
 

kafka 的 容错性还是能够保障的,其中一个broker  挂掉了之后,仍然可以接收到 消息。

 

五、java api 操作 kafka 完成生产者 生产消息,消费者消费消息

 


  
  1. public class KafkaProperties {
  2. public static String ZOOKEEPER = "192.168.42.85:2181";
  3. public static String BROKER_LIST = "192.168.42.85:9092";
  4. public static String TOPIC = "hello_topic";
  5. public static String GROUP_ID = "test_group";
  6. }

  
  1. public class KafkaProductor implements Runnable {
  2. private String topic;
  3. private Producer<Integer, String> producer;
  4. public KafkaProductor(String topic) {
  5. this.topic = topic;
  6. Properties properties = new Properties();
  7. properties.put("metadata.broker.list", KafkaProperties.BROKER_LIST);
  8. properties.put("serializer.class", "kafka.serializer.StringEncoder");
  9. properties.put("request.required.acks", "1");
  10. ProducerConfig producerConfig = new ProducerConfig(properties);
  11. producer = new Producer<Integer, String>(producerConfig);
  12. }
  13. public String getTopic() {
  14. return topic;
  15. }
  16. public void setTopic(String topic) {
  17. this.topic = topic;
  18. }
  19. public static final int THREAD_COUNT = 100;
  20. public static final int ALLOW_COUNT = 20;
  21. public static final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
  22. public static final Semaphore semaphore = new Semaphore(ALLOW_COUNT);
  23. public static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
  24. public static void main(String[] args) throws InterruptedException {
  25. for (int i = 0; i < THREAD_COUNT; i++) {
  26. EXECUTOR_SERVICE.submit(new KafkaProductor(KafkaProperties.TOPIC));
  27. }
  28. new Thread(new KafkaCustomer(KafkaProperties.TOPIC)).start();
  29. }
  30. @Override
  31. public void run() {
  32. try {
  33. semaphore.acquire();
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. int no = 0;
  38. while (no <= 10) {
  39. String message = "msg" + no;
  40. producer.send(new KeyedMessage<Integer, String>(topic, message));
  41. no++;
  42. try {
  43. Thread.sleep(2000);
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. semaphore.release();
  49. }
  50. }

 


  
  1. public class KafkaCustomer implements Runnable {
  2. private String topic;
  3. private ConsumerConnector consumerConnector;
  4. public KafkaCustomer(String topic) {
  5. this.topic = topic;
  6. Properties properties = new Properties();
  7. properties.put("group.id", KafkaProperties.GROUP_ID);
  8. properties.put("zookeeper.connect", KafkaProperties.ZOOKEEPER);
  9. ConsumerConfig consumerConfig = new ConsumerConfig(properties);
  10. consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
  11. }
  12. @Override
  13. public void run() {
  14. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  15. topicCountMap.put(KafkaProperties.TOPIC, 1);
  16. Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumerConnector.createMessageStreams(topicCountMap);
  17. KafkaStream<byte[], byte[]> messageAndMetadata = streamMap.get(KafkaProperties.TOPIC).get(0);
  18. ConsumerIterator<byte[], byte[]> iterator = messageAndMetadata.iterator();
  19. while (iterator.hasNext()) {
  20. String msg = new String(iterator.next().message());
  21. System.out.println("receive msg ~~~~:" + msg);
  22. }
  23. }
  24. }

 

六、 0.9版本的flume 采集日志 输出到 kafka

新版本的flume 配置 不同

exec-memory-avro.conf 编写


  
  1. exec-memory-avro.sources = exec-source
  2. exec-memory-avro.sinks = avro-sink
  3. exec-memory-avro.channels = memory-channel
  4. #描述/配置源
  5. exec-memory-avro.sources.exec-source.type = exec
  6. exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop000/hello.txt
  7. exec-memory-avro.sources.exec-source.shell = /bin/bash -c
  8. #描述接收器
  9. exec-memory-avro.sinks.avro-sink.type = avro
  10. exec-memory-avro.sinks.avro-sink.hostname = hadoop000
  11. exec-memory-avro.sinks.avro-sink.port = 44444
  12. #使用缓冲内存中事件的通道
  13. exec-memory-avro.channels.memory-channel.type = memory
  14. exec-memory-avro.channels.memory-channel.capacity = 1000
  15. exec-memory-avro.channels.memory-channel.transactionCapacity = 100
  16. #将源和接收器绑定到通道
  17. exec-memory-avro.sources.exec-source.channels = memory-channel
  18. exec-memory-avro.sinks.avro-sink.channel = memory-channel

 avro-memory-kafka.conf 编写


  
  1. avro-memory-kafka.sources = avro-source
  2. avro-memory-kafka.sinks = kafka-sink
  3. avro-memory-kafka.channels = memory-channel
  4. #描述/配置源
  5. avro-memory-kafka.sources.avro-source.type = avro
  6. avro-memory-kafka.sources.avro-source.bind= hadoop000
  7. avro-memory-kafka.sources.avro-source.port = 44444
  8. #描述接收器
  9. avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
  10. avro-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9093
  11. avro-memory-kafka.sinks.kafka-sink.topic = hello_topic
  12. avro-memory-kafka.sinks.kafka-sink.batchSize = 5
  13. avro-memory-kafka.sinks.kafka-sink.requireAcks = 1
  14. #使用缓冲内存中事件的通道
  15. avro-memory-kafka.channels.memory-channel.type = memory
  16. avro-memory-kafka.channels.memory-channel.capacity = 1000
  17. avro-memory-kafka.channels.memory-channel.transactionCapacity = 100
  18. #将源和接收器绑定到通道
  19. avro-memory-kafka.sources.avro-source.channels = memory-channel
  20. avro-memory-kafka.sinks.kafka-sink.channel = memory-channel

启动  kafka

kafka-server-start.sh $KAFKA_HOME/config/server-1.properties
 

启动 两个 flume

flume-ng agent --name avro-memory-kafka --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console
 
flume-ng agent --name exec-memory-avro --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-avro.conf -Dflume.root.logger=INFO,console
 

   echo  hello  world  >>  /home/hadoop000/hello.txt 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/85260702

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。