物联网平台MySQL:大事务拆分与批处理方案
1 物联网项目中的数据库事务优化实践
1.1 从一次生产故障说起
去年我们的智能工厂监控系统出了个问题。当时需要批量更新几万台设备的状态信息,结果一个大事务直接把数据库给锁死了,整个车间的设备监控界面都卡住不动。
事务本质上就是把多个数据库操作打包在一起,要么全成功,要么全失败。MySQL的事务有四个特点:原子性、一致性、隔离性、持久性,简称ACID。这些特性保证了数据不会乱套。
但是当事务变得很大的时候,问题就来了。
1.2 大事务带来的麻烦
在我们的物联网项目里,大事务主要会造成三个问题:
数据库被锁太久
想象一下,你要更新10万台传感器的数据,如果放在一个事务里,整个设备表就被锁住了。其他想查看设备状态的用户只能干等着,系统响应变得很慢。
服务器资源吃紧
大事务会消耗大量CPU、内存和磁盘IO。我们之前遇到过一次,更新设备固件版本信息时,服务器内存直接爆了,差点宕机。
回滚成本高
如果中途出错需要回滚,那就更麻烦了。几万条数据的回滚操作可能要跑好几分钟,这期间数据库基本处于半死不活的状态。
2 分批处理:化整为零的解决思路
既然大事务有这么多问题,那我们就把它拆开。核心想法很简单:把大量数据操作分成小块,一块一块地处理。这样每个小事务执行时间短,锁表时间也短,系统整体运行更平稳。
2.1 数据库连接工具类
先准备一个连接数据库的工具类,这个在物联网项目里经常用到:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class IoTDatabaseHelper {
// 物联网设备数据库连接地址
private static final String DATABASE_URL = "jdbc:mysql://localhost:3306/iot_device_platform";
// 数据库用户名
private static final String DB_USERNAME = "iot_admin";
// 数据库密码
private static final String DB_PASSWORD = "iot_2024_secure";
public static Connection getConnection() throws SQLException {
// 获取数据库连接
return DriverManager.getConnection(DATABASE_URL, DB_USERNAME, DB_PASSWORD);
}
}
2.2 设备状态批量更新实现
假设我们有个device_status表,存储着各种传感器和设备的状态信息。表结构包含设备ID和状态数据两个主要字段。现在要批量更新这些设备的在线状态,每次处理100台设备:
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class DeviceStatusBatchUpdater {
// 每批处理的设备数量
private static final int DEVICES_PER_BATCH = 100;
public static void main(String[] args) {
Connection dbConnection = null;
PreparedStatement updateStatement = null;
try {
// 连接物联网设备数据库
dbConnection = IoTDatabaseHelper.getConnection();
// 关闭自动提交,手动控制事务
dbConnection.setAutoCommit(false);
// 更新设备状态的SQL语句
String updateSql = "UPDATE device_status SET online_status = ? WHERE device_id = ?";
updateStatement = dbConnection.prepareStatement(updateSql);
int processedCount = 0;
// 模拟更新1000台设备的状态,实际项目中这里会从设备管理系统获取数据
for (int deviceIndex = 1; deviceIndex <= 1000; deviceIndex++) {
String newOnlineStatus = "online_" + System.currentTimeMillis();
int deviceId = deviceIndex;
// 设置SQL参数
updateStatement.setString(1, newOnlineStatus);
updateStatement.setInt(2, deviceId);
// 添加到批处理队列
updateStatement.addBatch();
// 达到批次大小时,执行这一批更新
if (++processedCount % DEVICES_PER_BATCH == 0) {
updateStatement.executeBatch();
dbConnection.commit();
updateStatement.clearBatch();
System.out.println("已处理 " + processedCount + " 台设备");
}
}
// 处理剩余的设备(不足一个批次的部分)
if (processedCount % DEVICES_PER_BATCH != 0) {
updateStatement.executeBatch();
dbConnection.commit();
System.out.println("最后一批设备处理完成,总共更新了 " + processedCount + " 台设备");
}
} catch (SQLException e) {
// 出错时回滚事务
if (dbConnection != null) {
try {
dbConnection.rollback();
System.out.println("更新失败,已回滚事务");
} catch (SQLException rollbackError) {
rollbackError.printStackTrace();
}
}
e.printStackTrace();
} finally {
// 清理资源
try {
if (updateStatement != null) {
updateStatement.close();
}
if (dbConnection != null) {
dbConnection.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
这段代码的关键点:
- 通过
dbConnection.setAutoCommit(false)关闭自动提交,这样我们可以控制什么时候提交事务 - 每处理100台设备就执行一次
executeBatch()和commit(),把这批数据写入数据库 - 用
clearBatch()清空批处理队列,准备处理下一批设备 - 最后别忘了处理不足100台的剩余设备
- 如果中途出错,用
rollback()回滚当前批次的操作
2.3 用MyBatis实现批量更新
如果你的物联网项目用的是MyBatis框架,也可以用它的批处理功能:
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
public class IoTDeviceMyBatisBatchUpdater {
private static SqlSessionFactory sessionFactory;
static {
// 加载MyBatis配置
InputStream configStream = IoTDeviceMyBatisBatchUpdater.class
.getClassLoader().getResourceAsStream("mybatis-config.xml");
sessionFactory = new SqlSessionFactoryBuilder().build(configStream);
}
public static void main(String[] args) {
// 开启批处理模式的SqlSession
SqlSession batchSession = sessionFactory.openSession(ExecutorType.BATCH);
try {
DeviceStatusMapper deviceMapper = batchSession.getMapper(DeviceStatusMapper.class);
List<DeviceStatusInfo> deviceList = new ArrayList<>();
// 准备要更新的设备状态数据
for (int i = 1; i <= 1000; i++) {
DeviceStatusInfo device = new DeviceStatusInfo();
device.setDeviceId(i);
device.setOnlineStatus("active_" + i);
device.setLastHeartbeat(System.currentTimeMillis());
deviceList.add(device);
}
int batchSize = 100;
for (int i = 0; i < deviceList.size(); i++) {
deviceMapper.updateDeviceStatus(deviceList.get(i));
// 每100台设备提交一次
if ((i + 1) % batchSize == 0) {
batchSession.commit();
batchSession.clearCache();
System.out.println("批次 " + ((i + 1) / batchSize) + " 处理完成");
}
}
// 处理最后一批
if (deviceList.size() % batchSize != 0) {
batchSession.commit();
System.out.println("所有设备状态更新完成");
}
} catch (Exception e) {
batchSession.rollback();
System.out.println("批量更新失败:" + e.getMessage());
e.printStackTrace();
} finally {
batchSession.close();
}
}
}
// 设备状态Mapper接口
interface DeviceStatusMapper {
void updateDeviceStatus(DeviceStatusInfo deviceInfo);
}
// 设备状态信息实体类
class DeviceStatusInfo {
private int deviceId;
private String onlineStatus;
private long lastHeartbeat;
private String deviceLocation;
// getter和setter方法
public int getDeviceId() {
return deviceId;
}
public void setDeviceId(int deviceId) {
this.deviceId = deviceId;
}
public String getOnlineStatus() {
return onlineStatus;
}
public void setOnlineStatus(String onlineStatus) {
this.onlineStatus = onlineStatus;
}
public long getLastHeartbeat() {
return lastHeartbeat;
}
public void setLastHeartbeat(long lastHeartbeat) {
this.lastHeartbeat = lastHeartbeat;
}
public String getDeviceLocation() {
return deviceLocation;
}
public void setDeviceLocation(String deviceLocation) {
this.deviceLocation = deviceLocation;
}
}
MyBatis的批处理模式使用起来更简洁:
- 用
ExecutorType.BATCH创建批处理会话 - 调用Mapper方法添加更新操作到批处理队列
- 定期调用
commit()和clearCache()提交并清理缓存 - 出错时用
rollback()回滚
3 实际应用中的注意事项
3.1 批次大小的选择
批次大小不是越小越好,也不是越大越好。太小了会增加网络开销和事务开销,太大了又回到了大事务的老问题。
在我们的物联网项目中,通常这样选择:
- 设备状态更新:100-500条/批次
- 传感器数据写入:1000-2000条/批次
- 设备配置更新:50-100条/批次
具体数值需要根据你的服务器性能和业务特点来调整。
3.2 错误处理策略
分批处理时,如果某一批出错了,不会影响其他批次。但你需要记录哪些数据处理失败了,后续可以重新处理。
可以考虑添加重试机制和失败记录:
// 简单的重试逻辑示例
int retryCount = 0;
int maxRetries = 3;
boolean batchSuccess = false;
while (!batchSuccess && retryCount < maxRetries) {
try {
updateStatement.executeBatch();
dbConnection.commit();
batchSuccess = true;
} catch (SQLException e) {
retryCount++;
System.out.println("批次处理失败,重试第 " + retryCount + " 次");
if (retryCount >= maxRetries) {
// 记录失败的批次信息,后续人工处理
logFailedBatch(currentBatchData);
}
}
}
3.3 监控和日志
在生产环境中,建议添加详细的处理进度日志,方便监控和排查问题:
long startTime = System.currentTimeMillis();
System.out.println("开始批量更新设备状态,预计处理 " + totalDevices + " 台设备");
// ... 批处理逻辑 ...
long endTime = System.currentTimeMillis();
System.out.println("批量更新完成,耗时 " + (endTime - startTime) + " 毫秒");
无论用JDBC还是MyBatis,核心思路都是一样的:把大任务拆成小任务,分批执行,控制好事务边界。这样既保证了数据一致性,又避免了大事务带来的性能问题。
在物联网项目中,设备数量往往很大,数据更新频繁,掌握这种分批处理的方法很有必要。合理的批次大小加上完善的错误处理,可以让你的系统在处理大量数据时依然保持稳定和高效。
- 点赞
- 收藏
- 关注作者
评论(0)