1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
|
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <iostream>
#include "event_loop.h"
#include "acceptor.h"
#include "tcp_connection.h"
EventLoop::EventLoop(Acceptor &acceptor) :
epfd_(CreateEpollFd()),
evtList_(1024),
isLooping_(false),
acceptor_(acceptor),
evtfd_(CreateEventFd()) {
//将listenfd放在红黑树上进行监听(socket)
int listenfd = acceptor_.Fd();
AddEpollReadFd(listenfd);
AddEpollReadFd(evtfd_);
}
EventLoop::~EventLoop() {
close(epfd_);
close(evtfd_);
}
//循环与否
void EventLoop::Loop() {
isLooping_ = true;
while(isLooping_)
{
WaitEpollFd();
}
}
void EventLoop::Unloop() {
isLooping_ = false;
}
//封装类epoll_wait函数
void EventLoop::WaitEpollFd() {
int nready = 0;
do {
nready = epoll_wait(epfd_, &*evtList_.begin(), evtList_.size(), 3000);
} while(-1 == nready && errno == EINTR);
if(-1 == nready) {
std::cerr << "-1 == nready" << std::endl;
return;
} else if(0 == nready) {
std::cout << ">>epoll_wait timeout" << std::endl;
} else {
//可以判断一下,文件描述符是不是已经达到了1024
//如果达到1024就需要进行扩容
if(nready == (int)evtList_.size()) {
evtList_.reserve(2 * nready);
}
for(int idx = 0; idx < nready; ++idx) {
int fd = evtList_[idx].data.fd;
//查看文件描述符是不是listenfd
if(fd == acceptor_.Fd()) { // 事件1,有新连接
if(evtList_[idx].events & EPOLLIN) {
//处理新的连接
HandleNewConnection();
}
} else if (fd == evtfd_) { //事件2,有连接的任务处理完成了
if(evtList_[idx].events & EPOLLIN) {
HandleRead();
DoPendingFunctors(); //主线程中发送被线程池处理好的数据
}
} else { //事件3,连接有数据到达
if(evtList_[idx].events & EPOLLIN) {
//处理老的连接
HandleMessage(fd);
}
}
}
}
}
//处理新的连接
void EventLoop::HandleNewConnection() {
int connfd = acceptor_.Accept();
if(connfd < 0) {
perror("handleNewConnection accept");
return;
}
AddEpollReadFd(connfd);
//就表明三次握手已经建立成功了
/* TcpConnection con(connfd); */
TcpConnectionPtr con(new TcpConnection(connfd, this));
//将三个回调函数注册给TcpConnection
con->SetNewConnectionCallback(onNewConnectionCb_);//连接建立的注册
con->SetMessageCallback(onMessageCb_);//消息到达的注册
con->SetCloseCallback(onCloseCb_);//连接断开的注册
//以键值对的形式存起来
/* _conns.insert(std::make_pair(connfd, con)); */
conns_[connfd] = con;
con->HandleNewConnectionCallback();
}
//处理老的连接上的消息
void EventLoop::HandleMessage(int fd) {
auto it = conns_.find(fd);
if(it != conns_.end()) {
//连接是存在的,可以进行数据的收发
bool flag = it->second->IsClosed();//读的时候客户端是不是与服务器断开
if(flag) {
//连接已经断开
it->second->HandleCloseCallback();//连接断开的事件的触发时机已经到达
DelEpollReadFd(fd);//将文件描述符从红黑树上摘除掉
conns_.erase(it);//同时将该链接从map中删除
} else {
it->second->HandleMessageCallback();//消息到达事件的触发时机已经到达
}
} else {
//连接不存在,可以直接让程序退出来
std::cout << "连接不存在" << std::endl;
return;
}
}
//epfd的创建
int EventLoop::CreateEpollFd() {
int fd = ::epoll_create(100);
if(fd < 0) {
perror("epoll_create");
return fd;
}
return fd;
}
//监听文件描述符
void EventLoop::AddEpollReadFd(int fd) {
struct epoll_event evt;
evt.events = EPOLLIN;
evt.data.fd = fd;
int ret = ::epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &evt);
if(ret < 0) {
perror("epoll_ctl add");
return;
}
}
//取消文件描述符的监听
void EventLoop::DelEpollReadFd(int fd) {
struct epoll_event evt;
evt.events = EPOLLIN;
evt.data.fd = fd;
int ret = ::epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, &evt);
if(ret < 0) {
perror("epoll_ctl add");
return;
}
}
void EventLoop::SetNewConnectionCallback(TcpConnectionCallback &&cb) {
onNewConnectionCb_ = std::move(cb);
}
void EventLoop::SetMessageCallback(TcpConnectionCallback &&cb) {
onMessageCb_ = std::move(cb);
}
void EventLoop::SetCloseCallback(TcpConnectionCallback &&cb) {
onCloseCb_ = std::move(cb);
}
void EventLoop::HandleRead() {
uint64_t two;
ssize_t ret = read(evtfd_, &two, sizeof(uint64_t));
if(ret != sizeof(uint64_t)) {
perror("read");
return;
}
}
void EventLoop::DoPendingFunctors() { //将处理好的数据发送回去,处理函数已经绑定了连接和数据,所以直接执行就好了
std::vector<Functor> tmp;
{
std::lock_guard<std::mutex> lc(std::mutex);
tmp.swap(pendings_);
}
for (auto& cb : tmp) {
cb();
}
}
int EventLoop::CreateEventFd() {
int fd = eventfd(10, 0);
if(fd < 0) {
perror("eventfd");
return fd;
}
return fd;
}
void EventLoop::Wakeup() {
uint64_t one = 1;
ssize_t ret = write(evtfd_, &one, sizeof(uint64_t));
if(ret != sizeof(uint64_t)) {
perror("write");
return;
}
}
void EventLoop::RunInLoop(Functor&& cb) {
{
std::lock_guard<std::mutex> lc(mutex_);
pendings_.push_back(std::move(cb)); //交给主线程去执行回调函数,将处理好的数据发送回去
}
Wakeup();
}
|