1️⃣ 添加 std::jthread 的动机

std::thread的缺点

缺点一

  • std::thread要求在其生命周期结束时,若表示正在运行的线程,则调用join()(等待线程结束)或detach()(让线程在后台运行)。
    • 若两者都没有调用,析构函数会立即导致异常的程序终止(在某些系统上导致段错误)。

    • 由于这个原因,下面的代码会出现错误(除非不关心程序异常的终止)

      1
      2
      3
      4
      void foo()
      {
      std::thread t{task, name, val};
      }
    • 当没有调用join()detach()就表示正在运行的线程t在析构时,程序会调用std:terminate(),后者调用std::abort()

缺点二

  • 即使使用join()来等待正在运行的线程结束,任然会有一个严重的问题
    1
    2
    3
    4
    5
    6
    void foo()
    {
    std::thread t{task, name, val};
    ...
    t.join();
    }
    • 这段代码还可能导致程序异常终止,线程开始和调用join()之间的foo()之间的代码发生异常时(或者控制流从未到达join()),就没有调用t.join()
  • 常规解决方案
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    void foo ()
    {
    ...
    std::thread t(task, name, val);
    try {
    ...
    }
    catch (...)
    {
    t.join();
    throw;
    }
    t.join() ;
    ...
    }
    • 这里,通过确保在离开作用域时调用join()来对异常作出反应,而不解决异常。
      • 不幸的是,这可能会导致阻塞(永远)。
      • 调用detach()也是一个问题,因为线程在程序的后台继续运行,使用CPU时间和资源,而这些时间和资源现在可能会销毁。
    • 若在更复杂的上下文中使用多个线程,问题会变得更糟,并且会产生非常糟糕的代码。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      void foo()
      {
      std::thread tl(taskl, name, val);
      std::thread t2;
      try {
      t2 = std::thread(task2, name, val);
      }
      catch (...)
      {
      t1.join();
      if (t2.joinable())
      {
      t2.join();
      }
      throw;
      }
      t1.join() ;
      t2.join() ;
      ...
      }
      • 启动第一个线程之后,启动第二个线程可能会抛出异常,因此启动第二个线程必须在try子句中进行。
      • 另一方面,可在同一范围内使用和join()两个线程。为了满足这两个需求,必须前向声明第二个线程,并在第一个线程的try子句中对其进行赋值。
      • 此外,对于异常,必须检查第二个线程是否启动,对没有关联线程的线程对象调用join()会导致异常。
        另一个问题是,对两个线程调用join()可能会花费大量时间(甚至永远)。
    • 注意,不能 “杀死” 已经启动的线程。线程不是进程,线程只能通过结束自身或结束整个程序来结束。

使用 std::jthread

  • std::jthread解决了这些问题,它是RAII类型。若线程是可汇入的(“j” 代表 “汇入”),析构函数会自动调用join()。这让上面的复杂代码变简单了:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    void foo ()
    {
    // start thread calling taskl() with name and val as arguments:
    std::jthread tl(taskl, name, val);
    // start thread calling task2 () with name and val as arguments:
    std::jthread t2(task2, name, val);
    // wait for threads to finish:
    t1.join();
    t2.join();
    }
    • 使用std::jthread就不再存在导致异常程序终止的危险,也不需要异常处理。
    • 为了支持尽可能容易地切换到std:jthread类,该类提供了与std:thread相同的 API,包括:
      • 使用相同的头文件<thread>
      • 当调用get_id()时返回std:thread:id(std:jthread::id类型只是一个别名类型)
      • 提供静态成员hardware_concurrency()
    • 只需用std:jthread替换std:thread并重新编译,代码就会变得更安全。

std::jthread更多的特性

停止令牌和停止回调

  • std:jthread做的更多:提供了一种使用 停止令牌 发出取消信号的机制,这些令牌由jthread的析构函数在调用join()之前使用,由线程启动的可调用对象(函数、函数对象或Lambda)必须支持此请求:

    • 若可调用对象只为所有传递的参数提供参数,将忽略停止请求:
      1
      2
      3
      4
      5
      void task (std::string S, double value)
      {
      ... // join() waits until this code ends
      }

    • 为了响应停止请求,可调用对象可以添加一个新的可选第一个类型为std:stop_token的参数,并不时检查是否请求了停止:
      1
      2
      3
      4
      5
      6
      7
      void task (std::stop_token st,std::string s, double value)
      {
      while (!st.stop_requested()) // stop requested (e.g., by the destructor)?
      {
      ... // ensure we check from time to time
      }
      }
  • 所以,std::jthread提供了一种 协作机制 来表示线程不应该再运行。它是 “协作的” ,因为该机制不会杀死正在运行的线程(因为C++线程根本不支持杀死线程,所以杀死线程的操作可能很容易使
    程序处于损坏状态)。为了响应停止请求,已启动的线程必须声明停止令牌作为附加的第一个参数,
    并使用它不时的检查是否应该继续运行。也可以手动请求已经启动的jthread停止。例如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    void foo()
    {
    // start thread calling task() with name and val as arguments:
    std::jthread t(task, name, val);
    if(...)
    {
    t.request_stop();
    }
    ...
    // wait for thread to finish:
    t.join();
    ...
    }
  • 此外,还有另一种对停止请求作出反应的方法:可以为停止令牌注册回调,该回调将在请求停止时自动调用:

    1
    2
    3
    4
    5
    6
    7
    void task (std::stop_token st,std::string S, double value)
    {
    std::stop_callback cb{st, [] {
    ... // called on a stop request
    }};
    ...
    }
  • 因此,停止执行task()线程的请求(无论是显式调用request_stop()还是由析构函数引起)都会调用注册为停止回调的lambda

    • 请注意,回调通常由请求停止的线程调用
    • 停止回调函数cb的生命周期结束时,析构函数自动注销回调函数,以便在之后发出停止信号时不再调用该回调函数,也可以通过这种方式注册任意数量的可调用对象(函数、函数对象或Lambda)。
    • 停止机制比最初看起来更灵活:
      • 可以传递句柄来请求停止,并传递令牌来检查请求的停止。
      • 支持条件变量,这样一个有信号的停止就可以中断一个等待。
      • 也可以独立于std:jthread使用这种机制来请求和检查停止。
  • 线程、停止源、停止令牌和停止回调之间也没有生命周期约束,存储停止状态的位置在堆上分配。当线程和使用此状态的最后一个停止源、停止令牌或停止回调函数销毁时,用于停止状态的内存将释放。

停止令牌和条件变量

  • 当请求停止时,线程可能会因等待条件变量的通知而阻塞。
  • 停止令牌的回调接口也支持这种情况。可以使用传递的停止令牌为条件变量调用 wait(),以便在请求停止时暂停等待。
    • 由于技术原因,必须使用 std::condition_variable_any 类型作为条件变量。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      include <iostream>
      #include <queue>
      #include <thread>
      #include <stop_token>
      #include <mutex>
      #include <condition_variable>
      using namespace
      std::literals; // for duration literals

      int main()
      {
      std::queue<std::string> messages;
      std::mutex messagesMx;
      std::condition_variable_any messagescv;

      // start thread that prints messages that occur in the queue:
      std:jthread tl{ [s] (std:stop_token st){
      while (Ist.stop_requested())
      {
      std::string msg;
      {
      // wait for the next message:
      std::unique_lock lock(messagesMx);
      if (!messagescv.wait(lock, st,[&] {
      return Imessages.empty();}))
      {
      return; // stop requested
      }
      // retrieve the next message out of the queue:
      msg - messages.front();
      messages.pop () :
      }
      // print the next message:
      std::cout << "msg: " << msg << std::endl,
      }
      }};


      // store 3 messages and notify one waiting thread each time:
      for (std::string s : ("Tic", "Tac", "Toe"))
      {
      std:scoped_lock Ig(messagesMx);
      messages.push(s);
      messagesCv.notify_one();
      }

      // after some time
      // - store l message and notify all waiting threads:
      std::this_thread::sleep_for(ls);
      {
      std::scoped_1ock Ig(messagesMx);
      messages.push("done");
      messagesCV.notify_all();
      }

      // after some time
      //- end program (requests stop, which interrupts wait())
      std::this_thread::sleep_for(ls);
      }
  • 用停止令牌调用 wait(),在这里传递停止令牌信号来停止线程,所以wait现在可能会结束的原因有两个:
    • 一个通知 (队列不再为空)
    • 要求停止
  • wait() 的返回值表示是否满足条件。若返回 false,则结束 wait() 的原因是请求停止,则可以做出相应的反应。

2️⃣ 停止来源和停止令牌

基本概念

  • C++20 不仅为线程提供了停止令牌。它是一种通用机制,可以异步请求停止,并使用各种方式
    对该请求作出响应。

机制

  • C++20 标准库可创建一个共享的停止状态。默认情况下,停止状态不会发出信号。
  • 类型为 std::stop_source 的停止源可以在其关联的共享停止状态下请求停止
  • std::stop_token 类型的停止令牌可用于,在其关联的共享停止状态下响应停止请求
    • 可以主动轮询是否请求了停止
    • 或者注册一个类型为 std::stop_callback 的回调,该回调将在请求停止时调用。
  • 当停止请求发出,就不能撤销 (后续的停止请求无效)。
  • 可以复制和移动停止源和停止令牌,允许在代码的多个位置发出停止信号或对停止做出反应。
    • 复制源代码或令牌的成本相对较低,因此按值传递可避免生命周期问题。
    • 然而,复制并不像传递整型值或原始指针那么简单,更像是传递一个共享指针。若经常将其传递给子函数,那么通过引用传递可能会更好。
  • 该机制是线程安全的,可以在并发情况下使用。停止请求、检查请求的停止,以及对注册或取消注册的回调调用需要同步,并且当最后一个用户 (停止源、停止令牌或停止回调) 销毁时,关联的共享停止状态将自动销毁。

创建停止源和停止令牌

1
2
3
4
5
#include <stop_token>

// create stop_source and stop_token:
std::stop_source ssrc; // creates a shared stop state
std::stop_token stok{ssrc.get_token()}; // creates a token for the stop state
  • 第一步是简单地创建 stop_source 对象,该对象提供了请求停止的 API。构造函数还创建关联的共享停止状态,就可以向停止源请求 stop_token 对象,该对象提供对停止请求作出反应的 API(通过轮询或注册回调)。
  • 然后,可以将令牌 (或源) 传递给位置/线程,以在可能请求停止的位置可能对停止做出反应的位置之间建立异步通信
  • 没有其他方法可以创建具有关联的共享停止状态的停止令牌,停止令牌的默认构造函数没有关联的停止状态。

停止源和停止令牌的详情

停止源的详情