个人介绍

这个人很懒,什么都没有留下

感兴趣或擅长的领域

暂无数据
个人勋章
TA还没获得勋章~
成长雷达
0
0
0
0
0

个人资料

个人介绍

这个人很懒,什么都没有留下

感兴趣或擅长的领域

暂无数据

达成规则

他的回复:
您好1.“开启了kerberos的集群中运行spark任务来读取另一个非kerberos集群的kafka”是的。2.本地idea连接的时候有做kafka节点的域名映射吗?        这是程序的Kafka相关配置,应该是用bootStrap直接指定了Ip地址和端口,应该不需要域名映射。直接就能连接成功。val sparkConf = new SparkConf().setAppName("SparkStreaming")      val scc = new StreamingContext(sparkConf,Seconds(10))      val kafkapara : Map[String, Object] = Map[String, Object](        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "10.172.140.153:9093,10.172.140.154:9093,10.172.140.155:9093",        ConsumerConfig.GROUP_ID_CONFIG -> "spark_test2",        "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",        "value.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",        "auto.offset.reset"->"latest",        "enable.auto.commit"->(false: java.lang.Boolean),        "security.protocol" -> "SASL_PLAINTEXT",        "sasl.mechanism"->"PLAIN",        "sasl.jaas.config"->"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"&&&&\" password=\"&&&&\";"      )      //kafka的泛型 传入什么样的KV      val kafkaDataDs: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](        //上下文的环境对象        scc,        //位置策略,采集结点与计算节点如何匹配,        LocationStrategies.PreferConsistent,        //        ConsumerStrategies.Subscribe[String,String](Set("acm_task"),kafkapara)      )