Spark-Redis入门篇
Spark 是专为大规模数据处理而设计的快速通用的计算引擎,是起源于UC Berkeley AMP lab的一个研究项目,于2010年开源,2013年加入Apache基金会,如今拥有50万Meetup成员,开源社区1300位开发者,各大公司参与开发和使用Spark。相比传统的Hadoop(MapReduce) ,Spark的性能快了将近100x倍。
Spark在计算中用到的数据可能会存在DWS、HBase或者HDFS上,其读写速度都和Spark计算的速度相差甚远。而Redis基于内存的读写则可以成功解决这个问题,于是诞生了Spark-Redis。。。
配置Config
在maven的pom.xml中添加依赖:
<dependencies>
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis_2.12</artifactId>
<version>2.4.2</version>
</dependency>
</dependencies>
在SBT中添加:
libraryDependencies += "com.redislabs" %% "spark-redis" % "2.4.2"
在spark-shell中使用spark-redis的库:
$ bin/spark-shell --jars <path-to>/spark-redis-<version>-jar-with-dependencies.jar
$ bin/spark-shell --jars <path-to>/spark-redis-<version>-jar-with-dependencies.jar --conf "spark.redis.host=localhost" --conf "spark.redis.port=6379" --conf "spark.redis.auth=passwd"
Spark-Redis工程:
下图是Spark-Redis的一个文件目录,其中重要的类主要有RedisRDD、DefaultSource、RedisSourceRelation等。
RedisRDD中定义了RedisKVRDD、RedisListRDD、RedisZSetRDD、RedisKeysRDD。因为Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)。
DefaultSource中定义了createRelation方法,在执行create、insert语句的时候会用到这个方法。
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { val relation = new RedisSourceRelation(sqlContext, parameters, userSpecifiedSchema = None) mode match { case Append => relation.insert(data, overwrite = false) case Overwrite => relation.insert(data, overwrite = true) case ErrorIfExists => if (relation.nonEmpty) { throw new IllegalStateException("SaveMode is set to ErrorIfExists and dataframe " + "already exists in Redis and contains data.") } relation.insert(data, overwrite = false) case Ignore => if (relation.isEmpty) { relation.insert(data, overwrite = false) } } relation }
RedisSourceRelation中定义了buildScan、scanRows、insert等重要的方法。当执行select查询时,首先通过sc.fromRedisKeyPattern获取RedisKeysRDD,也即所有keys。所以还需要filter columns,然后根据filter之后的keys从redis中读取数据。
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { val keysRdd = sc.fromRedisKeyPattern(dataKeyPattern, partitionNum = numPartitions) if (requiredColumns.isEmpty) { keysRdd.map { _ => new GenericRow(Array[Any]()) } } else { // filter schema columns, it should be in the same order as given 'requiredColumns' val requiredSchema = { val fieldsMap = schema.fields.map(f => (f.name, f)).toMap val requiredFields = requiredColumns.map { c => fieldsMap(c) } StructType(requiredFields) } val keyType = if (persistenceModel == SqlOptionModelBinary) { RedisDataTypeString } else { RedisDataTypeHash } keysRdd.mapPartitions { partition => // grouped iterator to only allocate memory for a portion of rows partition.grouped(iteratorGroupingSize).flatMap { batch => groupKeysByNode(redisConfig.hosts, batch.iterator) .flatMap { case (node, keys) => scanRows(node, keys, keyType, requiredSchema, requiredColumns) } } } } }
Insert方法定义了将数据写入redis的逻辑。
override def insert(data: DataFrame, overwrite: Boolean): Unit = { val schema = userSpecifiedSchema.getOrElse(data.schema) // write schema, so that we can load dataframe back currentSchema = saveSchema(schema) if (overwrite) { // truncate the table sc.fromRedisKeyPattern(dataKeyPattern).foreachPartition { partition => groupKeysByNode(redisConfig.hosts, partition).foreach { case (node, keys) => val conn = node.connect() foreachWithPipeline(conn, keys) { (pipeline, key) => (pipeline: PipelineBase).del(key) // fix ambiguous reference to overloaded definition } conn.close() } } } // write data data.foreachPartition { partition => val taskContext = TaskContext.get() var recordsCounter = 0L // grouped iterator to only allocate memory for a portion of rows partition.grouped(iteratorGroupingSize).foreach { batch => // the following can be optimized to not create a map val rowsWithKey: Map[String, Row] = batch.map(row => dataKeyId(row) -> row).toMap groupKeysByNode(redisConfig.hosts, rowsWithKey.keysIterator).foreach { case (node, keys) => val conn = node.connect() foreachWithPipeline(conn, keys) { (pipeline, key) => val row = rowsWithKey(key) val encodedRow = persistence.encodeRow(keyName, row) persistence.save(pipeline, key, encodedRow, ttl) } conn.close() } } } }
Redis是支持持久化的,所以BinaryRedisPersistence和HashRedisPersistence都是跟持久化相关的两个类。还有很多其他重要的类这里不做介绍了,可以在https://github.com/RedisLabs/spark-redis中看相关的代码。
结语
这也是我第一次接触spark-redis,因为在解问题单的时候涉及到这一块,所以在git上下载了spark-redis,其中doc中详细讲解了rdd、dataframe、streaming等相关概念。网上关于spark-redis的介绍比较少,所以建议从git上clone源码,这样会有个更清晰的认知。
- 点赞
- 收藏
- 关注作者
评论(0)