物联网平台MySQL:大事务拆分与批处理方案

举报
Yeats_Liao 发表于 2025/11/26 09:09:38 2025/11/26
【摘要】 1 物联网项目中的数据库事务优化实践 1.1 从一次生产故障说起去年我们的智能工厂监控系统出了个问题。当时需要批量更新几万台设备的状态信息,结果一个大事务直接把数据库给锁死了,整个车间的设备监控界面都卡住不动。事务本质上就是把多个数据库操作打包在一起,要么全成功,要么全失败。MySQL的事务有四个特点:原子性、一致性、隔离性、持久性,简称ACID。这些特性保证了数据不会乱套。但是当事务变得...

1 物联网项目中的数据库事务优化实践

1.1 从一次生产故障说起

去年我们的智能工厂监控系统出了个问题。当时需要批量更新几万台设备的状态信息,结果一个大事务直接把数据库给锁死了,整个车间的设备监控界面都卡住不动。

事务本质上就是把多个数据库操作打包在一起,要么全成功,要么全失败。MySQL的事务有四个特点:原子性、一致性、隔离性、持久性,简称ACID。这些特性保证了数据不会乱套。

但是当事务变得很大的时候,问题就来了。

1.2 大事务带来的麻烦

在我们的物联网项目里,大事务主要会造成三个问题:

数据库被锁太久
想象一下,你要更新10万台传感器的数据,如果放在一个事务里,整个设备表就被锁住了。其他想查看设备状态的用户只能干等着,系统响应变得很慢。

服务器资源吃紧
大事务会消耗大量CPU、内存和磁盘IO。我们之前遇到过一次,更新设备固件版本信息时,服务器内存直接爆了,差点宕机。

回滚成本高
如果中途出错需要回滚,那就更麻烦了。几万条数据的回滚操作可能要跑好几分钟,这期间数据库基本处于半死不活的状态。

2 分批处理:化整为零的解决思路

既然大事务有这么多问题,那我们就把它拆开。核心想法很简单:把大量数据操作分成小块,一块一块地处理。这样每个小事务执行时间短,锁表时间也短,系统整体运行更平稳。

2.1 数据库连接工具类

先准备一个连接数据库的工具类,这个在物联网项目里经常用到:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class IoTDatabaseHelper {
    // 物联网设备数据库连接地址
    private static final String DATABASE_URL = "jdbc:mysql://localhost:3306/iot_device_platform";
    // 数据库用户名
    private static final String DB_USERNAME = "iot_admin";
    // 数据库密码
    private static final String DB_PASSWORD = "iot_2024_secure";

    public static Connection getConnection() throws SQLException {
        // 获取数据库连接
        return DriverManager.getConnection(DATABASE_URL, DB_USERNAME, DB_PASSWORD);
    }
}

2.2 设备状态批量更新实现

假设我们有个device_status表,存储着各种传感器和设备的状态信息。表结构包含设备ID和状态数据两个主要字段。现在要批量更新这些设备的在线状态,每次处理100台设备:

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class DeviceStatusBatchUpdater {
    // 每批处理的设备数量
    private static final int DEVICES_PER_BATCH = 100;

    public static void main(String[] args) {
        Connection dbConnection = null;
        PreparedStatement updateStatement = null;
        
        try {
            // 连接物联网设备数据库
            dbConnection = IoTDatabaseHelper.getConnection();
            // 关闭自动提交,手动控制事务
            dbConnection.setAutoCommit(false);

            // 更新设备状态的SQL语句
            String updateSql = "UPDATE device_status SET online_status = ? WHERE device_id = ?";
            updateStatement = dbConnection.prepareStatement(updateSql);

            int processedCount = 0;
            
            // 模拟更新1000台设备的状态,实际项目中这里会从设备管理系统获取数据
            for (int deviceIndex = 1; deviceIndex <= 1000; deviceIndex++) {
                String newOnlineStatus = "online_" + System.currentTimeMillis();
                int deviceId = deviceIndex;
                
                // 设置SQL参数
                updateStatement.setString(1, newOnlineStatus);
                updateStatement.setInt(2, deviceId);
                // 添加到批处理队列
                updateStatement.addBatch();

                // 达到批次大小时,执行这一批更新
                if (++processedCount % DEVICES_PER_BATCH == 0) {
                    updateStatement.executeBatch();
                    dbConnection.commit();
                    updateStatement.clearBatch();
                    
                    System.out.println("已处理 " + processedCount + " 台设备");
                }
            }
            
            // 处理剩余的设备(不足一个批次的部分)
            if (processedCount % DEVICES_PER_BATCH != 0) {
                updateStatement.executeBatch();
                dbConnection.commit();
                System.out.println("最后一批设备处理完成,总共更新了 " + processedCount + " 台设备");
            }
            
        } catch (SQLException e) {
            // 出错时回滚事务
            if (dbConnection != null) {
                try {
                    dbConnection.rollback();
                    System.out.println("更新失败,已回滚事务");
                } catch (SQLException rollbackError) {
                    rollbackError.printStackTrace();
                }
            }
            e.printStackTrace();
        } finally {
            // 清理资源
            try {
                if (updateStatement != null) {
                    updateStatement.close();
                }
                if (dbConnection != null) {
                    dbConnection.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

这段代码的关键点:

  1. 通过dbConnection.setAutoCommit(false)关闭自动提交,这样我们可以控制什么时候提交事务
  2. 每处理100台设备就执行一次executeBatch()commit(),把这批数据写入数据库
  3. clearBatch()清空批处理队列,准备处理下一批设备
  4. 最后别忘了处理不足100台的剩余设备
  5. 如果中途出错,用rollback()回滚当前批次的操作

2.3 用MyBatis实现批量更新

如果你的物联网项目用的是MyBatis框架,也可以用它的批处理功能:

import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;

public class IoTDeviceMyBatisBatchUpdater {
    private static SqlSessionFactory sessionFactory;

    static {
        // 加载MyBatis配置
        InputStream configStream = IoTDeviceMyBatisBatchUpdater.class
                .getClassLoader().getResourceAsStream("mybatis-config.xml");
        sessionFactory = new SqlSessionFactoryBuilder().build(configStream);
    }

    public static void main(String[] args) {
        // 开启批处理模式的SqlSession
        SqlSession batchSession = sessionFactory.openSession(ExecutorType.BATCH);
        
        try {
            DeviceStatusMapper deviceMapper = batchSession.getMapper(DeviceStatusMapper.class);
            List<DeviceStatusInfo> deviceList = new ArrayList<>();
            
            // 准备要更新的设备状态数据
            for (int i = 1; i <= 1000; i++) {
                DeviceStatusInfo device = new DeviceStatusInfo();
                device.setDeviceId(i);
                device.setOnlineStatus("active_" + i);
                device.setLastHeartbeat(System.currentTimeMillis());
                deviceList.add(device);
            }

            int batchSize = 100;
            for (int i = 0; i < deviceList.size(); i++) {
                deviceMapper.updateDeviceStatus(deviceList.get(i));
                
                // 每100台设备提交一次
                if ((i + 1) % batchSize == 0) {
                    batchSession.commit();
                    batchSession.clearCache();
                    System.out.println("批次 " + ((i + 1) / batchSize) + " 处理完成");
                }
            }
            
            // 处理最后一批
            if (deviceList.size() % batchSize != 0) {
                batchSession.commit();
                System.out.println("所有设备状态更新完成");
            }
            
        } catch (Exception e) {
            batchSession.rollback();
            System.out.println("批量更新失败:" + e.getMessage());
            e.printStackTrace();
        } finally {
            batchSession.close();
        }
    }
}

// 设备状态Mapper接口
interface DeviceStatusMapper {
    void updateDeviceStatus(DeviceStatusInfo deviceInfo);
}

// 设备状态信息实体类
class DeviceStatusInfo {
    private int deviceId;
    private String onlineStatus;
    private long lastHeartbeat;
    private String deviceLocation;

    // getter和setter方法
    public int getDeviceId() {
        return deviceId;
    }

    public void setDeviceId(int deviceId) {
        this.deviceId = deviceId;
    }

    public String getOnlineStatus() {
        return onlineStatus;
    }

    public void setOnlineStatus(String onlineStatus) {
        this.onlineStatus = onlineStatus;
    }

    public long getLastHeartbeat() {
        return lastHeartbeat;
    }

    public void setLastHeartbeat(long lastHeartbeat) {
        this.lastHeartbeat = lastHeartbeat;
    }

    public String getDeviceLocation() {
        return deviceLocation;
    }

    public void setDeviceLocation(String deviceLocation) {
        this.deviceLocation = deviceLocation;
    }
}

MyBatis的批处理模式使用起来更简洁:

  1. ExecutorType.BATCH创建批处理会话
  2. 调用Mapper方法添加更新操作到批处理队列
  3. 定期调用commit()clearCache()提交并清理缓存
  4. 出错时用rollback()回滚

3 实际应用中的注意事项

3.1 批次大小的选择

批次大小不是越小越好,也不是越大越好。太小了会增加网络开销和事务开销,太大了又回到了大事务的老问题。

在我们的物联网项目中,通常这样选择:

  • 设备状态更新:100-500条/批次
  • 传感器数据写入:1000-2000条/批次
  • 设备配置更新:50-100条/批次

具体数值需要根据你的服务器性能和业务特点来调整。

3.2 错误处理策略

分批处理时,如果某一批出错了,不会影响其他批次。但你需要记录哪些数据处理失败了,后续可以重新处理。

可以考虑添加重试机制和失败记录:

// 简单的重试逻辑示例
int retryCount = 0;
int maxRetries = 3;
boolean batchSuccess = false;

while (!batchSuccess && retryCount < maxRetries) {
    try {
        updateStatement.executeBatch();
        dbConnection.commit();
        batchSuccess = true;
    } catch (SQLException e) {
        retryCount++;
        System.out.println("批次处理失败,重试第 " + retryCount + " 次");
        if (retryCount >= maxRetries) {
            // 记录失败的批次信息,后续人工处理
            logFailedBatch(currentBatchData);
        }
    }
}

3.3 监控和日志

在生产环境中,建议添加详细的处理进度日志,方便监控和排查问题:

long startTime = System.currentTimeMillis();
System.out.println("开始批量更新设备状态,预计处理 " + totalDevices + " 台设备");

// ... 批处理逻辑 ...

long endTime = System.currentTimeMillis();
System.out.println("批量更新完成,耗时 " + (endTime - startTime) + " 毫秒");

无论用JDBC还是MyBatis,核心思路都是一样的:把大任务拆成小任务,分批执行,控制好事务边界。这样既保证了数据一致性,又避免了大事务带来的性能问题。

在物联网项目中,设备数量往往很大,数据更新频繁,掌握这种分批处理的方法很有必要。合理的批次大小加上完善的错误处理,可以让你的系统在处理大量数据时依然保持稳定和高效。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。