OLAP之ClickHouse数据读写
ClickHouse是俄罗斯的Yandex于2016年开源的列式存储数据库(DBMS),使用C++语言编写,主要用于在线分析处理查询(OLAP),能够使用SQL查询实时生成分析数据报告。OLAP(On-Line Analytical Processing)翻译为联机分析处理,专注于分析处理,从对数据库操作来看,OLAP是对数据的查询.OLTP(on-line transaction processing)翻译为联机事务处理,专注于事务处理,从对数据库操作来看,OLTP主要是对数据的增删改。
ClickHouse是一个完全的列式数据库管理系统,允许在运行时创建表和数据库,加载数据和运行查询,而无需重新配置和重新启动服务器,支持线性扩展,简单方便,高可靠性,容错。它在大数据领域没有走 Hadoop 生态,而是采用 Local attached storage 作为存储,这样整个 IO 可能就没有 Hadoop 那一套的局限。它的系统在生产环境中可以应用到比较大的规模,因为它的线性扩展能力和可靠性保障能够原生支持 shard + replication 这种解决方案。它还提供了一些 SQL 直接接口,有比较丰富的原生 client。另外就是它比较快。
Java 读写ClickHouse
-
添加依赖
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
-
Java 读取ClickHouse单节点表数据
ClickHouseProperties props = new ClickHouseProperties();
props.setUser("default");
props.setPassword("123456");
BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://hadoop10:8123/default", props);
ClickHouseConnection conn = dataSource.getConnection();
ClickHouseStatement statement = conn.createStatement();
ResultSet rs = statement.executeQuery("select id,name,age from test");
while(rs.next()){
int id = rs.getInt("id");
String name = rs.getString("name");
int age = rs.getInt("age");
System.out.println("id = "+id+",name = "+name +",age = "+age);
}
如何向clickhouse导入数据
用python生成测试数据
import csv
import random
from datetime import datetime, timedelta
# 生成测试数据
rows = []
start_date = datetime(2022, 1, 1)
for i in range(1, 1000001):
date = start_date + timedelta(days=random.randint(0, 365))
product = random.choice(['Product A', 'Product B', 'Product C'])
channel = random.choice(['Online', 'Offline'])
customer = f"Customer {random.randint(1, 1000)}"
region = random.choice(['North', 'South', 'East', 'West'])
sales = round(random.uniform(100, 1000), 2)
quantity = random.randint(1, 10)
row = [date.strftime('%Y-%m-%d'), product, channel, customer, region, sales, quantity]
rows.append(row)
# 将测试数据写入CSV文件
with open('sales_data.csv', 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['Date', 'Product', 'Channel', 'Customer', 'Region', 'Sales', 'Quantity'])
writer.writerows(rows)
去除文件的第一行 sed -i "1d" /opt/data/sales_data.csv
CREATE TABLE sales (
date Date,
product String,
channel String,
customer String,
region String,
sales Float32,
quantity Int32
) ENGINE = MergeTree()
ORDER BY date;
一、clickhouse自带的命令,将linux系统上的csv文件导入clickhouse
cat sales_data.csv | clickhouse-client --host=hadoop10 --port=9000 --user=default --password 123456 --query="INSERT INTO default.sales FORMAT CSV"
二、datax将hdfs上的文件导入clickhouse
-
增加clickhouse-write插件
clickhousewriter.zip解压,然后上传到datax的/opt/installs/datax/plugin/writer/目录下
-
编写json抽取数据脚本
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/sales_data.csv",
"defaultFS": "hdfs://hadoop10:8020",
"column": ["*"],
"fileType": "csv",
"encoding": "UTF-8"
}
},
"writer":{
"name": "clickhousewriter",
"parameter": {
"username": "default",
"password": "123456",
"column": ["date","product","channel" ,"customer" ,"region","sales" ,"quantity"],
"connection": [
{
"jdbcUrl": "jdbc:clickhouse://hadoop10:8123/default",
"table": ["sales"]
}
],
"preSql": [],
"postSql": [],
"batchSize": 65536,
"batchByteSize": 134217728,
"dryRun": false,
"writeMode": "insert"
}
}
}
]
}
}
技术总结
- 点赞
- 收藏
- 关注作者
评论(0)