Spark-Redis入门篇

举报
我爱次火锅锅 发表于 2020/09/27 14:23:40 2020/09/27
【摘要】 Spark 是专为大规模数据处理而设计的快速通用的计算引擎,起源于UC Berkeley AMP lab的一个研究项目。相比传统的Hadoop(MapReduce) ,Spark的性能快了将近100x倍。Spark在计算中用到的数据可能会存在DWS、HBase或者HDFS上,其读写速度都和Spark计算的速度相差甚远。而Redis基于内存的读写可以成功解决这个问题,于是诞生了Spark-Redis

Spark 是专为大规模数据处理而设计的快速通用的计算引擎,是起源于UC Berkeley AMP lab的一个研究项目,于2010年开源,2013年加入Apache基金会,如今拥有50万Meetup成员,开源社区1300位开发者,各大公司参与开发和使用Spark。相比传统的Hadoop(MapReduce) ,Spark的性能快了将近100x倍。

Spark在计算中用到的数据可能会存在DWS、HBase或者HDFS上,其读写速度都和Spark计算的速度相差甚远。而Redis基于内存的读写则可以成功解决这个问题,于是诞生了Spark-Redis。。。

配置Config

在maven的pom.xml中添加依赖:

<dependencies>

    <dependency>

      <groupId>com.redislabs</groupId>

      <artifactId>spark-redis_2.12</artifactId>

      <version>2.4.2</version>

    </dependency>

  </dependencies>

在SBT中添加:

libraryDependencies += "com.redislabs" %% "spark-redis" % "2.4.2"

在spark-shell中使用spark-redis的库:

$ bin/spark-shell --jars <path-to>/spark-redis-<version>-jar-with-dependencies.jar
$ bin/spark-shell --jars <path-to>/spark-redis-<version>-jar-with-dependencies.jar --conf "spark.redis.host=localhost" --conf "spark.redis.port=6379" --conf "spark.redis.auth=passwd"

Spark-Redis工程: 

下图是Spark-Redis的一个文件目录,其中重要的类主要有RedisRDD、DefaultSource、RedisSourceRelation等。


RedisRDD中定义了RedisKVRDD、RedisListRDD、RedisZSetRDD、RedisKeysRDD。因为Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)。

DefaultSource中定义了createRelation方法,在执行create、insert语句的时候会用到这个方法。

override def createRelation(sqlContext: SQLContext, mode: SaveMode,
                            parameters: Map[String, String], data: DataFrame): BaseRelation = {
  val relation = new RedisSourceRelation(sqlContext, parameters, userSpecifiedSchema = None)
  mode match {
    case Append => relation.insert(data, overwrite = false)
    case Overwrite => relation.insert(data, overwrite = true)
    case ErrorIfExists =>
      if (relation.nonEmpty) {
        throw new IllegalStateException("SaveMode is set to ErrorIfExists and dataframe " +
          "already exists in Redis and contains data.")
      }
      relation.insert(data, overwrite = false)
    case Ignore =>
      if (relation.isEmpty) {
        relation.insert(data, overwrite = false)
      }
  }

  relation
}

RedisSourceRelation中定义了buildScan、scanRows、insert等重要的方法。当执行select查询时,首先通过sc.fromRedisKeyPattern获取RedisKeysRDD,也即所有keys。所以还需要filter columns,然后根据filter之后的keys从redis中读取数据。

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
  val keysRdd = sc.fromRedisKeyPattern(dataKeyPattern, partitionNum = numPartitions)
  if (requiredColumns.isEmpty) {
    keysRdd.map { _ =>
      new GenericRow(Array[Any]())
    }
  } else {
    // filter schema columns, it should be in the same order as given 'requiredColumns'
    val requiredSchema = {
      val fieldsMap = schema.fields.map(f => (f.name, f)).toMap
      val requiredFields = requiredColumns.map { c =>
        fieldsMap(c)
      }
      StructType(requiredFields)
    }
    val keyType =
      if (persistenceModel == SqlOptionModelBinary) {
        RedisDataTypeString
      } else {
        RedisDataTypeHash
      }
    keysRdd.mapPartitions { partition =>
      // grouped iterator to only allocate memory for a portion of rows
      partition.grouped(iteratorGroupingSize).flatMap { batch =>
        groupKeysByNode(redisConfig.hosts, batch.iterator)
          .flatMap { case (node, keys) =>
            scanRows(node, keys, keyType, requiredSchema, requiredColumns)
          }
      }
    }
  }
}

Insert方法定义了将数据写入redis的逻辑。

override def insert(data: DataFrame, overwrite: Boolean): Unit = {
  val schema = userSpecifiedSchema.getOrElse(data.schema)
  // write schema, so that we can load dataframe back
  currentSchema = saveSchema(schema)
  if (overwrite) {
    // truncate the table
    sc.fromRedisKeyPattern(dataKeyPattern).foreachPartition { partition =>
      groupKeysByNode(redisConfig.hosts, partition).foreach { case (node, keys) =>
        val conn = node.connect()
        foreachWithPipeline(conn, keys) { (pipeline, key) =>
          (pipeline: PipelineBase).del(key) // fix ambiguous reference to overloaded definition
        }
        conn.close()
      }
    }
  }

  // write data
  data.foreachPartition { partition =>
    val taskContext = TaskContext.get()
    var recordsCounter = 0L
    // grouped iterator to only allocate memory for a portion of rows
    partition.grouped(iteratorGroupingSize).foreach { batch =>
      // the following can be optimized to not create a map
      val rowsWithKey: Map[String, Row] = batch.map(row => dataKeyId(row) -> row).toMap
      groupKeysByNode(redisConfig.hosts, rowsWithKey.keysIterator).foreach { case (node, keys) =>
        val conn = node.connect()
        foreachWithPipeline(conn, keys) { (pipeline, key) =>
          val row = rowsWithKey(key)
          val encodedRow = persistence.encodeRow(keyName, row)
          persistence.save(pipeline, key, encodedRow, ttl)
        }
        conn.close()
      }
    }
  }
}

Redis是支持持久化的,所以BinaryRedisPersistence和HashRedisPersistence都是跟持久化相关的两个类。还有很多其他重要的类这里不做介绍了,可以在https://github.com/RedisLabs/spark-redis中看相关的代码。

结语

这也是我第一次接触spark-redis,因为在解问题单的时候涉及到这一块,所以在git上下载了spark-redis,其中doc中详细讲解了rdd、dataframe、streaming等相关概念。网上关于spark-redis的介绍比较少,所以建议从git上clone源码,这样会有个更清晰的认知。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。