门闩和栅栏

门闩和栅栏是比较简单的线程同步机制,其能使一些线程阻塞,直到计数器变为零时解除阻塞。首先,不要把栅栏和内存栅栏混为一谈。C++ 20/23中,我们假设有三种门闩和栅栏:std::latchstd::barrierstd::flex_barrier

首先,要回答两个问题:

  1. 这三种同步线程的机制有什么不同?std::latch只能使用一次,但是std::barrierstd::flex_barrier可以使用多次。此外,std::flex_barrier允许计数器变为0时执行一个函数。

  2. 哪些支持的门闩和栅栏的用例,在C++11和C++14中无法通过future、线程或条件变量与锁结合来实现呢?门闩和栅栏并不涉及新的用例,但它们使用起来要容易得多。通常是在内部使用无锁机制,所以它们还具有更高的性能。

std::latch

std::latch门闩是一个倒计时器,该值可以在构造函数中设置。门闩可以通过使用latch.count_down_and_wait来减小计数,并阻塞线程,直到计数器变为0。另外,latch.count_down(n)可以将计数器减少n,而不进行阻塞。如果没有给出参数,n默认为1。门闩也有latch.is_ready可以用来检查计数器是否为零,以及latch.wait会阻塞线程,直到计数器变为零。std::latch的计数器不能增加或重置,因此不能复用。

下面是来自N4204提案的一个简短代码段。

void DoWork(threadpool *pool){
  latch completion_latch(NTASKS);
  for (int i = 0; i < NTASKS; ++i){
    pool->add_task([&]{
      // perform work
        ...
        completion_latch.count_down();
    }); 
  }
  // Block until work is done
  completion_latch.wait();
}

std::latch completion_latch在其构造函数中将计数器设置为NTASKS (第2行),线程池执行NTASKS(第4 - 8行)个任务。每个任务结束时(第7行),计数器递减。第11行是运行DoWork函数的线程,以及工作流的栅栏。这样,线程就会阻塞,直到所有任务都完成。

std::barrierstd::latch非常相似。

std::barrier

std::latchstd::barrier之间的区别是,std::barrier计数器可以重置,所以可以多次地使用。计数器变为零之后,立即进入完成阶段。与std::flex_barrier有关,std::barrier有一个空的完成阶段。std::barrier有两个有趣的成员函数:std::arrive_and_waitstd::arrive_and_drop。当std::arrive_and_wait在同步点阻塞时,std::arrive_and_drop会从相关线程集中,删除自己的线程。未指定此函数是否阻塞,直到完成阶段结束。这里没有对函数块进行指定,是否到完成阶段才算结束。

N4204提案

该建议使用vector<thread*>,并将动态分配的线程推给vector:workers.push_back(new thread([&]{ ... }))。这会产生内存泄漏。应该将线程放到std::unique_ptr中,或者直接在vector中进行创建: workers.emplace_back([&]{ ... }),这个适用于std::barrierstd::flex_barrier。本例中使用std::flex_barrier的名称有点迷,例如:std::flex_barrier被称为notifying_barrier。所以我把名字改成flex_barrier,会更容易理解一些。此外,代表线程数量的n_threads没有初始化,我把它初始化为NTASKS。

深入研究std::flex_barrier和完成阶段之前,这里给出一个简短的示例,演示std::barrier的用法。

std::barrier

void DoWork(){
  Tasks& tasks;
  int n_threads{NTASKS};
  vector<thread*> workes;

  barrier task_barrier(n_threads);

  for (int i = 0; i < n_threads; ++i){
    workers.push_back(new thread([&]{
      bool active = ture;
      while(active){
        Task task = tasks.get();
        // perform task
        ...
        task_barrier.arrive_and_wait();
      }
    });
  }
  // Read each stage of the task until all stages are complete.
  while(!finished()){
    GetNextStage(tasks);
  }
}

第6行中的barrier用于协调多个执行线程,线程的数量是n_threads(第3行),每个线程通过tasks.get()获取(第12行中)任务,执行该任务并阻塞(第15行),直到所有线程完成其任务为止。之后,在第12行接受一个新任务,active在第11行返回true。

std::barrier不同,std::flex_barrier多一个构造函数。

std::flex_barrier

此构造函数接受在完成阶段调用可调用单元。可调用单元必须返回一个数字,使用这个数字设置计数器的值,返回-1意味着计数器在下一次迭代中保持相同的计数器值,而小于-1的数字是不允许的。

完成阶段会执行以下步骤:

  1. 阻塞全部线程

  2. 任意个线程解除阻塞,并执行可调用单元。

  3. 如果完成阶段已经完成,那么所有线程都将解除阻塞。

下面的段代码展示了std::flex_barrier的用法

void DoWork(){
  Tasks& tasks;
  int initial_threads;
  int n_threads{NTASKS};
  atomic<int> current_threads(initial_threads);
  vector<thread*> workers;

  // Create a flex_barrier, and set a lambda that will be
  // invoked every time the barrier counts down. If one or more
  // active threads have completed, reduce the number of threads.
  std::function rf = [&]{return current_threads;};
  flex_barrier task_barrier(n_threads, rf);

  for (int i = 0; i < n_threads; ++i){
   workers.push_back(new thread([&]{
     bool active = true;
     while(active) {
       Task task = tasks.get();
       // perform task
          ...
       if (finished(task)){
         current_threads--;
         active = false;
       }
       task_barrier.arrive_and_wait();
     }     
    })); 
  }

  // Read each stage of the task until all stages are cpmplete.
  while(!finished()){
    GetNextStage(tasks);
  }
}

这个例子采用了与std::barrier类似的策略,不同的是这次std::flex_barrier计数器是在运行时进行调整,所以std::flex_barrier task_barrier在第11行获得一个Lambda函数。这个Lambda函数通过引用获取变量current_thread:[&] { return current_threads; }。变量在第21行进行递减,如果线程完成了任务,则将active设置为false。因此,计数器在完成阶段是递减的。

std::barrierstd::latch相比,std::flex_barrier可以增加计数器。

可以在cppreference.com上阅读关于std::latchstd::barrierstd::flex_barrier的更多细节。

Last updated