对于多路复用,DeskFlow设计了两个类型 — SocketMultiplexer、ISocketMultiplexerJob:
- SocketMultiplexer是套接字多路复用器的核心实现,借助poll来管理多个网络套接字(ISocket)的异步I/O。
- ISocketMultiplexerJob是一个抽象类型,是套接字I/O任务的抽象。
多路复用器在其主线程的任务循环中检测套接字的状态,当状态发生变化时(可读、可写、出错),多路复用器就会调用套接字任务处理这些IO状态,而套接字的I/O被封装为“多路复用器事件”。
1. 多路复用器设计
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
class SocketMultiplexer
{
public:
SocketMultiplexer();
SocketMultiplexer(SocketMultiplexer const &) = delete;
SocketMultiplexer(SocketMultiplexer &&) = delete;
~SocketMultiplexer();
SocketMultiplexer &operator=(SocketMultiplexer const &) = delete;
SocketMultiplexer &operator=(SocketMultiplexer &&) = delete;
/* 将ISocket对象与对应的IO任务对象添加到列表中(线程安全),更新m_update=true */
void addSocket(ISocket *, ISocketMultiplexerJob *);
void removeSocket(ISocket *); //
static SocketMultiplexer *getInstance(); // 多路复用器单例
private:
/* 核心数据结构类型 */
using SocketJobs = std::list<ISocketMultiplexerJob *>; // 多路复用器任务列表,“任务”用于处理I/O,这些“任务”是与ISocket绑定的
using JobCursor = SocketJobs::iterator; // 任务列表迭代器
using SocketJobMap = std::map<ISocket *, JobCursor>; // ISocket和SocketJobs中任务的映射关系,方便通过IScoket对象查找对应的Job
[[noreturn]] void serviceThread(void *); // 轮询所有注册的 socket,检测事件并调用回调(ISocketMultiplexerJob)
/*
* 任务列表的迭代器游标,用于防止多线程并发时的迭代器失效的问题。
* 不再使用使用begin和end迭代器遍历列表,而是使用newCursor()和nextCursor()返回的迭代器来遍历。
*/
JobCursor newCursor(); // 将游标(m_cursorMark)插入列表开头,作为遍历的起点。
JobCursor nextCursor(JobCursor); // 在列表中找到一个有效的job,并返回。
void deleteCursor(JobCursor); // 移除游标。
/* 读写共享资源时,先调用lockJobListLock(),再调用lockJobList(),然后开始读写共享资源 */
void lockJobListLock(); // 获取 m_jobListLockLocked
void lockJobList(); // 获取 m_jobListLock,获取成功后会释放m_jobListLockLocked
void unlockJobList(); // 释放 m_jobListLock
private:
Mutex *m_mutex = nullptr; // 保护共享资源m_socketJobs、m_socketJobMap等
/* 多路复用器主线程,构造函数中启动 */
Thread *m_thread = nullptr;
/* 任务列表更新标志 */
bool m_update = false;
/* 通知服务线程有新 job 需要处理,由unlockJobList()更新 */
CondVar<bool> *m_jobsReady = nullptr;
/*
* 两个锁结合使用,防止死锁:
* 使用时先锁lockJobListLock,成功后再锁m_jobListLock,然后开始读写共享资源。
*/
CondVar<bool> *m_jobListLock = nullptr; // 这是m_socketJobs的锁
CondVar<bool> *m_jobListLockLocked = nullptr; // 这是m_jobListLock的锁
/* 两个锁的拥有者,用于检查拥有锁的是不是当前线程 */
Thread *m_jobListLocker = nullptr;
Thread *m_jobListLockLocker = nullptr;
SocketJobs m_socketJobs = {}; // 任务列表
SocketJobMap m_socketJobMap = {}; // 套接字与任务的对应关系
ISocketMultiplexerJob *m_cursorMark = nullptr; //
};
|
主线程实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
[[noreturn]] void SocketMultiplexer::serviceThread(void *)
{
std::vector<IArchNetwork::PollEntry> pfds;
IArchNetwork::PollEntry pfd;
for (;;)
{
Thread::testCancel(); // 检查是否需要取消线程
{
// 等待,直到任务列表中有任务
Lock lock(m_mutex);
while (!(bool)*m_jobsReady) {
m_jobsReady->wait();
}
}
// 有任务了,先上锁
lockJobListLock();
lockJobList();
// ...
// 这里就是poll的逻辑代码,监控一组ISocket,有事件就执行对应的任务。
// ...
// 解锁
unlockJobList();
}
}
|
2. 多路复用器任务
这里重点关注一下m_method成员,这是多路复用器任务,他的返回值类型还是多路复用器任务类型,ISocketMultiplexerJob::run()函数会去执行这个任务,并且返回值类型也是ISocketMultiplexerJob, 也就是说,执行一个任务后返回一个任务,这种方式允许在运行时链式任务处理,例如在连接建立过程中从连接处理任务切换到数据传输任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
/*
* 抽象基类: 定义了套接字多路复用器任务的核心功能。
* 该接口采用了事件驱动的设计模式,当套接字状态发生变化时(可读、可写、出错),多路复用器会调用 run() 方法处理这些事件。
*/
class ISocketMultiplexerJob : public IInterface
{
public:
virtual ISocketMultiplexerJob *run(bool readable, bool writable, bool error) = 0;
virtual ArchSocket getSocket() const = 0;
virtual bool isReadable() const = 0;
virtual bool isWritable() const = 0;
};
/* 派生类型 */
template <class T>
class TSocketMultiplexerMethodJob : public ISocketMultiplexerJob
{
public:
using Method = ISocketMultiplexerJob *(T::*)(ISocketMultiplexerJob *, bool, bool, bool);
TSocketMultiplexerMethodJob(T *object, Method method, ArchSocket socket, bool readable, bool writeable);
TSocketMultiplexerMethodJob(TSocketMultiplexerMethodJob const &) = delete;
TSocketMultiplexerMethodJob(TSocketMultiplexerMethodJob &&) = delete;
~TSocketMultiplexerMethodJob() override;
TSocketMultiplexerMethodJob &operator=(TSocketMultiplexerMethodJob const &) = delete;
TSocketMultiplexerMethodJob &operator=(TSocketMultiplexerMethodJob &&) = delete;
ISocketMultiplexerJob *run(bool readable, bool writable, bool error) override;
ArchSocket getSocket() const override;
bool isReadable() const override; // m_readable
bool isWritable() const override; // m_writable
private:
T *m_object; // 任务的来源,事件驱动框架中
Method m_method; // 多路复用器任务函数
ArchSocket m_socket; // 任务对应的Socket
bool m_readable;
bool m_writable;
void *m_arg;
};
|
虚函数实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
/*
* 运行多路复用器任务,允许在运行时动态改变套接字的处理逻辑,例如在连接建立过程中从连接处理任务切换到数据传输任务。
* 返回自身: 继续使用当前任务处理后续事件
* 返回新任务: 替换当前任务,改变处理方式
* 返回 nullptr:停止处理该套接字事件
*/
template <class T>
inline ISocketMultiplexerJob *TSocketMultiplexerMethodJob<T>::run(bool read, bool write, bool error)
{
if (m_object != nullptr) {
return (m_object->*m_method)(this, read, write, error);
}
return nullptr;
}
|