大数据之 kafka 入门
一、zookeeper 下载 安装
- cdh 版本的 zookeeper 下载地址
http://archive.cloudera.com/cdh5/cdh/5/zookeeper-3.4.5-cdh5.7.0.tar.gz
解压 tar -zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz
- 配置环境变量
  
   - 
    
     
    
    
     
      export ZK_HOME=/home/hadoop/app/zookeeper-3.4.5-cdh5.7.0
     
    
- 
    
     
    
    
     
      export PATH=${ZK_HOME}/bin:$PATH
     
    
 - 修改配置文件
cd /zookeeper-3.4.5-cdh5.7.0/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
  
   - 
    
     
    
    
     
      #修改数据存放目录,默认 目录是/tmp/zookeeper 临时文件夹,重启系统之后数据会被清除
     
    
- 
    
     
    
    
     
      dataDir=/home/hadoop/app/tmp/zookeeper
     
    
 启动 zookeeper start
zkServer.sh
JMX enabled by default
Using config: /home/hadoop/app/zookeeper-3.4.5-cdh5.7.0/bin/../conf/zoo.cfg
Usage: /home/hadoop/app/zookeeper-3.4.5-cdh5.7.0/bin/zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}
[root@hadoop000 /home/hadoop/app/zookeeper-3.4.5-cdh5.7.0/conf]#
执行 jps 查看存在 QuorumPeerMain 说明启动成功

二、kafka 下载 安装 配置 单节点 单 b'roker 启动
1、下载地址 :http://kafka.apache.org/downloads
https://archive.apache.org/dist/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
解压 kafka tar -zxvf kafka_2.11-0.9.0.0.gz
配置kafka 环境变量
vim /etc/profile
  
   - 
    
     
    
    
     
      export KAFKA_HOME=/home/hadoop/app/kafka_2.11-0.9.0.0
     
    
- 
    
     
    
    
     
      export PATH=${KAFKA_HOME}/bin:$PATH
     
    
 2、修改 配置文件
vim /kafka_2.11-0.9.0.0/config/server.properties
  
   - 
    
     
    
    
     
      ############################# Server Basics #############################
     
    
- 
    
     
    
    
     
      # broker 唯一id
     
    
- 
    
     
    
    
     
      # The id of the broker. This must be set to a unique integer for each broker.
     
    
- 
    
     
    
    
     
      broker.id=0
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      ############################# Socket Server Settings #############################
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      listeners=PLAINTEXT://:9092
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      # The port the socket server listens on
     
    
- 
    
     
    
    
     
      #port=9092
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      # Hostname the broker will bind to. If not set, the server will bind to all interfaces
     
    
- 
    
     
    
    
     
      host.name=hadoop000
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      ############################# Log Basics #############################
     
    
- 
    
     
    
    
     
      #修改 log 存放目录
     
    
- 
    
     
    
    
     
      # A comma seperated list of directories under which to store log files
     
    
- 
    
     
    
    
     
      log.dirs=/home/hadoop/app/tmp/kafka-logs
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      ############################# Zookeeper #############################
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      # Zookeeper connection string (see zookeeper docs for details).
     
    
- 
    
     
    
    
     
      # This is a comma separated host:port pairs, each corresponding to a zk
     
    
- 
    
     
    
    
     
      # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
     
    
- 
    
     
    
    
     
      # You can also append an optional chroot string to the urls to specify the
     
    
- 
    
     
    
    
     
      # root directory for all kafka znodes.
     
    
- 
    
     
    
    
     
      zookeeper.connect=hadoop000:2181
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      # Timeout in ms for connecting to zookeeper
     
    
- 
    
     
    
    
     
      zookeeper.connection.timeout.ms=6000
     
    
 3、启动 kafka
#kafka-server-start.sh
USAGE: /home/hadoop/app/kafka_2.11-0.9.0.0/bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*
启动命令
kafka-server-start.sh $KAFKA_HOME/config/server.properties 
 jps -m

三 、kafka 的基本 操作
1.创建 topic
kafka-topics.sh  --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic hello_topic
 2.生产者发送 消息 到 topic
kafka-console-producer.sh --broker-list hadoop000:9092 --topic hello_topic
 hello
hadoop
spark
kafka
flume
3.消费者接收消息
kafka-console-consumer.sh  --zookeeper hadoop000:2181 --topic hello_topic --from-beginning
 --from-beginning 的 使用 :加上 该参数,会接收之前所有的消息,不加该参数只接收最后面发送的消息。
4. 查看所有topic 详情
kafka-topics.sh --describe --zookeeper hadoop000:2181
 
 Topic:hello_topic       PartitionCount:1        ReplicationFactor:1     Configs:
         Topic: hello_topic      Partition: 0    Leader: 0       Replicas: 0     Isr: 0
5.查看指定 topic 详情
kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic  hello_topic
 
 Topic:hello_topic       PartitionCount:1        ReplicationFactor:1     Configs:
         Topic: hello_topic      Partition: 0    Leader: 0       Replicas: 0     Isr: 0 
四、单节点多broker 部署 及 使用
cp server.properties server-1.properties
cp server.properties server-2.properties
cp server.properties server-3.properties
分别修改 broker.id 监听端口号,log 目录配置
server-1.properties
  
   - 
    
     
    
    
     
      # The id of the broker. This must be set to a unique integer for each broker.
     
    
- 
    
     
    
    
     
      broker.id=1
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      ############################# Socket Server Settings #############################
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      listeners=PLAINTEXT://:9093
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      ############################# Log Basics #############################
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      # A comma seperated list of directories under which to store log files
     
    
- 
    
     
    
    
     
      log.dirs=/home/hadoop/app/tmp/kafka-logs-1
     
    
 server-2.properties
  
   - 
    
     
    
    
     
      # The id of the broker. This must be set to a unique integer for each broker.
     
    
- 
    
     
    
    
     
      broker.id=2
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      ############################# Socket Server Settings #############################
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      listeners=PLAINTEXT://:9094
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      ############################# Log Basics #############################
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      # A comma seperated list of directories under which to store log files
     
    
- 
    
     
    
    
     
      log.dirs=/home/hadoop/app/tmp/kafka-logs-2
     
    
 server-3.properties
  
   - 
    
     
    
    
     
      # The id of the broker. This must be set to a unique integer for each broker.
     
    
- 
    
     
    
    
     
      broker.id=3
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      ############################# Socket Server Settings #############################
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      listeners=PLAINTEXT://:9095
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      ############################# Log Basics #############################
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      # A comma seperated list of directories under which to store log files
     
    
- 
    
     
    
    
     
      log.dirs=/home/hadoop/app/tmp/kafka-logs-3
     
    
 启动三个 kafka 节点;
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties

创建topic
kafka-topics.sh  --create --zookeeper hadoop000:2181 --replication-factor 3 --partitions 1 --topic my_replication_topic
 
发送消息
kafka-console-producer.sh --broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095 --topic my_replication_topic
 
接收消息
kafka-console-consumer.sh  --zookeeper hadoop000:2181 --topic my_replication_topic 
 kafka 的 容错性还是能够保障的,其中一个broker 挂掉了之后,仍然可以接收到 消息。
五、java api 操作 kafka 完成生产者 生产消息,消费者消费消息
  
   - 
    
     
    
    
     
      public class KafkaProperties {
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      public static  String  ZOOKEEPER = "192.168.42.85:2181";
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      public static String  BROKER_LIST = "192.168.42.85:9092";
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      public static String  TOPIC = "hello_topic";
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      public   static  String  GROUP_ID = "test_group";
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      }
     
    
 
  
   - 
    
     
    
    
     
      public class KafkaProductor implements Runnable {
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      private String topic;
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      private Producer<Integer, String> producer;
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      public KafkaProductor(String topic) {
     
    
- 
    
     
    
    
     
       this.topic = topic;
     
    
- 
    
     
    
    
     
       Properties properties = new Properties();
     
    
- 
    
     
    
    
     
       properties.put("metadata.broker.list", KafkaProperties.BROKER_LIST);
     
    
- 
    
     
    
    
     
       properties.put("serializer.class", "kafka.serializer.StringEncoder");
     
    
- 
    
     
    
    
     
       properties.put("request.required.acks", "1");
     
    
- 
    
     
    
    
     
       ProducerConfig producerConfig = new ProducerConfig(properties);
     
    
- 
    
     
    
    
     
       producer = new Producer<Integer, String>(producerConfig);
     
    
- 
    
     
    
    
     
       }
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      public String getTopic() {
     
    
- 
    
     
    
    
      return topic;
     
    
- 
    
     
    
    
     
       }
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      public void setTopic(String topic) {
     
    
- 
    
     
    
    
     
       this.topic = topic;
     
    
- 
    
     
    
    
     
       }
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      public static final int THREAD_COUNT = 100;
     
    
- 
    
     
    
    
      public static final int ALLOW_COUNT = 20;
     
    
- 
    
     
    
    
      public static final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
     
    
- 
    
     
    
    
      public static final Semaphore semaphore = new Semaphore(ALLOW_COUNT);
     
    
- 
    
     
    
    
      public static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      public static void main(String[] args) throws InterruptedException {
     
    
- 
    
     
    
    
      for (int i = 0; i < THREAD_COUNT; i++) {
     
    
- 
    
     
    
    
     
       EXECUTOR_SERVICE.submit(new KafkaProductor(KafkaProperties.TOPIC));
     
    
- 
    
     
    
    
     
       }
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      new Thread(new KafkaCustomer(KafkaProperties.TOPIC)).start();
     
    
- 
    
     
    
    
     
       }
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
       @Override
     
    
- 
    
     
    
    
      public void run() {
     
    
- 
    
     
    
    
      try {
     
    
- 
    
     
    
    
     
       semaphore.acquire();
     
    
- 
    
     
    
    
     
       } catch (InterruptedException e) {
     
    
- 
    
     
    
    
     
       e.printStackTrace();
     
    
- 
    
     
    
    
     
       }
     
    
- 
    
     
    
    
      int no = 0;
     
    
- 
    
     
    
    
      while (no <= 10) {
     
    
- 
    
     
    
    
      String message = "msg" + no;
     
    
- 
    
     
    
    
     
       producer.send(new KeyedMessage<Integer, String>(topic, message));
     
    
- 
    
     
    
    
     
       no++;
     
    
- 
    
     
    
    
      try {
     
    
- 
    
     
    
    
     
       Thread.sleep(2000);
     
    
- 
    
     
    
    
     
       } catch (InterruptedException e) {
     
    
- 
    
     
    
    
     
       e.printStackTrace();
     
    
- 
    
     
    
    
     
       }
     
    
- 
    
     
    
    
     
       }
     
    
- 
    
     
    
    
     
       semaphore.release();
     
    
- 
    
     
    
    
     
       }
     
    
- 
    
     
    
    
     
      }
     
    
 
  
   - 
    
     
    
    
     
      public class KafkaCustomer implements Runnable {
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      private String topic;
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      private ConsumerConnector consumerConnector;
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      public KafkaCustomer(String topic) {
     
    
- 
    
     
    
    
     
       this.topic = topic;
     
    
- 
    
     
    
    
     
       Properties properties = new Properties();
     
    
- 
    
     
    
    
     
       properties.put("group.id", KafkaProperties.GROUP_ID);
     
    
- 
    
     
    
    
     
       properties.put("zookeeper.connect", KafkaProperties.ZOOKEEPER);
     
    
- 
    
     
    
    
     
       ConsumerConfig consumerConfig = new ConsumerConfig(properties);
     
    
- 
    
     
    
    
     
       consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
     
    
- 
    
     
    
    
     
       }
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
       @Override
     
    
- 
    
     
    
    
      public void run() {
     
    
- 
    
     
    
    
     
       Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
     
    
- 
    
     
    
    
     
       topicCountMap.put(KafkaProperties.TOPIC, 1);
     
    
- 
    
     
    
    
     
       Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumerConnector.createMessageStreams(topicCountMap);
     
    
- 
    
     
    
    
     
       KafkaStream<byte[], byte[]> messageAndMetadata = streamMap.get(KafkaProperties.TOPIC).get(0);
     
    
- 
    
     
    
    
     
       ConsumerIterator<byte[], byte[]> iterator = messageAndMetadata.iterator();
     
    
- 
    
     
    
    
      while (iterator.hasNext()) {
     
    
- 
    
     
    
    
      String msg = new String(iterator.next().message());
     
    
- 
    
     
    
    
     
       System.out.println("receive msg ~~~~:" + msg);
     
    
- 
    
     
    
    
     
       }
     
    
- 
    
     
    
    
     
       }
     
    
- 
    
     
    
    
     
      }
     
    
 
六、 0.9版本的flume 采集日志 输出到 kafka
新版本的flume 配置 不同
exec-memory-avro.conf 编写
  
   - 
    
     
    
    
     
      exec-memory-avro.sources =  exec-source
     
    
- 
    
     
    
    
     
      exec-memory-avro.sinks  =  avro-sink
     
    
- 
    
     
    
    
     
      exec-memory-avro.channels  =  memory-channel
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      #描述/配置源
     
    
- 
    
     
    
    
     
      exec-memory-avro.sources.exec-source.type  =  exec
     
    
- 
    
     
    
    
     
      exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop000/hello.txt
     
    
- 
    
     
    
    
     
      exec-memory-avro.sources.exec-source.shell = /bin/bash -c
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      #描述接收器
     
    
- 
    
     
    
    
     
      exec-memory-avro.sinks.avro-sink.type  =  avro
     
    
- 
    
     
    
    
     
      exec-memory-avro.sinks.avro-sink.hostname  =  hadoop000
     
    
- 
    
     
    
    
     
      exec-memory-avro.sinks.avro-sink.port  =  44444
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      #使用缓冲内存中事件的通道
     
    
- 
    
     
    
    
     
      exec-memory-avro.channels.memory-channel.type  =  memory
     
    
- 
    
     
    
    
     
      exec-memory-avro.channels.memory-channel.capacity  =  1000
     
    
- 
    
     
    
    
     
      exec-memory-avro.channels.memory-channel.transactionCapacity  =  100
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      #将源和接收器绑定到通道
     
    
- 
    
     
    
    
     
      exec-memory-avro.sources.exec-source.channels  =  memory-channel
     
    
- 
    
     
    
    
     
      exec-memory-avro.sinks.avro-sink.channel  =  memory-channel
     
    
 avro-memory-kafka.conf 编写
  
   - 
    
     
    
    
     
      avro-memory-kafka.sources =  avro-source
     
    
- 
    
     
    
    
     
      avro-memory-kafka.sinks  =  kafka-sink
     
    
- 
    
     
    
    
     
      avro-memory-kafka.channels  =  memory-channel
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      #描述/配置源
     
    
- 
    
     
    
    
     
      avro-memory-kafka.sources.avro-source.type = avro
     
    
- 
    
     
    
    
     
      avro-memory-kafka.sources.avro-source.bind= hadoop000
     
    
- 
    
     
    
    
     
      avro-memory-kafka.sources.avro-source.port = 44444
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      #描述接收器
     
    
- 
    
     
    
    
     
      avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
     
    
- 
    
     
    
    
     
      avro-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9093
     
    
- 
    
     
    
    
     
      avro-memory-kafka.sinks.kafka-sink.topic = hello_topic
     
    
- 
    
     
    
    
     
      avro-memory-kafka.sinks.kafka-sink.batchSize = 5
     
    
- 
    
     
    
    
     
      avro-memory-kafka.sinks.kafka-sink.requireAcks = 1
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      #使用缓冲内存中事件的通道
     
    
- 
    
     
    
    
     
      avro-memory-kafka.channels.memory-channel.type = memory
     
    
- 
    
     
    
    
     
      avro-memory-kafka.channels.memory-channel.capacity  =  1000
     
    
- 
    
     
    
    
     
      avro-memory-kafka.channels.memory-channel.transactionCapacity  =  100
     
    
- 
    
     
    
    
      
     
    
- 
    
     
    
    
     
      #将源和接收器绑定到通道
     
    
- 
    
     
    
    
     
      avro-memory-kafka.sources.avro-source.channels  =  memory-channel
     
    
- 
    
     
    
    
     
      avro-memory-kafka.sinks.kafka-sink.channel  =  memory-channel
     
    
 启动 kafka
kafka-server-start.sh $KAFKA_HOME/config/server-1.properties
 启动 两个 flume
flume-ng agent --name avro-memory-kafka --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console
 flume-ng agent --name exec-memory-avro --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-avro.conf -Dflume.root.logger=INFO,console
 echo hello world >> /home/hadoop000/hello.txt
文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_31905135/article/details/85260702
- 点赞
- 收藏
- 关注作者
 
             
           
评论(0)