Spark-Redis入门篇
【摘要】 Spark 是专为大规模数据处理而设计的快速通用的计算引擎,起源于UC Berkeley AMP lab的一个研究项目。相比传统的Hadoop(MapReduce) ,Spark的性能快了将近100x倍。Spark在计算中用到的数据可能会存在DWS、HBase或者HDFS上,其读写速度都和Spark计算的速度相差甚远。而Redis基于内存的读写可以成功解决这个问题,于是诞生了Spark-Redis
Spark 是专为大规模数据处理而设计的快速通用的计算引擎,是起源于UC Berkeley AMP lab的一个研究项目,于2010年开源,2013年加入Apache基金会,如今拥有50万Meetup成员,开源社区1300位开发者,各大公司参与开发和使用Spark。相比传统的Hadoop(MapReduce) ,Spark的性能快了将近100x倍。
Spark在计算中用到的数据可能会存在DWS、HBase或者HDFS上,其读写速度都和Spark计算的速度相差甚远。而Redis基于内存的读写则可以成功解决这个问题,于是诞生了Spark-Redis。。。
配置Config
在maven的pom.xml中添加依赖:
<dependencies>
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis_2.12</artifactId>
<version>2.4.2</version>
</dependency>
</dependencies>
在SBT中添加:
libraryDependencies += "com.redislabs" %% "spark-redis" % "2.4.2"
在spark-shell中使用spark-redis的库:
$ bin/spark-shell --jars <path-to>/spark-redis-<version>-jar-with-dependencies.jar
$ bin/spark-shell --jars <path-to>/spark-redis-<version>-jar-with-dependencies.jar --conf "spark.redis.host=localhost" --conf "spark.redis.port=6379" --conf "spark.redis.auth=passwd"
Spark-Redis工程:
下图是Spark-Redis的一个文件目录,其中重要的类主要有RedisRDD、DefaultSource、RedisSourceRelation等。
RedisRDD中定义了RedisKVRDD、RedisListRDD、RedisZSetRDD、RedisKeysRDD。因为Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)。
DefaultSource中定义了createRelation方法,在执行create、insert语句的时候会用到这个方法。
override def createRelation(sqlContext: SQLContext, mode: SaveMode,
parameters: Map[String, String], data: DataFrame): BaseRelation = {
val relation = new RedisSourceRelation(sqlContext, parameters, userSpecifiedSchema = None)
mode match {
case Append => relation.insert(data, overwrite = false)
case Overwrite => relation.insert(data, overwrite = true)
case ErrorIfExists =>
if (relation.nonEmpty) {
throw new IllegalStateException("SaveMode is set to ErrorIfExists and dataframe " +
"already exists in Redis and contains data.")
}
relation.insert(data, overwrite = false)
case Ignore =>
if (relation.isEmpty) {
relation.insert(data, overwrite = false)
}
}
relation
}
RedisSourceRelation中定义了buildScan、scanRows、insert等重要的方法。当执行select查询时,首先通过sc.fromRedisKeyPattern获取RedisKeysRDD,也即所有keys。所以还需要filter columns,然后根据filter之后的keys从redis中读取数据。
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val keysRdd = sc.fromRedisKeyPattern(dataKeyPattern, partitionNum = numPartitions)
if (requiredColumns.isEmpty) {
keysRdd.map { _ =>
new GenericRow(Array[Any]())
}
} else {
// filter schema columns, it should be in the same order as given 'requiredColumns'
val requiredSchema = {
val fieldsMap = schema.fields.map(f => (f.name, f)).toMap
val requiredFields = requiredColumns.map { c =>
fieldsMap(c)
}
StructType(requiredFields)
}
val keyType =
if (persistenceModel == SqlOptionModelBinary) {
RedisDataTypeString
} else {
RedisDataTypeHash
}
keysRdd.mapPartitions { partition =>
// grouped iterator to only allocate memory for a portion of rows
partition.grouped(iteratorGroupingSize).flatMap { batch =>
groupKeysByNode(redisConfig.hosts, batch.iterator)
.flatMap { case (node, keys) =>
scanRows(node, keys, keyType, requiredSchema, requiredColumns)
}
}
}
}
}
Insert方法定义了将数据写入redis的逻辑。
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
val schema = userSpecifiedSchema.getOrElse(data.schema)
// write schema, so that we can load dataframe back
currentSchema = saveSchema(schema)
if (overwrite) {
// truncate the table
sc.fromRedisKeyPattern(dataKeyPattern).foreachPartition { partition =>
groupKeysByNode(redisConfig.hosts, partition).foreach { case (node, keys) =>
val conn = node.connect()
foreachWithPipeline(conn, keys) { (pipeline, key) =>
(pipeline: PipelineBase).del(key) // fix ambiguous reference to overloaded definition
}
conn.close()
}
}
}
// write data
data.foreachPartition { partition =>
val taskContext = TaskContext.get()
var recordsCounter = 0L
// grouped iterator to only allocate memory for a portion of rows
partition.grouped(iteratorGroupingSize).foreach { batch =>
// the following can be optimized to not create a map
val rowsWithKey: Map[String, Row] = batch.map(row => dataKeyId(row) -> row).toMap
groupKeysByNode(redisConfig.hosts, rowsWithKey.keysIterator).foreach { case (node, keys) =>
val conn = node.connect()
foreachWithPipeline(conn, keys) { (pipeline, key) =>
val row = rowsWithKey(key)
val encodedRow = persistence.encodeRow(keyName, row)
persistence.save(pipeline, key, encodedRow, ttl)
}
conn.close()
}
}
}
}
Redis是支持持久化的,所以BinaryRedisPersistence和HashRedisPersistence都是跟持久化相关的两个类。还有很多其他重要的类这里不做介绍了,可以在https://github.com/RedisLabs/spark-redis中看相关的代码。
结语
这也是我第一次接触spark-redis,因为在解问题单的时候涉及到这一块,所以在git上下载了spark-redis,其中doc中详细讲解了rdd、dataframe、streaming等相关概念。网上关于spark-redis的介绍比较少,所以建议从git上clone源码,这样会有个更清晰的认知。
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)