在项目交付中经常会遇到各种异构数据库间的迁移替换,该场景重度依赖迁移工具完成同步。目前华为云上没有现成的数据迁移工具能够支持Clickhouse到GaussDB(DWS)的迁移,本文提供一种思路,通过Spark代码完成ClickHouse中表数据读取,并写入到GaussDB(DWS)中完成迁移。
样例参考代码如下:
#! /bin/bash/env python3 from pyspark.sql import SparkSession from datetime import datetime, timedelta import sys def ch2dws(): startTime = datetime.now() print("===== {}任务开始时间{}:".format("order2dws", startTime.strftime("%Y-%m-%d %H:%M:%S.%f"))) spark = SparkSession.builder.master("yarn").appName("ch2dws").getOrCreate() sql = """ select kid , ym , ymareaid , amount , order_total from xxxxx.xxxxxx oia where orderday='2021-06-03' limit 3000 """ print(sql) df = spark.read.format("jdbc").options( url="jdbc:clickhouse://xxxxx:8123/xxxxxxxx", driver="ru.yandex.clickhouse.ClickHouseDriver", dbtable="(%s) tp" % sql, user="xxxxxxxxx", password="xxxxxxxx").load() # append df.write.mode(saveMode="overwrite") \ .format("jdbc") \ .option("driver", "org.postgresql.Driver") \ .option("url", "jdbc:postgresql://xxxxxxxxx:8000/xxxxxxxxxx") \ .option("useSSL", "false") \ .option("user", "xxxxxxxxxx") \ .option("password", "xxxxxxxxx") \ .option("batchsize", 10000) \ .option("dbtable", "xxxxxxxxxx") \ .option("isolationLevel", "NONE") \ .option("truncate", False) \ .save() spark.stop() endTime = datetime.now() print("任务结束时间:{},总耗时{}".format(endTime, timedelta(seconds=(endTime - startTime).total_seconds()))) if __name__ == '__main__': # date_input = [it for it in sys.argv[1:]] ch2dws() |
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
评论(0)