HarmonyOS分布式数据库开发小知识:跨设备数据一致性保障

举报
Jack20 发表于 2026/06/19 22:47:24 2026/06/19
【摘要】 单机数据库追求ACID,分布式数据库追求的是"最终一致性"。但"最终"是多久?一致性如何保证?这是分布式数据库的核心命题。 一、背景与动机:为什么需要分布式数据库? 1.1 传统数据库的局限传统数据库设计假设:数据存储在单一节点。但在多设备协同场景下,这个假设失效了:场景1:跨设备查询用户在手机上保存了一篇文章在平板上打开应用,想查看收藏列表传统方案:数据在手机本地,平板查不到场景2:数据一...

单机数据库追求ACID,分布式数据库追求的是"最终一致性"。但"最终"是多久?一致性如何保证?这是分布式数据库的核心命题。

一、背景与动机:为什么需要分布式数据库?

1.1 传统数据库的局限

传统数据库设计假设:数据存储在单一节点。但在多设备协同场景下,这个假设失效了:

场景1:跨设备查询

  • 用户在手机上保存了一篇文章
  • 在平板上打开应用,想查看收藏列表
  • 传统方案:数据在手机本地,平板查不到

场景2:数据一致性

  • 用户在设备A修改了数据
  • 立即在设备B查询,期望看到最新值
  • 传统方案:无法保证,可能读到旧数据

场景3:离线操作

  • 用户在飞机上编辑了文档(离线)
  • 家人在家里的设备上也编辑了同一文档
  • 两个设备上线后,如何合并?

1.2 分布式数据库的设计目标

HarmonyOS分布式数据库的设计目标:
图片.png

二、核心原理:一致性模型

2.1 CAP理论

分布式系统不可能同时满足:

  • C (Consistency):一致性,所有节点同时看到相同数据
  • A (Availability):可用性,每个请求都能得到响应
  • P (Partition tolerance):分区容错,网络分区时系统仍能运行

HarmonyOS的选择:AP优先,C通过应用层实现。

2.2 一致性级别

HarmonyOS支持多种一致性级别:

enum ConsistencyLevel {
  // 强一致性:读写都要等待同步完成
  STRONG = 'strong',
  
  // 最终一致性:允许短暂不一致,最终会一致
  EVENTUAL = 'eventual',
  
  // 因果一致性:有因果关系的操作按顺序可见
  CAUSAL = 'causal',
  
  // 会话一致性:同一会话内读己之写
  SESSION = 'session'
}

2.3 数据同步流程

sequenceDiagram
    participant App as 应用层
    participant LocalDB as 本地数据库
    participant SyncEngine as 同步引擎
    participant RemoteDB as 远端数据库
    
    App->>LocalDB: 写入数据
    LocalDB->>LocalDB: 本地持久化
    LocalDB-->>App: 写入成功
    
    par 异步同步
        LocalDB->>SyncEngine: 触发同步任务
        SyncEngine->>SyncEngine: 生成变更集
        SyncEngine->>RemoteDB: 推送变更
        RemoteDB->>RemoteDB: 应用变更
        RemoteDB-->>SyncEngine: 确认
        SyncEngine->>LocalDB: 更新同步状态
    end
    
    App->>LocalDB: 读取数据
    LocalDB->>LocalDB: 检查本地数据
    LocalDB-->>App: 返回数据
    
    classDef primary fill:#4A90E2,stroke:#2E5C8A,stroke-width:2px,color:#fff
    classDef warning fill:#F5A623,stroke:#C17D10,stroke-width:2px,color:#fff
    classDef info fill:#7ED321,stroke:#5BA318,stroke-width:2px,color:#fff
    
    class App primary
    class LocalDB,SyncEngine warning
    class RemoteDB info

2.4 版本向量机制

为了追踪数据版本,分布式数据库使用版本向量:

// 版本向量示例
interface VersionVector {
  // 每个设备的版本号
  versions: Map<string, number>;
  
  // 比较操作
  compare(other: VersionVector): 'before' | 'after' | 'concurrent';
  
  // 合并操作
  merge(other: VersionVector): VersionVector;
}

// 示例:设备A修改了2次,设备B修改了1次
const vv1: VersionVector = {
  versions: new Map([
    ['deviceA', 2],
    ['deviceB', 1]
  ])
};

// 设备B又修改了1次
const vv2: VersionVector = {
  versions: new Map([
    ['deviceA', 2],
    ['deviceB', 2]
  ])
};

// vv2 > vv1,vv2是更新的版本

三、代码实战:构建分布式数据库应用

3.1 基础示例:创建分布式数据库

import distributedData from '@ohos.data.distributedData';
import { BusinessError } from '@ohos.base';

/**
 * 分布式数据库管理器
 * 提供数据库的创建、配置、基础操作
 */
export class DistributedDBManager {
  private kvManager: distributedData.KVManager | null = null;
  private kvStore: distributedData.KVStore | null = null;
  
  /**
   * 初始化分布式数据库
   * 创建支持跨设备同步的KV存储
   */
  async init(context: Context): Promise<void> {
    try {
      // 创建KV管理器
      this.kvManager = distributedData.createKVManager({
        bundleName: context.applicationInfo.name,
        userId: 0
      });
      
      // 数据库配置
      const options: distributedData.Options = {
        // 基础配置
        createIfMissing: true,           // 不存在则创建
        encrypt: false,                  // 是否加密
        backup: true,                    // 是否备份
        
        // 同步配置
        autoSync: true,                  // 自动同步
        kvStoreType: distributedData.KVStoreType.DEVICE_COLLABORATION,  // 设备协同类型
        
        // 安全配置
        securityLevel: distributedData.SecurityLevel.S1,  // 安全级别S1
        
        // 一致性配置
        consistency: distributedData.ConsistencyLevel.EVENTUAL  // 最终一致性
      };
      
      // 创建KV存储
      this.kvStore = await this.kvManager.getKVStore('distributed_db', options);
      
      // 注册数据变更监听
      this.setupDataChangeListener();
      
      // 注册同步状态监听
      this.setupSyncStatusListener();
      
      console.info('[DistributedDB] 数据库初始化成功');
    } catch (error) {
      const err = error as BusinessError;
      console.error(`[DistributedDB] 初始化失败: ${err.code} - ${err.message}`);
      throw error;
    }
  }
  
  /**
   * 写入数据
   * @param key 数据键
   * @param value 数据值
   */
  async put(key: string, value: any): Promise<void> {
    if (!this.kvStore) {
      throw new Error('数据库未初始化');
    }
    
    try {
      // 序列化数据
      const serialized = typeof value === 'string' ? value : JSON.stringify(value);
      
      // 写入数据
      await this.kvStore.put(key, serialized);
      
      console.info(`[DistributedDB] 数据写入成功: ${key}`);
    } catch (error) {
      console.error(`[DistributedDB] 写入失败: ${error}`);
      throw error;
    }
  }
  
  /**
   * 读取数据
   * @param key 数据键
   * @returns 数据值
   */
  async get(key: string): Promise<any> {
    if (!this.kvStore) {
      throw new Error('数据库未初始化');
    }
    
    try {
      const value = await this.kvStore.get(key);
      
      // 尝试反序列化
      try {
        return JSON.parse(value as string);
      } catch {
        return value;  // 不是JSON,直接返回
      }
    } catch (error) {
      console.error(`[DistributedDB] 读取失败: ${error}`);
      throw error;
    }
  }
  
  /**
   * 删除数据
   * @param key 数据键
   */
  async delete(key: string): Promise<void> {
    if (!this.kvStore) {
      throw new Error('数据库未初始化');
    }
    
    try {
      await this.kvStore.delete(key);
      console.info(`[DistributedDB] 数据删除成功: ${key}`);
    } catch (error) {
      console.error(`[DistributedDB] 删除失败: ${error}`);
      throw error;
    }
  }
  
  /**
   * 设置数据变更监听
   * 当本地或远端数据变更时触发
   */
  private setupDataChangeListener(): void {
    if (!this.kvStore) return;
    
    // 监听本地数据变更
    this.kvStore.on('dataChange', distributedData.SubscribeType.SUBSCRIBE_TYPE_LOCAL, 
      (data: distributedData.ChangeNotification) => {
        console.info('[DistributedDB] 本地数据变更:');
        this.handleChangeNotification(data);
      });
    
    // 监听远端数据变更
    this.kvStore.on('dataChange', distributedData.SubscribeType.SUBSCRIBE_TYPE_REMOTE, 
      (data: distributedData.ChangeNotification) => {
        console.info('[DistributedDB] 远端数据变更:');
        this.handleChangeNotification(data);
      });
    
    // 监听所有数据变更(本地+远端)
    this.kvStore.on('dataChange', distributedData.SubscribeType.SUBSCRIBE_TYPE_ALL, 
      (data: distributedData.ChangeNotification) => {
        console.info('[DistributedDB] 数据变更(本地+远端):');
        this.handleChangeNotification(data);
      });
  }
  
  /**
   * 设置同步状态监听
   */
  private setupSyncStatusListener(): void {
    if (!this.kvStore) return;
    
    // 监听同步完成
    this.kvStore.on('syncComplete', (data: distributedData.SyncCompleteNotification) => {
      if (data.success) {
        console.info(`[DistributedDB] 同步成功,设备: ${data.deviceId}`);
      } else {
        console.error(`[DistributedDB] 同步失败,设备: ${data.deviceId}, 错误: ${data.error}`);
      }
    });
  }
  
  /**
   * 处理数据变更通知
   */
  private handleChangeNotification(data: distributedData.ChangeNotification): void {
    // 更新的数据
    if (data.updateEntries && data.updateEntries.length > 0) {
      console.info(`  更新: ${data.updateEntries.length}`);
      for (const entry of data.updateEntries) {
        console.info(`    - ${entry.key}: ${entry.value}`);
      }
    }
    
    // 新增的数据
    if (data.insertEntries && data.insertEntries.length > 0) {
      console.info(`  新增: ${data.insertEntries.length}`);
      for (const entry of data.insertEntries) {
        console.info(`    + ${entry.key}: ${entry.value}`);
      }
    }
    
    // 删除的数据
    if (data.deleteEntries && data.deleteEntries.length > 0) {
      console.info(`  删除: ${data.deleteEntries.length}`);
      for (const entry of data.deleteEntries) {
        console.info(`    x ${entry.key}`);
      }
    }
  }
}

3.2 进阶示例:事务与批量操作

import distributedData from '@ohos.data.distributedData';

/**
 * 分布式数据库事务管理器
 * 提供事务支持和批量操作能力
 */
export class DistributedDBTransaction {
  private kvStore: distributedData.KVStore | null = null;
  
  /**
   * 批量写入数据
   * 原子操作,要么全部成功,要么全部失败
   */
  async batchPut(entries: Array<{key: string, value: any}>): Promise<void> {
    if (!this.kvStore) {
      throw new Error('数据库未初始化');
    }
    
    try {
      console.info(`[Transaction] 开始批量写入 ${entries.length} 条数据`);
      
      // 转换为KVStore的Entry格式
      const kvEntries: distributedData.Entry[] = entries.map(item => ({
        key: item.key,
        value: typeof item.value === 'string' ? item.value : JSON.stringify(item.value)
      }));
      
      // 批量写入
      await this.kvStore.putBatch(kvEntries);
      
      console.info('[Transaction] 批量写入成功');
    } catch (error) {
      console.error(`[Transaction] 批量写入失败: ${error}`);
      throw error;
    }
  }
  
  /**
   * 批量读取数据
   */
  async batchGet(keys: string[]): Promise<Map<string, any>> {
    if (!this.kvStore) {
      throw new Error('数据库未初始化');
    }
    
    try {
      const entries = await this.kvStore.getEntries(keys);
      const result = new Map<string, any>();
      
      for (const entry of entries) {
        // 尝试反序列化
        try {
          result.set(entry.key, JSON.parse(entry.value as string));
        } catch {
          result.set(entry.key, entry.value);
        }
      }
      
      console.info(`[Transaction] 批量读取 ${result.size} 条数据`);
      return result;
    } catch (error) {
      console.error(`[Transaction] 批量读取失败: ${error}`);
      throw error;
    }
  }
  
  /**
   * 批量删除数据
   */
  async batchDelete(keys: string[]): Promise<void> {
    if (!this.kvStore) {
      throw new Error('数据库未初始化');
    }
    
    try {
      await this.kvStore.deleteBatch(keys);
      console.info(`[Transaction] 批量删除 ${keys.length} 条数据成功`);
    } catch (error) {
      console.error(`[Transaction] 批量删除失败: ${error}`);
      throw error;
    }
  }
  
  /**
   * 条件查询
   * 查询满足前缀条件的所有数据
   */
  async queryByPrefix(prefix: string): Promise<Array<{key: string, value: any}>> {
    if (!this.kvStore) {
      throw new Error('数据库未初始化');
    }
    
    try {
      // 使用getEntries查询前缀匹配的数据
      const entries = await this.kvStore.getEntries(prefix);
      
      const result: Array<{key: string, value: any}> = [];
      for (const entry of entries) {
        if (entry.key.startsWith(prefix)) {
          try {
            result.push({
              key: entry.key,
              value: JSON.parse(entry.value as string)
            });
          } catch {
            result.push({
              key: entry.key,
              value: entry.value
            });
          }
        }
      }
      
      console.info(`[Transaction] 前缀查询 ${prefix} 返回 ${result.length} 条数据`);
      return result;
    } catch (error) {
      console.error(`[Transaction] 查询失败: ${error}`);
      throw error;
    }
  }
  
  /**
   * 复合事务操作
   * 示例:转账操作(原子性)
   */
  async transfer(fromKey: string, toKey: string, amount: number): Promise<void> {
    if (!this.kvStore) {
      throw new Error('数据库未初始化');
    }
    
    try {
      // 读取当前余额
      const fromBalance = await this.get(fromKey) as number;
      const toBalance = await this.get(toKey) as number;
      
      // 检查余额是否充足
      if (fromBalance < amount) {
        throw new Error('余额不足');
      }
      
      // 计算新余额
      const newFromBalance = fromBalance - amount;
      const newToBalance = toBalance + amount;
      
      // 批量更新(原子操作)
      await this.batchPut([
        { key: fromKey, value: newFromBalance },
        { key: toKey, value: newToBalance }
      ]);
      
      console.info(`[Transaction] 转账成功: ${fromKey} -> ${toKey}, 金额: ${amount}`);
    } catch (error) {
      console.error(`[Transaction] 转账失败: ${error}`);
      throw error;
    }
  }
  
  private async get(key: string): Promise<any> {
    const value = await this.kvStore!.get(key);
    try {
      return JSON.parse(value as string);
    } catch {
      return value;
    }
  }
}

3.3 高级示例:一致性控制与冲突处理

import distributedData from '@ohos.data.distributedData';

/**
 * 一致性级别枚举
 */
enum Consistency {
  STRONG = 'strong',      // 强一致性
  EVENTUAL = 'eventual',  // 最终一致性
  CAUSAL = 'causal'       // 因果一致性
}

/**
 * 数据版本信息
 */
interface DataVersion {
  version: number;
  timestamp: number;
  deviceId: string;
  vectorClock: Map<string, number>;
}

/**
 * 高级分布式数据库管理器
 * 提供一致性控制和冲突处理能力
 */
export class AdvancedDistributedDB {
  private kvStore: distributedData.KVStore | null = null;
  private consistencyLevel: Consistency = Consistency.EVENTUAL;
  private localDeviceId: string = '';
  
  /**
   * 初始化
   */
  async init(): Promise<void> {
    // 获取本地设备ID
    this.localDeviceId = await this.getDeviceId();
    
    // 创建KVStore
    const kvManager = distributedData.createKVManager({
      bundleName: 'com.example.myapp',
      userId: 0
    });
    
    const options: distributedData.Options = {
      createIfMissing: true,
      autoSync: true,
      kvStoreType: distributedData.KVStoreType.DEVICE_COLLABORATION,
      securityLevel: distributedData.SecurityLevel.S1,
      
      // 配置冲突解决策略
      conflictResolver: this.createConflictResolver()
    };
    
    this.kvStore = await kvManager.getKVStore('advanced_db', options);
  }
  
  /**
   * 设置一致性级别
   */
  setConsistency(level: Consistency): void {
    this.consistencyLevel = level;
    console.info(`[AdvancedDB] 一致性级别设置为: ${level}`);
  }
  
  /**
   * 写入数据(带版本控制)
   */
  async putWithVersion(key: string, value: any): Promise<void> {
    if (!this.kvStore) {
      throw new Error('数据库未初始化');
    }
    
    // 读取当前版本信息
    let currentVersion: DataVersion;
    try {
      const existing = await this.kvStore.get(key);
      const existingData = JSON.parse(existing as string);
      currentVersion = existingData._version;
    } catch {
      // 数据不存在,创建初始版本
      currentVersion = {
        version: 0,
        timestamp: 0,
        deviceId: '',
        vectorClock: new Map()
      };
    }
    
    // 构建新版本
    const newVersion: DataVersion = {
      version: currentVersion.version + 1,
      timestamp: Date.now(),
      deviceId: this.localDeviceId,
      vectorClock: this.incrementVectorClock(currentVersion.vectorClock)
    };
    
    // 包装数据
    const wrappedData = {
      data: value,
      _version: newVersion
    };
    
    // 写入
    await this.kvStore.put(key, JSON.stringify(wrappedData));
    
    // 强一致性:等待同步完成
    if (this.consistencyLevel === Consistency.STRONG) {
      await this.waitForSync();
    }
    
    console.info(`[AdvancedDB] 数据写入成功: ${key}, 版本: ${newVersion.version}`);
  }
  
  /**
   * 读取数据(带一致性保证)
   */
  async getWithConsistency(key: string): Promise<any> {
    if (!this.kvStore) {
      throw new Error('数据库未初始化');
    }
    
    // 强一致性:先同步最新数据
    if (this.consistencyLevel === Consistency.STRONG) {
      await this.syncLatest(key);
    }
    
    // 读取数据
    const value = await this.kvStore.get(key);
    const wrappedData = JSON.parse(value as string);
    
    console.info(`[AdvancedDB] 数据读取成功: ${key}, 版本: ${wrappedData._version.version}`);
    return wrappedData.data;
  }
  
  /**
   * 比较并交换(CAS操作)
   * 原子操作,用于乐观锁
   */
  async compareAndSwap(key: string, expectedVersion: number, newValue: any): Promise<boolean> {
    if (!this.kvStore) {
      throw new Error('数据库未初始化');
    }
    
    try {
      // 读取当前数据
      const current = await this.kvStore.get(key);
      const currentData = JSON.parse(current as string);
      
      // 检查版本是否匹配
      if (currentData._version.version !== expectedVersion) {
        console.warn(`[AdvancedDB] CAS失败: 版本不匹配 (期望: ${expectedVersion}, 实际: ${currentData._version.version})`);
        return false;
      }
      
      // 版本匹配,执行更新
      await this.putWithVersion(key, newValue);
      
      console.info(`[AdvancedDB] CAS成功: ${key}`);
      return true;
    } catch (error) {
      console.error(`[AdvancedDB] CAS异常: ${error}`);
      return false;
    }
  }
  
  /**
   * 创建冲突解决器
   */
  private createConflictResolver(): distributedData.ConflictResolver {
    const self = this;
    
    return {
      resolve(local: distributedData.Entry, remote: distributedData.Entry): distributedData.Entry {
        try {
          const localData = JSON.parse(local.value as string);
          const remoteData = JSON.parse(remote.value as string);
          
          const localVersion = localData._version as DataVersion;
          const remoteVersion = remoteData._version as DataVersion;
          
          // 比较版本向量
          const comparison = self.compareVectorClocks(
            localVersion.vectorClock,
            remoteVersion.vectorClock
          );
          
          if (comparison === 'after') {
            // 本地更新
            return local;
          } else if (comparison === 'before') {
            // 远端更新
            return remote;
          } else {
            // 并发修改,需要合并
            return self.mergeConflict(local, remote, localData, remoteData);
          }
        } catch (error) {
          // 解析失败,使用时间戳优先
          console.warn(`[AdvancedDB] 冲突解决失败,退化为时间戳优先: ${error}`);
          return local;
        }
      }
    };
  }
  
  /**
   * 合并冲突
   */
  private mergeConflict(local: distributedData.Entry, remote: distributedData.Entry,
                        localData: any, remoteData: any): distributedData.Entry {
    // 简单策略:版本号大者胜出
    if (localData._version.version > remoteData._version.version) {
      return local;
    } else if (localData._version.version < remoteData._version.version) {
      return remote;
    } else {
      // 版本号相同,时间戳优先
      if (localData._version.timestamp > remoteData._version.timestamp) {
        return local;
      } else {
        return remote;
      }
    }
  }
  
  /**
   * 递增版本向量
   */
  private incrementVectorClock(vc: Map<string, number>): Map<string, number> {
    const newVc = new Map(vc);
    const current = newVc.get(this.localDeviceId) || 0;
    newVc.set(this.localDeviceId, current + 1);
    return newVc;
  }
  
  /**
   * 比较版本向量
   */
  private compareVectorClocks(vc1: Map<string, number>, vc2: Map<string, number>): 
    'before' | 'after' | 'concurrent' {
    let allGreaterOrEqual = true;
    let allLessOrEqual = true;
    let atLeastOneDifferent = false;
    
    const allKeys = new Set([...vc1.keys(), ...vc2.keys()]);
    
    for (const key of allKeys) {
      const v1 = vc1.get(key) || 0;
      const v2 = vc2.get(key) || 0;
      
      if (v1 > v2) {
        allLessOrEqual = false;
        atLeastOneDifferent = true;
      } else if (v1 < v2) {
        allGreaterOrEqual = false;
        atLeastOneDifferent = true;
      }
    }
    
    if (allGreaterOrEqual && atLeastOneDifferent) return 'after';
    if (allLessOrEqual && atLeastOneDifferent) return 'before';
    return 'concurrent';
  }
  
  /**
   * 等待同步完成
   */
  private async waitForSync(): Promise<void> {
    return new Promise((resolve) => {
      if (!this.kvStore) {
        resolve();
        return;
      }
      
      const handler = (data: distributedData.SyncCompleteNotification) => {
        if (data.success) {
          this.kvStore?.off('syncComplete', handler);
          resolve();
        }
      };
      
      this.kvStore.on('syncComplete', handler);
      
      // 超时处理
      setTimeout(() => {
        this.kvStore?.off('syncComplete', handler);
        resolve();
      }, 5000);
    });
  }
  
  /**
   * 同步最新数据
   */
  private async syncLatest(key: string): Promise<void> {
    // 触发同步
    const devices = await this.getConnectedDevices();
    for (const deviceId of devices) {
      await this.kvStore?.sync(deviceId, distributedData.SyncMode.PULL);
    }
  }
  
  private async getDeviceId(): Promise<string> {
    return 'device_' + Date.now();
  }
  
  private async getConnectedDevices(): Promise<string[]> {
    return [];
  }
}

四、踩坑与注意事项

4.1 数据大小限制

问题:单条数据超过1MB会导致失败。

解决方案:大数据分片存储

/**
 * 大数据分片工具
 */
export class LargeDataSharding {
  private kvStore: distributedData.KVStore;
  private readonly CHUNK_SIZE = 512 * 1024;  // 512KB
  
  /**
   * 分片存储大数据
   */
  async putLarge(key: string, data: ArrayBuffer): Promise<void> {
    const chunks: distributedData.Entry[] = [];
    const totalSize = data.byteLength;
    const chunkCount = Math.ceil(totalSize / this.CHUNK_SIZE);
    
    // 分片
    for (let i = 0; i < chunkCount; i++) {
      const start = i * this.CHUNK_SIZE;
      const end = Math.min(start + this.CHUNK_SIZE, totalSize);
      const chunk = data.slice(start, end);
      
      chunks.push({
        key: `${key}_chunk_${i}`,
        value: chunk
      });
    }
    
    // 元数据
    const meta = {
      totalSize,
      chunkCount,
      chunkSize: this.CHUNK_SIZE
    };
    
    chunks.push({
      key: `${key}_meta`,
      value: JSON.stringify(meta)
    });
    
    // 批量写入
    await this.kvStore.putBatch(chunks);
  }
  
  /**
   * 读取大数据
   */
  async getLarge(key: string): Promise<ArrayBuffer> {
    // 读取元数据
    const metaData = await this.kvStore.get(`${key}_meta`);
    const meta = JSON.parse(metaData as string);
    
    // 合并分片
    const result = new ArrayBuffer(meta.totalSize);
    const resultView = new Uint8Array(result);
    
    for (let i = 0; i < meta.chunkCount; i++) {
      const chunk = await this.kvStore.get(`${key}_chunk_${i}`);
      const chunkData = chunk as ArrayBuffer;
      const chunkView = new Uint8Array(chunkData);
      
      const start = i * meta.chunkSize;
      resultView.set(chunkView, start);
    }
    
    return result;
  }
}

4.2 查询性能问题

问题:大量数据时,全量查询性能差。

解决方案:使用索引和分页

/**
 * 分页查询工具
 */
export class PaginationQuery {
  private kvStore: distributedData.KVStore;
  
  /**
   * 分页查询
   * @param prefix 键前缀
   * @param page 页码(从1开始)
   * @param pageSize 每页大小
   */
  async queryPage(prefix: string, page: number, pageSize: number): Promise<PageResult> {
    // 查询所有匹配前缀的数据
    const allEntries = await this.kvStore.getEntries(prefix);
    
    // 过滤匹配的键
    const matched = allEntries.filter(e => e.key.startsWith(prefix));
    
    // 排序(按键名)
    matched.sort((a, b) => a.key.localeCompare(b.key));
    
    // 分页
    const startIndex = (page - 1) * pageSize;
    const endIndex = startIndex + pageSize;
    const pageData = matched.slice(startIndex, endIndex);
    
    return {
      data: pageData.map(e => ({
        key: e.key,
        value: this.parseValue(e.value)
      })),
      total: matched.length,
      page,
      pageSize,
      totalPages: Math.ceil(matched.length / pageSize)
    };
  }
  
  private parseValue(value: any): any {
    try {
      return JSON.parse(value as string);
    } catch {
      return value;
    }
  }
}

interface PageResult {
  data: Array<{key: string, value: any}>;
  total: number;
  page: number;
  pageSize: number;
  totalPages: number;
}

4.3 内存溢出问题

问题:大量数据加载到内存导致OOM。

解决方案:流式读取

/**
 * 流式读取工具
 */
export class StreamReader {
  private kvStore: distributedData.KVStore;
  
  /**
   * 流式遍历所有数据
   * @param prefix 键前缀
   * @param callback 每条数据的回调
   */
  async streamForEach(prefix: string, 
                      callback: (key: string, value: any) => Promise<void>): Promise<void> {
    const BATCH_SIZE = 100;
    let processed = 0;
    
    // 查询所有匹配的数据
    const allEntries = await this.kvStore.getEntries(prefix);
    
    // 分批处理
    for (let i = 0; i < allEntries.length; i += BATCH_SIZE) {
      const batch = allEntries.slice(i, i + BATCH_SIZE);
      
      for (const entry of batch) {
        if (entry.key.startsWith(prefix)) {
          await callback(entry.key, this.parseValue(entry.value));
          processed++;
        }
      }
      
      // 让出执行权
      await new Promise(resolve => setTimeout(resolve, 0));
    }
    
    console.info(`[StreamReader] 共处理 ${processed} 条数据`);
  }
  
  private parseValue(value: any): any {
    try {
      return JSON.parse(value as string);
    } catch {
      return value;
    }
  }
}

五、HarmonyOS 6适配指南

5.1 API变更

5.1.1 Schema定义

// HarmonyOS 6新增:Schema定义
import distributedData from '@kit.ArkData';

const schema: distributedData.Schema = {
  version: 1,
  
  // 字段定义
  fields: [
    { name: 'id', type: distributedData.FieldType.STRING, nullable: false },
    { name: 'title', type: distributedData.FieldType.STRING, nullable: true },
    { name: 'content', type: distributedData.FieldType.STRING, nullable: true },
    { name: 'timestamp', type: distributedData.FieldType.LONG, nullable: false },
    { name: 'author', type: distributedData.FieldType.STRING, nullable: true }
  ],
  
  // 索引定义
  indexes: [
    { name: 'idx_timestamp', fields: ['timestamp'], type: distributedData.IndexType.BTREE },
    { name: 'idx_author', fields: ['author'], type: distributedData.IndexType.HASH }
  ]
};

const options: distributedData.Options = {
  createIfMissing: true,
  autoSync: true,
  kvStoreType: distributedData.KVStoreType.DEVICE_COLLABORATION,
  securityLevel: distributedData.SecurityLevel.S1,
  schema: schema  // 应用Schema
};

5.1.2 查询API增强

// HarmonyOS 6新增:结构化查询
import distributedData from '@kit.ArkData';

const query: distributedData.Query = {
  // 字段过滤
  fields: ['id', 'title', 'timestamp'],
  
  // 条件过滤
  where: {
    operator: 'AND',
    conditions: [
      { field: 'timestamp', op: '>=', value: Date.now() - 7 * 24 * 60 * 60 * 1000 },
      { field: 'author', op: '=', value: '张三' }
    ]
  },
  
  // 排序
  orderBy: [
    { field: 'timestamp', direction: 'DESC' }
  ],
  
  // 分页
  limit: 20,
  offset: 0
};

const results = await kvStore.query(query);

5.2 行为变更

5.2.1 自动压缩

// HarmonyOS 6:自动压缩大值
const options: distributedData.Options = {
  createIfMissing: true,
  autoSync: true,
  kvStoreType: distributedData.KVStoreType.DEVICE_COLLABORATION,
  securityLevel: distributedData.SecurityLevel.S1,
  
  // HarmonyOS 6新增:压缩配置
  compression: true,
  compressionThreshold: 1024,  // 超过1KB自动压缩
  compressionAlgorithm: 'zstd'  // 使用zstd算法
};

5.3 性能优化

/**
 * HarmonyOS 6性能优化配置
 */
export class HarmonyOS6DBOptimization {
  getOptimizedOptions(): distributedData.Options {
    return {
      createIfMissing: true,
      autoSync: true,
      kvStoreType: distributedData.KVStoreType.DEVICE_COLLABORATION,
      securityLevel: distributedData.SecurityLevel.S1,
      
      // Schema定义(提升查询性能)
      schema: this.defineSchema(),
      
      // 压缩配置
      compression: true,
      compressionThreshold: 1024,
      
      // 缓存配置
      cache: {
        enabled: true,
        maxSize: 50 * 1024 * 1024,  // 50MB缓存
        policy: 'lru'
      },
      
      // 批量操作配置
      batch: {
        maxSize: 1000,  // 单批最多1000条
        parallel: true  // 并行处理
      },
      
      // 同步配置
      sync: {
        mode: 'incremental',  // 增量同步
        batchSize: 100,       // 每批100条
        retryPolicy: {
          maxRetries: 3,
          backoff: 'exponential'
        }
      }
    };
  }
  
  private defineSchema(): distributedData.Schema {
    return {
      version: 1,
      fields: [
        { name: 'id', type: distributedData.FieldType.STRING, nullable: false },
        { name: 'data', type: distributedData.FieldType.STRING, nullable: true },
        { name: 'timestamp', type: distributedData.FieldType.LONG, nullable: false }
      ],
      indexes: [
        { name: 'idx_time', fields: ['timestamp'], type: distributedData.IndexType.BTREE }
      ]
    };
  }
}

六、总结

分布式数据库是HarmonyOS分布式能力的核心基础设施。通过本文的深度解析,我们掌握了:

核心要点

  1. 理论基础:CAP理论、一致性模型、版本向量机制
  2. API使用:数据库创建、数据读写、事务操作、批量处理
  3. 一致性控制:强一致性、最终一致性、因果一致性的实现
  4. 冲突处理:版本向量比较、CAS操作、自定义冲突解决
  5. 避坑指南:数据大小限制、查询性能、内存溢出等问题解决
  6. 版本适配:HarmonyOS 6的Schema、查询API、性能优化

最佳实践

  • 选择合适的一致性级别:关键数据用强一致性,普通数据用最终一致性
  • 使用版本控制:避免数据覆盖,支持冲突检测
  • 批量操作:减少网络开销,提升性能
  • 大数据分片:突破单条数据大小限制
  • 流式处理:避免内存溢出

架构建议

应用层 → 业务数据模型
    ↓
数据访问层 → CRUD操作封装
    ↓
分布式数据库 → 一致性保证
    ↓
同步引擎 → 跨设备同步
    ↓
传输层 → 网络通信

掌握分布式数据库,你的应用才能真正实现"数据随设备而动,设备随用户而在"的分布式愿景。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。