大数据物流项目:业务服务器和大数据服务器(四)

举报
Maynor学长 发表于 2022/06/21 20:46:40 2022/06/21
【摘要】 theme: smartblue持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第8天,点击查看活动详情 业务服务器和大数据服务器 01-[复习]-上次课程内容回顾主要讲解2个方面内容:物流项目业务数据(数据库部署)和OGG 实时采集Oracle数据库表的数据。1、物流项目业务数据 实际物流快递公司来说,有很多业务系统,使用不同类型数据库存储数据,在此仅仅以2个业务系统...

业务服务器和大数据服务器

1612173547954

01-[复习]-上次课程内容回顾

主要讲解2个方面内容:物流项目业务数据(数据库部署)和OGG 实时采集Oracle数据库表的数据。

1、物流项目业务数据
	实际物流快递公司来说,有很多业务系统,使用不同类型数据库存储数据,在此仅仅以2个业务系统为例
	- 物流系统Logistics
		使用Oracle数据库
	- CRM系统客户关系管理系统
		使用MySQL数据库
	统一采用Docker容器部署业务数据库,为了方便学习业务:业务数据实时采集
	【针对上述2个业务系统数据,进行实时增量采集,分别使用不同采集框架:】
		- 物流系统Logistics,Oracle数据库使用OGG采集
		- CRM系统,MySQL数据库使用Canal采集
		【将OGG和Canal采用Docker容器部署框架,简化运维,环境构建】
	【node1.itcast.cn虚拟机上,有3个容器Container】
	- mysql 容器:运行MySQL数据库
	- canal-server 容器:运行Canal Server服务,采集MySQL数据
	- myoracle 容器:运行Oracle数据库和OGG服务框架,采集数据

2OGG 实时采集Oracle数据库表的数据
	- OGG 是什么,功能是啥
	- 基本原理
		源端SRC:捕获Oracle数据库日志数据
			MGR(管理者)、EXTRACT(提取进程)、本地缓存(Local TrailFile)、Pump进程(发送数据到目标端)
		目标端DST:解析日志数据,发送目标系统
			GR(管理者)、Collector(收集进程)、远程缓存(Remote TrailFile)、Replicat进程(复制进程,解析数据,发送Kafka或者其他)
	- 启动Oracle数据库和OGG服务,测试数据实时采集

02-[了解]-第4天:课程内容提纲

​ 主要讲解:如何将业务系统(仅仅以物流系统Logistics和客户关系管理系统CRM为例)实时增量采集数据到分布式消息队列Kafka(1个业务系统存储1个Topic:一对一)。

  • 、MySQL数据库(CRM系统):使用Canal框架实时采集

1615771459449

1)、Canal 实时采集MySQL数据库表数据【掌握】
	原理架构
	配置
	高可用HA集群,新版本
	使用测试
2)、大数据服务相关说明
	基于CM安装部署CDH,单机伪分布式
	CM安装CDH架构原理
3)、扩展:
	Oracle和MySQL数据库表数据实时同步,还有其他框架,比如flume,Maxwell

03-理解]-Canal 数据同步之MySQL binlog日志

​ Canal是阿里巴巴开发数据实时同步框架,原理与OGG基本类似,都是捕获数据库日志数据,进行解析,将其发送到目标端(比如Kafka 消息队列)。新版Canal 1.1版本

==针对MySQL数据库来说,日志数据:binlog日志,二进制日志。==

binlog中存储四种类型日志数据:插入insert、更新update、删除delete和清空truncate

1612326012096

默认情况下,MySQL数据库没有开启binlog日志,向表中CUDT操作时,不会记录日志到文件中。

1615781586606

[root@node1 ~]# docker exec -it mysql /bin/bash
root@8b5cd2152ed9:/# 
root@8b5cd2152ed9:/# more /etc/mysql/my.cnf
## 相关配置解析:
## [mysqld]
## log-bin=mysql-bin #添加这一行就ok
## binlog-format=ROW #选择row模式
## server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
## expire_logs_days=7 # binlog文件保存7天
## max_binlog_size=500m # 每个binlog日志文件大小

1615781688969

root@8b5cd2152ed9:/# mysql -uroot -p123456
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 6
Server version: 5.7.30-log MySQL Community Server (GPL)

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> 

1615781803587

可以查看MySQL数据库binlog日志文件存储目录

root@8b5cd2152ed9: ~# cd /var/lib/mysql
root@8b5cd2152ed9:/var/lib/mysql# ls -l
total 205364
-rw-r-----. 1 mysql mysql       56 May 29  2020 auto.cnf
-rw-------. 1 mysql mysql     1676 May 29  2020 ca-key.pem
-rw-r--r--. 1 mysql mysql     1112 May 29  2020 ca.pem
drwxr-x---. 2 mysql mysql       20 Jul  4  2020 canal_tsdb
-rw-r--r--. 1 mysql mysql     1112 May 29  2020 client-cert.pem
-rw-------. 1 mysql mysql     1676 May 29  2020 client-key.pem
drwxr-x---. 2 mysql mysql      186 Jul  4  2020 crm
-rw-r-----. 1 mysql mysql      576 Mar 13 08:50 ib_buffer_pool
-rw-r-----. 1 mysql mysql 50331648 Mar 15 02:18 ib_logfile0
-rw-r-----. 1 mysql mysql 50331648 Mar 15 02:18 ib_logfile1
-rw-r-----. 1 mysql mysql 79691776 Mar 15 02:18 ibdata1
-rw-r-----. 1 mysql mysql 12582912 Mar 15 02:22 ibtmp1
drwxr-x---. 2 mysql mysql     4096 May 29  2020 mysql
-rw-r-----. 1 mysql mysql 17271182 Mar 13 16:47 mysql-bin.000004
-rw-r-----. 1 mysql mysql      177 Mar 13 08:50 mysql-bin.000005
-rw-r-----. 1 mysql mysql      154 Mar 15 02:18 mysql-bin.000006
-rw-r-----. 1 mysql mysql       57 Mar 15 02:18 mysql-bin.index
drwxr-x---. 2 mysql mysql     8192 May 29  2020 performance_schema
-rw-------. 1 mysql mysql     1680 May 29  2020 private_key.pem
-rw-r--r--. 1 mysql mysql      452 May 29  2020 public_key.pem
-rw-r--r--. 1 mysql mysql     1112 May 29  2020 server-cert.pem
-rw-------. 1 mysql mysql     1676 May 29  2020 server-key.pem
drwxr-x---. 2 mysql mysql     8192 May 29  2020 sys
drwxr-x---. 2 mysql mysql       60 May 29  2020 test

04-[理解]-Canal 数据同步之工作原理

Canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

1612334014005

Canal起源于,阿里巴巴杭州和美国机房数据同步备份,衍生框架,目前支持功能与版本如下:

1615790940107

Canal框架原理,参考==MySQL数据主从复制原理==。

1612334597112

  • 1)、MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看);变更数据,记录日志:biglog
  • 2)、MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log);IO 线程,获取binlog
  • 3)、MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据;SQL线程,执行语句

1612334674650

  • 1)、canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • 2)、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • 3)、canal 解析 binary log 对象(原始为 byte 流);

1612334913065

上图中功能,属于Canal新版才有的(解析binlog日志,发送到Kafka消息队列)。

1615791603097

05-[理解]-Canal 数据同步之Canal 架构

Canal框架使用Java语言编写,启动服务Server,每个服务配置多个实例Instance,如下结构所示:

1615791782074

  • 1)、Canal Server 代表一个 Canal 运行实例,对应于一个 JVM;
  • 2)、Instance 对应于一个数据队列 (1个 Canal Server 对应 1…n 个 Instance)
    • 监控某个数据库database binlog日志,需要将数据,发送到Kafka消息队列和Es索引index
    • 发送到Kafka,称为1个实例:instance(binlog -> instance -> kafka)
    • 发送到Es,称为1个实例:instance(binlog -> instance -> es)
  • 3)、Instance下的子模块
    • eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析
    • eventSink: ParserStore 链接器,进行数据过滤,加工,分发的工作
    • eventStore:数据存储
    • metaManager:增量订阅 & 消费信息管理器

1612335671638

​ 总结:CanalServer来说,启动1个服务,运行多个instance实例(每个实例对应一个sink),每个实例instance包含四个部分:eventParse、eventSink、eventStore、metaManager

EventParser在向MySQL发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。

canal架构图.png

  • 1)、EventSink起到一个类似channel的功能,可以对数据进行过滤、分发/路由(1:n)、归并(n:1)和加
    工。EventSink是连接EventParser和EventStore的桥梁
  • 2)、EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据
    存储和读取的位置。

06-[掌握]-Canal 数据同步之Docker 安装部署

​ Canal 1.1.x版本开始,支持Docker容器部署,所以本项目中,采用Docker容器部署CanalServer。

node1.itcast.cn机器上安装部署CanalServer,首先搜索获取canal镜像,然后pull拉取镜像,最后创建容器

文档:https://github.com/alibaba/canal/wiki/Docker-QuickStart、

1615792713761

​ Canal的好处在于对业务代码没有侵入,因为是基于监听binlog日志去进行同步数据的。实时性也能做到准实时,其实是很多企业一种比较常见的数据同步的方案。

1612336004227

​ 具体安装部署,已经完成,只需要启动CanalServer容器即可,再启动Server服务,但是需要掌握如何配置,以及一些参数含义。

# 拉取镜像
docker pull canal/canal-server:v1.1.2

# 创建容器
docker run -d --name canal-server \
-e canal.instance.master.address=192.168.88.10:3306 \
-e canal.instance.dbUsername=root \
-e canal.instance.dbPassword=123456 \
-p 11111:11111 \
canal/canal-server:v1.1.2

docker start canal
# 进入容器
docker exec -it canal-server /bin/bash

# 切换到canal-server安装目录bin下,重启服务
[root@28888fad98c9 admin]# cd canal-server/bin/
[root@28888fad98c9 bin]# ./restart.sh 
[root@28888fad98c9 bin]# jps
305 CanalLauncher
321 Jps

1615793128774

接下来,查看CanalServer配置:

  • 1)、CanalServer配置文件:canal-server/conf/canal.properties

1615793313924

  • 2)、Instance实例配置文件:canal-server/conf/example/instance.properties

1615793419104

启动node2.itcast.cn中Kafka消息队列,准备测试,看是否将MySQL表数据实时同步到Kafka中。

07-[理解]-Canal 数据同步之CRM数据同步Kafka

​ 测试Canal功能,首先启动MySQL数据库,然后启动Kafka消息队列,最后启动CanalServer服务。

针对物流项目来说,以客户关系管理系统CRM,同步MySQL数据库表的数据。

  • 1)、插入数据测试
-- 插入数据INSERT
INSERT INTO `crm_address` VALUES (10001, '葛秋红', null, '17*******47', '恒大影城南侧小金庄', '130903', null, '2020-02-02 18:51:39', '2020-02-02 18:51:39', null);

插入数据到MySQL数据库表,Kafka中数据:

{
  "data": [
    {
      "id": "10001",
      "name": "葛秋红",
      "tel": null,
      "mobile": "17*******47",
      "detail_addr": "恒大影城南侧小金庄",
      "area_id": "130903",
      "gis_addr": null,
      "cdt": "2020-02-02 18:51:39",
      "udt": "2020-02-02 18:51:39",
      "remark": null
    }
  ],
  "database": "crm",
  "es": 1615794443000,
  "id": 2,
  "isDdl": false,
  "mysqlType": {
    "id": "bigint(20)",
    "name": "varchar(50)",
    "tel": "varchar(20)",
    "mobile": "varchar(20)",
    "detail_addr": "varchar(100)",
    "area_id": "bigint(20)",
    "gis_addr": "varchar(20)",
    "cdt": "datetime",
    "udt": "datetime",
    "remark": "varchar(100)"
  },
  "old": null,
  "sql": "",
  "sqlType": {
    "id": -5,
    "name": 12,
    "tel": 12,
    "mobile": 12,
    "detail_addr": 12,
    "area_id": -5,
    "gis_addr": 12,
    "cdt": 93,
    "udt": 93,
    "remark": 12
  },
  "table": "crm_address",
  "ts": 1615794443644,
  "type": "INSERT"
}
  • 2)、更新数据

更新MySQL数据库表的数据,查看Kafka中数据格式:

{
  "data": [
    {
      "id": "1245",
      "name": "贝贝",
      "tel": null,
      "mobile": "18*******57",
      "detail_addr": "美林家装广场3号门(晓东村涵馨公寓对面)",
      "area_id": "530111",
      "gis_addr": null,
      "cdt": "2020-02-02 18:51:00",
      "udt": "2020-02-02 18:51:00",
      "remark": null
    }
  ],
  "database": "crm",
  "es": 1621498703000,
  "id": 3,
  "isDdl": false,
  "mysqlType": {
    "id": "bigint(20)",
    "name": "varchar(50)",
    "tel": "varchar(20)",
    "mobile": "varchar(20)",
    "detail_addr": "varchar(100)",
    "area_id": "bigint(20)",
    "gis_addr": "varchar(20)",
    "cdt": "datetime",
    "udt": "datetime",
    "remark": "varchar(100)"
  },
  "old": [
    {
      "name": "贝贝婆婆"
    }
  ],
  "sql": "",
  "sqlType": {
    "id": -5,
    "name": 12,
    "tel": 12,
    "mobile": 12,
    "detail_addr": 12,
    "area_id": -5,
    "gis_addr": 12,
    "cdt": 93,
    "udt": 93,
    "remark": 12
  },
  "table": "crm_address",
  "ts": 1621498703502,
  "type": "UPDATE"
}
  • 3)、删除数据测试

删除MySQL数据库表的数据,查看Kafka中数据:

{
  "data": [
    {
      "id": "10001",
      "name": "葛红红",
      "tel": null,
      "mobile": "17*******47",
      "detail_addr": "恒大影城南侧小金庄",
      "area_id": "130903",
      "gis_addr": null,
      "cdt": "2020-02-02 18:51:39",
      "udt": "2020-02-02 18:51:39",
      "remark": null
    }
  ],
  "database": "crm",
  "es": 1615794586000,
  "id": 4,
  "isDdl": false,
  "mysqlType": {
    "id": "bigint(20)",
    "name": "varchar(50)",
    "tel": "varchar(20)",
    "mobile": "varchar(20)",
    "detail_addr": "varchar(100)",
    "area_id": "bigint(20)",
    "gis_addr": "varchar(20)",
    "cdt": "datetime",
    "udt": "datetime",
    "remark": "varchar(100)"
  },
  "old": null,
  "sql": "",
  "sqlType": {
    "id": -5,
    "name": 12,
    "tel": 12,
    "mobile": 12,
    "detail_addr": 12,
    "area_id": -5,
    "gis_addr": 12,
    "cdt": 93,
    "udt": 93,
    "remark": 12
  },
  "table": "crm_address",
  "ts": 1615794586746,
  "type": "DELETE"
}

​ 可以发现,Canal数据同步实时性很高,针对插入数据:INSERT、更新数据:UPDATE和删除数据:DELETE

  • 4)、清空表数据测试
    • 执行DDL语句:TRUNCATE TABLE crm_address ;
{
  "data": null,
  "database": "crm",
  "es": 1615794718000,
  "id": 5,
  "isDdl": true,
  "mysqlType": null,
  "old": null,
  "sql": "/* ApplicationName\u003dDBeaver 6.3.4 - SQLEditor \u003cScript-4.sql\u003e */ TRUNCATE TABLE crm_address",
  "sqlType": null,
  "table": "crm_address",
  "ts": 1615794719249,
  "type": "TRUNCATE"
}

08-[了解]-Canal 数据同步之集群高可用HA

​ Canal 1.1.4 版本开始,提供集群高可用HA,运行2个CanalServer服务,一个为Active,一个为Standby,当Active宕掉以后,Standby接收工作,继续进行数据实时同步功能。

  • 1)、首先,Canal集群同步数据数据流转示意图:

    业务系统 -> MySQL(业务主库) -> Canal 集群 -> Kafak Topic -> 实时应用程序 :统计分析 -> Redis

1612338707148

Canal 集群高可用如何实现的,如下图所示,核心点2个:

  • 1)、CanalServer服务启动2个:running(active)、Standby
  • 2)、依赖Zookeeper监控CanalServer状态,并且存储状态

1612338755220

比如CanalServer中有2个Instance实例时,高可用集群如何工作:

1612338926977

09-[扩展]-数据实时同步之Flume 实时采集同步

​ 如何将业务服务器业务数据(MySQL和Oracle数据库)实时同步到Kafka消息队列,分别使用Canal和OGG框架,也可以使用前面学习Flume Agent 实时采集。

首先,回顾Flume Agent组成结构:三个部分组成

  • 1)、Source数据源,从哪里获取

    ​ | source -> push 推送 -> channel

  • 2)、Channel管道,缓冲数据

  • 3)、Sink终端,将数据写入到哪里去

    ​ | sink <- pull 拉取 <- channel

所以,Flume框架完全可以实时获取数据库日志文件数据,经过解析,最终写入Kafka

Agent component diagram

伟大网友实现自定义FlumeSource,从数据库实时获取数据,模块:flume-ng-sql-source

​ 网址:https://github.com/keedio/flume-ng-sql-source

1615799282413

10-[扩展]-数据实时同步之Maxwell 实时采集同步

​ 在大数据领域中,除了Canal以外,国外提供框架:Maxwell,轻量级,专门针对MySQL数据库数据实时同步到Kafka消息队列(数据都是JSON格式数据)。

1、官网:
	http://maxwells-daemon.io/
2、源码:
	https://github.com/zendesk/maxwell			

1615734115490

Maxwell文档:http://maxwells-daemon.io/quickstart/

mysql> insert into `test`.`maxwell` set id = 1, daemon = 'Stanislaw Lem';
  maxwell: {
    "database": "test",
    "table": "maxwell",
    "type": "insert",
    "ts": 1449786310,
    "xid": 940752,
    "commit": true,
    "data": { "id":1, "daemon": "Stanislaw Lem" }
  }

 mysql> update test.maxwell set daemon = 'firebus!  firebus!' where id = 1;
  maxwell: {
    "database": "test",
    "table": "maxwell",
    "type": "update",
    "ts": 1449786341,
    "xid": 940786,
    "commit": true,
    "data": {"id":1, "daemon": "Firebus!  Firebus!"},
    "old":  {"daemon": "Stanislaw Lem"}
  }

11-[掌握]-大数据服务器之CM安装架构及目录

​ 针对整个物流项目来说,1台虚拟机安装部署大数据环境:基于CM6.2.1安装CDH6.2.1

关于CM功能及CM安装,不再过多赘述,项目还是要注重于业务及数据和实现。

1615795864918

提供虚拟机【node2.itcast.cn】解压后,导入VMWare 软件中,启动虚拟机即可(选择我已移动该虚拟机)

  • 1)、启动之前,设置node2.itcast.cn内存:4GB或者6GB或者8GB即可
  • 2)、第一次启动虚拟机时,很慢很慢很慢,由于启动所有服务(CM安装CDH服务),时间5分钟以上
  • 3)、测试时,需要什么服务,启动什么服务,不要全部启动(耗内存,耗性能,没有任何意义)

1615796135090

架构原理:==CM如何安装CDH框架,原理架构是什么?????==

image-20210520171157255

  • 2)、Cloudera 将所有==大数据框架放在某个目录==,打成包:parcel

    • 首先parcel包,存储在CMServer主机的目录:/opt/cloudera/parcel-repo

    1615796586659

1615796536760

  • 3)、CMServer服务,将parcel包,分发给所有大数据集群机器:Cloudera Agent

    • 当所有集群机器下载parcel包以后,解压:/opt/cloudera/parcels

    1615796680185

    • 每个框架安装目录:/opt/cloudera/parcels/CDH/lib/框架名称

    1615796799129

    • 进入Kafka软件安装目录:/opt/cloudera/parcels/CDH/lib/kafka

    1615796842444

下图表示使用CM安装CDH架构原理图,多多理解。

1612340738839

12-[掌握]-大数据服务器之CDH框架安装细节

当使用CM安装CDH集群以后,可以通过CM界面UI,管理整个集群服务和查看监控运行状态。

  • 第一、框架用户,实际项目中按照大数据各个框架时,不建议使用root用户,创建普通用户。

​ 使用CM安装CDH集群时,针对每个框架(比如,HDFS、MAPREDUCE、YARN等)创建用户,不可以被用于登录操作系统。以框架名称创建用户,所有用户名为框架名称。

  1. 第一点:linux系统:/etc/passwd

    1615797831875

  2. 第二点:启动服务,所使使用用户

    1615797948191

  • 第二、服务配置

​ 使用CM安装CDH组件时,配置分为2类:

1615798359130

  1. 第一类、服务端Server配置,启动服务Server时加载配置文件

    • 存储在数据库中,物流项目中配置的是MySQL数据库:scm

    1615798311485

  2. 第二类、客户端Client配置,客户端连接服务时配置信息

    • 存储在配置文件,在系统目录:/etc/xx/conf,其中xx表示框架名称
    • 以Kafka框架为例:

    1615798345969

  • 第三、服务日志

​ 使用CM安装CDH组件时,默认启动服务时,日志文件存储目录:/var/log/xx/

1615798415864

  • 第四、切换用户

由于每个框架都有自己用户,所以有时候对框架进行操作时,需要切换到框架用户,比如在HDFS文件系统上创建目录,使用hdfs用户,如下所示切换用户:

# root切换框架用户方式
sudo -u userName

# 举例说明:
[root@node2 ~]# sudo -u hdfs hdfs dfs -mkdir -p /datas
[root@node2 ~]# sudo -u hdfs hdfs dfs -ls

1615798634938

学习:从零搭建Canal实时采集数据

1、Linux操作系统安装MySQL 5.65.78.x也行
	安装MySQL数据库
	
2、采用Docker容器部署安装Canal,并且配置
	Docker容器部署Canal
	
3、搭建Kafka单机版
	JDK、Zookeeper和Kafka
	
4、联动测试
	向MySQL表中写入数据,Canal实时采集到,Kafka分布式消息队列

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。