MySQL到ES近实时同步方案
一、场景描述
受限于MySQL的ACID特性以及可扩展性,其检索能力以及OLAP/OLTP处理能力相对比较弱。CSS/ES支持文本、时间、数字、空间坐标、向量等数据类型构建索引,拥有非常强大的数据检索与分析能力;通过将CSS/ES通过作为MySQL的备数据库,可以补齐MySQL中的数据检索能力与OLAP与OLTP处理能力。
如下方案用于将Mysql中的数据近实时同步到CSS/ES集群。
二、前置条件
- RDS实例已创建好。
- 需要近实时同步的表中有增量字段,并且字段的类型为时间或数字类型。
三、操作步骤
-
创建ES/CSS实例。
a) 版本建议选择7.6.2版本
b) 建议打开安全模式,但是关闭https访问
c) 安全组中需要放开安全组内9300通信,不然集群会创建失败 -
根据MySQL中的表结构,在ES/CSS中新建索引。
-
创建ECS实例.
a) VPC与安全组与RDS保持一致
b) OS建议选择:CentOS 8.0
c) 绑定公网访问,以方便后续工具安装 -
安装java。
a) 建议选择OpenJDK 1.8,相关命令如下:yum install java
-
安装Logstash。
a) 下载OSS版本的Logstash,7.x版本的ES集群建议使用Logstash OSS 7.10.0),相关命令如下:
wget https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.10.0-linux-x86_64.tar.gz
b) 下载后进行解压,解压命令样例:
tar -zxvf logstash-oss-7.10.0-linux-x86_64.tar.gz
-
下载mysql驱动。
a) 选择对应版本的 mysql-connector-java包,样例如下:
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.21/mysql-connector-java-8.0.21.jar
-
配置Logstash。
a) 修改Jvm参数。
将jvm.options中的-Xms1g -Xmx1g优化为内存的一半(比如4u16g的节点,上述两个参数修改为-Xms8g -Xmx8g)。
b) 配置logstash-sample.conf。
说明:Logstash的配置由3部分组成,分别为:input,filter,output;其中input说明数据来源,filter中执行相关的ETL处理,output中说明数据的目的端。input { } filter{ } output{ }
b.1) 配置Input。
假设mydb数据库中有一张表person,其包含如下3个字段:id bigint(20) UNSIGNED not null primary key auto_increment comment "主键" createdat datetime not null default now() comment "创建时间" desc varchar(50) character set utf8mb4 comment "备注信息"
那么其input参考如下:
input { jdbc { jdbc_driver_library => "/root/logstash-7.10.0/lib/mysql-connector-java-8.0.21.jar" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.0.113:3306/mydb" jdbc_user => "root" jdbc_password => "********" jdbc_paging_enabled => true jdbc_page_size => 1000 schedule => "* * * * *" statement => "select * from person where createat >= :sql_last_value" use_column_value => true tracking_column_type => "timestamp" tracking_column => "createat" last_run_metadata_path => "/root/logstash-7.10.0/config/checkpoint" } }
其中:
tracking_column 为增量字段,用于区分增量数据,需要跟statement中的字段保持一致。
tracking_column_type为增量字段的类型,可以是timestamp或numeric类型。
last_run_metadata_path中记录的是起始值,其数据类型需要跟tracking_column保持一致。随着每次调度的执行,该值会自动进行刷新。
schedule为调度周期,其格式与crontab保持一致,当前为1分钟同步一次。b.2) 配置filter。
filter { mutate { remove_field => ["@timestamp", "@version"] } }
b.3) 配置output。
场景一:ES 为安全模式,但不开启httpsoutput { elasticsearch { hosts => ["http://192.168.0.72:9200"] index => "person" user => "admin" password => "********" document_id => "%{id}" ilm_enabled => false } }
其中:
document_id中的字段为MySQL表中主键。这是MySQL中的数据与ES中数据保持一致的关键。场景二:ES为安全模式,并且开启https
output { elasticsearch { hosts => ["https://192.168.0.117:9200"] index => "person" user => "admin" password => "********" document_id => "%{id}" cacert => "/opt/CloudSearchService.cer" ssl_certificate_verification => false ilm_enabled => false ssl => true } }
其中:
document_id中的字段为MySQL表中主键。这是MySQL中的数据与ES中数据保持一致的关键。
cacert需要到ES/CSS管理控制台->集群基本信息中下载。 -
运行Logstash
a) 运行命令如下:nohup ./bin/logstash -f ./config/logstash-sample.conf --path.data=
b) 到CSS/ES中观察同步结果。
- 点赞
- 收藏
- 关注作者
评论(0)