客快物流大数据项目(一百零一):实时OLAP开发
实时OLAP开发
一、实时ETL处理
使用ClickHouse分析物流指标数据,必须将数据存储到ClickHouse中。
业务流程:
二、SparkSQL基于DataSourceV2自定义数据源
1、Data Source API V1
Spark 1.3 版本开始引入了 Data Source API V1,通过这个 API 我们可以很方便的读取各种来源的数据,而且 Spark 使用 SQL 组件的一些优化引擎对数据源的读取进行优化,比如列裁剪、过滤下推等等。
这个版本的 Data Source API 有以下几个优点:
- 接口实现非常简单
- 能够满足大部分的使用场景
同时存在一些问题:
- 扩展能力有限,难以下推其他算子
- 缺乏对列式存储读取的支持
- 写操作不支持事务
- 缺乏分区和排序信息
- 不支持流处理
2、Data Source API V2
Data Source API V2为了解决 Data Source V1 的一些问题,从 Apache Spark 2.3.0 版本开始,社区引入了 Data Source API V2,在保留原有的功能之外,还解决了 Data Source API V1 存在的一些问题,比如不再依赖上层 API,扩展能力增强。
这个版本的 Data Source API V2 有以下几个优点:
- DataSourceV2 API使用Java编写
- 不依赖于上层API(DataFrame/RDD)
- 易于扩展,可以添加新的优化,同时保持向后兼容
- 提供物理信息,如大小、分区等
- 支持Streaming Source/Sink
- 灵活、强大和事务性的写入API
Spark2.3中V2的功能
- 支持列扫描和行扫描
- 列裁剪和过滤条件下推
- 可以提供基本统计和数据分区
- 事务写入API
- 支持微批和连续的Streaming Source/Sink
三、基于DataSourceV2实现输入源
1、ReadSupport & WriteSupport
为了使用 Data Source API V2,我们肯定是需要使用到 Data Source API V2 包里面相关的类库,对于读取程序,我们只需要实现 ReadSupport 相关接口就行,如下:
代码实现:
/**
* Spark SQL 基于DataSourceV2接口实现自定义数据源
* 1.继承DataSourceV2向Spark注册数据源
* 2.继承ReadSupport支持读数据
* 3.继承WriteSupport支持写数据
*/
class CustomDataSourceV2 extends DataSourceV2 with ReadSupport with WriteSupport {
/**
* 创建Reader
*
* @param options 用户自定义的options
* @return 返回自定义的DataSourceReader
*/
override def createReader(options: DataSourceOptions): DataSourceReader = ???
/**
* 创建Writer
*
* @param jobId jobId
* @param schema schema
* @param mode 保存模式
* @param options 用于定义的option
* @return Optional[自定义的DataSourceWriter]
*/
override def createWriter(jobId: String, schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[DataSourceWriter] = ???
}
2、DataSourceReader & DataSourceWriter
前面我们实现了 ReadSupport 接口,并重写了 createReader 方法。这里我们需要实现 DataSourceReader 接口相关的操作,如下:
/**
* 自定义的DataSourceReader
* 继承DataSourceReader
* 重写readSchema方法用来生成schema
* 重写planInputPartitions,每个分区拆分及读取逻辑
* @param options options
*/
case class CustomDataSourceV2Reader(options: Map[String, String]) extends DataSourceReader {
/**
* 读取的列相关信息
* @return
*/
override def readSchema(): StructType = ???
/**
* 每个分区拆分及读取逻辑
* @return
*/
override def planInputPartitions(): util.List[InputPartition[InternalRow]] = ???
}
/**
* 自定义的DataSourceWriter
* 继承DataSourceWriter
* 重写createWriterFactory方法用来创建RestDataWriter工厂类
* 重写commit方法,所有分区提交的commit信息
* 重写abort方法,当write异常时调用,该方法用于事务回滚,当write方法发生异常之后触发该方法
* @param dataSourceOptions options
*/
class CustomDataSourceWriter(dataSourceOptions: DataSourceOptions) extends DataSourceWriter {
/**
* 创建RestDataWriter工厂类
* @return DataWriterFactory
*/
override def createWriterFactory(): DataWriterFactory[InternalRow] = ???
/**
* commit
* @param writerCommitMessages 所有分区提交的commit信息
* 触发一次
*/
override def commit(writerCommitMessages: Array[WriterCommitMessage]): Unit = ???
/** *
* abort
* @param writerCommitMessages 当write异常时调用,该方法用于事务回滚,当write方法发生异常之后触发该方法
*/
override def abort(writerCommitMessages: Array[WriterCommitMessage]): Unit = ???
}
3、读写实现
最后一个需要我们实现的就是分片读取,在 DataSource V1 里面缺乏分区的支持,而 DataSource V2 支持完整的分区处理,也就是上面的 planInputPartitions 方法。
在那里我们可以定义使用几个分区读取数据源的数据。比如如果是 TextInputFormat,我们可以读取到对应文件的 splits 个数,然后每个 split 构成这里的一个分区,使用一个 Task 读取。为了简便起见,我这里使用了只使用了一个分区,也就是 List[InputPartition[InternalRow]].asJava。
SparkSQL的DataSourceV2的实现与StructuredStreaming自定义数据源如出一辙,思想是一样的,但是具体实现有所不同
主要步骤如下:
- 继承DataSourceV2和ReadSupport创建XXXDataSource类,重写ReadSupport的creatReader方法,用来返回自定义的DataSourceReader类,如返回自定义XXXDataSourceReader实例
- 继承DataSourceReader创建XXXDataSourceReader类,重写DataSourceReader的readSchema方法用来返回数据源的schema,重写DataSourceReader的planInputPartitions用来返回多个自定义DataReaderFactory实例
- 继承DataReaderFactory创建DataReader工厂类,如XXXDataReaderFactory,重写DataReaderFactory的createDataReader方法,返回自定义DataRader实例
- 继承DataReader类创建自定义的DataReader,如XXXDataReader,重写DataReader的next()方法,用来告诉Spark是否有下条数据,用来触发get()方法,重写DataReader的get()方法获取数据,重写DataReader的close()方法用来关闭资源
四、编写ClickHouse操作的自定义数据源
实现步骤:
- 在logistics-etl模块cn.it.logistics.etl.realtime.ext.clickhouse程序包下创建ClickHouseDataSourceV2类
- 分别继承自ReadSupport、WriteSupport、StreamWriteSupport接口
- 依次实现各个接口的方法
- createReader(批处理方式下的数据读取)
- createWriter(批处理方式下的数据写入)
- createStreamWriter(流处理方式下的数据写入)
- 创建连接Clickhouse所需要的的参数对象(ClickHouseOptions)
- 创建操作ClickHouse的工具类(ClickHouseHelper)
- 实现获取ClickHouse连接对象的方法
- 实现创建表的方法
- 实现生成插入sql语句的方法
- 实现生成修改sql语句的方法
- 实现生成删除sql语句的方法
- 实现批量更新sql的方法
- 创建测试单例对象读取clickhouse的数据以及将数据写入clickhouse中
实现方法:
- 在logistics-etl模块cn.it.logistics.etl.realtime.ext.clickhouse程序包下创建ClickHouseDataSourceV2类
package cn.it.logistics.etl.realtime.ext.clickhouse
/**
* @ClassName ClickHouseDataSourceV2
* @Description 扩展SparkSQL DataSourceV2的ClickHouse数据源实现
*/
class ClickHouseDataSourceV2 {
}
- 分别继承自ReadSupport、WriteSupport、StreamWriteSupport接口
/**
* @ClassName ClickHouseDataSourceV2
* @Description 扩展SparkSQL DataSourceV2的ClickHouse数据源实现
*/
class ClickHouseDataSourceV2 extends DataSourceV2 with ReadSupport with WriteSupport with StreamWriteSupport {
}
- 依次实现各个接口的方法
- createReader(批处理方式下的数据读取)
- createWriter(批处理方式下的数据写入)
- createStreamWriter(流处理方式下的数据写入)
/**
* @ClassName ClickHouseDataSourceV2
* @Description 扩展SparkSQL DataSourceV2的ClickHouse数据源实现
*/
class ClickHouseDataSourceV2 extends DataSourceV2 with ReadSupport with WriteSupport with StreamWriteSupport {
/** 批处理方式下的数据读取 */
override def createReader(options: DataSourceOptions): DataSourceReader = ???
/** 批处理方式下的数据写入 */
override def createWriter(writeUUID: String, schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[DataSourceWriter] = ???
/** 流处理方式下的数据写入 */
override def createStreamWriter(queryId: String, schema: StructType, mode: OutputMode, options: DataSourceOptions): StreamWriter = ???
}
1、批处理方式下的数据读取
实现步骤:
- 自定义ClickHouseDataSourceReader类继承自DataSourceReader接口
- 实现如下方法:
- readSchema()(该方法主要是基于Clickhouse的表结构构建schama对象)
- planInputPartitions()(针对每个分区的数据读取逻辑的实现)
- 自定义每个分区数据读取逻辑的实现类:ClickHouseInputPartition,继承InputPartition接口,并实现如下方法:
- createPartitionReader(创建分区数据读取对象)
- 自定义分区数据读取对象:ClickHouseInputPartitionReader,继承自InputPartitionReader接口及Serializable接口,并实现如下方法:
- next()(是否有下一条数据,返回true表示有数据)
- get()(读取数据)
- close()(关闭jdbc连接,释放资源)
- 为ClickHouseDataSourceV2 对象的createReader方法赋值
实现方法:
- 自定义ClickHouseDataSourceReader类继承自DataSourceReader接口
/**
* 基于批处理的方式对ClickHouse数据库中的数据进行读取
*/
class ClickHouseDataSourceReader(options: ClickHouseOptions) extends DataSourceReader {
}
- 实现如下方法:
- readSchema()(该方法主要是基于Clickhouse的表结构构建schama对象)
- planInputPartitions()(针对每个分区的数据读取逻辑的实现)
/**
* 基于批处理的方式对ClickHouse数据库中的数据进行读取
*/
class ClickHouseDataSourceReader(options: ClickHouseOptions) extends DataSourceReader {
//实例化ClickHouseHelper工具类
val ckHelper = new ClickHouseHelper(options)
private val schema: StructType = ckHelper.getSparkTableSchema
/**
* 读取数据需要返回DataFrame对象(RDD+Schema组成)
* 读取表的结构信息(schema)
* @return
*/
override def readSchema(): StructType = schema
/**
* 每个分区拆分读取逻辑的实现(返回所有分区的数据)
* @return
*/
override def planInputPartitions(): util.List[InputPartition[InternalRow]] = util.Arrays.asList(new ClickHouseInputPartition(schema, options))
}
- 在ClickHouseHelper单例对象中实现getSparkTableSchema方法
package cn.it.logistics.etl.realtime.ext
import java.sql.{Connection, Date, PreparedStatement, ResultSet, Statement}
import java.text.SimpleDateFormat
import java.util
import org.apache.commons.lang3.StringUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.javatuples.Triplet
import ru.yandex.clickhouse.domain.ClickHouseDataType
import ru.yandex.clickhouse.response.{ClickHouseResultSet, ClickHouseResultSetMetaData}
import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseDataSource, ClickHouseStatement}
import ru.yandex.clickhouse.settings.ClickHouseProperties
import org.apache.spark.sql.types.{BooleanType, DataType, DataTypes, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, StructField, StructType}
import scala.collection.mutable.ArrayBuffer
/**
* clickHouse操作的工具类
*/
class ClickHouseHelper(options: ClickHouseOptions) extends Logging{
private val opType: String = options.getOpTypeField
private var connection: ClickHouseConnection = getConnection
private val id: String = options.getPrimaryKey
/**
* 获取Clickhouse的连接对象
*/
def getConnection = {
//获取clickhouse的连接字符串
val url: String = options.getURL
//创建clickhouseDataSource对象
val clickHouseDataSource: ClickHouseDataSource = new ClickHouseDataSource(url, new ClickHouseProperties())
//返回clickhouse的连接对象
clickHouseDataSource.getConnection
}
/**
* 返回指定表的schema信息
* @return StructType:sparkDataFrame对象的schema信息
*/
def getSparkTableSchema: StructType = {
import collection.JavaConversions._
val clickHouseTableSchema: util.LinkedList[Triplet[String, String, String]] = getClickHouseTableSchema
//println(clickHouseTableSchema)
val fileds = ArrayBuffer[StructField]()
//基于clickhouse的表的列及列的类型创建schema对象
for (trp <- clickHouseTableSchema) {
fileds += StructField(trp.getValue0, getSparkSqlType(trp.getValue1))
}
//返回structType对象,该对象就是schema
StructType(fileds)
}
/**
* 根据clickhouseTable的列及列的类型集合
*/
def getClickHouseTableSchema = {
//定义列的集合
val fileds: util.LinkedList[Triplet[String, String, String]] = new util.LinkedList[Triplet[String, String, String]]()
//查询指定的表数据,返回查询到的结果及列的信息
//定义clickhouse的connection对象
var connection: ClickHouseConnection = null
var statement: ClickHouseStatement = null
var resultSet: ClickHouseResultSet = null
var metaData: ClickHouseResultSetMetaData = null
try {
//获取connection的连接对象
connection = getConnection
statement = connection.createStatement()
//定义要操作的表的sql语句,目前我们需要的是表的字段及字段类型,而不关心表的数据,因此给定不能满足的查询条件
val sql: String = s"select * FROM ${options.getFullTable} where 1=0"
resultSet = statement.executeQuery(sql).asInstanceOf[ClickHouseResultSet]
//获取到了指定表的元数据信息
metaData = resultSet.getMetaData.asInstanceOf[ClickHouseResultSetMetaData]
val columnCount: Int = metaData.getColumnCount
for (i <- 1 to columnCount) {
val columnName: String = metaData.getColumnName(i)
val columnTypeName: String = metaData.getColumnTypeName(i)
val javaTypeName: String = ClickHouseDataType.fromTypeString(columnTypeName).getJavaClass.getSimpleName
println("columnTypeName:"+columnTypeName)
println("javaTypeName:"+javaTypeName)
fileds.add(new Triplet(columnName, columnTypeName, javaTypeName))
}
} catch {
case ex: Exception => ex.printStackTrace()
} finally {
if (statement != null) statement.close()
if (connection != null) connection.close()
}
fileds
}
def closeAll(connection: Connection = null, st: Statement = null, ps: PreparedStatement = null, rs: ResultSet = null): Unit = {
try {
if (rs != null && !rs.isClosed) rs.close()
if (ps != null && !ps.isClosed) ps.close()
if (st != null && !st.isClosed) st.close()
if (connection != null && !connection.isClosed) connection.close()
} catch {
case e: Exception => e.printStackTrace()
}
}
}
- 自定义每个分区数据读取逻辑的实现类:ClickHouseInputPartition,继承InputPartition接口,并实现如下方法:
- createPartitionReader(创建分区数据读取对象)
package cn.it.logistics.etl.realtime.ext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
import org.apache.spark.sql.types.StructType
/**
* @ClassName CKInputPartition
* @Description 基于批处理方式的ClickHouse分区实现
*/
class ClickHouseInputPartition(schema: StructType, options: ClickHouseOptions) extends InputPartition[InternalRow] {
/**
* 创建分区数据读取对象
* @return
*/
override def createPartitionReader(): InputPartitionReader[InternalRow] = new ClickHouseInputPartitionReader(schema, options)
}
- 自定义分区数据读取对象:ClickHouseInputPartitionReader,继承自InputPartitionReader接口及Serializable接口,并实现如下方法:
- next()(是否有下一条数据,返回true表示有数据)
- get()(读取数据)
- close()(关闭jdbc连接,释放资源)
package cn.it.logistics.etl.realtime.ext
import java.io.Serializable
import java.sql.{ResultSet, SQLException}
import cn.it.logistics.etl.realtime.ext.example1.{CKHelper, CKOptions}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.unsafe.types.UTF8String
import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseStatement}
/**
* @ClassName CKInputPartitionReader
* @Description 基于批处理方式的ClickHouse分区读取数据实现
*/
class ClickHouseInputPartitionReader(schema: StructType, options: ClickHouseOptions) extends InputPartitionReader[InternalRow] with Logging with Serializable{
val helper = new ClickHouseHelper(options)
var connection: ClickHouseConnection = null
var st: ClickHouseStatement = null
var rs: ResultSet = null
/**
* 是否有下一条数据
* @return boolean
*/
override def next(): Boolean = {
if (null == connection || connection.isClosed && null == st || st.isClosed && null == rs || rs.isClosed){
connection = helper.getConnection
st = connection.createStatement()
rs = st.executeQuery(helper.getSelectStatement(schema))
println(/**logInfo**/s"初始化ClickHouse连接.")
}
if(null != rs && !rs.isClosed) rs.next() else false
}
/**
* 获取数据
* 当next为true时会调用get方法获取数据
* @return Row
*/
override def get(): InternalRow = {
val fields = schema.fields
val length = fields.length
val record = new Array[Any](length)
for (i <- 0 until length) {
val field = fields(i)
val name = field.name
val dataType = field.dataType
try {
dataType match {
case DataTypes.BooleanType => record(i) = rs.getBoolean(name)
case DataTypes.DateType => record(i) = DateTimeUtils.fromJavaDate(rs.getDate(name))
case DataTypes.DoubleType => record(i) = rs.getDouble(name)
case DataTypes.FloatType => record(i) = rs.getFloat(name)
case DataTypes.IntegerType => record(i) = rs.getInt(name)
case DataTypes.LongType => record(i) = rs.getLong(name)
case DataTypes.ShortType => record(i) = rs.getShort(name)
case DataTypes.StringType => record(i) = UTF8String.fromString(rs.getString(name))
case DataTypes.TimestampType => record(i) = DateTimeUtils.fromJavaTimestamp(rs.getTimestamp(name))
case DataTypes.BinaryType => record(i) = rs.getBytes(name)
case DataTypes.NullType => record(i) = StringUtils.EMPTY
}
} catch {
case e: SQLException => logError(e.getStackTrace.mkString("", scala.util.Properties.lineSeparator, scala.util.Properties.lineSeparator))
}
}
new GenericInternalRow(record)
}
/**
* 关闭资源
*/
override def close(): Unit = {helper.closeAll(connection, st, null, rs)}
}
- 在ClickHouseHelper单例对象中实现getSelectStatement方法
def getSelectStatement(schema: StructType): String = {
s"SELECT ${schema.fieldNames.mkString(",")} FROM ${options.getFullTable}"
}
- 为ClickHouseDataSourceV2 对象的createReader方法赋值
/**
* 批处理的方式读取数据
* options:接收到的是加载数据的时候指定的options函数
* */
override def createReader(options: DataSourceOptions): DataSourceReader = new ClickHouseDataSourceReader(new ClickHouseOptions(options.asMap()))
2、批处理方式下的数据写入
实现步骤:
- 自定义ClickHouseWrite类继承自StreamWrite接口
- 实现如下方法:
- createWriterFactory(重写createWriterFactory返回自定义的DataWriterFactory)
- commit(批数据或者流数据的递交方法)
- abort(写入数据的时候发生异常调用)
- 自定义ClickHouseDataWriterFactory,继承DataWriterFactory接口,并实现如下方法:
- createDataWriter(创建分区数据读取对象)
- 自定义ClickHouseDataWriter,继承自DataWriter接口及Serializable接口,并实现如下方法:
- commit()(递交数据)
- write()(写入数据数据)
- abort()(写入失败,执行方法)
- 为ClickHouseDataSourceV2 对象的createWriter方法赋值
实现方法:
- 自定义ClickHouseWrite类继承自StreamWrite接口
- 实现如下方法:
- createWriterFactory(重写createWriterFactory返回自定义的DataWriterFactory)
- commit(批数据或者流数据的递交方法)
- abort(写入数据的时候发生异常调用)
package cn.it.logistics.etl.realtime.ext
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.writer.{DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
/**
* @ClassName CKWriter
* @Description 支持Batch和Stream的数据写实现
*/
class ClickHouseWriter(writeUuidOrQueryId: String, schema: StructType, batchMode: SaveMode, streamMode: OutputMode, options: ClickHouseOptions) extends StreamWriter {
/**
* 重写createWriterFactory返回自定义的DataWriterFactory
* @return
*/
override def createWriterFactory(): DataWriterFactory[InternalRow] = new ClickHouseDataWriterFactory(writeUuidOrQueryId, schema, batchMode, streamMode, options)
/** Batch writer commit */
override def commit(messages: Array[WriterCommitMessage]): Unit = {}
/** Batch writer abort */
override def abort(messages: Array[WriterCommitMessage]): Unit = {}
/** Streaming writer commit */
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
/** Streaming writer abort */
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
}
- 自定义ClickHouseDataWriterFactory,继承DataWriterFactory接口,并实现如下方法:
- createDataWriter(创建分区数据读取对象)
package cn.it.logistics.etl.realtime.ext
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
/**
* @ClassName CKDataWriterFactory
* @Description 写数据工厂,用来实例化CKDataWriter
*/
class ClickHouseDataWriterFactory(writeUUID: String, schema: StructType, batchMode: SaveMode, streamMode: OutputMode, options: ClickHouseOptions) extends DataWriterFactory[InternalRow] {
override def createDataWriter(partitionId: Int, taskId: Long, epochId: Long): DataWriter[InternalRow] = new ClickHouseDataWriter(writeUUID, schema, batchMode, streamMode, options)
}
- 自定义ClickHouseDataWriter,继承自DataWriter接口及Serializable接口,并实现如下方法:
- commit()(递交数据)
- write()(写入数据数据)
- abort()(写入失败,执行方法)
package cn.it.logistics.etl.realtime.ext
import java.io.Serializable
import org.apache.commons.lang3.StringUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import scala.collection.mutable.ArrayBuffer
/**
* @ClassName CKDataWriter
* @Description ClickHouse的数据写实现
*/
class ClickHouseDataWriter(writeUUID: String, schema: StructType, batchMode: SaveMode, streamMode: OutputMode, options: ClickHouseOptions) extends DataWriter[InternalRow] with Logging with Serializable {
val helper = new ClickHouseHelper(options)
val opType = options.getOpTypeField
private val sqls = ArrayBuffer[String]()
private val autoCreateTable: Boolean = options.autoCreateTable
private val init = if (autoCreateTable) {
val createSQL: String = helper.createTable(options.getFullTable, schema)
println(/**logInfo**/s"==== 初始化表SQL:$createSQL")
helper.executeUpdate(createSQL)
}
val fields = schema.fields
override def commit(): WriterCommitMessage = {
println("executeUpdateBatch:" + sqls.length)
if (sqls.length > 0) {
helper.executeUpdateBatch(sqls)
}
val batchSQL = sqls.mkString("\n")
// logDebug(batchSQL)
println(batchSQL)
new WriterCommitMessage {
override def toString: String = s"批量插入SQL: $batchSQL"
}
}
override def write(record: InternalRow): Unit = {
if(StringUtils.isEmpty(opType)) {
throw new RuntimeException("未传入opTypeField字段名称,无法确定数据持久化类型!")
}
var sqlStr: String = helper.getStatement(options.getFullTable, schema, record)
println(sqlStr)
logDebug(s"==== 拼接完成的INSERT SQL语句为:$sqlStr")
try {
if (StringUtils.isEmpty(sqlStr)) {
val msg = "==== 拼接INSERT SQL语句失败,因为该语句为NULL或EMPTY!"
logError(msg)
throw new RuntimeException(msg)
}
//Thread.sleep(options.getInterval())
// 在流处理模式下操作
if (null == batchMode) {
if (streamMode == OutputMode.Append) {
sqls += sqlStr
// val state = helper.executeUpdate(sqlStr)
// println(s"==== 在OutputMode.Append模式下执行:$sqlStr\n状态:$state")
}
else if(streamMode == OutputMode.Complete) {logError("==== 未实现OutputMode.Complete模式下的写入操作,请在CKDataWriter.write方法中添加相关实现!")}
else if(streamMode == OutputMode.Update) {logError("==== 未实现OutputMode.Update模式下的写入操作,请在CKDataWriter.write方法中添加相关实现!")}
else {logError(s"==== 未知模式下的写入操作,请在CKDataWriter.write方法中添加相关实现!")}
// 在批处理模式下操作
} else {
if (batchMode == SaveMode.Append) {
sqls += sqlStr
//val state = helper.executeUpdate(sqlStr)
//println(s"==== 在SaveMode.Append模式下执行:$sqlStr\n状态:$state")
}
else if(batchMode == SaveMode.Overwrite) {logError("==== 未实现SaveMode.Overwrite模式下的写入操作,请在CKDataWriter.write方法中添加相关实现!")}
else if(batchMode == SaveMode.ErrorIfExists) {logError("==== 未实现SaveMode.ErrorIfExists模式下的写入操作,请在CKDataWriter.write方法中添加相关实现!")}
else if(batchMode == SaveMode.Ignore) {logError("==== 未实现SaveMode.Ignore模式下的写入操作,请在CKDataWriter.write方法中添加相关实现!")}
else {logError(s"==== 未知模式下的写入操作,请在CKDataWriter.write方法中添加相关实现!")}
}
} catch {
case e: Exception => logError(e.getMessage)
}
}
override def abort(): Unit = {}
}
- 在ClickHouseHelper单例对象中实现数据添加、修改、删除等方法
package cn.it.logistics.etl.realtime.ext
import java.sql.{Connection, Date, PreparedStatement, ResultSet, Statement}
import java.text.SimpleDateFormat
import java.util
import org.apache.commons.lang3.StringUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.javatuples.Triplet
import ru.yandex.clickhouse.domain.ClickHouseDataType
import ru.yandex.clickhouse.response.{ClickHouseResultSet, ClickHouseResultSetMetaData}
import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseDataSource, ClickHouseStatement}
import ru.yandex.clickhouse.settings.ClickHouseProperties
import org.apache.spark.sql.types.{BooleanType, DataType, DataTypes, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, StructField, StructType}
import scala.collection.mutable.ArrayBuffer
/**
* clickHouse操作的工具类
*/
class ClickHouseHelper(options: ClickHouseOptions) extends Logging{
private val opType: String = options.getOpTypeField
private var connection: ClickHouseConnection = getConnection
private val id: String = options.getPrimaryKey
/**
* 获取Clickhouse的连接对象
*/
def getConnection = {
//获取clickhouse的连接字符串
val url: String = options.getURL
//创建clickhouseDataSource对象
val clickHouseDataSource: ClickHouseDataSource = new ClickHouseDataSource(url, new ClickHouseProperties())
//返回clickhouse的连接对象
clickHouseDataSource.getConnection
}
/**
* 返回指定表的schema信息
* @return StructType:sparkDataFrame对象的schema信息
*/
def getSparkTableSchema: StructType = {
import collection.JavaConversions._
val clickHouseTableSchema: util.LinkedList[Triplet[String, String, String]] = getClickHouseTableSchema
//println(clickHouseTableSchema)
val fileds = ArrayBuffer[StructField]()
//基于clickhouse的表的列及列的类型创建schema对象
for (trp <- clickHouseTableSchema) {
fileds += StructField(trp.getValue0, getSparkSqlType(trp.getValue1))
}
//返回structType对象,该对象就是schema
StructType(fileds)
}
/**
* 根据clickhouseTable的列及列的类型集合
*/
def getClickHouseTableSchema = {
//定义列的集合
val fileds: util.LinkedList[Triplet[String, String, String]] = new util.LinkedList[Triplet[String, String, String]]()
//查询指定的表数据,返回查询到的结果及列的信息
//定义clickhouse的connection对象
var connection: ClickHouseConnection = null
var statement: ClickHouseStatement = null
var resultSet: ClickHouseResultSet = null
var metaData: ClickHouseResultSetMetaData = null
try {
//获取connection的连接对象
connection = getConnection
statement = connection.createStatement()
//定义要操作的表的sql语句,目前我们需要的是表的字段及字段类型,而不关心表的数据,因此给定不能满足的查询条件
val sql: String = s"select * FROM ${options.getFullTable} where 1=0"
resultSet = statement.executeQuery(sql).asInstanceOf[ClickHouseResultSet]
//获取到了指定表的元数据信息
metaData = resultSet.getMetaData.asInstanceOf[ClickHouseResultSetMetaData]
val columnCount: Int = metaData.getColumnCount
for (i <- 1 to columnCount) {
val columnName: String = metaData.getColumnName(i)
val columnTypeName: String = metaData.getColumnTypeName(i)
val javaTypeName: String = ClickHouseDataType.fromTypeString(columnTypeName).getJavaClass.getSimpleName
println("columnTypeName:"+columnTypeName)
println("javaTypeName:"+javaTypeName)
fileds.add(new Triplet(columnName, columnTypeName, javaTypeName))
}
} catch {
case ex: Exception => ex.printStackTrace()
} finally {
if (statement != null) statement.close()
if (connection != null) connection.close()
}
fileds
}
def createTable(table: String, schema: StructType): String = {
val cols = ArrayBuffer[String]()
for (field <- schema.fields) {
val dataType = field.dataType
val ckColName = field.name
if (ckColName != opType) {
var ckColType = getClickhouseSqlType(dataType)
if (!StringUtils.isEmpty(ckColType)) {
if (ckColType.toLowerCase == "string") {
ckColType = "Nullable(" + ckColType + ")"
}
}
cols += ckColName + " " + ckColType
}
}
s"CREATE TABLE IF NOT EXISTS $table(${cols.mkString(",")},sign Int8,version UInt64) ENGINE=VersionedCollapsingMergeTree(sign, version) ORDER BY $id"
}
def executeUpdate(sql: String): Int = {
var state = 0;
var st: ClickHouseStatement = null;
try {
if (null == connection || connection.isClosed) {
connection = getConnection
}
st = connection createStatement()
state = st.executeUpdate(sql)
} catch {
case e: Exception => logError(s"执行异常:$sql\n${e.getMessage}")
} finally {
//closeAll(connection, st)
}
state
}
def executeUpdateBatch(sqlArray: ArrayBuffer[String]) = {
//sql操作包括了insert、update、delete
var batchSQL: StringBuffer = new StringBuffer()
var statement: ClickHouseStatement = null
try {
statement = clickHouseConnection.createStatement()
if (clickHouseConnection == null || clickHouseConnection.isClosed) {
clickHouseConnection = getConnection
}
//insert语句是否出现过
var insertFlag: Boolean = false
for (i <- 0 until sqlArray.length) {
val line: String = sqlArray(i)
//插入操作
if (line.toLowerCase.contains("insert") && line.toLowerCase.contains("values")) {
//找到values这个关键字出现的位置
val offset: Int = line.indexOf("VALUES")
if (!insertFlag) {
//第一次出现insert的关键字
//截取字符串,根据values这个关键字截取:INSERT INTO tbl_areas(citycode,id,lat,level,lng,mername,name,pid,pinyin,sname,yzcode,sign,version) VALUES
val prefix: String = line.substring(0, offset + 6)
batchSQL.append(prefix)
}
//截取插入操作sql的后缀
val suffix: String = line.substring(offset + 6)
batchSQL.append(suffix)
insertFlag = true
}
else if (line.toLowerCase.contains("update")) { //更新操作
//如果更新操作出现之前,已经出现了插入操作
if (insertFlag) {
println("拼接好的批量更新操作的sql语句:" + batchSQL.toString)
statement.executeUpdate(batchSQL.toString)
}
println("单条更新操作的sql语句:" + line)
statement.executeUpdate(line)
batchSQL = new StringBuffer()
insertFlag = false
} else if (line.toLowerCase().contains("delete")) { //删除操作
//如果删除操作出现之前,已经出现了插入操作
if (insertFlag) {
println("拼接好的批量更新操作的sql语句:" + batchSQL.toString)
statement.executeUpdate(batchSQL.toString)
}
println("单条删除操作的sql语句:" + line)
statement.executeUpdate(line)
batchSQL = new StringBuffer()
insertFlag = false
}
//如果只有插入操作则完成数据的写入
if (!batchSQL.toString.isEmpty() && i == sqlArray.length - 1) {
println("拼接好的批量更新操作的sql语句:" + batchSQL.toString)
statement.executeUpdate(batchSQL.toString)
}
}
} catch {
case ex: Exception => ex.printStackTrace()
}
}
def getStatement(table: String, schema: StructType, record: InternalRow): String = {
val opTypeValue: String = getFieldValue(opType, schema, record).toString
if (opTypeValue.toLowerCase() == "insert") {
getInsertStatement(table, schema, record)
}
else if (opTypeValue.toLowerCase() == "delete") {
getUpdateStatement(table, schema, record)
}
else if (opTypeValue.toLowerCase() == "update") {
getDeleteStatement(table, schema, record)
}
else {
""
}
}
def getSelectStatement(schema: StructType): String = {
s"SELECT ${schema.fieldNames.mkString(",")} FROM ${options.getFullTable}"
}
def getInsertStatement(table: String, schema: StructType, data: InternalRow): String = {
val fields = schema.fields
val names = ArrayBuffer[String]()
val values = ArrayBuffer[String]()
// // 表示DataFrame中的字段与数据库中的字段相同,拼接SQL语句时使用全量字段拼接
// if (data.numFields == fields.length) {
// } else { // 表示DataFrame中的字段与数据库中的字段不同,拼接SQL时需要仅拼接DataFrame中有的字段到SQL中
// }
for (i <- 0 until fields.length) {
val field = fields(i)
val fieldType = field.dataType
val fieldName = field.name
if (fieldName != opType) {
val fieldValue = fieldType match {
case DataTypes.BooleanType => if (data.isNullAt(i)) "NULL" else s"${data.getBoolean(i)}"
case DataTypes.DoubleType => if (data.isNullAt(i)) "NULL" else s"${data.getDouble(i)}"
case DataTypes.FloatType => if (data.isNullAt(i)) "NULL" else s"${data.getFloat(i)}"
case DataTypes.IntegerType => if (data.isNullAt(i)) "NULL" else s"${data.getInt(i)}"
case DataTypes.LongType => if (data.isNullAt(i)) "NULL" else s"${data.getLong(i)}"
case DataTypes.ShortType => if (data.isNullAt(i)) "NULL" else s"${data.getShort(i)}"
case DataTypes.StringType => if (data.isNullAt(i)) "NULL" else s"'${data.getUTF8String(i).toString.trim}'"
case DataTypes.DateType => if (data.isNullAt(i)) "NULL" else s"'${new SimpleDateFormat("yyyy-MM-dd").format(new Date(data.get(i, DateType).asInstanceOf[Date].getTime / 1000))}'"
case DataTypes.TimestampType => if (data.isNullAt(i)) "NULL" else s"'${new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(data.getLong(i) / 1000))}'"
case DataTypes.BinaryType => if (data.isNullAt(i)) "NULL" else s"${data.getBinary(i)}"
case DataTypes.NullType => "NULL"
}
names += fieldName
values += fieldValue
}
}
if (names.length > 0 && values.length > 0) {
names += ("sign", "version")
values += ("1", System.currentTimeMillis().toString)
}
val strSql = s"INSERT INTO $table(${names.mkString(",")}) VALUES(${values.mkString(",")})"
strSql
}
def getDeleteStatement(table: String, schema: StructType, data: InternalRow): String = {
val fields = schema.fields
val primaryKeyFields = if (options.getPrimaryKey.isEmpty) {
fields.filter(field => field.name == "id")
} else {
fields.filter(field => field.name == options.getPrimaryKey)
}
if (primaryKeyFields.length > 0) {
val primaryKeyField = primaryKeyFields(0)
val primaryKeyValue = getFieldValue(primaryKeyField.name, schema, data)
s"ALTER TABLE $table DELETE WHERE ${primaryKeyField.name} = $primaryKeyValue"
} else {
logError("==== 找不到主键,无法生成删除SQL!")
""
}
}
def getUpdateStatement(table: String, schema: StructType, data: InternalRow): String = {
val fields = schema.fields
val primaryKeyFields = if (options.getPrimaryKey.isEmpty) {
fields.filter(field => field.name == "id")
} else {
fields.filter(field => field.name == options.getPrimaryKey)
}
if (primaryKeyFields.length > 0) {
val primaryKeyField = primaryKeyFields(0)
val primaryKeyValue = getFieldValue(primaryKeyField.name, schema, data)
val noPrimaryKeyFields = fields.filter(field => field.name != primaryKeyField.name)
var sets = ArrayBuffer[String]()
for (i <- 0 until noPrimaryKeyFields.length) {
val noPrimaryKeyField = noPrimaryKeyFields(i)
val set = noPrimaryKeyField.name + "=" + getFieldValue(noPrimaryKeyField.name, schema, data).toString
sets += set
}
sets.remove(sets.length - 1)
s"ALTER TABLE $table UPDATE ${sets.mkString(" AND ")} WHERE ${primaryKeyField.name}=$primaryKeyValue"
} else {
logError("==== 找不到主键,无法生成修改SQL!")
""
}
}
private def getFieldValue(fieldName: String, schema: StructType, data: InternalRow): Any = {
var flag = true
var fieldValue: String = null
val fields = schema.fields
for (i <- 0 until fields.length if flag) {
val field = fields(i)
if (fieldName == field.name) {
fieldValue = field.dataType match {
case DataTypes.BooleanType => if (data.isNullAt(i)) "NULL" else s"${data.getBoolean(i)}"
case DataTypes.DoubleType => if (data.isNullAt(i)) "NULL" else s"${data.getDouble(i)}"
case DataTypes.FloatType => if (data.isNullAt(i)) "NULL" else s"${data.getFloat(i)}"
case DataTypes.IntegerType => if (data.isNullAt(i)) "NULL" else s"${data.getInt(i)}"
case DataTypes.LongType => if (data.isNullAt(i)) "NULL" else s"${data.getLong(i)}"
case DataTypes.ShortType => if (data.isNullAt(i)) "NULL" else s"${data.getShort(i)}"
case DataTypes.StringType => if (data.isNullAt(i)) "NULL" else s"${data.getUTF8String(i).toString.trim}"
case DataTypes.DateType => if (data.isNullAt(i)) "NULL" else s"'${new SimpleDateFormat("yyyy-MM-dd").format(new Date(data.get(i, DateType).asInstanceOf[Date].getTime / 1000))}'"
case DataTypes.TimestampType => if (data.isNullAt(i)) "NULL" else s"${new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(data.getLong(i) / 1000))}"
case DataTypes.BinaryType => if (data.isNullAt(i)) "NULL" else s"${data.getBinary(i)}"
case DataTypes.NullType => "NULL"
}
flag = false
}
}
fieldValue
}
def closeAll(connection: Connection = null, st: Statement = null, ps: PreparedStatement = null, rs: ResultSet = null): Unit = {
try {
if (rs != null && !rs.isClosed) rs.close()
if (ps != null && !ps.isClosed) ps.close()
if (st != null && !st.isClosed) st.close()
if (connection != null && !connection.isClosed) connection.close()
} catch {
case e: Exception => e.printStackTrace()
}
}
/**
* IntervalYear (Types.INTEGER, Integer.class, true, 19, 0),
* IntervalQuarter (Types.INTEGER, Integer.class, true, 19, 0),
* IntervalMonth (Types.INTEGER, Integer.class, true, 19, 0),
* IntervalWeek (Types.INTEGER, Integer.class, true, 19, 0),
* IntervalDay (Types.INTEGER, Integer.class, true, 19, 0),
* IntervalHour (Types.INTEGER, Integer.class, true, 19, 0),
* IntervalMinute (Types.INTEGER, Integer.class, true, 19, 0),
* IntervalSecond (Types.INTEGER, Integer.class, true, 19, 0),
* UInt64 (Types.BIGINT, BigInteger.class, false, 19, 0),
* UInt32 (Types.INTEGER, Long.class, false, 10, 0),
* UInt16 (Types.SMALLINT, Integer.class, false, 5, 0),
* UInt8 (Types.TINYINT, Integer.class, false, 3, 0),
* Int64 (Types.BIGINT, Long.class, true, 20, 0, "BIGINT"),
* Int32 (Types.INTEGER, Integer.class, true, 11, 0, "INTEGER", "INT"),
* Int16 (Types.SMALLINT, Integer.class, true, 6, 0, "SMALLINT"),
* Int8 (Types.TINYINT, Integer.class, true, 4, 0, "TINYINT"),
* Date (Types.DATE, Date.class, false, 10, 0),
* DateTime (Types.TIMESTAMP, Timestamp.class, false, 19, 0, "TIMESTAMP"),
* Enum8 (Types.VARCHAR, String.class, false, 0, 0),
* Enum16 (Types.VARCHAR, String.class, false, 0, 0),
* Float32 (Types.FLOAT, Float.class, true, 8, 8, "FLOAT"),
* Float64 (Types.DOUBLE, Double.class, true, 17, 17, "DOUBLE"),
* Decimal32 (Types.DECIMAL, BigDecimal.class, true, 9, 9),
* Decimal64 (Types.DECIMAL, BigDecimal.class, true, 18, 18),
* Decimal128 (Types.DECIMAL, BigDecimal.class, true, 38, 38),
* Decimal (Types.DECIMAL, BigDecimal.class, true, 0, 0, "DEC"),
* UUID (Types.OTHER, UUID.class, false, 36, 0),
* String (Types.VARCHAR, String.class, false, 0, 0, "LONGBLOB", "MEDIUMBLOB", "TINYBLOB", "MEDIUMTEXT", "CHAR", "VARCHAR", "TEXT", "TINYTEXT", "LONGTEXT", "BLOB"),
* FixedString (Types.CHAR, String.class, false, -1, 0, "BINARY"),
* Nothing (Types.NULL, Object.class, false, 0, 0),
* Nested (Types.STRUCT, String.class, false, 0, 0),
* Tuple (Types.OTHER, String.class, false, 0, 0),
* Array (Types.ARRAY, Array.class, false, 0, 0),
* AggregateFunction (Types.OTHER, String.class, false, 0, 0),
* Unknown (Types.OTHER, String.class, false, 0, 0);
*
* @param clickhouseDataType
* @return
*/
private def getSparkSqlType(clickhouseDataType: String) = clickhouseDataType match {
case "IntervalYear" => DataTypes.IntegerType
case "IntervalQuarter" => DataTypes.IntegerType
case "IntervalMonth" => DataTypes.IntegerType
case "IntervalWeek" => DataTypes.IntegerType
case "IntervalDay" => DataTypes.IntegerType
case "IntervalHour" => DataTypes.IntegerType
case "IntervalMinute" => DataTypes.IntegerType
case "IntervalSecond" => DataTypes.IntegerType
case "UInt64" => DataTypes.LongType //DataTypes.IntegerType;
case "UInt32" => DataTypes.LongType
case "UInt16" => DataTypes.IntegerType
case "UInt8" => DataTypes.IntegerType
case "Int64" => DataTypes.LongType
case "Int32" => DataTypes.IntegerType
case "Int16" => DataTypes.IntegerType
case "Int8" => DataTypes.IntegerType
case "Date" => DataTypes.DateType
case "DateTime" => DataTypes.TimestampType
case "Enum8" => DataTypes.StringType
case "Enum16" => DataTypes.StringType
case "Float32" => DataTypes.FloatType
case "Float64" => DataTypes.DoubleType
case "Decimal32" => DataTypes.createDecimalType
case "Decimal64" => DataTypes.createDecimalType
case "Decimal128" => DataTypes.createDecimalType
case "Decimal" => DataTypes.createDecimalType
case "UUID" => DataTypes.StringType
case "String" => DataTypes.StringType
case "FixedString" => DataTypes.StringType
case "Nothing" => DataTypes.NullType
case "Nested" => DataTypes.StringType
case "Tuple" => DataTypes.StringType
case "Array" => DataTypes.StringType
case "AggregateFunction" => DataTypes.StringType
case "Unknown" => DataTypes.StringType
case _ => DataTypes.NullType
}
private def getClickhouseSqlType(sparkDataType: DataType) = sparkDataType match {
case DataTypes.ByteType => "Int8"
case DataTypes.ShortType => "Int16"
case DataTypes.IntegerType => "Int32"
case DataTypes.FloatType => "Float32"
case DataTypes.DoubleType => "Float64"
case DataTypes.LongType => "Int64"
case DataTypes.DateType => "DateTime"
case DataTypes.TimestampType => "DateTime"
case DataTypes.StringType => "String"
case DataTypes.NullType => "String"
}
}
- 为ClickHouseDataSourceV2 对象的createWriter方法赋值
/** 批处理方式下的数据写入 */
override def createWriter(writeUUID: String, schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[DataSourceWriter] = Optional.of(new ClickHouseWriter(writeUUID, schema, mode, null, new ClickHouseOptions(options.asMap())))
/** 流处理方式下的数据写入 */
override def createStreamWriter(queryId: String, schema: StructType, mode: OutputMode, opt
- 点赞
- 收藏
- 关注作者
评论(0)