他的回复:
您好1.“开启了kerberos的集群中运行spark任务来读取另一个非kerberos集群的kafka”是的。2.本地idea连接的时候有做kafka节点的域名映射吗? 这是程序的Kafka相关配置,应该是用bootStrap直接指定了Ip地址和端口,应该不需要域名映射。直接就能连接成功。val sparkConf = new SparkConf().setAppName("SparkStreaming") val scc = new StreamingContext(sparkConf,Seconds(10)) val kafkapara : Map[String, Object] = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "10.172.140.153:9093,10.172.140.154:9093,10.172.140.155:9093", ConsumerConfig.GROUP_ID_CONFIG -> "spark_test2", "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer", "auto.offset.reset"->"latest", "enable.auto.commit"->(false: java.lang.Boolean), "security.protocol" -> "SASL_PLAINTEXT", "sasl.mechanism"->"PLAIN", "sasl.jaas.config"->"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"&&&&\" password=\"&&&&\";" ) //kafka的泛型 传入什么样的KV val kafkaDataDs: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String]( //上下文的环境对象 scc, //位置策略,采集结点与计算节点如何匹配, LocationStrategies.PreferConsistent, // ConsumerStrategies.Subscribe[String,String](Set("acm_task"),kafkapara) )