云社区 博客 博客详情
云社区 博客 博客详情

spark streaming连接kafka引发"partition.assignment.strategy"异常处理

AllEmpty 发表于 2020-03-18 11:03:02 03-18 11:03
AllEmpty 发表于 2020-03-18 11:03:02 2020-03-18
0
0

【摘要】   服务器运行环境:spark 2.4.4 + scall 2.11.12 + kafka 2.2.2  由于业务相对简单,kafka只有固定topics,所以一直使用下面脚本执行实时流计算spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4 --py-files /data/service...

  服务器运行环境:spark 2.4.4 + scall 2.11.12 + kafka 2.2.2

  由于业务相对简单,kafka只有固定topics,所以一直使用下面脚本执行实时流计算

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4 --py-files /data/service/xxx.zip /data/service/xxx.py

  代码中使用pyspark.streaming.kafka的KafkaUtils来创建spark streaming与kafka的连接,运行了好长时间都没有出现过问题

  随着新业务接入,在新功能中kafka需要使用动态topics方式,要用到正则表达式,查了KafkaUtils源码和相关资料,发现它不支持动态topics方式,需要使用spark-streaming-kafka-0-10才能支持

  查看文档http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html 与 http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html 后,使用结构化流structured-streaming来实现

  实现代码:

import sysfrom pyspark.sql import SparkSessiondef process_row(row):    # Write row to storage
    passif __name__ == "__main__":    if len(sys.argv) != 4:        print("""
        Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>        """, file=sys.stderr)
        sys.exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]

    spark = SparkSession\
        .builder\
        .appName("StructuredKafkaWordCount")\
        .getOrCreate()    # Create DataSet representing the stream of input lines from kafka
    ds = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", bootstrapServers)\
        .option(subscribeType, topics)\
        .load()\
        .selectExpr("CAST(value AS STRING)")

    ds.printSchema()
    query = ds.writeStream.foreach(process_row).start()
    query.awaitTermination()


  执行提交任务命令

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 /data/service/demo.py master:9092 subscribePattern event.log.*

  提交后一直报下面错误

org.apache.kafka.common.config.ConfigException: 
Missing required configuration "partition.assignment.strategy" which has no default value

  查了好多资料,都说需要添加参数,配置Kafka分区分配策略,并将readStream修改为:

    ds = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", bootstrapServers)\
        .option("kafka.partition.assignment.strategy", "range")\
        .option(subscribeType, topics)\
        .load()


  再次运行异常信息改为无法连接kafka了,弄了整整一天人都快崩溃了还没搞定

  还好最终查找https://xbuba.com/questions/44959483,大牛提示说有可能是kafka0.8版本的jar与kafka0.10的jar冲突原因造成的

  使用命令查找

find / -name 'spark-streaming-kafka*'
find / -name 'spark-sql-kafka*'

  发现在/root/.ivy2/cache/org.apache.spark/ 目录下面存在spark-streaming-kafka-0-8_2.11 与 spark-sql-kafka-0-10_2.11 文件夹和相关的jar文件

  将spark-streaming-kafka-0-8_2.11删除后执行代码就正常运行了

  由于老脚本用的还是kafka0.8,为了兼容两个版本能同时运行,需要将/root/.ivy2/cache/org.apache.spark/ 目录下面kafka0.8与kafka0.10两个版本的jar全部清除

  然后登录https://repo1.maven.org/maven2/org/apache/spark/ 下载spark-streaming-kafka-0-8与spark-sql-kafka-0-10对应的jar下来,并将提交命令spark-submit的参数改为:

spark-submit --jars /data/service/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar --py-files /data/service/xxx.zip /data/service/xxx.py
spark-submit --jars /data/service/spark-sql-kafka-0-10_2.11-2.4.4.jar /data/service/demo.py master:9092 subscribePattern event.log.*

  修改后两个脚本运行都没有问题(PS:老脚本原想直接用org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4包来启动,执行后直接暴错,提示说要改为org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4才行)


登录后可下载附件,请登录或者注册

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:huaweicloud.bbs@huawei.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
评论文章 //点赞 收藏 0
点赞
分享文章到微博
分享文章到朋友圈

评论 (0)


0/1000
评论

登录后可评论,请 登录注册

评论

您没有权限执行当前操作

温馨提示

您确认删除评论吗?

确定
取消
温馨提示

您确认删除评论吗?

删除操作无法恢复,请谨慎操作。

确定
取消
温馨提示

您确认删除博客吗?

确定
取消

确认删除

您确认删除博客吗?

确认删除

您确认删除评论吗?

温馨提示

登录超时或用户已下线,请重新登录!!!

确定
取消