Spark 操作 kafka

举报
武师叔 发表于 2022/09/29 16:38:03 2022/09/29
【摘要】 theme: condensed-night-purple Spark 操作 kafka携手创作,共同成长!这是我参与「掘金日新计划 · 8 月更文挑战」的第30天,点击查看活动详情Spark Streaming提供了两类内置的streaming源:Basic sources :直接在StreamingContext API中可用的源。例如,文件系统和socket连接 Advanced s...

theme: condensed-night-purple

Spark 操作 kafka

携手创作,共同成长!这是我参与「掘金日新计划 · 8 月更文挑战」的第30天,点击查看活动详情


Spark Streaming提供了两类内置的streaming源:

Basic sources :直接在StreamingContext API中可用的源。例如,文件系统和socket连接 Advanced sources :像Kafka,Flume,Kinesis等这样的源,可通过额外的程序类获得

消费

  1. 先把数据接收过来,转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:1.利用Receiver接收数据,2.直接从kafka读取数据。

在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。

0.10以后只保留了direct模式(Reveiver模式不适合生产环境),并且0.10版本API有变化(更加强大)

生产

与读数据不同,Spark并没有提供统一的接口用于写入Kafka,所以我们需要使用底层Kafka接口进行包装。 最直接的做法我们可以想到如下这种方式:

input.foreachRDD(rdd =>
  // 不能在这里创建KafkaProducer
  rdd.foreachPartition(partition =>
    partition.foreach{
      case x:String=>{
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        println(x)
        val producer = new KafkaProducer[String,String](props)
        val message=new ProducerRecord[String, String]("output",null,x)
        producer.send(message)
      }
    }
  )
)Copy to clipboardErrorCopied

但是这种方式缺点很明显,对于每个partition的每条记录,我们都需要创建KafkaProducer,然后利用producer进行输出操作,注意这里我们并不能将KafkaProducer的新建任务放在foreachPartition外边,因为KafkaProducer是不可序列化的(not serializable)。显然这种做法是不灵活且低效的,因为每条记录都需要建立一次连接。如何解决呢?

首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下:

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。