Cartographer中的线程池操作

举报
Hermit_Rabbit 发表于 2022/10/21 22:27:11 2022/10/21
【摘要】 0. 简介在设计复杂的运行程序时,我们经常需要创建一定数量的线程,然而很多时候线程不都是一直执行的,会存在一些线程处于空闲状态。所以通过线程池的方式,可以有效的对线程进行分配。若线程池中有空闲线程,则从线程池中取出一个空闲的线程处理该任务,任务处理完后,该线程被放到线程池中;若线程池中无空闲线程,则将任务放入任务队列等待线程池中有线程空闲,这样的处理方式可以避免线程在建立与销毁时存在的开销...

0. 简介

在设计复杂的运行程序时,我们经常需要创建一定数量的线程,然而很多时候线程不都是一直执行的,会存在一些线程处于空闲状态。所以通过线程池的方式,可以有效的对线程进行分配。若线程池中有空闲线程,则从线程池中取出一个空闲的线程处理该任务,任务处理完后,该线程被放到线程池中;若线程池中无空闲线程,则将任务放入任务队列等待线程池中有线程空闲,这样的处理方式可以避免线程在建立与销毁时存在的开销。
在这里插入图片描述

1. 基础知识

2. Cartographer中的线程池

关键参数

  1. 任务队列(task_queue_):每个线程的DoWork()线程空闲时都会通过反复读取该队列来获得任务,各线程通过互斥锁防止同时读取。
  2. 等待队列(tasks_not_ready_):尚未ready的任务队列,其其依赖的任务(dependent_tasks_)还没准备好,直到dependent_tasks_都完成了,主任务(Task)才能执行将tasks_not_ready转为task_queue_。
  3. 任务依赖(dependent_tasks_):dependent_tasks_都完成了,主任务(Task)才能执行

核心函数

创建线程池:

ThreadPool::ThreadPool(int num_threads) {
  absl::MutexLock locker(&mutex_);
  for (int i = 0; i != num_threads; ++i) {
    pool_.emplace_back([this]() { ThreadPool::DoWork(); });
  }
}

thread_pool::DoWork():

初始化的时候每个线程都执行该死循环函数ThreadPool::DoWork(),并直到析构才返回,只要task_queue有任务,就执行操作。

void ThreadPool::DoWork() {
#ifdef __linux__
  // This changes the per-thread nice level of the current thread on Linux. We
  // do this so that the background work done by the thread pool is not taking
  // away CPU resources from more important foreground threads.
  CHECK_NE(nice(10), -1);
#endif
  const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
    return !task_queue_.empty() || !running_;
  };
  for (;;) {
    std::shared_ptr<Task> task;
    {
      absl::MutexLock locker(&mutex_);
      mutex_.Await(absl::Condition(&predicate));
      if (!task_queue_.empty()) {
        task = std::move(task_queue_.front());
        task_queue_.pop_front();
      } else if (!running_) {
        return;
      }
    }
    CHECK(task);
    CHECK_EQ(task->GetState(), common::Task::DEPENDENCIES_COMPLETED);
    Execute(task.get());
  }
}

在初始化成功并拿到task_queue_后,该Task将会按以下几个状态顺序执行:

  enum State { NEW, DISPATCHED, DEPENDENCIES_COMPLETED, RUNNING, COMPLETED };
  • NEW(新建任务,还未schedule到线程池)

  • DISPATCHED(任务已经schedule到线程池)

  • DEPENDENCIES_COMPLETED(任务依赖已经执行完成)

  • RUNNING(任务执行中)

  • COMPLETED(任务完成)

task->SetWorkItem(task)

新建task实例,状态默认为NEW,然后通过task->SetWorkItem设置任务(示例中运行的函数为DrainWorkQueue)

  auto scan_matcher_task = absl::make_unique<common::Task>();
  scan_matcher_task->SetWorkItem(
      [&submap_scan_matcher, &scan_matcher_options]() {
        submap_scan_matcher.fast_correlative_scan_matcher =
            absl::make_unique<scan_matching::FastCorrelativeScanMatcher2D>(
                *submap_scan_matcher.grid, scan_matcher_options);
      });
  submap_scan_matcher.creation_task_handle =
      thread_pool_->Schedule(std::move(scan_matcher_task));

task2->AddDependency(task1)

该函数是可选的,有依赖的任务才需添加,其含义是task2依赖于task1,只有在task1执行完后,task2才能执行。

  auto constraint_task = absl::make_unique<common::Task>();
  constraint_task->SetWorkItem([=]() LOCKS_EXCLUDED(mutex_) {
    ComputeConstraint(submap_id, submap, node_id, true, /* match_full_submap */
                      constant_data, transform::Rigid2d::Identity(),
                      *scan_matcher, constraint);
  });
  constraint_task->AddDependency(scan_matcher->creation_task_handle);
  auto constraint_task_handle =
      thread_pool_->Schedule(std::move(constraint_task));

thread_pool->Schedule(task)

将task赋给tasks_not_ready_并将task状态变为DISPATCHED,判断其依赖的任务是否加载完成,若完成则将状态置为DEPENDENCIES_COMPLETED,然后task加入task_queue_并从tasks_not_ready_移除等待线程执行任务;若依赖未完成,则等待依赖的task执行完。在cartographer中存在有不同的数据用于动态加载,可能存在有依赖,此时Schedule起到了层级的作用。

std::weak_ptr<Task> ThreadPool::Schedule(std::unique_ptr<Task> task) {
  std::shared_ptr<Task> shared_task;
  {
    std::lock_guard<std::mutex> lock(mutex_);
    auto insert_result =
        tasks_not_ready_.insert(std::make_pair(task.get(), std::move(task)));
    if (!insert_result.second) {
      return insert_result.first->second;
      // throw std::runtime_error("a task has been scheduled twice");
    }
    shared_task = insert_result.first->second;
  }
  SetThreadPool(shared_task.get());
  return shared_task;
}

…详情请参照古月居

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。