【愚公系列】2023年12月 HarmonyOS教学课程 060-ArkTS语言基础类库(并发)

举报
愚公搬代码 发表于 2023/12/31 20:35:21 2023/12/31
【摘要】 🏆 作者简介,愚公搬代码🏆《头衔》:华为云特约编辑,华为云云享专家,华为开发者专家,华为产品云测专家,CSDN博客专家,CSDN商业化专家,阿里云专家博主,阿里云签约作者,腾讯云优秀博主,腾讯云内容共创官,掘金优秀博主,51CTO博客专家等。🏆《近期荣誉》:2023年华为云十佳博主,2022年CSDN博客之星TOP2,2022年华为云十佳博主等。🏆《博客内容》:.NET、Java、...

🏆 作者简介,愚公搬代码
🏆《头衔》:华为云特约编辑,华为云云享专家,华为开发者专家,华为产品云测专家,CSDN博客专家,CSDN商业化专家,阿里云专家博主,阿里云签约作者,腾讯云优秀博主,腾讯云内容共创官,掘金优秀博主,51CTO博客专家等。
🏆《近期荣誉》:2023年华为云十佳博主,2022年CSDN博客之星TOP2,2022年华为云十佳博主等。
🏆《博客内容》:.NET、Java、Python、Go、Node、前端、IOS、Android、鸿蒙、Linux、物联网、网络安全、大数据、人工智能、U3D游戏、小程序等相关领域知识。
🏆🎉欢迎 👍点赞✍评论⭐收藏

🚀一、并发

并发是指在一个时间段内,多个事件、任务或操作同时进行或者交替进行的方式。在计算机科学中,特指多个任务或程序同时执行的能力。并发可以提升系统的吞吐量、响应速度和资源利用率,并能更好地处理多用户、多线程和分布式的场景。常见的并发模型有多线程、多进程、多任务、协程等。

🔎1.并发概述

HarmonyOS系统提供的异步并发和多线程并发两种处理策略:

异步并发 多线程并发
概念 异步代码在执行到一定程度后会被暂停,以便在未来某个时间点继续执行。同一时间只有一段代码在执行。 允许在同一时间段内同时执行多段代码。主线程继续响应用户操作和更新UI的同时,后台也能执行耗时操作。
优势 1. 提高系统的响应速度和吞吐量。
2. 提高资源利用率,避免浪费。
3. 便于处理多用户和分布式场景。
1. 避免应用出现卡顿,提升用户体验。
2. 充分利用多核处理器的性能优势。
3. 能够同时进行耗时操作和响应用户操作。
应用场景 1. 网络请求。
2. 文件读写操作。
3. 异步任务的处理。
1. 后台数据处理和计算。
2. 图片和视频处理。
3. 并行计算任务的处理。
处理方式 通过回调函数或者Future对象来处理异步任务的结果。 通过创建多个线程或线程池,并通过线程间的通信来处理并发任务。
注意事项和挑战 1. 需要合理处理好异步任务的执行顺序和依赖关系。
2. 需要避免出现死锁和竞争条件。
3. 异步任务的错误处理需要注意。
1. 需要注意线程安全问题,避免竞争条件和数据一致性问题。
2. 需要合理管理线程池,避免资源浪费和线程过多导致的性能问题。
3. 需要避免死锁和线程间的等待资源导致的性能问题。

ArkTS系统提供的异步并发和多线程:

异步并发 多线程并发
概念 异步并发能力由Promise和async/await提供,适用于单次I/O任务的开发场景。 多线程并发能力由TaskPool和Worker提供,适用于CPU密集型任务、I/O密集型任务和同步任务等并发场景。
应用场景 单次I/O任务的开发场景,例如网络请求、文件读写操作等。 CPU密集型任务、I/O密集型任务和同步任务等并发场景。
处理方式 使用Promise和async/await来处理异步任务的结果。 使用TaskPool和Worker来处理并发任务。
注意事项和挑战 1. 需要合理处理好异步任务的执行顺序和依赖关系。
2. 需要避免出现死锁和竞争条件。
3. 异步任务的错误处理需要注意。
1. 需要注意线程安全问题,避免竞争条件和数据一致性问题。
2. 需要合理管理线程池,避免资源浪费和线程过多导致的性能问题。
3. 需要避免死锁和线程间的等待资源导致的性能问题。

🔎2.异步并发

🦋2.1 异步并发概述

☀️2.1.1 Promise

Promise是一种用于处理异步操作的对象。它表示一个可能还未完成的操作,并提供了一系列方法来处理操作的结果或错误。Promise对象有三种状态:pending(进行中)、fulfilled(已完成)和rejected(已失败)。当操作完成时,Promise对象将会从pending状态转变为fulfilled或rejected状态,并调用相应的回调函数。使用Promise可以更加方便地管理异步操作,并避免回调函数嵌套过多的问题。

Promise是一种用于处理异步操作的对象。它可以认为是一个代理,用来代表一个尚未完成但最终会完成的操作。

Promise的定义:

const promise = new Promise((resolve, reject) => {
  // 异步操作
  // 如果操作成功,调用resolve(value)
  // 如果操作失败,调用reject(error)
});

Promise构造函数接受一个函数作为参数,该函数被称为执行器(executor)。执行器会立即执行,并传入两个参数resolve和reject。在异步操作完成时,调用resolve传递最终的结果,或调用reject传递错误信息。

Promise的使用:

promise
  .then((value) => {
    // 当异步操作成功时,执行这里的回调函数
    console.log('操作成功:', value);
  })
  .catch((error) => {
    // 当异步操作失败时,执行这里的回调函数
    console.error('操作失败:', error);
  })
  .finally(() => {
    // 无论异步操作成功或失败都会执行这里的回调函数
    console.log('操作完成');
  });

通过then方法可以注册成功回调函数,通过catch方法可以注册失败回调函数,通过finally方法可以注册最终回调函数。当异步操作完成后,Promise会根据操作的结果调用相应的回调函数。

Promise还提供了一些其他的方法,例如all、race等,用于处理多个Promise对象的并行或竞争操作。

☀️2.1.2 async/await

async/await是一种用于处理异步操作的语法糖(syntactic sugar),它基于Promise对象提供了一种更直观、更方便的方式来编写和处理异步代码。

async/await的定义和使用如下:

  • async:async关键字用于修饰函数,表示该函数是一个异步函数。异步函数会自动返回一个Promise对象。
async function foo() {
  // 异步操作
  return result;
}
  • await:await关键字只能在async函数内部使用,用于暂停异步函数的执行,等待一个Promise对象的状态变为resolved(成功)或rejected(失败),然后返回该Promise的结果。
async function myAsyncFunction() {
  const result = await new Promise((resolve) => {
    setTimeout(() => {
      resolve('Hello, world!');
    }, 3000);
  });
  console.info(String(result)); // 输出: Hello, world!
}

myAsyncFunction();

在async函数中使用await关键字可以实现类似同步代码的连续执行效果,而不需要嵌套使用回调函数或链式调用then方法。

async/await的优点包括:

  • 代码可读性更高,更接近同步代码的写法,易于理解和维护。
  • 可以在代码中使用try/catch语句来捕获和处理异步操作产生的错误。
  • 可以使用常规的控制流语法(如循环、条件语句)来组织和管理异步代码的执行顺序。

使用async/await时仍然依赖于Promise对象来处理异步操作。async/await只是一种更加简洁和易读的语法,本质上仍然是基于Promise的异步编程模式。

🦋2.2 单次I/O任务开发指导

import fs from '@ohos.file.fs';
import common from '@ohos.app.ability.common';

async function write(data: string, file: fs.File): Promise<void> {
  fs.write(file.fd, data).then((writeLen: number) => {
    console.info('write data length is: ' + writeLen)
  }).catch((err) => {
    console.error(`Failed to write data. Code is ${err.code}, message is ${err.message}`);
  })
}

async function testFunc(): Promise<void> {
  let context = getContext() as common.UIAbilityContext;
  let filePath: string = context.filesDir + "/test.txt"; // 应用文件路径
  let file: fs.File = await fs.open(filePath, fs.OpenMode.READ_WRITE | fs.OpenMode.CREATE);
  write('Hello World!', file).then(() => {
    console.info('Succeeded in writing data.');
  }).catch((err) => {
    console.error(`Failed to write data. Code is ${err.code}, message is ${err.message}`);
  })
  fs.close(file);
}
testFunc();

@Entry
@Component
struct WebComponent {
  build() {
    Column() {
      Text("Hello World")
    }
  }
}

在这里插入图片描述

🔎3.多线程并发

🦋3.1 多线程并发概述

☀️3.1.1 简介

Actor并发模型是一种用于并发计算的编程模型。在该模型中,计算被抽象为一组独立的Actor,每个Actor都有自己的状态和行为,并且可以通过消息传递进行通信和协作。

在Actor模型中,每个Actor都可以接收异步消息,并且根据消息内容和当前状态来做出相应的响应。当一个Actor接收到消息时,它可以执行一系列的计算,修改自己的状态,并发送消息给其他Actor或者自己。

Actor之间的消息传递是异步的,所以发送消息的Actor不需要等待接收消息的Actor的响应,从而实现并发执行。由于每个Actor都是独立的,它们之间不存在共享状态,因此不需要进行锁机制和同步操作,避免了一些常见的并发编程问题,如死锁和竞争条件。

ArkTS语言选择的并发模型就是Actor。

☀️3.1.2 数据传输对象

ArkTS语言支持传输的数据对象可以分为下面四种:

数据对象类型 特点
普通对象 不能被传输和复制,只能在创建它们的线程中使用
可转移对象 可以在不同线程之间传输,但是不能在多个线程间共享
可共享对象 可以在多个线程间共享,但是不能在不同线程之间传输
Native绑定对象 与底层原生代码绑定,无法直接在应用程序中使用
🌈3.1.1.1 普通对象

普通对象的传输是通过结构化克隆算法进行序列化的。结构化克隆算法可以递归地拷贝传输对象,因此支持的对象类型非常丰富。

基础类型(除Symbol)、Date、String、RegExp、Array、Map、Set、Object(仅限简单对象,即通过“{}”或者“new Object”创建的)以及ArrayBuffer、TypedArray 都是支持序列化的类型。

需要注意的是,普通对象只能传递属性,不能传递其原型和方法。

🌈3.1.1.2 可转移对象

可转移对象(Transferable object)是指在多线程编程中,用于在不同线程之间传输数据的对象。在传输过程中,不需要对该对象内容进行拷贝,而是通过地址转移的方式进行序列化。其中,ArrayBuffer是一种可转移对象的例子。

在传输过程中,发送线程会将ArrayBuffer的所有权转移给接收线程。这意味着,在发送线程中,一旦传输完成,该ArrayBuffer将变为不可用,不允许再进行访问。接收线程可以获得该ArrayBuffer的所有权,并可以自由地对其进行访问和操作。

通过使用可转移对象,可以避免在多线程之间复制大量数据的开销,提高数据传输的效率。但需要注意的是,一旦转移完成,发送线程将丧失对该ArrayBuffer的所有权,如果发送线程还想要对其进行访问,就需要重新获取所有权或者重新创建一个新的ArrayBuffer。

// 定义可转移对象
let buffer = new ArrayBuffer(100);
🌈3.1.1.3 可共享对象

共享对象SharedArrayBuffer可以通过使用Atomics对象中提供的方法来实现原子操作,保证在多线程环境下的数据同步。这些方法包括add、sub、and、or、xor等,可以保证操作的原子性,避免数据的不一致性。

在使用SharedArrayBuffer时,需要注意以下几点:

  1. SharedArrayBuffer只能在支持多线程的环境中使用。
  2. 对SharedArrayBuffer的访问必须通过Atomics对象中的方法进行,以确保操作的原子性。
  3. 对SharedArrayBuffer的修改需要先获取锁,以避免多线程同时修改导致的数据不一致性。
  4. 由于SharedArrayBuffer的特性,需要格外小心防止数据竞争和死锁等问题。
// 定义可共享对象,可以使用Atomics进行操作
let sharedBuffer = new SharedArrayBuffer(1024);
🌈3.1.1.4 Native绑定对象

Native绑定对象是一种与底层系统功能进行了绑定。通过Native绑定对象,可以直接访问和调用底层系统提供的功能,如操作系统、网络、文件系统等。

Native绑定对象通常由宿主环境提供,以便与底层功能进行交互。不同的宿主环境提供的Native绑定对象可能不同,因为它们对底层系统功能的访问方式和级别可能不同。

通过Native绑定对象,可以调用底层系统提供的功能,如创建文件、读取网络数据、调用操作系统API等。

Native绑定对象 功能
Context 提供应用程序组件上下文信息,允许访问系统服务和资源
RemoteObject 实现远程通信的功能,允许在不同进程间传递对象的引用,共享对象的状态和方法

🦋3.2 TaskPool和Worker的对比

☀️3.2.1 实现特点对比

实现 TaskPool Worker
内存模型 线程间隔离,内存不共享。 线程间隔离,内存不共享。
参数传递机制 采用标准的结构化克隆算法(Structured Clone)进行序列化、反序列化,完成参数传递。支持ArrayBuffer转移和SharedArrayBuffer共享。 采用标准的结构化克隆算法(Structured Clone)进行序列化、反序列化,完成参数传递。支持ArrayBuffer转移和SharedArrayBuffer共享。
参数传递 直接传递,无需封装,默认进行transfer。 消息对象唯一参数,需要自己封装。
方法调用 直接将方法传入调用。 在Worker线程中进行消息解析并调用对应方法。
返回值 异步调用后默认返回。 主动发送消息,需在onmessage解析赋值。
生命周期 TaskPool自行管理生命周期,无需关心任务负载高低。 开发者自行管理Worker的数量及生命周期。
任务池个数上限 自动管理,无需配置。 同个进程下,最多支持同时开启8个Worker线程。
任务执行时长上限 无限制。 无限制。
设置任务的优先级 不支持。 不支持。
执行任务的取消 支持取消任务队列中等待的任务。 不支持。

☀️3.2.2 适用场景对比

TaskPool偏向独立任务维度,任务在线程中执行,不需要关注线程的生命周期。超长任务(大于3分钟)会被系统自动回收。

适用场景:

  1. 有关联的一系列同步任务,例如在需要创建和使用不同句柄的场景中,每次创建的句柄需要永久保存。这种情况需要使用Worker来管理线程生命周期。
  2. 需要频繁取消任务的场景,例如图库大图浏览,为了提升用户体验,同时缓存当前图片左右侧各2张图片。当用户往一侧滑动跳到下一张图片时,需要取消另一侧的一个缓存任务。这种情况下,使用TaskPool来管理任务会更适合。

Worker偏向线程的维度,支持长时间占据线程执行,需要主动管理线程的生命周期。

适用场景:

  1. 需要长时间占用线程执行的任务,例如网络请求、数据库操作等。这种情况下,使用Worker可以保持线程的稳定性和性能。

另外,在大量或者调度点较分散的任务场景下,例如大型应用的多个模块包含多个耗时任务,使用8个Worker去做负载管理可能不方便。这种情况下,推荐使用TaskPool来管理任务。

☀️3.2.3 TaskPool运作机制

在这里插入图片描述
TaskPool是一个任务调度和执行的工具,支持开发者将任务封装在主线程中,并将任务提交给任务队列。系统会自动选择合适的工作线程来执行任务,并将结果返回给主线程。TaskPool提供简洁易用的接口,支持任务的执行和取消操作。同时,TaskPool限制工作线程数量的上限为4。

☀️3.2.4 Worker运作机制

在这里插入图片描述
Worker子线程都是独立的实例,拥有自己的基础设施、对象和代码段。Worker子线程与宿主线程之间的通信是通过消息传递来实现的。Worker通过序列化机制与宿主线程进行相互通信,并完成命令和数据的交互。

具体来说,当宿主线程有任务需要执行时,它会将任务封装成消息,并将消息发送给TaskPool中的任务队列。TaskPool会选择一个合适的Worker子线程来接收任务消息。Worker子线程接收到任务消息后,会根据消息的内容,执行相应的任务。在任务执行完成后,Worker子线程会将执行结果封装成消息,并通过序列化机制,将消息发送回宿主线程。

这种基于消息传递和序列化机制的通信方式,使得Worker子线程和宿主线程可以独立运行,并且可以在不同线程和进程之间进行通信。宿主线程通过发送消息给Worker子线程,来分配任务和接收任务执行结果。Worker子线程通过接收消息来获取任务和向宿主线程发送执行结果。这种设计可以提高系统的并发性能和响应能力,同时也避免了多线程编程中的并发问题。

☀️3.2.5 TaskPool注意事项

说明 内容
实现任务的函数需要使用装饰器@Concurrent标注 仅支持在.ets文件中使用
实现任务的函数入参需满足序列化支持的类型 详情请参见数据传输对象
由于不同线程中上下文对象是不同的 TaskPool工作线程只能使用线程安全的库,例如UI相关的非线程安全库不能使用
序列化传输的数据量大小限制 16MB

☀️3.2.6 Worker注意事项

说明 内容
创建Worker时,传入的Worker.ts路径在不同版本有不同的规则 详情请参见文件路径注意事项
Worker创建后需要手动管理生命周期 最多同时运行的Worker子线程数量为8个,详情请参见生命周期注意事项
Ability类型的Module支持使用Worker,Library类型的Module不支持使用Worker
创建Worker不支持使用其他Module的Worker.ts文件 即不支持跨模块调用Worker
由于不同线程中上下文对象是不同的 Worker线程只能使用线程安全的库,例如UI相关的非线程安全库不能使用
序列化传输的数据量大小限制 16MB
🌈3.2.6.1 文件路径注意事项
// 导入模块
import worker from '@ohos.worker';

// 写法一
// Stage模型-目录同级(entry模块下,workers目录与pages目录同级)
const worker1 = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts', {name:"first worker in Stage model"});
// Stage模型-目录不同级(entry模块下,workers目录是pages目录的子目录)
const worker2 = new worker.ThreadWorker('entry/ets/pages/workers/MyWorker.ts');

// 写法二
// Stage模型-目录同级(entry模块下,workers目录与pages目录同级),假设bundlename是com.example.workerdemo
const worker3 = new worker.ThreadWorker('@bundle:com.example.workerdemo/entry/ets/workers/worker');
// Stage模型-目录不同级(entry模块下,workers目录是pages目录的子目录),假设bundlename是com.example.workerdemo
const worker4 = new worker.ThreadWorker('@bundle:com.example.workerdemo/entry/ets/pages/workers/worker');
🌈3.2.6.2 生命周期注意事项

Worker的创建和销毁耗费性能,建议开发者合理管理已创建的Worker并重复使用。Worker空闲时也会一直运行,因此当不需要Worker时,可以调用terminate()接口或parentPort.close()方法主动销毁Worker。若Worker处于已销毁或正在销毁等非运行状态时,调用其功能接口,会抛出相应的错误。(Too many workers, the number of workers exceeds the maximum.

🦋3.3 @Concurrent装饰器:校验并发函数

在HarmonyOS中,@Concurrent装饰器用于标识一个方法需要在工作线程中执行。该装饰器可以应用于普通的方法或者回调方法。

使用@Concurrent装饰器的方法会在一个工作线程中执行,不会阻塞主线程的运行。这对于一些耗时操作或者需要与其他服务进行交互的方法非常有用。在方法执行完成后,可以使用HarmonyOS提供的线程间通信机制将结果传递回主线程。

import taskpool from '@ohos.taskpool';

@Concurrent
function add(num1: number, num2: number): number {
  return num1 + num2;
}

async function ConcurrentFunc(): Promise<void> {
  try {
    let task: taskpool.Task = new taskpool.Task(add, 1, 2);
    console.info("taskpool res is: " + await taskpool.execute(task));
  } catch (e) {
    console.error("taskpool execute error is: " + e);
  }
}

@Entry
@Component
struct Index {
  @State message: string = 'Hello World'

  build() {
    Row() {
      Column() {
        Text(this.message)
          .fontSize(50)
          .fontWeight(FontWeight.Bold)
          .onClick(() => {
            ConcurrentFunc();
          })
      }
      .width('100%')
    }
    .height('100%')
  }
}

在这里插入图片描述

🦋3.4 CPU密集型任务开发指导

☀️3.4.1 使用TaskPool进行图像直方图处理

import taskpool from '@ohos.taskpool';

@Concurrent
function imageProcessing(dataSlice: ArrayBuffer) {
  // 步骤1: 具体的图像处理操作及其他耗时操作
  return dataSlice;
}

function histogramStatistic(pixelBuffer: ArrayBuffer) {
  // 步骤2: 分成三段并发调度
  let number = pixelBuffer.byteLength / 3;
  let buffer1 = pixelBuffer.slice(0, number);
  let buffer2 = pixelBuffer.slice(number, number * 2);
  let buffer3 = pixelBuffer.slice(number * 2);

  let task1 = new taskpool.Task(imageProcessing, buffer1);
  let task2 = new taskpool.Task(imageProcessing, buffer2);
  let task3 = new taskpool.Task(imageProcessing, buffer3);

  taskpool.execute(task1).then((ret: ArrayBuffer[]) => {
    // 步骤3: 结果处理
  });
  taskpool.execute(task2).then((ret: ArrayBuffer[]) => {
    // 步骤3: 结果处理
  });
  taskpool.execute(task3).then((ret: ArrayBuffer[]) => {
    // 步骤3: 结果处理
  });
}

@Entry
@Component
struct Index {
  @State message: string = 'Hello World'

  build() {
    Row() {
      Column() {
        Text(this.message)
          .fontSize(50)
          .fontWeight(FontWeight.Bold)
          .onClick(() => {
            let data: ArrayBuffer;
            histogramStatistic(data);
          })
      }
      .width('100%')
    }
    .height('100%')
  }
}

☀️3.4.2 使用Worker进行长时间数据分析

1、创建Worker
在这里插入图片描述

在这里插入图片描述
2、UI主线程

import worker from '@ohos.worker';

const workerInstance = new worker.ThreadWorker('entry/ets/workers/Worker.ts');
// 接收Worker子线程的结果
workerInstance.onmessage = function(e) {
  // data:Worker线程发送的信息
  let data = e.data;
  console.info('MyWorker.ts onmessage');
}

workerInstance.onerror = function (d) {
  // 接收Worker子线程的错误信息
}

// 向Worker子线程发送训练消息
workerInstance.postMessage({ 'type': 0 });
// 向Worker子线程发送预测消息
workerInstance.postMessage({ 'type': 1, 'value': [90, 5] });

在这里插入图片描述
3、Worker.ts 子线程

import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';

let workerPort: ThreadWorkerGlobalScope = worker.workerPort;

// 定义训练模型及结果 
let result;

// 定义预测函数
function predict(x) {
  return result[x];
}

// 定义优化器训练过程
function optimize() {
  result = {};
}

// Worker线程的onmessage逻辑
workerPort.onmessage = function (e: MessageEvents) {
  let data = e.data
  // 根据传输的数据的type选择进行操作
  switch (data.type) {
    case 0:
    // 进行训练
      optimize();
    // 训练之后发送主线程训练成功的消息
      workerPort.postMessage({ type: 'message', value: 'train success.' });
      break;
    case 1:
    // 执行预测
      const output = predict(data.value);
    // 发送主线程预测的结果
      workerPort.postMessage({ type: 'predict', value: output });
      break;
    default:
      workerPort.postMessage({ type: 'message', value: 'send message is invalid' });
      break;
  }
}

4、线程销毁

主线程接收线程销毁后的执行逻辑

// Worker线程销毁后,执行onexit回调方法
workerInstance.onexit = function() {
  console.info("main thread terminate");
}

方式一:在宿主线程中通过调用terminate()方法销毁Worker线程,并终止Worker接收消息。

// 销毁Worker线程
workerInstance.terminate();

方式二:在Worker线程中通过调用close()方法主动销毁Worker线程,并终止Worker接收消息。

// 销毁线程
workerPort.close();

🦋3.5 I/O密集型任务开发指导

import fs from '@ohos.file.fs';
import taskpool from '@ohos.taskpool';

// 定义并发函数,内部密集调用I/O能力
@Concurrent
async function concurrentTest(fileList: string[]) {
  // 写入文件的实现
  async function write(data, filePath) {
    let file = await fs.open(filePath, fs.OpenMode.READ_WRITE);
    await fs.write(file.fd, data);
    fs.close(file);
  }
  // 循环写文件操作
  for (let i = 0; i < fileList.length; i++) {
    write('Hello World!', fileList[i]).then(() => {
      console.info(`Succeeded in writing the file. FileList: ${fileList[i]}`);
    }).catch((err) => {
      console.error(`Failed to write the file. Code is ${err.code}, message is ${err.message}`)
      return false;
    })
  }
  return true;
}

let filePath1 = ...; // 应用文件路径
let filePath2 = ...;

// 使用TaskPool执行包含密集I/O的并发函数
// 数组较大时,I/O密集型任务任务分发也会抢占主线程,需要使用多线程能力
taskpool.execute(concurrentTest, [filePath1, filePath2]).then((ret) => {
  // 调度结果处理
  console.info(`The result: ${ret}`);
})

🦋3.6 同步任务开发指导

在异步编程中,任务同步是指在多个异步任务之间进行协调和同步执行的过程。当存在多个异步任务需要按照一定的顺序或条件进行执行时,任务同步可以确保任务按照预期的顺序或条件进行执行,以避免竞态条件或程序错误。

常见的任务同步方式包括:

  1. 回调函数:通过在一个异步任务完成后触发回调函数来执行下一个任务。

  2. Promise/异步函数:使用Promise或异步函数的异步链式调用,通过then或await等关键字确保任务按顺序执行。

  3. 线程间通信:通过消息队列或信号量等机制,在异步任务之间传递消息或信号,使得任务按特定的顺序或条件执行。

  4. 锁或互斥体:使用锁或互斥体等同步机制,在异步任务之间实现互斥访问,确保任务按照顺序执行。

任务同步的目的是确保异步任务能够按照一定的顺序或条件执行,以避免竞态条件、数据错误或逻辑错误。

☀️3.6.1 使用TaskPool处理同步任务

// Handle.ts 代码
export default class Handle {
  static getInstance() {
    // 返回单例对象
  }

  static syncGet() {
    // 同步Get方法
    return;
  }

  static syncSet(num: number) {
    // 同步Set方法
    return;
  }
}
// Index.ets代码
import taskpool from '@ohos.taskpool';
import Handle from './Handle'; // 返回静态句柄

// 步骤1: 定义并发函数,内部调用同步方法
@Concurrent
function func(num: number) {
  // 调用静态类对象中实现的同步等待调用
  Handle.syncSet(num);
  // 或者调用单例对象中实现的同步等待调用
  Handle.getInstance().syncGet();
  return true;
}

// 步骤2: 创建任务并执行
async function asyncGet() {
  // 创建task并传入函数func
  let task = new taskpool.Task(func, 1);
  // 执行task任务,获取结果res
  let res = await taskpool.execute(task);
  // 对同步逻辑后的结果进行操作
  console.info(String(res));
}

@Entry
@Component
struct Index {
  @State message: string = 'Hello World';

  build() {
    Row() {
      Column() {
        Text(this.message)
          .fontSize(50)
          .fontWeight(FontWeight.Bold)
          .onClick(() => {
            // 步骤3: 执行并发操作
            asyncGet();
          })
      }
      .width('100%')
      .height('100%')
    }
  }
}

☀️3.6.2 使用Worker处理关联的同步任务

1、UI界面

import worker from '@ohos.worker';

@Entry
@Component
struct Index {
  @State message: string = 'Hello World';

  build() {
    Row() {
      Column() {
        Text(this.message)
          .fontSize(50)
          .fontWeight(FontWeight.Bold)
          .onClick(() => {
            let w = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
            w.onmessage = function (d) {
              // 接收Worker子线程的结果
            }
            w.onerror = function (d) {
              // 接收Worker子线程的错误信息
            }
            // 向Worker子线程发送Set消息
            w.postMessage({'type': 0, 'data': 'data'})
            // 向Worker子线程发送Get消息
            w.postMessage({'type': 1})
            // ...
            // 根据实际业务,选择时机以销毁线程
            w.terminate()
          })
      }
      .width('100%')
    }
    .height('100%')
  }
}

2、Worker.ts

// handle.ts代码
export default class Handle {
  syncGet() {
    return;
  }

  syncSet(num: number) {
    return;
  }
}

// Worker.ts代码
import worker, { ThreadWorkerGlobalScope, MessageEvents } from '@ohos.worker';
import Handle from './handle.ts'  // 返回句柄

var workerPort : ThreadWorkerGlobalScope = worker.workerPort;

// 无法传输的句柄,所有操作依赖此句柄
var handler = new Handle()

// Worker线程的onmessage逻辑
workerPort.onmessage = function(e : MessageEvents) {
  switch (e.data.type) {
    case 0:
      handler.syncSet(e.data.data);
      workerPort.postMessage('success set');
    case 1:
      handler.syncGet();
      workerPort.postMessage('success get');
  }
}

🚀感谢:给读者的一封信

亲爱的读者,

我在这篇文章中投入了大量的心血和时间,希望为您提供有价值的内容。这篇文章包含了深入的研究和个人经验,我相信这些信息对您非常有帮助。

如果您觉得这篇文章对您有所帮助,我诚恳地请求您考虑赞赏1元钱的支持。这个金额不会对您的财务状况造成负担,但它会对我继续创作高质量的内容产生积极的影响。

我之所以写这篇文章,是因为我热爱分享有用的知识和见解。您的支持将帮助我继续这个使命,也鼓励我花更多的时间和精力创作更多有价值的内容。

如果您愿意支持我的创作,请扫描下面二维码,您的支持将不胜感激。同时,如果您有任何反馈或建议,也欢迎与我分享。

在这里插入图片描述

再次感谢您的阅读和支持!

最诚挚的问候, “愚公搬代码”

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。