从零开始实施推荐系统的落地部署——22.推荐系统案例(十二)使用canal做mysql准实时数据同步到ES

举报
wuyicom 发表于 2021/01/03 00:10:56 2021/01/03
【摘要】     我的2021年的第一篇博客写的是canal做mysql准实时数据同步到ES,它的特点是伪装mysql slave,直接监听mysql binlog。搭配logstash防止数据采集是因某些原因丢失,起到高可用的作用。而elasticsearch7创建索引,其功能避免重复采集数据,已确保数据的唯一性。原本是去年应该写的,已经部署过canal-1.1.5搭配elasticsearch7....

canal1.jpg

    我的2021年的第一篇博客写的是canalmysql准实时数据同步到ES,它的特点是伪装mysql slave,直接监听mysql binlog。搭配logstash防止数据采集是因某些原因丢失,起到高可用的作用。而elasticsearch7创建索引,其功能避免重复采集数据,已确保数据的唯一性。原本是去年应该写的,已经部署过canal-1.1.5搭配elasticsearch7.6.2,能全量同步,却无法做到增量同步。现在选择canal-1.1.4部署是因为这个版本新增python客户端(https://github.com/haozi3156666/canal-python),而不选择canal-1.1.3,是因为它不支持python客户端。虽然canal-1.1.4不支持elasticsearch7,但是可以通过修改源码来实现支持elasticsearch7.6.2。进入到elasticsearchpom.xml里,把elasticsearch6.4.3的版本改为7.6.2。使用mvn clean package –DskipTests 重新打包编译。

2.png

    出现上面这个情况,是链接有问题,在主目录的pom.xml里面把http://repo1.maven.org/maven2改为https://repo1.maven.org/maven2即可。编译过程中出现下面情况:

[ERROR] /C:/Users/wy1/Desktop/canal/canal-1.1.4/canal-canal-1.1.4/client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter

/es/ESAdapter.java:[225,56] 不兼容的类型: org.apache.lucene.search.TotalHits

无法转换为long

[ERROR] /C:/Users/wy1/Desktop/canal/canal-1.1.4/canal-canal-1.1.4/client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESConnection.java:[420,47] 无法将类 org.elasticsearch.client.RestHighLevelClient中的方法 bulk应用到给定类型;

[ERROR] 需要: org.elasticsearch.action.bulk.BulkRequest,org.elasticsearch.client.RequestOptions

[ERROR] 找到: org.elasticsearch.action.bulk.BulkRequest

[ERROR] 原因: 实际参数列表和形式参数列表长度不同

解决的办法:

1)定位到ESAdapter.java:[225,56]long rowCount = response.getHits().getTotalHits();添加.value改成long rowCount = response.getHits().getTotalHits().value;

(2)定位到ESConnection.java:[420,47],在420行的括号里添加,RequestOptions.DEFAULT即可。出现下面情况,说明已经编译成功。

3.png

1.1关于mysql的字符设置问题,之前有在大数据全栈成长论坛提问过,不过在当天已经解决。同时非常感谢考过IE励志当攻城狮的帮助。使用set character_set_client=utf8set names utf8修改mysql数据库的字符编码,重启系统后无效,要在my.cnf配置文件修改才有效。在【client】下面添加character_set_client=utf8会报错mysql: unknown variable 'character_set_client=utf8'。应该修改为default-character-set=utf8,启动后修改字符编码成功。部署canal,必须在my.cnf的在[mysqld]下面的位置的添加:

log-bin = mysql-bin

binlog-format=ROW

server_id=1

查看mysql是否打卡log_bin

show variables like '%bin%';

   1.2创建授权。进入mysql创建用户以及给此用户权限。

CREATE USER canal IDENTIFIED BY 'canal'; 

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;  

FLUSH PRIVILEGES;

 

2安装安装canal. Deployer

2.1 建立目录和解压安装包

mkdir -p /opt/modules/canal

tar -zxvf /opt/canal-1.1.4/canal.deployer-1.1.4.tar.gz -C /opt/modules/canal

cd /opt/modules/canal

2.2 修改配置

vi /opt/modules/canal/conf/example/instance.properties

canal.instance.mysql.slaveId=1234

canal.instance.gtidon=false

canal.instance.master.address=192.168.56.102:3306

canal.instance.tsdb.enable=true

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

canal.instance.connectionCharset = UTF-8

canal.instance.enableDruid=false

canal.instance.filter.regex=dianpingdb.shop

canal.mq.topic=example

canal.mq.partition=0

2.3启动canal

cd /opt/modules/canal

sh bin/startup.sh

出现下面情况说明canal.deployer已经启动成功。

2021-01-01 23:15:38.106 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

2021-01-01 23:15:38.271 [destination = example , address = /192.168.56.102:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position

2021-01-01 23:15:38.271 [destination = example , address = /192.168.56.102:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status

2021-01-01 23:15:39.169 [destination = example , address = /192.168.56.102:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4,serverId=1,gtid=<null>,timestamp=1609486247000] cost : 882ms , the next step is binlog dump

 

  3 安装 canal-adapter

  3.1建立目录和解压安装包

mkdir -p /opt/modules/canal-adapter

tar -zxvf /opt/canal-1.1.4/canal.adapter-1.1.4.tar.gz -C /opt/modules/canal-adapter

cd /opt/modules/canal-adapter

  3.2修改配置

vi /opt/modules/canal-adapter/conf/application.yml

canal.conf:

  mode: tcp # kafka rocketMQ

  canalServerHost: 192.168.56.102:11111

 

  batchSize: 500

  syncBatchSize: 1000

  retries: 0

  timeout:

  accessKey:

  secretKey:

  username:

  password:

  vhost:

  srcDataSources:

    defaultDS:

      url: jdbc:mysql://192.168.56.102:3306/dianpingdb?useUnicode=true

      username: canal

      password: canal

  canalAdapters:

  - instance: example

    groups:

    - groupId: g1

      outerAdapters:

      - name: logger

      - name: es7

        hosts: 192.168.56.102:9300

        properties:

          cluster.name: wuyicom

touch /opt/modules/canal-adapter/conf/es/shop.yml

vi /opt/modules/canal-adapter/conf/es/shop.yml

dataSourceKey: defaultDS

destination: example

groupId:

esMapping:

        _index: shop

        _type: _doc

        _id: id

        upsert: true

        sql: "select a.id,a.name,a.tags,concat(a.latitude,',',a.longitude) as location,a.remark_score,a.price_per_man,a.category_id,b.name as category_name,a.seller_id,c.remark_score as seller_remark_score,c.disabled_flag as seller_disabled_flag from shop a left join category b on a.category_id = b.id left join seller c on c.id = a.seller_id"

        commitBatch: 3000

3.3在配置完成canal-adapter后还不要着急启动,需要事先在es中定义我们指定的索引shop。注意:canal-1.1.5版本跟canal-1.1.4的配置不同的是-name: es7改为-name: es即可,canal-1.1.4会主动找到conf/esshop.yml。启动后,修改mysql的数据,数据增量同步到es7.6.2

4.PNG

5.PNG

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。