MySQL 实战:4 种将数据同步到 Elasticsearch (ES) 的方案

举报
鱼弦 发表于 2025/01/22 09:29:08 2025/01/22
【摘要】 MySQL 实战:4 种将数据同步到 Elasticsearch (ES) 的方案将 MySQL 数据同步到 Elasticsearch(ES)是常见的需求,尤其是在需要全文搜索、实时分析和复杂查询的场景中。以下是 4 种常见的同步方案及其实现。 1. 方案介绍基于 Logstash 的同步:使用 Logstash 的 JDBC 插件从 MySQL 读取数据并写入 ES。适合定时批量同步。...

MySQL 实战:4 种将数据同步到 Elasticsearch (ES) 的方案

将 MySQL 数据同步到 Elasticsearch(ES)是常见的需求,尤其是在需要全文搜索、实时分析和复杂查询的场景中。以下是 4 种常见的同步方案及其实现。


1. 方案介绍

  1. 基于 Logstash 的同步
    • 使用 Logstash 的 JDBC 插件从 MySQL 读取数据并写入 ES。
    • 适合定时批量同步。
  2. 基于 Binlog 的实时同步
    • 使用 MySQL 的 Binlog 实时捕获数据变更,并通过 Canal 或 Debezium 同步到 ES。
    • 适合实时同步。
  3. 基于 Kafka 的消息队列同步
    • 将 MySQL 数据变更发布到 Kafka,然后通过消费者将数据写入 ES。
    • 适合高并发、分布式场景。
  4. 基于应用程序的双写同步
    • 在应用程序中同时写入 MySQL 和 ES。
    • 适合简单场景,但需要保证数据一致性。

2. 应用场景

  1. 全文搜索:将 MySQL 数据同步到 ES 以实现高效的全文搜索。
  2. 实时分析:实时同步数据以支持实时分析和可视化。
  3. 日志处理:将日志数据从 MySQL 同步到 ES 进行集中管理和分析。
  4. 数据备份:将 MySQL 数据同步到 ES 作为数据备份和冗余。

3. 原理解释

基于 Logstash 的同步

Logstash 是一个数据收集和处理的工具,通过 JDBC 插件定期从 MySQL 查询数据并写入 ES。

基于 Binlog 的实时同步

MySQL 的 Binlog 记录了所有的数据变更操作(如 INSERT、UPDATE、DELETE)。通过解析 Binlog,可以实时捕获数据变更并同步到 ES。

基于 Kafka 的消息队列同步

Kafka 是一个分布式消息队列,可以将 MySQL 数据变更发布到 Kafka,然后通过消费者将数据写入 ES。

基于应用程序的双写同步

在应用程序中同时写入 MySQL 和 ES,确保数据一致性。

算法原理流程图

基于 Logstash 的同步:
1. Logstash 定期从 MySQL 查询数据
2. Logstash 将数据写入 ES

基于 Binlog 的实时同步:
1. 解析 MySQL 的 Binlog
2. 捕获数据变更
3. 将数据变更同步到 ES

基于 Kafka 的消息队列同步:
1. 将 MySQL 数据变更发布到 Kafka
2. Kafka 消费者将数据写入 ES

基于应用程序的双写同步:
1. 应用程序同时写入 MySQL 和 ES

4. 代码实现

场景 1:基于 Logstash 的同步

Logstash 配置文件

input {
  jdbc {
    jdbc_driver_library => "/path/to/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "root"
    jdbc_password => "password"
    schedule => "* * * * *" # 每分钟执行一次
    statement => "SELECT * FROM mytable"
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "mytable"
  }
}

场景 2:基于 Binlog 的实时同步

Canal 配置

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=password
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\\..*

Canal 客户端代码

public class CanalClient {
    public static void main(String[] args) {
        CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
        connector.connect();
        connector.subscribe(".*\\..*");
        while (true) {
            Message message = connector.getWithoutAck(100);
            for (CanalEntry.Entry entry : message.getEntries()) {
                if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                        // 处理数据变更并写入 ES
                    }
                }
            }
            connector.ack(message.getId());
        }
    }
}

场景 3:基于 Kafka 的消息队列同步

Kafka 生产者代码

public class KafkaProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        producer.send(new ProducerRecord<>("mytable", "key", "value"));
        producer.close();
    }
}

Kafka 消费者代码

public class KafkaConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("mytable"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 处理数据并写入 ES
            }
        }
    }
}

场景 4:基于应用程序的双写同步

Java 代码

public class DoubleWrite {
    public void saveData(String data) {
        // 写入 MySQL
        jdbcTemplate.update("INSERT INTO mytable (data) VALUES (?)", data);

        // 写入 ES
        IndexRequest request = new IndexRequest("mytable");
        request.source("data", data);
        client.index(request, RequestOptions.DEFAULT);
    }
}

5. 测试步骤

  1. 配置 Logstash、Canal、Kafka 或应用程序。
  2. 运行同步任务,检查数据是否成功同步到 ES。
  3. 调整配置参数,观察同步效果。

6. 部署场景

  • Logstash:适合定时批量同步。
  • Canal/Debezium:适合实时同步。
  • Kafka:适合高并发、分布式场景。
  • 双写:适合简单场景,但需要保证数据一致性。

7. 材料链接


8. 总结

  • 将 MySQL 数据同步到 ES 是常见的需求,有多种实现方案。
  • 根据具体场景选择合适的同步方案,可以提高数据同步的效率和可靠性。
  • 通过 Logstash、Canal、Kafka 或双写方案,可以实现高效的数据同步。

9. 未来展望

  • 自动化同步:结合自动化工具实现更高效的数据同步。
  • 数据一致性:研究更高效的数据一致性保证机制。
  • 云原生集成:将同步方案集成到云原生环境中,提供更灵活的部署和管理。

通过掌握 MySQL 数据同步到 ES 的技术,你可以在全文搜索、实时分析和数据备份等领域开发出高效的系统。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。