1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
class TCPSocket : public IDataSocket
{
public:
TCPSocket(
IEventQueue *events, SocketMultiplexer *socketMultiplexer,
IArchNetwork::AddressFamily family = IArchNetwork::AddressFamily::INet
);
TCPSocket(IEventQueue *events, SocketMultiplexer *socketMultiplexer, ArchSocket socket);
~TCPSocket() override;
// ...
// ...
uint32_t read(void *buffer, uint32_t n) override; // 从接收缓冲区读取n个字节到buffer中。
void write(const void *buffer, uint32_t n) override; // 将buffer中的字节写入发送缓冲区,并触发异步发送流程(发送事件)
void flush() override; // 等待发送缓冲区刷新完成。
void shutdownInput() override; // 关闭Socket的接收端,清空发送缓冲区,并从多路复用器移除该Socket对应的任务。
void shutdownOutput() override;
bool isReady() const override; // 接收缓冲区有数据
uint32_t getSize() const override; // 接收缓冲区字节数
// 发起连接,并向多路复用器注册读写任务。
void connect(const NetworkAddress &) override;
/*
* 创建并返回与套接字关联的异步I/O任务对象(多路复用器任务)。
* 如果suocket还没有建立连接,就创建连接建立任务,也就是serviceConnecting。
* 如果已经建立建立,那就创建读写任务,也就是serviceConnected。
*/
virtual ISocketMultiplexerJob *newJob();
protected:
/* 表示任务执行的结果,多路复用器是以 “任务链” 的形式运行任务 */
enum class JobResult
{
Break = -1, // 强制终止任务链,用于处理不可恢复错误(如断开连接)
Retry, // 保持当前任务继续执行,适用于暂时性阻塞(如等待I/O)
New // 需要创建新任务,上一个任务已经完成。
};
// ...
/*
* 非阻塞I/O,并处理网络断开情况,由serviceConnected调用。
* doRead(): 读取网络数据保存到接收缓冲区中,当输入缓冲区从空变为有数据时,通过事件队列通知上层有数据到达当输入缓冲区从空变为有数据时,通过事件队列通知上层有数据到达。
* doWrite():将发送缓冲区中的数据发送出去。
*/
virtual JobResult doRead();
virtual JobResult doWrite();
/*
* 注册任务到多路复用器。
* 如果参数为nullptr,就移除多路复用器中与当前Socket绑定的任务。
* 如果不为nullptr,就注册该任务到多路复用器。
*/
void setJob(ISocketMultiplexerJob *);
// ...
// 获取/设置状态的接口
// ...
void sendEvent(EventTypes);
/* 刷新发送缓冲区(从输出缓冲区移除已成功发送的字节数),并唤醒所有等待m_flushed的线程。*/
void discardWrittenData(int bytesWrote);
IEventQueue *m_events;
StreamBuffer m_inputBuffer; // 接收缓冲区
StreamBuffer m_outputBuffer; // 发送缓冲区
private:
void init(); // 初始化TCP套接字的状态并配置网络参数,并且禁用禁用Nagle算法。
void sendConnectionFailedEvent(const char *);
void onInputShutdown(); // 清空接收缓存区。
void onOutputShutdown();
void onConnected(); // 建立连接后更新Socekt状态。
void onDisconnected(); // 断开连接后更新Socket状态。
/* 多路复用器任务 */
ISocketMultiplexerJob *serviceConnecting(ISocketMultiplexerJob *, bool, bool, bool);
ISocketMultiplexerJob *serviceConnected(ISocketMultiplexerJob *, bool, bool, bool);
bool m_readable; // Socket是否可读
bool m_writable;
bool m_connected; // 连接状态
Mutex m_mutex;
ArchSocket m_socket;
CondVar<bool> m_flushed; // 用于在多线程场景下同步TCP套接字的"数据刷新完成"状态。当发送缓冲区的数据被完全发送到网络后,通过 m_flushed 通知等待的线程继续执行。
SocketMultiplexer *m_socketMultiplexer;
};
|