Featured image of post reactor模型

reactor模型

实现reactor模型

引言

reactor模型是一种事件驱动的设计模式,广泛应用于并发编程中,特别是在网络编程和高性能服务器设计中。它的基本思想是通过集中式的事件分发机制来处理输入/输出事件和相关的操作。

前置知识:线程池

reactor实现简要介绍

  • 本案例实现的reactor模型用于服务器端,可以和用户端建立多个连接,每当用户有数据发送过来时,就需要通过线程池执行对应的任务,当任务结束以后,再由主线程将数据发送回用户

两个重要的类

EventLoop类

  • 事件循环监听机制,主要会监听三个事件
    • 连接建立
    • 消息到达,通常会将处理消息的任务交给线程池去处理
    • 消息处理完成,将处理好的消息发送回去

TcpConnection类

  • 该类主要记录服务器这边的所有连接,并提供接受和发送数据

三个回调函数(通常由自己去实现,可以更改实现,去完成不同的业务)

连接建立

  • 我们可以打印连接信息

消息达到

  • 在监听消息达到,并且用户没有断开时执行
  • 我们去接受消息,然后将消息和连接和线程池任务绑定,将任务放到线程池的任务队列中,让线程去执行任务

连接断开

  • 在监听消息到达时执行,当发现消息到达并且结果为0时,说明用户断开,执行回调函数。
  • 我们可以打印连接断开信息

线程池任务处理逻辑 (通常由自己去实现,可以更改实现,去完成不同的业务)

  • 先根据msg进行对应的任务,然后将msg2交给主函数去发送(将发送能力打包成一个可调用对象,交给主函数)
  • 三个回调函数和线程池任务处理逻辑都在main函数所在文件夹中进行定义

源码实现

InetAddress(ip地址和端口号对象)

  • 封装了ip和端口号
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
#ifndef INET_ADDRESS_H_
#define INET_ADDRESS_H_

#include <arpa/inet.h>
#include <string>

class InetAddress{
public:
    InetAddress(const std::string& ip, unsigned short port);
    InetAddress(const struct sockaddr_in& addr);
    ~InetAddress() = default;
    auto Ip() const -> std::string;
    auto Port() const -> unsigned short;
    auto GetPtr() const -> const struct sockaddr_in*;

private:
    struct sockaddr_in addr_;

};

#endif
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include "inet_address.h"

#include <strings.h>

InetAddress::InetAddress(const std::string& ip, unsigned short port) {
    ::bzero(&addr_,sizeof(struct sockaddr_in));
    addr_.sin_family = AF_INET;
    addr_.sin_port = htons(port);
    addr_.sin_addr.s_addr = inet_addr(ip.c_str());
}

InetAddress::InetAddress(const struct sockaddr_in& addr) : addr_(addr) {

}

std::string InetAddress::Ip() const {
    return std::string(inet_ntoa(addr_.sin_addr));
}

unsigned short InetAddress::Port() const {
    return ntohs(addr_.sin_port);
}

const struct sockaddr_in* InetAddress::GetPtr() const {
    return &addr_;
}

Socket对象

  • 封装了socket文件描述符
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
#ifndef SOCKET_H_
#define SOCKET_H_

class Socket {
public:
    Socket();
    explicit Socket(int fd);
    ~Socket();
    auto Fd() const -> int;

    Socket(const Socket&) = delete;
    void operator=(const Socket&) = delete;

private:
    int fd_;
};

#endif
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <unistd.h>

#include "socket.h"

Socket::Socket() {
    fd_ = ::socket(AF_INET,SOCK_STREAM, 0);
    if (fd_ < 0) {
        perror("socket");
        return;
    }
}

Socket::Socket(int fd) : fd_(fd) {}

Socket::~Socket() {
    close(fd_);
}

int Socket::Fd() const {
    return fd_;
}

SocketIO对象 (用于发送接受数据)

  • 封装了通过socket进行收发数据的操作
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
#ifndef SOCKET_IO_H_
#define SOCKET_IO_H_

class SocketIO
{
public:
    explicit SocketIO(int fd);
    ~SocketIO();
    auto Readn(char *buf, int len) -> int;
    auto ReadLine(char *buf, int len) -> int;
    auto Writen(const char *buf, int len) -> int;

private:
    int fd_;
};

#endif
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>

#include "socket_io.h"

SocketIO::SocketIO(int fd) : fd_(fd) {}

SocketIO::~SocketIO() {
    close(fd_);
}

//len = 10000    1500/6     1000/1
int SocketIO::Readn(char *buf, int len) {
    int left = len;
    char *pstr = buf;
    int ret = 0;

    while(left > 0) {
        ret = read(fd_, pstr, left);
        if(-1 == ret && errno == EINTR) {
            continue;
        } else if(-1 == ret) {
            perror("read error -1");
            return -1;
        } else if(0 == ret) {
            break;
        } else {
            pstr += ret;
            left -= ret;
        }
    }

    return len - left;
}

int SocketIO::ReadLine(char *buf, int len) {
    int left = len - 1;
    char *pstr = buf;
    int ret = 0, total = 0;

    while(left > 0) {
        //MSG_PEEK不会将缓冲区中的数据进行清空,只会进行拷贝操作
        ret = recv(fd_, pstr, left, MSG_PEEK);
        if(-1 == ret && errno == EINTR) {
            continue;
        } else if(-1 == ret) {
            perror("readLine error -1");
            return -1;
        } else if(0 == ret) {
            break;
        } else {
            for(int idx = 0; idx < ret; ++idx) {
                if(pstr[idx] == '\n') {
                    int sz = idx + 1;
                    Readn(pstr, sz);
                    pstr += sz;
                    *pstr = '\0';//C风格字符串以'\0'结尾

                    return total + sz;
                }
            }
            Readn(pstr, ret);//从内核态拷贝到用户态
            total += ret;
            pstr += ret;
            left -= ret;
        }
    }
    *pstr = '\0';

    return total - left;

}

int SocketIO::Writen(const char *buf, int len) {
    int left = len;
    const char *pstr = buf;
    int ret = 0;

    while(left > 0) {
        ret = write(fd_, pstr, left);
        if(-1 == ret && errno == EINTR) {
            continue;
        } else if(-1 == ret) {
            perror("writen error -1");
            return -1;
        } else if(0 == ret) {
            break;
        } else {
            pstr += ret;
            left -= ret;
        }
    }
    return len - left;
}

Acceptor对象(用于建立tcp连接)

  • 封装了通过socket建立tcp连接的过程
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#ifndef ACCEPTOR_H_
#define ACCEPTOR_H_

#include <string>

#include "socket.h"
#include "inet_address.h"

class Acceptor{
public:
    Acceptor(const std::string& ip, unsigned short port);
    ~Acceptor() = default;
    auto Ready() -> void;		//服务器端准备
    auto Accept() -> int;		//服务器端获得新连接
    auto Fd() -> int;

private:
    auto SetReuseAddr() -> void;
    auto SetReusePort() -> void;
    auto Bind() -> void;
    auto Listen() -> void;

private:
    Socket sock_;       //welcome socket
    InetAddress addr_;
};

#endif
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <stdio.h>

#include "acceptor.h"

Acceptor::Acceptor(const std::string& ip, unsigned short port) :
sock_(),
addr_(ip,port) {}

void Acceptor::Ready() {
    SetReuseAddr();
    SetReusePort();
    Bind();
    Listen();
}

void Acceptor::SetReuseAddr() {
    int on = 1;
    int ret = setsockopt(sock_.Fd(), SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
    if (ret) {
        perror("setsockopt");
        return;
    }
}

void Acceptor::SetReusePort() {
    int on = 1;
    int ret = setsockopt(sock_.Fd(), SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on));
    if (ret) {
        perror("setsockopt");
        return;
    }
}

void Acceptor::Bind() {
    int ret = ::bind(sock_.Fd(),(struct sockaddr*)addr_.GetPtr(), sizeof(struct sockaddr));
    if (-1 == ret) {
        perror("bind");
        return;
    }
}

void Acceptor::Listen() {
    int ret = ::listen(sock_.Fd(),128);
    if (-1 == ret) {
        perror("listen");
        return;
    }
}

int Acceptor::Accept() {
    int connfd = ::accept(sock_.Fd(),nullptr,nullptr);
    if (-1 == connfd) {
        perror("accept");
        return -1;
    }
    return connfd;
}

int Acceptor::Fd() {
    return sock_.Fd();
}

EventLoop类

  • 实现了事件循环机制,通过io多路复用对各种事件进行了监听
  • 存储了连接所有连接fd和封装的连接类,可以进行连接上的处理
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#ifndef EVENT_LOOP_H_
#define EVENT_LOOP_H_

#include <vector>
#include <map>
#include <memory>
#include <functional>
#include <mutex>

class Acceptor;
class TcpConnection;

using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
using TcpConnectionCallback = std::function<void(const TcpConnectionPtr &)>;

class EventLoop
{
    using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
    using Functor = std::function<void()>;
public:
    EventLoop(Acceptor &acceptor);
    ~EventLoop();

    //循环与否
    auto Loop() -> void;
    auto Unloop() -> void;

private:
    //封装类epoll_wait函数
    auto WaitEpollFd() -> void;

    //处理新的连接
    auto HandleNewConnection() -> void;

    //处理老的连接上的消息
    auto HandleMessage(int fd) -> void;

    //epfd的创建
    auto CreateEpollFd() -> int;

    //监听文件描述符
    auto AddEpollReadFd(int fd) -> void;

    //取消文件描述符的监听
    auto DelEpollReadFd(int fd) -> void;

public:
    auto SetNewConnectionCallback(TcpConnectionCallback&& cb) -> void;
    auto SetMessageCallback(TcpConnectionCallback&& cb) -> void;
    auto SetCloseCallback(TcpConnectionCallback&& cb) -> void;

private:
    auto HandleRead() -> void;
    auto DoPendingFunctors() -> void;
    auto CreateEventFd() -> int;
    
public:
    auto Wakeup() -> void;
    auto RunInLoop(std::function<void()>&& f) -> void;

private:
    int epfd_;//epoll_create创建的文件描述符
    std::vector<struct epoll_event> evtList_;//存放满足条件的文件描述符的数据结构
    bool isLooping_;//标识循环是否在运行的标志
    Acceptor& acceptor_;//因为需要调用其中的accept函数
    std::map<int, TcpConnectionPtr> conns_;//存放的是文件描述符与连接的键值对

    TcpConnectionCallback onNewConnectionCb_;//连接建立
    TcpConnectionCallback onMessageCb_;//消息到达
    TcpConnectionCallback onCloseCb_;//连接断开

    int evtfd_;     //用于通知有消息处理完成,并放入到pendings_中了
    std::vector<Functor> pendings_;     //消息处理完后
    std::mutex mutex_;
};

#endif
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>

#include <iostream>

#include "event_loop.h"
#include "acceptor.h"
#include "tcp_connection.h"


EventLoop::EventLoop(Acceptor &acceptor) : 
epfd_(CreateEpollFd()), 
evtList_(1024), 
isLooping_(false), 
acceptor_(acceptor),
evtfd_(CreateEventFd()) {
    //将listenfd放在红黑树上进行监听(socket)
    int listenfd = acceptor_.Fd();
    AddEpollReadFd(listenfd);
    AddEpollReadFd(evtfd_);
}

EventLoop::~EventLoop() {
    close(epfd_);
    close(evtfd_);
}

//循环与否
void EventLoop::Loop() {
    isLooping_ = true;
    while(isLooping_)
    {
        WaitEpollFd();
    }
}

void EventLoop::Unloop() {
    isLooping_ = false;
}

//封装类epoll_wait函数
void EventLoop::WaitEpollFd() {
    int nready = 0;
    do {
        nready = epoll_wait(epfd_, &*evtList_.begin(), evtList_.size(), 3000);
    } while(-1 == nready && errno == EINTR);

    if(-1 == nready) {
        std::cerr << "-1 == nready" << std::endl;
        return;
    } else if(0 == nready) {
        std::cout << ">>epoll_wait timeout" << std::endl;
    } else {
        //可以判断一下,文件描述符是不是已经达到了1024
        //如果达到1024就需要进行扩容
        if(nready == (int)evtList_.size()) {
            evtList_.reserve(2 * nready);
        }

        for(int idx = 0; idx < nready; ++idx) {
            int fd = evtList_[idx].data.fd;
            //查看文件描述符是不是listenfd
            if(fd == acceptor_.Fd()) {           // 事件1,有新连接 
                if(evtList_[idx].events & EPOLLIN) {
                    //处理新的连接
                    HandleNewConnection();
                }
            } else if (fd == evtfd_) {            //事件2,有连接的任务处理完成了
                if(evtList_[idx].events & EPOLLIN) {
                    HandleRead();
                    DoPendingFunctors();        //主线程中发送被线程池处理好的数据
                }
            } else {                              //事件3,连接有数据到达
                if(evtList_[idx].events & EPOLLIN) {
                    //处理老的连接
                    HandleMessage(fd);
                }
            }
        }
    }
}

//处理新的连接
void EventLoop::HandleNewConnection() {
    int connfd = acceptor_.Accept();
    if(connfd < 0) {
        perror("handleNewConnection accept");
        return;
    }

    AddEpollReadFd(connfd);

    //就表明三次握手已经建立成功了
    /* TcpConnection con(connfd); */
    TcpConnectionPtr con(new TcpConnection(connfd, this));

    //将三个回调函数注册给TcpConnection
    con->SetNewConnectionCallback(onNewConnectionCb_);//连接建立的注册
    con->SetMessageCallback(onMessageCb_);//消息到达的注册
    con->SetCloseCallback(onCloseCb_);//连接断开的注册

    //以键值对的形式存起来
    /* _conns.insert(std::make_pair(connfd, con)); */
    conns_[connfd] = con;

    con->HandleNewConnectionCallback(); 
}

//处理老的连接上的消息
void EventLoop::HandleMessage(int fd) {
    auto it = conns_.find(fd);
    if(it != conns_.end()) {
        //连接是存在的,可以进行数据的收发
        bool flag = it->second->IsClosed();//读的时候客户端是不是与服务器断开
        if(flag) {
            //连接已经断开
            it->second->HandleCloseCallback();//连接断开的事件的触发时机已经到达
            DelEpollReadFd(fd);//将文件描述符从红黑树上摘除掉
            conns_.erase(it);//同时将该链接从map中删除
        } else {
            it->second->HandleMessageCallback();//消息到达事件的触发时机已经到达
        }
    } else {
        //连接不存在,可以直接让程序退出来
        std::cout << "连接不存在" << std::endl;
        return;
    }
}

//epfd的创建
int EventLoop::CreateEpollFd() {
    int fd = ::epoll_create(100);
    if(fd < 0) {
        perror("epoll_create");
        return fd;
    }
    return fd;
}

//监听文件描述符
void EventLoop::AddEpollReadFd(int fd) {
    struct epoll_event evt;
    evt.events = EPOLLIN;
    evt.data.fd = fd;

    int ret = ::epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &evt);
    if(ret < 0) {
        perror("epoll_ctl add");
        return;
    }
}

//取消文件描述符的监听
void EventLoop::DelEpollReadFd(int fd) {
    struct epoll_event evt;
    evt.events = EPOLLIN;
    evt.data.fd = fd;

    int ret = ::epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, &evt);
    if(ret < 0) {
        perror("epoll_ctl add");
        return;
    }
}

void EventLoop::SetNewConnectionCallback(TcpConnectionCallback &&cb) {
    onNewConnectionCb_ = std::move(cb);
}

void EventLoop::SetMessageCallback(TcpConnectionCallback &&cb) {
    onMessageCb_ = std::move(cb);
}

void EventLoop::SetCloseCallback(TcpConnectionCallback &&cb) {
    onCloseCb_ = std::move(cb);
}

void EventLoop::HandleRead() {
    uint64_t two;
    ssize_t ret = read(evtfd_, &two, sizeof(uint64_t));
    if(ret != sizeof(uint64_t)) {
        perror("read");
        return;
    }
}

void EventLoop::DoPendingFunctors() {       //将处理好的数据发送回去,处理函数已经绑定了连接和数据,所以直接执行就好了
    std::vector<Functor> tmp;
    {   
        std::lock_guard<std::mutex> lc(std::mutex);
        tmp.swap(pendings_);
    }
    for (auto& cb : tmp) {
        cb();
    }
}

int EventLoop::CreateEventFd() {
    int fd = eventfd(10, 0);
    if(fd < 0) {
        perror("eventfd");
        return fd;
    }

    return fd;
}

void EventLoop::Wakeup() {
    uint64_t one = 1;
    ssize_t ret = write(evtfd_, &one, sizeof(uint64_t));
    if(ret != sizeof(uint64_t)) {
        perror("write");
        return;
    }
}

void EventLoop::RunInLoop(Functor&& cb) {
    {       
        std::lock_guard<std::mutex> lc(mutex_);
        pendings_.push_back(std::move(cb));             //交给主线程去执行回调函数,将处理好的数据发送回去
    }
    Wakeup();
} 

TcpConnection类

  • tcp连接类,负责连接相关的操作

  • 可以调用底层的sockio类进行数据的收发,同时main里面的回调函数最终是通过连接类来执行的。(只是执行的时机由EventLoop类控制,在EventLoop类中调用TcpConnection的方法执行回调函数)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#ifndef TCP_CONNECTION_H_
#define TCP_CONNECTION_H_

#include <memory>
#include <functional>
#include <string>

#include "socket.h"
#include "socket_io.h"
#include "inet_address.h"
#include "event_loop.h"

class TcpConnection;

using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
using TcpConnectionCallback = std::function<void(const TcpConnectionPtr &)>;

class TcpConnection : public std::enable_shared_from_this<TcpConnection> {
public:
    explicit TcpConnection(int fd, EventLoop* loop);
    ~TcpConnection();
    auto Send(const std::string &msg) -> void;
    auto Receive() -> std::string;

    auto IsClosed() const -> bool;

    //为了方便调试的函数
    auto ToString() -> std::string;

    auto SetNewConnectionCallback(const TcpConnectionCallback& cb) -> void;
    auto SetMessageCallback(const TcpConnectionCallback& cb) -> void;
    auto SetCloseCallback(const TcpConnectionCallback& cb) -> void;

    auto HandleNewConnectionCallback() -> void;
    auto HandleMessageCallback() -> void;
    auto HandleCloseCallback() -> void;

    auto SendInLoop(const std::string& msg) -> void;

private:
    //获取本端地址与对端地址
    auto GetLocalAddr() -> InetAddress;
    auto GetPeerAddr() -> InetAddress;

private:
    SocketIO sockIO_;
    EventLoop* loop_;

    //为了调试而加入的三个数据成员
    Socket sock_;
    InetAddress localAddr_;
    InetAddress peerAddr_;

    TcpConnectionCallback onNewConnectionCb_;//连接建立
    TcpConnectionCallback onMessageCb_;//消息到达
    TcpConnectionCallback onCloseCb_;//连接断开
};

#endif 
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include <iostream>
#include <sstream>
#include <functional>

#include "tcp_connection.h"

TcpConnection::TcpConnection(int fd, EventLoop* loop) :
sockIO_(fd),
loop_(loop),
sock_(fd),
localAddr_(GetLocalAddr()),
peerAddr_(GetPeerAddr()) {}

TcpConnection::~TcpConnection() {}

void TcpConnection::Send(const std::string &msg) {
    sockIO_.Writen(msg.c_str(), msg.size());
}

std::string TcpConnection::Receive() {
    char buff[65535] = {0};
    sockIO_.ReadLine(buff, sizeof(buff));

    return std::string(buff);
}

//可以通过该函数判断读的数据是不是空的,也就是有没有断开
bool TcpConnection::IsClosed() const {
    char buf[10]= {0};
    int ret = ::recv(sock_.Fd(), buf, sizeof(buf), MSG_PEEK);
    return (0 == ret);
}

std::string TcpConnection::ToString() {
    std::ostringstream oss;
    oss << localAddr_.Ip() << ":"
        << localAddr_.Port() << "---->"
        << peerAddr_.Ip() << ":"
        << peerAddr_.Port();

    return oss.str();
}

void TcpConnection::SendInLoop(const std::string& msg) {
    auto f = std::bind(&TcpConnection::Send,this,msg);      //Send函数绑定好TcpConnection对象和处理好的msg
    if (loop_) {
        //发送操作交给eventloop,放入pendings数组中,并调用wakeup,让eventloop的evtfd_可读
        //这样事件循环就能检测到事件,将pendings里的操作发送回去
        loop_->RunInLoop(std::move(f));        
    }
}

//获取本端的网络地址信息
InetAddress TcpConnection::GetLocalAddr() {
    struct sockaddr_in addr;
    socklen_t len = sizeof(struct sockaddr );
    //获取本端地址的函数getsockname
    int ret = getsockname(sock_.Fd(), (struct sockaddr *)&addr, &len);
    if(-1 == ret) {
        perror("getsockname");
    }

    return InetAddress(addr);
}

//获取对端的网络地址信息
InetAddress TcpConnection::GetPeerAddr() {
    struct sockaddr_in addr;
    socklen_t len = sizeof(struct sockaddr );
    //获取对端地址的函数getpeername
    int ret = getpeername(sock_.Fd(), (struct sockaddr *)&addr, &len);
    if(-1 == ret) {
        perror("getpeername");
    }

    return InetAddress(addr);
}

void TcpConnection::SetNewConnectionCallback(const TcpConnectionCallback &cb) {
    onNewConnectionCb_ = cb;
}

void TcpConnection::SetMessageCallback(const TcpConnectionCallback &cb) {
    onMessageCb_ = cb;
}

void TcpConnection::SetCloseCallback(const TcpConnectionCallback &cb) {
    onCloseCb_ = cb;
}

//三个回调的执行
void TcpConnection::HandleNewConnectionCallback() {
    if(onNewConnectionCb_) {
        /* _onNewConnectionCb(shared_ptr<TcpConnection>(this)); */
        onNewConnectionCb_(shared_from_this());
    } else {
        std::cout << "_onNewConnectionCb == nullptr" << std::endl;
    }
}

void TcpConnection::HandleMessageCallback()
{
    if(onMessageCb_) {
        onMessageCb_(shared_from_this());
    } else {
        std::cout << "_onMessageCb == nullptr" << std::endl;
    }
}

void TcpConnection::HandleCloseCallback()
{
    if(onCloseCb_) {
        onCloseCb_(shared_from_this());
    } else {
        std::cout << "_onCloseCb == nullptr" << std::endl;
    }
}

TcpServer类(封装EventLoop和Acceptor类)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#ifndef TCP_SERVER_H_
#define TCP_SERVER_H_

#include <string>

#include "acceptor.h"
#include "event_loop.h"

class TcpServer{

    using Callback = std::function<void(const TcpConnectionPtr &)>;
public:
    TcpServer(const std::string& ip, unsigned short port);
    ~TcpServer() = default;

    auto Start() -> void;
    auto Stop() -> void;
    auto SetAllCallback(Callback&& cb1, Callback&& cb2, Callback&& cb3) -> void;

private:
    Acceptor acceptor_;
    EventLoop loop_;

};

#endif
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
#include "tcp_server.h"

TcpServer::TcpServer(const std::string& ip, unsigned short port) : 
acceptor_(ip,port), 
loop_(acceptor_) {}

void TcpServer::Start() {
    acceptor_.Ready();      //创建socket连接,等待连接
    loop_.Loop();           //开启事件循环
}

void TcpServer::Stop() {
    loop_.Unloop();
}

//设置处理事件回调函数
void TcpServer::SetAllCallback(Callback&& cb1, Callback&& cb2, Callback&& cb3) {
    loop_.SetNewConnectionCallback(std::move(cb1));
    loop_.SetMessageCallback(std::move(cb2));
    loop_.SetCloseCallback(std::move(cb3));
}

main函数文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include <iostream>
#include <string>
#include <functional>

#include "acceptor.h"
#include "tcp_connection.h"
#include "event_loop.h"
#include "tcp_server.h"
#include "thread_pool.h"

class MyTask{
public:
    MyTask(const std::string& msg, TcpConnectionPtr con) : 
    msg_(msg),
    con_(con){
        std::cout << "construct " << msg << std::endl;
        std::cout << "construct2 " << msg_ << std::endl;
    }

    void process(){         //线程池中线程执行的业务处理逻辑
        std::string msg2 = "";
        std::cout << "before process: " << msg_ << std::endl;
        for (auto c : msg_) {
            if (c >= 'a' && c <= 'z') {
                msg2 += (c - 32);
            } else {
                msg2 += c;
            }
        }
        msg2 += '\n';
        std::cout << "after process: " << msg2 << std::endl;

        //处理完成后,发送交给主线程去处理
        //先将信息msg2交给TcpConnection,将msg2和connnction的send函数进行绑定,形成一个具有发送msg2能力的可调用对象void()
        //再将这个可调用对象交给EventLoop,放入到pendings中(所有线程处理完数据后,形成的可调用对象集合) 
        //这里还会去让EventLoop的向evtfd_写入数据,让evtfd_可读,这样下一个loop循环检测到就去执行集合里的所有可调用对象
        //即在主函数中将数据发送出去了
        con_->SendInLoop(msg2);       
    }

private:
    std::string msg_;
    TcpConnectionPtr con_;
};

//连接建立
void onNewConnection(const TcpConnectionPtr &con)
{
    std::cout << con->ToString() << " has connected!" << std::endl;
}

//文件描述符可读(消息到达)
void onMessage(const TcpConnectionPtr &con, ThreadPool& pool)
{
    std::string msg = con->Receive();
    std::cout << ">>recv client msg = " << msg << std::endl;

    //业务处理逻辑函数交给线程池取完成
    std::shared_ptr<MyTask> task = std::make_shared<MyTask>(msg,con);
    pool.AddTask(std::bind(&MyTask::process,task));
}

//连接断开
void onClose(const TcpConnectionPtr &con)
{
    std::cout << con->ToString() << " has closed!" << std::endl;
}
    
void test()
{
    TcpServer tcpServer("127.0.0.1", 8888);
    ThreadPool pool(4,10);
    pool.Start();
    tcpServer.SetAllCallback(onNewConnection,
        std::bind(&onMessage,std::placeholders::_1,std::ref(pool)),
        onClose);
    tcpServer.Start();
}

int main(int argc, char *argv[])
{
    test();
    return 0;
}

线程池相关类(之前博客有相关介绍)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#ifndef TASKQUEUE_H_
#define TASKQUEUE_H_

#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>

class TaskQueue{
    //表示任务的类型
    using ElemType = std::function<void()>;
 
public:
    explicit TaskQueue(int que_size);
    ~TaskQueue() = default;
    auto Push(ElemType&& ptask) -> void;
    auto Pop() -> ElemType;
    [[nodiscard]] auto IsFull() const -> bool;
    [[nodiscard]] auto IsEmpty() const -> bool;
    auto WakeUp() -> void;

private:
    size_t que_size_;
    std::queue<ElemType> que_;			//用于存放任务,任务应当是一个void()的可调用对象
    std::mutex mutex_;
    std::condition_variable not_full_;
    std::condition_variable not_empty_;
    bool flag_;  //为了唤醒所有的工作线程,可以让while退出
};

#endif
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include "task_queue.h"

TaskQueue::TaskQueue(const int que_size) : 
que_size_(que_size),
mutex_(),
not_full_(),
not_empty_(),
flag_(true)
{   
}

void TaskQueue::Push(ElemType&& ptask) {
    std::unique_lock lock(mutex_);
    while(IsFull()) {
        //等待not_full_来唤醒
        not_full_.wait(lock);
    }
    que_.push(std::move(ptask));
    not_empty_.notify_one();
}

TaskQueue::ElemType TaskQueue::Pop() {
    std::unique_lock lock(mutex_);
    while(IsEmpty() && flag_) {
        //等待not_empty_来唤醒
        not_empty_.wait(lock);
    }
    if (flag_) {
        ElemType task = que_.front();
        que_.pop();
        not_full_.notify_one();
        return task;
    } else {
        return nullptr;
    }
}

bool TaskQueue::IsFull() const {
    return que_.size() == que_size_;
}

bool TaskQueue::IsEmpty() const {
    return que_.empty();
}

void TaskQueue::WakeUp() {
    flag_ = false;
    not_empty_.notify_all();
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#ifndef THREAD_POOL_H_
#define THREAD_POOL_H_

#include <memory>
#include <vector>
#include <functional>
#include <thread>

#include "task_queue.h"

class ThreadPool {
    using Task = std::function<void()>;
    friend class WorkThread;
public:
    ThreadPool(size_t thread_num, size_t que_size);
    ~ThreadPool() = default;
    auto Start() -> void;
    auto Stop() -> void;
    auto AddTask(Task&& task) -> void;

private:
    auto GetTask() -> Task;
    auto DoTask() -> void;

private:
    size_t thread_num_;
    size_t que_size_; 
    std::vector<std::unique_ptr<std::thread>> threads_;
    TaskQueue task_que_;
    bool is_exit_;
};

#endif
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <iostream>
#include <functional>
#include <thread>
#include <chrono>

#include "thread_pool.h"

ThreadPool::ThreadPool(const size_t thread_num, const size_t que_size) :
thread_num_(thread_num),
que_size_(que_size),
task_que_(que_size_),
is_exit_(false) {
    threads_.reserve(thread_num_);
}

void ThreadPool::Start() {
    for (size_t i = 0; i < thread_num_; i++) {
        threads_.push_back(std::make_unique<std::thread>(std::bind(&ThreadPool::DoTask,this)));
    }
}

void ThreadPool::Stop() {
    //确保任务队列里的任务可以执行完
    while(!task_que_.IsEmpty()) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    std::this_thread::sleep_for(std::chrono::seconds(2));
    is_exit_ = true;
    task_que_.WakeUp();
    for (size_t i = 0; i < thread_num_; i++) {
        threads_[i]->join();
    }
}

void ThreadPool::AddTask(Task&& task) {
    if (task) {
        task_que_.Push(std::move(task));
    }
} 

ThreadPool::Task ThreadPool::GetTask() {
    return task_que_.Pop();
}

void ThreadPool::DoTask() {
    while(!is_exit_) {
        if (Task task = GetTask()) {
            task();
        }
    } 
}

使用教学

  • 主要代码写在MyTask类和三个回调函数中

  • 核心业务代码可以在MyTask中完成

  • 三个回调函数可以用来打印对应信息或进行其他处理