大数据物流项目:业务服务器和大数据服务器(四)
业务服务器和大数据服务器
01-[复习]-上次课程内容回顾
主要讲解2个方面内容:物流项目业务数据(数据库部署)和OGG 实时采集Oracle数据库表的数据。
1、物流项目业务数据
实际物流快递公司来说,有很多业务系统,使用不同类型数据库存储数据,在此仅仅以2个业务系统为例
- 物流系统Logistics
使用Oracle数据库
- CRM系统客户关系管理系统
使用MySQL数据库
统一采用Docker容器部署业务数据库,为了方便学习业务:业务数据实时采集
【针对上述2个业务系统数据,进行实时增量采集,分别使用不同采集框架:】
- 物流系统Logistics,Oracle数据库使用OGG采集
- CRM系统,MySQL数据库使用Canal采集
【将OGG和Canal采用Docker容器部署框架,简化运维,环境构建】
【node1.itcast.cn虚拟机上,有3个容器Container】
- mysql 容器:运行MySQL数据库
- canal-server 容器:运行Canal Server服务,采集MySQL数据
- myoracle 容器:运行Oracle数据库和OGG服务框架,采集数据
2、OGG 实时采集Oracle数据库表的数据
- OGG 是什么,功能是啥
- 基本原理
源端SRC:捕获Oracle数据库日志数据
MGR(管理者)、EXTRACT(提取进程)、本地缓存(Local TrailFile)、Pump进程(发送数据到目标端)
目标端DST:解析日志数据,发送目标系统
GR(管理者)、Collector(收集进程)、远程缓存(Remote TrailFile)、Replicat进程(复制进程,解析数据,发送Kafka或者其他)
- 启动Oracle数据库和OGG服务,测试数据实时采集
02-[了解]-第4天:课程内容提纲
主要讲解:如何将业务系统(仅仅以物流系统Logistics和客户关系管理系统CRM为例)实时增量采集数据到分布式消息队列Kafka(1个业务系统存储1个Topic:一对一)。
- 、MySQL数据库(CRM系统):使用
Canal
框架实时采集
1)、Canal 实时采集MySQL数据库表数据【掌握】
原理架构
配置
高可用HA集群,新版本
使用测试
2)、大数据服务相关说明
基于CM安装部署CDH,单机伪分布式
CM安装CDH架构原理
3)、扩展:
Oracle和MySQL数据库表数据实时同步,还有其他框架,比如flume,Maxwell
03-理解]-Canal 数据同步之MySQL binlog日志
Canal是阿里巴巴开发数据实时同步框架,原理与OGG基本类似,都是捕获数据库日志数据,进行解析,将其发送到目标端(比如Kafka 消息队列)。新版Canal 1.1版本
==针对MySQL数据库来说,日志数据:
binlog日志
,二进制日志。==
默认情况下,MySQL数据库没有开启binlog日志,向表中CUDT操作时,不会记录日志到文件中。
[root@node1 ~]# docker exec -it mysql /bin/bash
root@8b5cd2152ed9:/#
root@8b5cd2152ed9:/# more /etc/mysql/my.cnf
## 相关配置解析:
## [mysqld]
## log-bin=mysql-bin #添加这一行就ok
## binlog-format=ROW #选择row模式
## server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
## expire_logs_days=7 # binlog文件保存7天
## max_binlog_size=500m # 每个binlog日志文件大小
root@8b5cd2152ed9:/# mysql -uroot -p123456
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 6
Server version: 5.7.30-log MySQL Community Server (GPL)
Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql>
可以查看MySQL数据库binlog日志文件存储目录
root@8b5cd2152ed9: ~# cd /var/lib/mysql
root@8b5cd2152ed9:/var/lib/mysql# ls -l
total 205364
-rw-r-----. 1 mysql mysql 56 May 29 2020 auto.cnf
-rw-------. 1 mysql mysql 1676 May 29 2020 ca-key.pem
-rw-r--r--. 1 mysql mysql 1112 May 29 2020 ca.pem
drwxr-x---. 2 mysql mysql 20 Jul 4 2020 canal_tsdb
-rw-r--r--. 1 mysql mysql 1112 May 29 2020 client-cert.pem
-rw-------. 1 mysql mysql 1676 May 29 2020 client-key.pem
drwxr-x---. 2 mysql mysql 186 Jul 4 2020 crm
-rw-r-----. 1 mysql mysql 576 Mar 13 08:50 ib_buffer_pool
-rw-r-----. 1 mysql mysql 50331648 Mar 15 02:18 ib_logfile0
-rw-r-----. 1 mysql mysql 50331648 Mar 15 02:18 ib_logfile1
-rw-r-----. 1 mysql mysql 79691776 Mar 15 02:18 ibdata1
-rw-r-----. 1 mysql mysql 12582912 Mar 15 02:22 ibtmp1
drwxr-x---. 2 mysql mysql 4096 May 29 2020 mysql
-rw-r-----. 1 mysql mysql 17271182 Mar 13 16:47 mysql-bin.000004
-rw-r-----. 1 mysql mysql 177 Mar 13 08:50 mysql-bin.000005
-rw-r-----. 1 mysql mysql 154 Mar 15 02:18 mysql-bin.000006
-rw-r-----. 1 mysql mysql 57 Mar 15 02:18 mysql-bin.index
drwxr-x---. 2 mysql mysql 8192 May 29 2020 performance_schema
-rw-------. 1 mysql mysql 1680 May 29 2020 private_key.pem
-rw-r--r--. 1 mysql mysql 452 May 29 2020 public_key.pem
-rw-r--r--. 1 mysql mysql 1112 May 29 2020 server-cert.pem
-rw-------. 1 mysql mysql 1676 May 29 2020 server-key.pem
drwxr-x---. 2 mysql mysql 8192 May 29 2020 sys
drwxr-x---. 2 mysql mysql 60 May 29 2020 test
04-[理解]-Canal 数据同步之工作原理
Canal
[kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
Canal起源于,阿里巴巴杭州和美国机房数据同步备份,衍生框架,目前支持功能与版本如下:
Canal框架原理,参考==MySQL数据主从复制原理==。
- 1)、MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过
show binlog events
进行查看);变更数据,记录日志:biglog- 2)、MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log);IO 线程,获取binlog
- 3)、MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据;SQL线程,执行语句
- 1)、canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- 2)、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- 3)、canal 解析 binary log 对象(原始为 byte 流);
上图中功能,属于Canal新版才有的(解析binlog日志,发送到Kafka消息队列)。
05-[理解]-Canal 数据同步之Canal 架构
Canal框架使用Java语言编写,启动
服务Server
,每个服务配置多个实例Instance,如下结构所示:
- 1)、Canal Server 代表一个 Canal 运行实例,对应于一个 JVM;
- 2)、Instance 对应于一个数据队列 (1个 Canal Server 对应 1…n 个 Instance)
- 监控某个数据库database binlog日志,需要将数据,发送到Kafka消息队列和Es索引index
- 发送到Kafka,称为1个实例:
instance
(binlog -> instance -> kafka)- 发送到Es,称为1个实例:
instance
(binlog -> instance -> es)- 3)、Instance下的子模块
eventParser
: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析eventSink
: Parser 和 Store 链接器,进行数据过滤,加工,分发的工作eventStore
:数据存储metaManager
:增量订阅 & 消费信息管理器
总结:CanalServer来说,启动1个服务,运行多个instance实例(每个实例对应一个sink),每个实例instance包含四个部分:
eventParse、eventSink、eventStore、metaManager
。EventParser在向MySQL发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。
- 1)、EventSink起到一个类似channel的功能,可以对数据进行过滤、分发/路由(1:n)、归并(n:1)和加
工。EventSink是连接EventParser和EventStore的桥梁- 2)、EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据
存储和读取的位置。
06-[掌握]-Canal 数据同步之Docker 安装部署
Canal 1.1.x版本开始,支持Docker容器部署,所以本项目中,采用Docker容器部署CanalServer。
在
node1.itcast.cn
机器上安装部署CanalServer,首先搜索获取canal镜像,然后pull拉取镜像,最后创建容器
Canal的好处在于对业务代码没有侵入,因为是基于监听binlog日志去进行同步数据的。实时性也能做到准实时,其实是很多企业一种比较常见的数据同步的方案。
具体安装部署,已经完成,只需要启动CanalServer容器即可,再启动Server服务,但是需要掌握如何配置,以及一些参数含义。
# 拉取镜像
docker pull canal/canal-server:v1.1.2
# 创建容器
docker run -d --name canal-server \
-e canal.instance.master.address=192.168.88.10:3306 \
-e canal.instance.dbUsername=root \
-e canal.instance.dbPassword=123456 \
-p 11111:11111 \
canal/canal-server:v1.1.2
docker start canal
# 进入容器
docker exec -it canal-server /bin/bash
# 切换到canal-server安装目录bin下,重启服务
[root@28888fad98c9 admin]# cd canal-server/bin/
[root@28888fad98c9 bin]# ./restart.sh
[root@28888fad98c9 bin]# jps
305 CanalLauncher
321 Jps
接下来,查看CanalServer配置:
- 1)、CanalServer配置文件:
canal-server/conf/canal.properties
- 2)、Instance实例配置文件:
canal-server/conf/example/instance.properties
启动
node2.itcast.cn
中Kafka消息队列,准备测试,看是否将MySQL表数据实时同步到Kafka中。
07-[理解]-Canal 数据同步之CRM数据同步Kafka
测试Canal功能,首先启动MySQL数据库,然后启动Kafka消息队列,最后启动CanalServer服务。
- 1)、插入数据测试
-- 插入数据INSERT
INSERT INTO `crm_address` VALUES (10001, '葛秋红', null, '17*******47', '恒大影城南侧小金庄', '130903', null, '2020-02-02 18:51:39', '2020-02-02 18:51:39', null);
插入数据到MySQL数据库表,Kafka中数据:
{
"data": [
{
"id": "10001",
"name": "葛秋红",
"tel": null,
"mobile": "17*******47",
"detail_addr": "恒大影城南侧小金庄",
"area_id": "130903",
"gis_addr": null,
"cdt": "2020-02-02 18:51:39",
"udt": "2020-02-02 18:51:39",
"remark": null
}
],
"database": "crm",
"es": 1615794443000,
"id": 2,
"isDdl": false,
"mysqlType": {
"id": "bigint(20)",
"name": "varchar(50)",
"tel": "varchar(20)",
"mobile": "varchar(20)",
"detail_addr": "varchar(100)",
"area_id": "bigint(20)",
"gis_addr": "varchar(20)",
"cdt": "datetime",
"udt": "datetime",
"remark": "varchar(100)"
},
"old": null,
"sql": "",
"sqlType": {
"id": -5,
"name": 12,
"tel": 12,
"mobile": 12,
"detail_addr": 12,
"area_id": -5,
"gis_addr": 12,
"cdt": 93,
"udt": 93,
"remark": 12
},
"table": "crm_address",
"ts": 1615794443644,
"type": "INSERT"
}
- 2)、更新数据
更新MySQL数据库表的数据,查看Kafka中数据格式:
{
"data": [
{
"id": "1245",
"name": "贝贝",
"tel": null,
"mobile": "18*******57",
"detail_addr": "美林家装广场3号门(晓东村涵馨公寓对面)",
"area_id": "530111",
"gis_addr": null,
"cdt": "2020-02-02 18:51:00",
"udt": "2020-02-02 18:51:00",
"remark": null
}
],
"database": "crm",
"es": 1621498703000,
"id": 3,
"isDdl": false,
"mysqlType": {
"id": "bigint(20)",
"name": "varchar(50)",
"tel": "varchar(20)",
"mobile": "varchar(20)",
"detail_addr": "varchar(100)",
"area_id": "bigint(20)",
"gis_addr": "varchar(20)",
"cdt": "datetime",
"udt": "datetime",
"remark": "varchar(100)"
},
"old": [
{
"name": "贝贝婆婆"
}
],
"sql": "",
"sqlType": {
"id": -5,
"name": 12,
"tel": 12,
"mobile": 12,
"detail_addr": 12,
"area_id": -5,
"gis_addr": 12,
"cdt": 93,
"udt": 93,
"remark": 12
},
"table": "crm_address",
"ts": 1621498703502,
"type": "UPDATE"
}
- 3)、删除数据测试
删除MySQL数据库表的数据,查看Kafka中数据:
{
"data": [
{
"id": "10001",
"name": "葛红红",
"tel": null,
"mobile": "17*******47",
"detail_addr": "恒大影城南侧小金庄",
"area_id": "130903",
"gis_addr": null,
"cdt": "2020-02-02 18:51:39",
"udt": "2020-02-02 18:51:39",
"remark": null
}
],
"database": "crm",
"es": 1615794586000,
"id": 4,
"isDdl": false,
"mysqlType": {
"id": "bigint(20)",
"name": "varchar(50)",
"tel": "varchar(20)",
"mobile": "varchar(20)",
"detail_addr": "varchar(100)",
"area_id": "bigint(20)",
"gis_addr": "varchar(20)",
"cdt": "datetime",
"udt": "datetime",
"remark": "varchar(100)"
},
"old": null,
"sql": "",
"sqlType": {
"id": -5,
"name": 12,
"tel": 12,
"mobile": 12,
"detail_addr": 12,
"area_id": -5,
"gis_addr": 12,
"cdt": 93,
"udt": 93,
"remark": 12
},
"table": "crm_address",
"ts": 1615794586746,
"type": "DELETE"
}
可以发现,Canal数据同步实时性很高,针对插入数据:
INSERT
、更新数据:UPDATE
和删除数据:DELETE
。
- 4)、清空表数据测试
- 执行DDL语句:
TRUNCATE TABLE crm_address ;
- 执行DDL语句:
{
"data": null,
"database": "crm",
"es": 1615794718000,
"id": 5,
"isDdl": true,
"mysqlType": null,
"old": null,
"sql": "/* ApplicationName\u003dDBeaver 6.3.4 - SQLEditor \u003cScript-4.sql\u003e */ TRUNCATE TABLE crm_address",
"sqlType": null,
"table": "crm_address",
"ts": 1615794719249,
"type": "TRUNCATE"
}
08-[了解]-Canal 数据同步之集群高可用HA
Canal 1.1.4 版本开始,提供集群高可用HA,运行2个CanalServer服务,一个为Active,一个为Standby,当Active宕掉以后,Standby接收工作,继续进行数据实时同步功能。
-
1)、首先,Canal集群同步数据数据流转示意图:
业务系统 -> MySQL(业务主库) -> Canal 集群 -> Kafak Topic -> 实时应用程序 :统计分析 -> Redis
Canal 集群高可用如何实现的,如下图所示,核心点2个:
- 1)、CanalServer服务启动2个:running(active)、Standby
- 2)、依赖Zookeeper监控CanalServer状态,并且存储状态
比如CanalServer中有2个Instance实例时,高可用集群如何工作:
09-[扩展]-数据实时同步之Flume 实时采集同步
如何将业务服务器业务数据(MySQL和Oracle数据库)实时同步到Kafka消息队列,分别使用Canal和OGG框架,也可以使用前面学习Flume Agent 实时采集。
1)、Source数据源,从哪里获取
| source -> push 推送 -> channel
2)、Channel管道,缓冲数据
3)、Sink终端,将数据写入到哪里去
|
sink <- pull 拉取 <- channel
伟大网友实现自定义FlumeSource,从数据库实时获取数据,模块:
flume-ng-sql-source
10-[扩展]-数据实时同步之Maxwell 实时采集同步
在大数据领域中,除了Canal以外,国外提供框架:
Maxwell
,轻量级,专门针对MySQL数据库数据实时同步到Kafka消息队列(数据都是JSON格式数据)。
1、官网:
http://maxwells-daemon.io/
2、源码:
https://github.com/zendesk/maxwell
Maxwell文档:http://maxwells-daemon.io/quickstart/
mysql> insert into `test`.`maxwell` set id = 1, daemon = 'Stanislaw Lem';
maxwell: {
"database": "test",
"table": "maxwell",
"type": "insert",
"ts": 1449786310,
"xid": 940752,
"commit": true,
"data": { "id":1, "daemon": "Stanislaw Lem" }
}
mysql> update test.maxwell set daemon = 'firebus! firebus!' where id = 1;
maxwell: {
"database": "test",
"table": "maxwell",
"type": "update",
"ts": 1449786341,
"xid": 940786,
"commit": true,
"data": {"id":1, "daemon": "Firebus! Firebus!"},
"old": {"daemon": "Stanislaw Lem"}
}
11-[掌握]-大数据服务器之CM安装架构及目录
针对整个物流项目来说,1台虚拟机安装部署大数据环境:
基于CM6.2.1安装CDH6.2.1
。
提供虚拟机【
node2.itcast.cn
】解压后,导入VMWare 软件中,启动虚拟机即可(选择我已移动该虚拟机)
- 1)、启动之前,设置node2.itcast.cn内存:
4GB
或者6GB
或者8GB
即可- 2)、第一次启动虚拟机时,很慢很慢很慢,由于启动所有服务(CM安装CDH服务),时间5分钟以上
- 浏览器登录:http://node2.itcast.cn:7180/cmf/,
admin/admin
- 将所有服务关闭:各个框架服务和CMS服务,而且CMS服务永远给关闭
- 3)、测试时,需要什么服务,启动什么服务,不要全部启动(耗内存,耗性能,没有任何意义)
架构原理:==CM如何安装CDH框架,原理架构是什么?????==
1)、CM安装CDH集群时,架构属于主从分布式架构(Master/Slaves)
主节点(老大):CMS(ClouderaManagerServer),在一台机器上运行
从节点(小弟):安装服务(Cloudera Agent),所有大数据集群机器
2)、Cloudera 将所有==大数据框架放在某个目录==,
打成包:parcel
- 首先parcel包,存储在CMServer主机的目录:
/opt/cloudera/parcel-repo
3)、CMServer服务,将parcel包,分发给所有大数据集群机器:Cloudera Agent
- 当所有集群机器下载parcel包以后,解压:
/opt/cloudera/parcels
- 每个框架安装目录:
/opt/cloudera/parcels/CDH/lib/框架名称
- 进入Kafka软件安装目录:
/opt/cloudera/parcels/CDH/lib/kafka
12-[掌握]-大数据服务器之CDH框架安装细节
当使用CM安装CDH集群以后,可以通过CM界面UI,管理整个集群服务和查看监控运行状态。
- 第一、框架用户,实际项目中按照大数据各个框架时,不建议使用root用户,创建普通用户。
使用CM安装CDH集群时,针对每个框架(比如,HDFS、MAPREDUCE、YARN等)创建用户,不可以被用于登录操作系统。
以框架名称创建用户,所有用户名为框架名称。
第一点:linux系统:
/etc/passwd
第二点:启动服务,所使使用用户
- 第二、服务配置
使用CM安装CDH组件时,配置分为2类:
第一类、服务端Server配置,启动服务Server时加载配置文件
- 存储在数据库中,物流项目中配置的是MySQL数据库:
scm
第二类、客户端Client配置,客户端连接服务时配置信息
- 存储在配置文件,在系统目录:
/etc/xx/conf
,其中xx表示框架名称
- 以Kafka框架为例:
- 第三、服务日志
使用CM安装CDH组件时,默认启动服务时,日志文件存储目录:
/var/log/xx/
- 第四、切换用户
由于每个框架都有自己用户,所以有时候对框架进行操作时,需要切换到框架用户,比如在HDFS文件系统上创建目录,使用
hdfs
用户,如下所示切换用户:
# root切换框架用户方式
sudo -u userName
# 举例说明:
[root@node2 ~]# sudo -u hdfs hdfs dfs -mkdir -p /datas
[root@node2 ~]# sudo -u hdfs hdfs dfs -ls
学习:从零搭建Canal实时采集数据
1、Linux操作系统安装MySQL 5.6、5.7、8.x也行
安装MySQL数据库
2、采用Docker容器部署安装Canal,并且配置
Docker容器部署Canal
3、搭建Kafka单机版
JDK、Zookeeper和Kafka
4、联动测试
向MySQL表中写入数据,Canal实时采集到,Kafka分布式消息队列
- 点赞
- 收藏
- 关注作者
评论(0)