线程模块

线程的作用

  • 在现代 C++ 网络服务器框架中,线程模块是实现并发处理、任务调度和系统资源充分利用的核心组件。Sylar 框架中的线程模块(Thread 类)封装了 POSIX pthread 接口,简化了线程的创建、命名、同步与生命周期管理。

整体框架

Thread

模块解析

私有成员变量

  • m_name :当前子线程的名字,在构造函数中我们需要给每一个线程赋予名字。
  • m_id :当前子线程的真实内核级线程IDLinux 下使用 pid_t 唯一标识一个线程。
  • m_thread :当前子线程的线程标识符,用于表示一个线程对象
  • m_cb :当前子线程绑定的回调函数,同样在构造函数中需要给每一个线程赋予它们所需要执行的函数
  • m_semphore :当前子线程的信号量,用于进行线程之间的同步

重要成员函数

  • 构造函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    Thread::Thread(std::function<void()> cb, const std::string &name)
    : m_cb(cb)
    , m_name(name)
    {
    if (name.empty())
    {
    m_name = "UNKNOWN";
    }
    int rt = pthread_create(&m_thread, nullptr, &Thread::run, this);
    if (rt)
    {
    SYLAR_LOG_ERROR(g_logger) << "pthread create thread fail , rt=" << rt << "name="<< name;
    throw std::logic_error("pthread create error");
    }
    m_semaphore.wait();
    }
    • 参数解析
      • cb :当前线程绑定的回调函数
      • name :当前线程的名称。
    • 函数解析
      • 通过调用系统API pthread_create ,创建一个新线程,并将该线程的执行函数设置为 Thread 类中的静态函数 run,同时将当前对象的指针 this 作为参数进行传入。之后,新线程的 ID 被保存在成员变量 m_thread 中。
      • 与此同时,主线程执行 m_semaphore.wait() 等待子线程进行线程的初始化以及完成对应的回调函数。
  • 析构函数 ~Thread()

    1
    2
    3
    4
    5
    6
    7
    Thread::~Thread()
    {
    if (m_thread)
    {
    pthread_detach(m_thread);
    }
    }
    • 函数解析
      • 如果主线程已经要结束了,但是子线程仍然存在,就子线程设置为分离线程。这表示主线程不会去调用 pthread_join 来等待它退出。
  • run

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    void* Thread::run(void* arg)
    {
    Thread* thread = (Thread*)(arg);
    t_thread = thread;
    t_thread_name = thread->m_name;
    thread->m_id = sylar::GetThreadId();
    pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());

    std::function<void()> cb;
    cb.swap(thread->m_cb);

    thread->m_semaphore.notify();
    cb();
    return 0;
    }
    • 参数解析
      • argvoid* 标识可以接收任意类型的指针参数,在此刻 void* 主要接受的是构造函数里 pthread_create 传入的 this 指针,即 Thread*
    • 函数解析
      • t_thread 的类型是静态线程局部变量 Thread*
      • thread_local 表示每个线程都有对当前变量的副本,即每个线程对线程变量都是独立的,不会相互干扰。
      • 由于每个 Thread 传入的参数都是 this 指针(指向自己的指针)。所以 t_thread 保存的都是当前的 Thread 对象的指针。t_thread_name 保存的是当前 Thread 对象的名称。之后,指针 thread 设置当前 Thread 的线程 id,通过系统 API 设置当前线程的名称。最后,通过信号量 m_semphore 唤醒当时等待在构造函数里的主线程,然后子线程执行完回调函数 cb 就终止了。
  • GetThis

    1
    2
    3
    4
    5
    6
    static thread_local Thread* t_thread = nullptr;
    ...
    Thread* Thread::GetThis()
    {
    return t_thread;
    }
    • 函数解析
      • 可知 t_thread 保存的就是当前 Thread 对象的指针。
      • 函数 GetThis() 返回当前线程所对应的 Thread 对象指针。
  • GetName

    1
    2
    3
    4
    const std::string& Thread::GetName()
    {
    return t_thread_name;
    }
    • 函数解析
      • 可知 t_thread_name 保存的就是当前 Thread 对象的名字。
      • 函数 GetName() 返回当前线程所对应的 Thread 对象的名字。

线程同步模块

线程同步的作用

  • 线程同步的作用主要在于保证多线程环境下对共享资源的安全访问,防止出现数据竞争、死锁、脏读等问题,从而确保程序行为的正确性和一致性。

为什么需要线程同步

  • 数据竞争(Race Condition):多个线程并发访问同一个资源,读写操作交叉执行,导致数据状态不一致。
  • 脏读/写:一个线程正在写数据,另一个线程同时读取,读到的是未完成或错误的中间状态。
  • 死锁(Deadlock):两个或多个线程互相等待对方释放资源,导致程序无法继续执行。

信号量

  • Semphore 的作用

    • 信号量(Semaphore)是一个计数器,用于控制对某个共享资源的访问数量,实现线程之间的同步与互斥。
  • 基本原理

    • 信号量维护一个整型计数器 count :
      • 当线程执行 wait()P 操作):
        • 如果 count > 0,则将 count–,允许线程继续执行;
        • 如果 count == 0,线程会阻塞等待资源可用。
      • 当线程执行 notify()V 操作):
        • count++,表示释放一个资源;
        • 如果有线程在等待,将唤醒其中一个。
  • 整体框架

    Semphore

  • 函数解析

    • 构造函数

      1
      2
      3
      4
      5
      6
      7
      Semaphore::Semaphore(uint32_t count)
      {
      if (sem_init(&m_semaphore, 0, count))
      {
      throw std::logic_error("sem_init error");
      }
      }
    • 析构函数

      1
      2
      3
      4
      5
      Semaphore::~Semaphore()
      {
      sem_destroy(&m_semaphore);

      }
    • wait

      1
      2
      3
      4
      5
      6
      7
      void Semaphore::wait()
      {
      if (sem_wait(&m_semaphore))
      {
      throw std::logic_error("sem_wait error");
      }
      }
    • notify

      1
      2
      3
      4
      5
      6
      7
      void Semaphore::notify()
      {
      if (sem_post(&m_semaphore))
      {
      throw std::logic_error("sem_post error");
      }
      }

互斥量

  • Mutex 的作用

    • 互斥量( Mutex )是一种用于线程间互斥访问共享资源的同步原语。它保证同一时间只有一个线程可以访问某段临界区critical section)代码或共享资源。
  • 为什么需要 Mutex

    • 防止多线程同时访问共享资源造成数据竞态(Race Condition)。
      • 多个线程并发访问共享资源(如变量、文件、数据库)时,如果不加保护,会出现不可预测的错误。互斥量能防止这种情况。
    • 保证临界区代码的线程安全
      • 在临界区(修改共享变量的代码段)前加锁,退出时解锁,保证任何时刻最多只有一个线程运行这段代码。
  • 基本原理

    • 加锁( lock
      • 如果没有被锁定,当前线程获得锁,继续执行;
      • 如果已被锁定,当前线程将阻塞直到获得锁。
    • 解锁( unlock )
      • 释放锁,如果有阻塞线程,会唤醒其中一个。
  • 整体框架

    Mutex

  • 函数解析

    • 构造函数

      1
      2
      3
      4
      Mutex()
      {
      pthread_mutex_init(&mutex, nullptr);
      }
    • 析构函数

      1
      2
      3
      4
      ~Mutex()
      {
      pthread_mutex_destroy(&mutex);
      }
    • lock

      1
      2
      3
      4
      void lock()
      {
      pthread_mutex_lock(&mutex);
      }
    • unlock

      1
      2
      3
      4
      void unlock()
      {
      pthread_mutex_unlock(&mutex);
      }

读写锁

  • RWMutex 的作用

    • 读写锁是一种高级同步原语,适用于:读多写少 的共享资源访问场景。它允许多个线程同时读取(读锁共享)以及写操作时独占访问(写锁互斥)。
  • 为什么需要 RWMutex

    • 支持高并发读
      • 多个线程可以同时加读锁,避免阻塞,从而提升读取性能。
    • 写操作安全
      • 当一个线程加写锁时,会阻塞其他所有的读锁或写锁请求,确保写操作的原子性和一致性。
    • 适用于读多写少的共享资源
  • 基本原理

    • 基本状态变量

      1
      2
      3
      4
      5
      6
      int readers = 0;          // 当前持有读锁的线程数
      int writers = 0; // 当前持有写锁的线程数(最多为1)
      int write_requests = 0; // 正在等待获取写锁的线程数
      std::mutex mtx; // 内部互斥锁保护状态变量
      std::condition_variable reader_cv;
      std::condition_variable writer_cv;
    • 加读锁(读线程调用)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
        lock(mtx);
      while (writers > 0 || write_requests > 0)
      {
      reader_cv.wait(mtx); // 等待写线程释放或避免饿死写线程
      }
      readers++;
      unlock(mtx);
      ```
      - 加写锁(写线程调用)

      ```cpp
      lock(mtx);
      write_requests++;
      while (readers > 0 || writers > 0)
      {
      writer_cv.wait(mtx); // 等待所有读和写线程释放锁
      }
      write_requests--;
      writers = 1;
      unlock(mtx);
      ```

      - 解锁

      ```cpp
      lock(mtx);
      if (当前线程是读锁)
      {
      readers--;
      if (readers == 0 && write_requests > 0)
      writer_cv.notify_one(); // 唤醒写线程
      }
      else { // 写锁
      writers = 0;
      if (write_requests > 0)
      writer_cv.notify_one(); // 写优先
      else
      reader_cv.notify_all(); // 唤醒所有读线程
      }
      unlock(mtx);
  • 整体框架

    RWMutex

  • 函数解析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    class RWMutex : Noncopyable
    {
    public:
    using ReadLock = ReadScopedLockImpl<RWMutex>;
    using WriteLock = WriteScopedLockImpl<RWMutex>;

    RWMutex()
    {
    pthread_rwlock_init(&m_lock,nullptr);
    }

    ~RWMutex()
    {
    pthread_rwlock_destroy(&m_lock);
    }

    void rdlock()
    {
    pthread_rwlock_rdlock(&m_lock);
    }

    void wrlock()
    {
    pthread_rwlock_wrlock(&m_lock);
    }

    void unlock()
    {
    pthread_rwlock_unlock(&m_lock);
    }
    private:
    pthread_rwlock_t m_lock; // 读写锁
    };