HarmonyOS APP跨进程回调:RemoteObject回调机制
【摘要】 HarmonyOS APP跨进程回调:RemoteObject回调机制双向通信的艺术:让服务端也能主动通知客户端 一、背景与动机:为什么需要跨进程回调? 1.1 单向通信的局限前面的RPC示例都是单向的:客户端调用服务端方法,服务端返回结果。但有些场景需要服务端主动通知客户端:下载进度通知:下载服务需要实时通知进度消息推送:IM服务需要推送新消息状态变化:服务状态变化时通知所有客户端事件监...
HarmonyOS APP跨进程回调:RemoteObject回调机制
双向通信的艺术:让服务端也能主动通知客户端
一、背景与动机:为什么需要跨进程回调?
1.1 单向通信的局限
前面的RPC示例都是单向的:客户端调用服务端方法,服务端返回结果。但有些场景需要服务端主动通知客户端:
- 下载进度通知:下载服务需要实时通知进度
- 消息推送:IM服务需要推送新消息
- 状态变化:服务状态变化时通知所有客户端
- 事件监听:服务端事件需要通知订阅者
// ❌ 单向通信:客户端轮询
setInterval(async () => {
const progress = await downloadService.getProgress()
updateUI(progress)
}, 1000) // 每1秒查询一次,效率低
// ✅ 双向通信:服务端主动通知
downloadService.registerCallback((progress) => {
updateUI(progress) // 实时更新
})
1.2 回调的本质
跨进程回调的本质是反向RPC:

- 客户端调用服务端:客户端是Caller,服务端是Callee
- 服务端回调客户端:服务端是Caller,客户端是Callee
1.3 实现原理

二、核心原理:RemoteObject回调机制
2.1 RemoteObject传递
RemoteObject可以像普通数据一样在进程间传递:
// 客户端:创建Callback对象
class MyCallback extends RemoteObject {
onResult(result: string) {
// 处理回调
}
}
const callback = new MyCallback()
// 将Callback传递给服务端
service.registerCallback(callback.asObject())
当RemoteObject跨进程传递时:
- 发送方:RemoteObject → 女柄(handle)
- 接收方:句柄 → RemoteProxy
2.2 Callback接口定义
// 回调接口
interface IDownloadCallback {
onProgress(progress: number): void
onComplete(result: DownloadResult): void
onError(error: string): void
}
// 服务端保存的是RemoteProxy
// 调用callback.onProgress()实际是RPC调用
2.3 生命周期管理

三、代码实战:下载服务回调
3.1 定义回调接口
// IDownloadCallback.ets
import { IRemoteObject } from '@kit.IPCKit'
/**
* 下载回调接口
*/
export interface IDownloadCallback {
/**
* 进度回调
* @param progress 进度百分比 (0-100)
* @param downloaded 已下载字节数
* @param total 总字节数
*/
onProgress(progress: number, downloaded: number, total: number): void
/**
* 下载完成
* @param result 下载结果
*/
onComplete(result: DownloadResult): void
/**
* 下载失败
* @param error 错误信息
*/
onError(error: string): void
/**
* 获取RemoteObject引用
*/
asObject(): IRemoteObject
}
/**
* 下载结果
*/
export interface DownloadResult {
filePath: string // 文件路径
fileSize: number // 文件大小
duration: number // 下载耗时(毫秒)
averageSpeed: number // 平均速度(字节/秒)
}
// 回调方法码
export const enum CallbackCode {
ON_PROGRESS = 1,
ON_COMPLETE = 2,
ON_ERROR = 3
}
// 回调接口描述符
export const CALLBACK_DESCRIPTOR = 'com.example.IDownloadCallback'
3.2 客户端实现Callback
// DownloadCallback.ets
import { RemoteObject, MessageParcel, IRemoteObject } from '@kit.IPCKit'
import { IDownloadCallback, DownloadResult, CallbackCode, CALLBACK_DESCRIPTOR } from './IDownloadCallback'
import { EventHandler, EventRunner } from '@kit.BasicServicesKit'
import hilog from '@ohos.hilog'
const TAG = 'DownloadCallback'
const DOMAIN = 0xFF00
/**
* 下载回调实现(客户端)
* 继承RemoteObject,作为服务端回调的目标
*/
export class DownloadCallback extends RemoteObject implements IDownloadCallback {
private mainHandler: EventHandler
// 回调函数
private onProgressCallback: ((progress: number, downloaded: number, total: number) => void) | null = null
private onCompleteCallback: ((result: DownloadResult) => void) | null = null
private onErrorCallback: ((error: string) => void) | null = null
constructor() {
super(CALLBACK_DESCRIPTOR)
// 获取主线程Handler,确保回调在主线程执行
const mainRunner = EventRunner.getMainEventRunner()
this.mainHandler = new EventHandler(mainRunner)
hilog.info(DOMAIN, TAG, 'DownloadCallback created')
}
/**
* 设置进度回调
*/
setOnProgress(callback: (progress: number, downloaded: number, total: number) => void): void {
this.onProgressCallback = callback
}
/**
* 设置完成回调
*/
setOnComplete(callback: (result: DownloadResult) => void): void {
this.onCompleteCallback = callback
}
/**
* 设置错误回调
*/
setOnError(callback: (error: string) => void): void {
this.onErrorCallback = callback
}
/**
* 处理远程请求(服务端调用)
*/
onRemoteRequest(code: number, data: MessageParcel, reply: MessageParcel): boolean {
hilog.info(DOMAIN, TAG, `onRemoteRequest: code=${code}`)
switch (code) {
case CallbackCode.ON_PROGRESS:
return this.handleProgress(data)
case CallbackCode.ON_COMPLETE:
return this.handleComplete(data)
case CallbackCode.ON_ERROR:
return this.handleError(data)
default:
hilog.warn(DOMAIN, TAG, `Unknown code: ${code}`)
return false
}
}
/**
* 处理进度回调
*/
private handleProgress(data: MessageParcel): boolean {
const progress = data.readInt()
const downloaded = data.readLong()
const total = data.readLong()
// 转发到主线程
this.mainHandler.sendEvent({
eventId: 1,
param: { progress, downloaded, total }
})
return true
}
/**
* 处理完成回调
*/
private handleComplete(data: MessageParcel): boolean {
const resultJson = data.readString()
const result: DownloadResult = JSON.parse(resultJson)
this.mainHandler.sendEvent({
eventId: 2,
param: result
})
return true
}
/**
* 处理错误回调
*/
private handleError(data: MessageParcel): boolean {
const error = data.readString()
this.mainHandler.sendEvent({
eventId: 3,
param: error
})
return true
}
/**
* 设置主线程事件处理
*/
setupEventHandler(): void {
this.mainHandler.setEventHandler((eventId, param) => {
switch (eventId) {
case 1:
// 进度回调
const progressData = param as { progress: number, downloaded: number, total: number }
if (this.onProgressCallback) {
this.onProgressCallback(
progressData.progress,
progressData.downloaded,
progressData.total
)
}
break
case 2:
// 完成回调
if (this.onCompleteCallback) {
this.onCompleteCallback(param as DownloadResult)
}
break
case 3:
// 错误回调
if (this.onErrorCallback) {
this.onErrorCallback(param as string)
}
break
}
})
}
/**
* 实现接口方法(本地调用)
*/
onProgress(progress: number, downloaded: number, total: number): void {
if (this.onProgressCallback) {
this.onProgressCallback(progress, downloaded, total)
}
}
onComplete(result: DownloadResult): void {
if (this.onCompleteCallback) {
this.onCompleteCallback(result)
}
}
onError(error: string): void {
if (this.onErrorCallback) {
this.onErrorCallback(error)
}
}
asObject(): IRemoteObject {
return this
}
}
3.3 服务端管理回调
// DownloadService.ets
import { RemoteObject, MessageParcel, IRemoteObject } from '@kit.IPCKit'
import { IDownloadCallback, DownloadResult, CallbackCode, CALLBACK_DESCRIPTOR } from './IDownloadCallback'
import hilog from '@ohos.hilog'
const TAG = 'DownloadService'
const DOMAIN = 0xFF00
/**
* 下载服务(服务端)
*/
export class DownloadService extends RemoteObject {
// 下载任务列表
private downloadTasks: Map<string, DownloadTask> = new Map()
// 回调列表(每个任务可能有多个回调)
private callbacks: Map<string, Set<IRemoteObject>> = new Map()
// 回调代理缓存
private callbackProxies: Map<IRemoteObject, CallbackProxy> = new Map()
constructor() {
super('com.example.IDownloadService')
hilog.info(DOMAIN, TAG, 'DownloadService created')
}
/**
* 处理远程请求
*/
onRemoteRequest(code: number, data: MessageParcel, reply: MessageParcel): boolean {
switch (code) {
case ServiceCode.START_DOWNLOAD:
return this.handleStartDownload(data, reply)
case ServiceCode.REGISTER_CALLBACK:
return this.handleRegisterCallback(data, reply)
case ServiceCode.UNREGISTER_CALLBACK:
return this.handleUnregisterCallback(data, reply)
case ServiceCode.CANCEL_DOWNLOAD:
return this.handleCancelDownload(data, reply)
default:
return false
}
}
/**
* 处理开始下载
*/
private handleStartDownload(data: MessageParcel, reply: MessageParcel): boolean {
const url = data.readString()
const savePath = data.readString()
// 生成任务ID
const taskId = `task_${Date.now()}`
// 创建下载任务
const task = new DownloadTask(taskId, url, savePath, this)
this.downloadTasks.set(taskId, task)
// 启动下载
task.start()
// 返回任务ID
reply.writeString(taskId)
hilog.info(DOMAIN, TAG, `Download started: ${taskId}`)
return true
}
/**
* 处理注册回调
*/
private handleRegisterCallback(data: MessageParcel, reply: MessageParcel): boolean {
const taskId = data.readString()
const callbackRemote = data.readRemoteObject()
if (!callbackRemote) {
hilog.error(DOMAIN, TAG, 'Callback remote is null')
return false
}
// 添加到回调列表
if (!this.callbacks.has(taskId)) {
this.callbacks.set(taskId, new Set())
}
this.callbacks.get(taskId).add(callbackRemote)
// 创建代理(如果还没有)
if (!this.callbackProxies.has(callbackRemote)) {
this.callbackProxies.set(callbackRemote, new CallbackProxy(callbackRemote))
}
hilog.info(DOMAIN, TAG, `Callback registered for task: ${taskId}`)
reply.writeBoolean(true)
return true
}
/**
* 处理注销回调
*/
private handleUnregisterCallback(data: MessageParcel, reply: MessageParcel): boolean {
const taskId = data.readString()
const callbackRemote = data.readRemoteObject()
if (this.callbacks.has(taskId)) {
this.callbacks.get(taskId).delete(callbackRemote)
}
hilog.info(DOMAIN, TAG, `Callback unregistered for task: ${taskId}`)
reply.writeBoolean(true)
return true
}
/**
* 处理取消下载
*/
private handleCancelDownload(data: MessageParcel, reply: MessageParcel): boolean {
const taskId = data.readString()
const task = this.downloadTasks.get(taskId)
if (task) {
task.cancel()
this.downloadTasks.delete(taskId)
this.callbacks.delete(taskId)
}
reply.writeBoolean(true)
return true
}
/**
* 通知进度(由DownloadTask调用)
*/
notifyProgress(taskId: string, progress: number, downloaded: number, total: number): void {
const callbacks = this.callbacks.get(taskId)
if (!callbacks) {
return
}
for (const remote of callbacks) {
try {
const proxy = this.callbackProxies.get(remote)
if (proxy) {
proxy.onProgress(progress, downloaded, total)
}
} catch (err) {
hilog.error(DOMAIN, TAG, `Notify progress failed: ${err.message}`)
// 移除失效的回调
callbacks.delete(remote)
}
}
}
/**
* 通知完成
*/
notifyComplete(taskId: string, result: DownloadResult): void {
const callbacks = this.callbacks.get(taskId)
if (!callbacks) {
return
}
for (const remote of callbacks) {
try {
const proxy = this.callbackProxies.get(remote)
if (proxy) {
proxy.onComplete(result)
}
} catch (err) {
hilog.error(DOMAIN, TAG, `Notify complete failed: ${err.message}`)
}
}
// 清理
this.callbacks.delete(taskId)
this.downloadTasks.delete(taskId)
}
/**
* 通知错误
*/
notifyError(taskId: string, error: string): void {
const callbacks = this.callbacks.get(taskId)
if (!callbacks) {
return
}
for (const remote of callbacks) {
try {
const proxy = this.callbackProxies.get(remote)
if (proxy) {
proxy.onError(error)
}
} catch (err) {
hilog.error(DOMAIN, TAG, `Notify error failed: ${err.message}`)
}
}
// 清理
this.callbacks.delete(taskId)
this.downloadTasks.delete(taskId)
}
}
/**
* 回调代理(服务端使用)
*/
class CallbackProxy {
private remote: IRemoteObject
constructor(remote: IRemoteObject) {
this.remote = remote
}
onProgress(progress: number, downloaded: number, total: number): void {
const data = MessageParcel.create()
const reply = MessageParcel.create()
try {
data.writeInt(progress)
data.writeLong(downloaded)
data.writeLong(total)
this.remote.sendRequest(CallbackCode.ON_PROGRESS, data, reply)
} finally {
data.reclaim()
reply.reclaim()
}
}
onComplete(result: DownloadResult): void {
const data = MessageParcel.create()
const reply = MessageParcel.create()
try {
data.writeString(JSON.stringify(result))
this.remote.sendRequest(CallbackCode.ON_COMPLETE, data, reply)
} finally {
data.reclaim()
reply.reclaim()
}
}
onError(error: string): void {
const data = MessageParcel.create()
const reply = MessageParcel.create()
try {
data.writeString(error)
this.remote.sendRequest(CallbackCode.ON_ERROR, data, reply)
} finally {
data.reclaim()
reply.reclaim()
}
}
}
/**
* 下载任务
*/
class DownloadTask {
private taskId: string
private url: string
private savePath: string
private service: DownloadService
private cancelled: boolean = false
constructor(taskId: string, url: string, savePath: string, service: DownloadService) {
this.taskId = taskId
this.url = url
this.savePath = savePath
this.service = service
}
async start(): Promise<void> {
const startTime = Date.now()
const totalSize = 10 * 1024 * 1024 // 模拟10MB文件
let downloaded = 0
// 模拟下载
while (downloaded < totalSize && !this.cancelled) {
// 模拟下载一块数据
await new Promise(resolve => setTimeout(resolve, 100))
const chunk = Math.min(100 * 1024, totalSize - downloaded)
downloaded += chunk
const progress = Math.floor((downloaded / totalSize) * 100)
// 通知进度
this.service.notifyProgress(this.taskId, progress, downloaded, totalSize)
}
if (this.cancelled) {
return
}
// 下载完成
const duration = Date.now() - startTime
const result: DownloadResult = {
filePath: this.savePath,
fileSize: totalSize,
duration: duration,
averageSpeed: totalSize / (duration / 1000)
}
this.service.notifyComplete(this.taskId, result)
}
cancel(): void {
this.cancelled = true
}
}
// 服务方法码
const enum ServiceCode {
START_DOWNLOAD = 1,
REGISTER_CALLBACK = 2,
UNREGISTER_CALLBACK = 3,
CANCEL_DOWNLOAD = 4
}
3.4 客户端使用示例
// DownloadPage.ets
import { DownloadCallback } from './DownloadCallback'
import { DownloadServiceProxy } from './DownloadServiceProxy'
import hilog from '@ohos.hilog'
const TAG = 'DownloadPage'
const DOMAIN = 0xFF00
@Entry
@Component
struct DownloadPage {
@State progress: number = 0
@State downloaded: string = '0 MB'
@State total: string = '0 MB'
@State status: string = '未开始'
@State isDownloading: boolean = false
private serviceProxy: DownloadServiceProxy = new DownloadServiceProxy()
private callback: DownloadCallback = null
private taskId: string = ''
async aboutToAppear() {
// 创建回调
this.callback = new DownloadCallback()
this.callback.setupEventHandler()
// 设置回调函数
this.callback.setOnProgress((progress, downloaded, total) => {
this.progress = progress
this.downloaded = this.formatSize(downloaded)
this.total = this.formatSize(total)
})
this.callback.setOnComplete((result) => {
this.status = '下载完成'
this.isDownloading = false
hilog.info(DOMAIN, TAG, `Download complete: ${result.filePath}`)
})
this.callback.setOnError((error) => {
this.status = `下载失败: ${error}`
this.isDownloading = false
})
// 连接服务
await this.serviceProxy.connect()
}
aboutToDisappear() {
this.serviceProxy.disconnect()
}
/**
* 开始下载
*/
async startDownload(): Promise<void> {
this.isDownloading = true
this.status = '下载中...'
this.progress = 0
// 开始下载
this.taskId = await this.serviceProxy.startDownload(
'https://example.com/file.zip',
'/data/local/tmp/file.zip'
)
// 注册回调
await this.serviceProxy.registerCallback(this.taskId, this.callback.asObject())
hilog.info(DOMAIN, TAG, `Download started: ${this.taskId}`)
}
/**
* 取消下载
*/
async cancelDownload(): Promise<void> {
if (this.taskId) {
await this.serviceProxy.cancelDownload(this.taskId)
this.status = '已取消'
this.isDownloading = false
}
}
/**
* 格式化大小
*/
private formatSize(bytes: number): string {
if (bytes < 1024) {
return `${bytes} B`
} else if (bytes < 1024 * 1024) {
return `${(bytes / 1024).toFixed(2)} KB`
} else {
return `${(bytes / 1024 / 1024).toFixed(2)} MB`
}
}
build() {
Column() {
Text('下载服务示例')
.fontSize(24)
.fontWeight(FontWeight.Bold)
.margin({ bottom: 30 })
// 进度条
Progress({ value: this.progress, total: 100, type: ProgressType.Linear })
.width('80%')
.margin({ bottom: 10 })
// 进度信息
Text(`${this.progress}%`)
.fontSize(18)
.margin({ bottom: 10 })
Text(`${this.downloaded} / ${this.total}`)
.fontSize(14)
.fontColor('#666666')
.margin({ bottom: 20 })
// 状态
Text(this.status)
.fontSize(16)
.margin({ bottom: 30 })
// 操作按钮
Row() {
Button('开始下载')
.enabled(!this.isDownloading)
.onClick(() => this.startDownload())
.margin({ right: 10 })
Button('取消下载')
.enabled(this.isDownloading)
.onClick(() => this.cancelDownload())
}
}
.width('100%')
.height('100%')
.justifyContent(FlexAlign.Center)
}
}
四、踩坑与注意事项
4.1 坑一:回调在Binder线程执行
问题:回调在Binder线程执行,不能直接更新UI。
// ❌ 直接更新UI
onRemoteRequest(code: number, data: MessageParcel, reply: MessageParcel): boolean {
const progress = data.readInt()
this.progress = progress // 可能不在主线程!
return true
}
// ✅ 转发到主线程
onRemoteRequest(code: number, data: MessageParcel, reply: MessageParcel): boolean {
const progress = data.readInt()
// 使用EventHandler转发到主线程
this.mainHandler.sendEvent({
eventId: 1,
param: progress
})
return true
}
4.2 坑二:回调对象被回收
问题:客户端Callback对象被GC回收,服务端调用失败。
// ❌ 局部变量,可能被回收
async registerCallback() {
const callback = new DownloadCallback() // 局部变量
await service.registerCallback(callback.asObject())
// 方法返回后,callback可能被GC
}
// ✅ 成员变量,保持引用
class DownloadManager {
private callback: DownloadCallback = null // 成员变量
async registerCallback() {
this.callback = new DownloadCallback()
await service.registerCallback(this.callback.asObject())
}
// 销毁时清理
destroy() {
this.callback = null
}
}
4.3 坑三:回调列表无限增长
问题:服务端保存的回调列表可能无限增长,导致内存泄漏。
// ❌ 只添加不移除
registerCallback(taskId: string, callback: IRemoteObject) {
this.callbacks.add(callback)
// 没有移除机制
}
// ✅ 定期清理失效回调
class CallbackManager {
private callbacks: Set<IRemoteObject> = new Set()
register(callback: IRemoteObject) {
this.callbacks.add(callback)
}
unregister(callback: IRemoteObject) {
this.callbacks.delete(callback)
}
// 定期清理失效回调
cleanup() {
for (const callback of this.callbacks) {
try {
// 尝试调用,检测是否有效
callback.sendRequest(0, MessageParcel.create(), MessageParcel.create())
} catch (err) {
// 回调失效,移除
this.callbacks.delete(callback)
}
}
}
}
4.4 坑四:客户端进程退出
问题:客户端进程退出后,服务端不知道,继续回调会失败。
// ✅ 检测客户端退出
class DownloadService {
private callbacks: Map<string, IRemoteObject> = new Map()
notifyProgress(taskId: string, progress: number) {
const callback = this.callbacks.get(taskId)
if (!callback) {
return
}
try {
const proxy = new CallbackProxy(callback)
proxy.onProgress(progress)
} catch (err) {
// 回调失败,客户端可能已退出
hilog.warn(DOMAIN, TAG, `Callback failed, removing: ${err.message}`)
this.callbacks.delete(taskId)
}
}
}
4.5 坑五:回调顺序问题
问题:多个回调可能乱序到达。
// ❌ 假设回调有序
callback.onProgress(10)
callback.onProgress(20)
callback.onProgress(30)
// 可能收到顺序:20, 10, 30
// ✅ 使用序号保证顺序
interface ProgressData {
sequence: number // 序号
progress: number
}
// 客户端按序号排序处理
private lastSequence: number = 0
handleProgress(data: ProgressData) {
if (data.sequence > this.lastSequence) {
this.progress = data.progress
this.lastSequence = data.sequence
}
// 忽略旧数据
}
五、HarmonyOS 6适配指南
5.1 API变更
| 变更项 | HarmonyOS 5 | HarmonyOS 6 |
|---|---|---|
| RemoteObject传递 | writeRemoteObject |
同上 |
| 异步回调 | 无 | 新增sendRequestAsync |
| 回调状态检测 | 无 | 新增isCallbackValid |
5.2 新增功能
// HarmonyOS 6新增:异步回调
class CallbackProxy {
private remote: IRemoteObject
async onProgressAsync(progress: number): Promise<void> {
const data = MessageParcel.create()
const reply = MessageParcel.create()
try {
data.writeInt(progress)
// 异步发送
await this.remote.sendRequestAsync(CallbackCode.ON_PROGRESS, data, reply)
} finally {
data.reclaim()
reply.reclaim()
}
}
}
// HarmonyOS 6新增:回调状态检测
import { rpc } from '@kit.IPCKit'
function isCallbackValid(callback: IRemoteObject): boolean {
return rpc.isRemoteObjectValid(callback)
}
// HarmonyOS 6新增:DeathRecipient
class MyDeathRecipient implements rpc.DeathRecipient {
onRemoteDied() {
// 远程对象死亡通知
hilog.info(DOMAIN, TAG, 'Remote object died')
}
}
// 注册死亡监听
const recipient = new MyDeathRecipient()
callback.addDeathRecipient(recipient)
六、总结
6.1 核心要点回顾

6.2 最佳实践清单
- [ ] 回调转发到主线程执行
- [ ] 保持Callback对象的引用
- [ ] 定期清理失效回调
- [ ] 处理客户端退出情况
- [ ] 使用序号保证回调顺序
- [ ] 注册DeathRecipient监听
- [ ] 使用异步回调提高性能
下一篇预告:《通信安全:加密通道与认证》——深入鸿蒙通信的安全机制。
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)