kafka配置命令

举报
bigdata张凯翔 发表于 2021/03/25 23:09:29 2021/03/25
【摘要】 nohup /export/servers/kafka_2.11-1.0.0/bin/kafka-server-start.sh /export/servers/kafka_2.11-1.0.0/config/server.properties > /dev/null 2>&1 & 关闭 bin/kafka-server-stop.sh 3.10.3 创建topic bin/k...

nohup /export/servers/kafka_2.11-1.0.0/bin/kafka-server-start.sh
/export/servers/kafka_2.11-1.0.0/config/server.properties > /dev/null 2>&1 &

关闭

bin/kafka-server-stop.sh
3.10.3 创建topic
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 3 --topic pyg
注意:对于消费者,kafka中有两个设置的地方:对于老的消费者,由--zookeeper参数设置;对于新的消费者,由--bootstrap-server参数设置
3.10.4 启动消费者
bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic pyg
启动生产者:
bin/kafka-console-producer.sh --broker-list node01:9092 --topic pyg

自学kafkaAPI小总结:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.itzkx.Kafka_all</groupId> <artifactId>Kafka_all</artifactId> <packaging>pom</packaging> <version>1.0-SNAPSHOT</version> <modules> <module>kafka</module> </modules>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>Kafka_all</artifactId> <groupId>cn.itzkx.Kafka_all</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>kafka</artifactId>

<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> </dependency>
</dependencies>
</project>

消费者

package cn.itzkx.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * @ClassName OrderConsumer
 * @Description TODO 订单的消费者,获取数据,获取到数据即可以将数据保存到订单表中
 */
public class OrderConsumer { public static void main(String[] args) { //创建连接对象 Properties props = new Properties(); //消费者配置参数 props.put("bootstrap.servers", "node01:9092");//连接到哪个kafka集群 props.put("group.id", "test1");//设置消费者组 props.put("enable.auto.commit", "true");//自动提交,offset是否会自动保存到kafka的对应的topic上 props.put("auto.commit.interval.ms", "1000");//每1秒钟提交一次偏移量 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props); //消费数据 //订阅topic 参数是多个topic的一个集合 kafkaConsumer.subscribe(Arrays.asList("spider-test")); while (true){ //拉去100毫秒内的数据 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);//拉去数据 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { //打印数据 System.out.println("消费到的数据:"+consumerRecord.value()); } } }
}

生产者

package cn.itzkx.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * @ClassName OrderConsumer
 * @Description TODO 订单的消费者,获取数据,获取到数据即可以将数据保存到订单表中
 */
public class OrderConsumer { public static void main(String[] args) { //创建连接对象 Properties props = new Properties(); //消费者配置参数 props.put("bootstrap.servers", "node01:9092");//连接到哪个kafka集群 props.put("group.id", "test1");//设置消费者组 props.put("enable.auto.commit", "true");//自动提交,offset是否会自动保存到kafka的对应的topic上 props.put("auto.commit.interval.ms", "1000");//每1秒钟提交一次偏移量 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props); //消费数据 //订阅topic 参数是多个topic的一个集合 kafkaConsumer.subscribe(Arrays.asList("spider-test")); while (true){ //拉去100毫秒内的数据 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);//拉去数据 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { //打印数据 System.out.println("消费到的数据:"+consumerRecord.value()); } } }
}

文章来源: www.jianshu.com,作者:百忍成金的虚竹,版权归原作者所有,如需转载,请联系作者。

原文链接:www.jianshu.com/p/a456b6ab11c8

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。