从零开始实施推荐系统的落地部署——22.推荐系统案例(十二)使用canal做mysql准实时数据同步到ES
我的2021年的第一篇博客写的是canal做mysql准实时数据同步到ES,它的特点是伪装mysql slave,直接监听mysql binlog。搭配logstash防止数据采集是因某些原因丢失,起到高可用的作用。而elasticsearch7创建索引,其功能避免重复采集数据,已确保数据的唯一性。原本是去年应该写的,已经部署过canal-1.1.5搭配elasticsearch7.6.2,能全量同步,却无法做到增量同步。现在选择canal-1.1.4部署是因为这个版本新增python客户端(https://github.com/haozi3156666/canal-python),而不选择canal-1.1.3,是因为它不支持python客户端。虽然canal-1.1.4不支持elasticsearch7,但是可以通过修改源码来实现支持elasticsearch7.6.2。进入到elasticsearch的pom.xml里,把elasticsearch6.4.3的版本改为7.6.2。使用mvn clean package –DskipTests 重新打包编译。
出现上面这个情况,是链接有问题,在主目录的pom.xml里面把http://repo1.maven.org/maven2改为https://repo1.maven.org/maven2即可。编译过程中出现下面情况:
[ERROR] /C:/Users/wy1/Desktop/canal/canal-1.1.4/canal-canal-1.1.4/client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter
/es/ESAdapter.java:[225,56] 不兼容的类型: org.apache.lucene.search.TotalHits
无法转换为long
[ERROR] /C:/Users/wy1/Desktop/canal/canal-1.1.4/canal-canal-1.1.4/client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESConnection.java:[420,47] 无法将类 org.elasticsearch.client.RestHighLevelClient中的方法 bulk应用到给定类型;
[ERROR] 需要: org.elasticsearch.action.bulk.BulkRequest,org.elasticsearch.client.RequestOptions
[ERROR] 找到: org.elasticsearch.action.bulk.BulkRequest
[ERROR] 原因: 实际参数列表和形式参数列表长度不同
解决的办法:
(1)定位到ESAdapter.java:[225,56]把long rowCount = response.getHits().getTotalHits();添加.value改成long rowCount = response.getHits().getTotalHits().value;
(2)定位到ESConnection.java:[420,47],在420行的括号里添加,RequestOptions.DEFAULT即可。出现下面情况,说明已经编译成功。
1.1关于mysql的字符设置问题,之前有在大数据全栈成长论坛提问过,不过在当天已经解决。同时非常感谢考过IE励志当攻城狮的帮助。使用set character_set_client=utf8或 set names utf8修改mysql数据库的字符编码,重启系统后无效,要在my.cnf配置文件修改才有效。在【client】下面添加character_set_client=utf8会报错mysql: unknown variable 'character_set_client=utf8'。应该修改为default-character-set=utf8,启动后修改字符编码成功。部署canal,必须在my.cnf的在[mysqld]下面的位置的添加:
log-bin = mysql-bin
binlog-format=ROW
server_id=1
查看mysql是否打卡log_bin
show variables like '%bin%';
1.2创建授权。进入mysql创建用户以及给此用户权限。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
2安装安装canal. Deployer
2.1 建立目录和解压安装包
mkdir -p /opt/modules/canal
tar -zxvf /opt/canal-1.1.4/canal.deployer-1.1.4.tar.gz -C /opt/modules/canal
cd /opt/modules/canal
2.2 修改配置
vi /opt/modules/canal/conf/example/instance.properties
canal.instance.mysql.slaveId=1234
canal.instance.gtidon=false
canal.instance.master.address=192.168.56.102:3306
canal.instance.tsdb.enable=true
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.enableDruid=false
canal.instance.filter.regex=dianpingdb.shop
canal.mq.topic=example
canal.mq.partition=0
2.3启动canal
cd /opt/modules/canal
sh bin/startup.sh
出现下面情况说明canal.deployer已经启动成功。
2021-01-01 23:15:38.106 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2021-01-01 23:15:38.271 [destination = example , address = /192.168.56.102:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2021-01-01 23:15:38.271 [destination = example , address = /192.168.56.102:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2021-01-01 23:15:39.169 [destination = example , address = /192.168.56.102:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4,serverId=1,gtid=<null>,timestamp=1609486247000] cost : 882ms , the next step is binlog dump
3 安装 canal-adapter
3.1建立目录和解压安装包
mkdir -p /opt/modules/canal-adapter
tar -zxvf /opt/canal-1.1.4/canal.adapter-1.1.4.tar.gz -C /opt/modules/canal-adapter
cd /opt/modules/canal-adapter
3.2修改配置
vi /opt/modules/canal-adapter/conf/application.yml
canal.conf:
mode: tcp # kafka rocketMQ
canalServerHost: 192.168.56.102:11111
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
username:
password:
vhost:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.56.102:3306/dianpingdb?useUnicode=true
username: canal
password: canal
canalAdapters:
- instance: example
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es7
hosts: 192.168.56.102:9300
properties:
cluster.name: wuyicom
touch /opt/modules/canal-adapter/conf/es/shop.yml
vi /opt/modules/canal-adapter/conf/es/shop.yml
dataSourceKey: defaultDS
destination: example
groupId:
esMapping:
_index: shop
_type: _doc
_id: id
upsert: true
sql: "select a.id,a.name,a.tags,concat(a.latitude,',',a.longitude) as location,a.remark_score,a.price_per_man,a.category_id,b.name as category_name,a.seller_id,c.remark_score as seller_remark_score,c.disabled_flag as seller_disabled_flag from shop a left join category b on a.category_id = b.id left join seller c on c.id = a.seller_id"
commitBatch: 3000
3.3在配置完成canal-adapter后还不要着急启动,需要事先在es中定义我们指定的索引shop。注意:canal-1.1.5版本跟canal-1.1.4的配置不同的是-name: es7改为-name: es即可,canal-1.1.4会主动找到conf/es的shop.yml。启动后,修改mysql的数据,数据增量同步到es7.6.2
- 点赞
- 收藏
- 关注作者
评论(0)