线程模块
线程模块
线程的作用
- 在现代
C++
网络服务器框架中,线程模块是实现并发处理、任务调度和系统资源充分利用的核心组件。Sylar
框架中的线程模块(Thread
类)封装了POSIX
的pthread
接口,简化了线程的创建、命名、同步与生命周期管理。
整体框架
模块解析
私有成员变量
m_name
:当前子线程的名字,在构造函数中我们需要给每一个线程赋予名字。m_id
:当前子线程的真实内核级线程ID,Linux
下使用pid_t
唯一标识一个线程。m_thread
:当前子线程的线程标识符,用于表示一个线程对象。m_cb
:当前子线程绑定的回调函数,同样在构造函数中需要给每一个线程赋予它们所需要执行的函数m_semphore
:当前子线程的信号量,用于进行线程之间的同步。
重要成员函数
构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16Thread::Thread(std::function<void()> cb, const std::string &name)
: m_cb(cb)
, m_name(name)
{
if (name.empty())
{
m_name = "UNKNOWN";
}
int rt = pthread_create(&m_thread, nullptr, &Thread::run, this);
if (rt)
{
SYLAR_LOG_ERROR(g_logger) << "pthread create thread fail , rt=" << rt << "name="<< name;
throw std::logic_error("pthread create error");
}
m_semaphore.wait();
}- 参数解析
cb
:当前线程绑定的回调函数。name
:当前线程的名称。
- 函数解析
- 通过调用系统API
pthread_create
,创建一个新线程,并将该线程的执行函数设置为Thread
类中的静态函数run
,同时将当前对象的指针this
作为参数进行传入。之后,新线程的ID
被保存在成员变量m_thread
中。 - 与此同时,主线程执行
m_semaphore.wait()
等待子线程进行线程的初始化以及完成对应的回调函数。
- 通过调用系统API
- 参数解析
析构函数
~Thread()
1
2
3
4
5
6
7Thread::~Thread()
{
if (m_thread)
{
pthread_detach(m_thread);
}
}- 函数解析
- 如果主线程已经要结束了,但是子线程仍然存在,就子线程设置为分离线程。这表示主线程不会去调用
pthread_join
来等待它退出。
- 如果主线程已经要结束了,但是子线程仍然存在,就子线程设置为分离线程。这表示主线程不会去调用
- 函数解析
run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15void* Thread::run(void* arg)
{
Thread* thread = (Thread*)(arg);
t_thread = thread;
t_thread_name = thread->m_name;
thread->m_id = sylar::GetThreadId();
pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());
std::function<void()> cb;
cb.swap(thread->m_cb);
thread->m_semaphore.notify();
cb();
return 0;
}- 参数解析
arg
:void*
标识可以接收任意类型的指针参数,在此刻void*
主要接受的是构造函数里pthread_create
传入的this
指针,即Thread*
。
- 函数解析
t_thread
的类型是静态线程局部变量Thread*
。thread_local
表示每个线程都有对当前变量的副本,即每个线程对线程变量都是独立的,不会相互干扰。- 由于每个
Thread
传入的参数都是this
指针(指向自己的指针)。所以t_thread
保存的都是当前的Thread
对象的指针。t_thread_name
保存的是当前Thread
对象的名称。之后,指针thread
设置当前Thread
的线程id
,通过系统API
设置当前线程的名称。最后,通过信号量m_semphore
唤醒当时等待在构造函数里的主线程,然后子线程执行完回调函数cb
就终止了。
- 参数解析
GetThis
1
2
3
4
5
6static thread_local Thread* t_thread = nullptr;
...
Thread* Thread::GetThis()
{
return t_thread;
}- 函数解析
- 可知
t_thread
保存的就是当前Thread
对象的指针。 - 函数
GetThis()
返回当前线程所对应的Thread
对象指针。
- 可知
- 函数解析
GetName
1
2
3
4const std::string& Thread::GetName()
{
return t_thread_name;
}- 函数解析
- 可知
t_thread_name
保存的就是当前Thread
对象的名字。 - 函数
GetName()
返回当前线程所对应的Thread
对象的名字。
- 可知
- 函数解析
线程同步模块
线程同步的作用
- 线程同步的作用主要在于保证多线程环境下对共享资源的安全访问,防止出现数据竞争、死锁、脏读等问题,从而确保程序行为的正确性和一致性。
为什么需要线程同步
- 数据竞争(
Race Condition
):多个线程并发访问同一个资源,读写操作交叉执行,导致数据状态不一致。 - 脏读/写:一个线程正在写数据,另一个线程同时读取,读到的是未完成或错误的中间状态。
- 死锁(
Deadlock
):两个或多个线程互相等待对方释放资源,导致程序无法继续执行。
信号量
Semphore
的作用- 信号量(
Semaphore
)是一个计数器,用于控制对某个共享资源的访问数量,实现线程之间的同步与互斥。
- 信号量(
基本原理
- 信号量维护一个整型计数器
count
:- 当线程执行
wait()
(P
操作):- 如果
count
> 0,则将count
–,允许线程继续执行; - 如果
count
== 0,线程会阻塞等待资源可用。
- 如果
- 当线程执行
notify()
(V
操作):- 将
count
++,表示释放一个资源; - 如果有线程在等待,将唤醒其中一个。
- 将
- 当线程执行
- 信号量维护一个整型计数器
整体框架
函数解析
构造函数
1
2
3
4
5
6
7Semaphore::Semaphore(uint32_t count)
{
if (sem_init(&m_semaphore, 0, count))
{
throw std::logic_error("sem_init error");
}
}析构函数
1
2
3
4
5Semaphore::~Semaphore()
{
sem_destroy(&m_semaphore);
}wait
1
2
3
4
5
6
7void Semaphore::wait()
{
if (sem_wait(&m_semaphore))
{
throw std::logic_error("sem_wait error");
}
}notify
1
2
3
4
5
6
7void Semaphore::notify()
{
if (sem_post(&m_semaphore))
{
throw std::logic_error("sem_post error");
}
}
互斥量
Mutex
的作用- 互斥量(
Mutex
)是一种用于线程间互斥访问共享资源的同步原语。它保证同一时间只有一个线程可以访问某段临界区(critical section
)代码或共享资源。
- 互斥量(
为什么需要
Mutex
- 防止多线程同时访问共享资源造成数据竞态(
Race Condition
)。- 多个线程并发访问共享资源(如变量、文件、数据库)时,如果不加保护,会出现不可预测的错误。互斥量能防止这种情况。
- 保证临界区代码的线程安全
- 在临界区(修改共享变量的代码段)前加锁,退出时解锁,保证任何时刻最多只有一个线程运行这段代码。
- 防止多线程同时访问共享资源造成数据竞态(
基本原理
- 加锁(
lock
)- 如果没有被锁定,当前线程获得锁,继续执行;
- 如果已被锁定,当前线程将阻塞直到获得锁。
- 解锁(
unlock
)- 释放锁,如果有阻塞线程,会唤醒其中一个。
- 加锁(
整体框架
函数解析
构造函数
1
2
3
4Mutex()
{
pthread_mutex_init(&mutex, nullptr);
}析构函数
1
2
3
4~Mutex()
{
pthread_mutex_destroy(&mutex);
}lock
1
2
3
4void lock()
{
pthread_mutex_lock(&mutex);
}unlock
1
2
3
4void unlock()
{
pthread_mutex_unlock(&mutex);
}
读写锁
RWMutex
的作用- 读写锁是一种高级同步原语,适用于:读多写少 的共享资源访问场景。它允许多个线程同时读取(读锁共享)以及写操作时独占访问(写锁互斥)。
为什么需要
RWMutex
- 支持高并发读
- 多个线程可以同时加读锁,避免阻塞,从而提升读取性能。
- 写操作安全
- 当一个线程加写锁时,会阻塞其他所有的读锁或写锁请求,确保写操作的原子性和一致性。
- 适用于读多写少的共享资源
- 支持高并发读
基本原理
基本状态变量
1
2
3
4
5
6int readers = 0; // 当前持有读锁的线程数
int writers = 0; // 当前持有写锁的线程数(最多为1)
int write_requests = 0; // 正在等待获取写锁的线程数
std::mutex mtx; // 内部互斥锁保护状态变量
std::condition_variable reader_cv;
std::condition_variable writer_cv;加读锁(读线程调用)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40lock(mtx);
while (writers > 0 || write_requests > 0)
{
reader_cv.wait(mtx); // 等待写线程释放或避免饿死写线程
}
readers++;
unlock(mtx);
```
- 加写锁(写线程调用)
```cpp
lock(mtx);
write_requests++;
while (readers > 0 || writers > 0)
{
writer_cv.wait(mtx); // 等待所有读和写线程释放锁
}
write_requests--;
writers = 1;
unlock(mtx);
```
- 解锁
```cpp
lock(mtx);
if (当前线程是读锁)
{
readers--;
if (readers == 0 && write_requests > 0)
writer_cv.notify_one(); // 唤醒写线程
}
else { // 写锁
writers = 0;
if (write_requests > 0)
writer_cv.notify_one(); // 写优先
else
reader_cv.notify_all(); // 唤醒所有读线程
}
unlock(mtx);
整体框架
函数解析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33class RWMutex : Noncopyable
{
public:
using ReadLock = ReadScopedLockImpl<RWMutex>;
using WriteLock = WriteScopedLockImpl<RWMutex>;
RWMutex()
{
pthread_rwlock_init(&m_lock,nullptr);
}
~RWMutex()
{
pthread_rwlock_destroy(&m_lock);
}
void rdlock()
{
pthread_rwlock_rdlock(&m_lock);
}
void wrlock()
{
pthread_rwlock_wrlock(&m_lock);
}
void unlock()
{
pthread_rwlock_unlock(&m_lock);
}
private:
pthread_rwlock_t m_lock; // 读写锁
};
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 GYu的妙妙屋!