从实例看muduo网络库各模块交互过程
@[toc]
muduo网络库的核心代码模块
1、channel
2、Poller 和它的子类 EpollPoller
3、EventLoop
4、Thread、EventLoopThread、EventLoopThreadPool
5、Sock、Acceptor
6、Buffer
7、TcpServer、TCPConnection
至于其他还有Logger模块,就不是重点了吧。
各模块功能解释
经过我三天的研究,以及之前的源码铺垫,整理出来了第一个版本,当然后面会持续更新,预计更新到国庆节回来,那个版本应该是能看了。
Channel
根据收到的事件,调用相应的回调。
==一个channel绑定一个fd==
生命周期:新连接产生 -> 该连接断开。
由poller管理,从属于loop,可配置epoll监听事件。
Poller
muduo中多路事件分发器的核心模块,包含了一个 channel 数组,同时也是一个抽象基类(我只继承了epoll模块),
可以说:One loop per poller.
EpollPoller
实现了:
epoll_create:构造函数
epoll_ctl:一堆的 enable 函数
epoll_wait:poll方法。
通过epoll_wait,将有事件的channel通过参数传递给EventLoop。
此处参数:==events[i].data.ptr==。(经验呐!!!我觉得有这么一点,这篇就亮了!!!还不止呢。)
void EpollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const {
//for(Channel* channel:activeChannels){
//这样剧让不行了!!!
//eventloop 即将拿到它的poller返回的所有发生事件列表
for (int i = 0; i < numEvents; ++i) {
Channel* channel = static_cast<Channel*>(events_[i].data.ptr); //666666666,这一行代码,N年的功力,你接得住吗?
channel->set_revents(events_[i].events);
activeChannels->push_back(channel);
}
}
void EpollPoller::update(int operation, Channel* channel) {
epoll_event event;
int fd = channel->fd();
memset(&event, 0, sizeof(event));
event.events = channel->events();
event.data.ptr = channel; //上面那行,配合上这行看
event.data.fd = fd;
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0) {
if (operation == EPOLL_CTL_DEL) {
LOG_ERROR("epoll_ctl del error:%d\n", errno);
}
else {
LOG_FATAL("epoll_ctl add/mod error:%d\n", errno);
}
}
}
EventLoop
事件循环,One loop per thread,per poller,many channels,per wakeupchannel.
这个 wakeupchannel 是干嘛的呢?专门用于监听唤醒 eventfd 相应的 loop,这个事件通知机制没有见过吧,反正我是第一次见,基于文件描述符的,据说比 condition 要高档一些,condition都显得有点老了,这个比较年轻。
//通过轮询的方式唤醒channel
int createEventfd() {
//创建一个能被用户应用程序用于时间等待唤醒机制的eventfd对象
//eventfd 单纯的使用文件描述符实现的线程间的通知机制,可以很好的融入select、poll、epoll的I/O复用机制中
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0) {
LOG_ERROR("Failed in eventfd%d\n", errno);
}
return evtfd;
}
每一个 EventLoop,都配备有一个wakeupfd,有一个wakeup channel 专门负责处理 wakeupfd 事件。
处理办法:随便读个数据,唤醒本 loop 起来干活了。(每一个 EventLoop 都监听了 wakeupchannel 的 EPOLL_IN 事件)
loop() 开始运行后,
1、通过poll函数进行epoll_wait,获取activeChannel。
2、唤醒相应channel。
3、执行doPendingFunction 方法(这里默认有子loop。)
这里又是个巧夺天工的设计:
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (const Functor& functor : functors)
{
functor();
}
callingPendingFunctors_ = false;
}
看这一行:functors.swap(pendingFunctors_);
这里为什么要把 pendingFunctors_ 置换出来?这个置换有意思吗?那可太有意思了。
如果不置换,直接拿着 pendingFunctors_ 去执行,这个资源是不是要被锁住?那接下来有新事件过来要放哪里?再开个pendingFunctors_ 2号吗?
这样一置换,相当于这些事件可以并发执行了。
有意思吧。
再看这个queueInLoop 和runInLoop:
void EventLoop::runInLoop(Functor cb) {
if (isInLoopThread()) {
cb();
}
else {
queueInLoop(std::move(cb));
}
}
void EventLoop::queueInLoop(Functor cb) {
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.push_back(std::move(cb));
}
if (!isInLoopThread() || callingPendingFunctors_) {
//callingPendingFunctors_:我是在本线程,而且我还在执行回调,有啥事情赶紧的拿过来,不然一会儿loop转一圈过去又阻塞了
wakeup();
}
}
如果有子loop,主loop只处理 Acceptor(runInLoop),剩下的全都在queueInLoop -> wakeup 子Loop->handleRead(或者事件已就绪)->唤醒 channel。
EventLoopThread
功能:用于绑定一个 loop 和一个thread。
类成员:一个 EventLoop 指针,一个thread 对象,锁、条件变量、回调等。
one loop per thread 在此处体现:
startloop:启动底层新线程,执行回调,配置 loop 并返回,创建一个独立的 loop,并开启事件循环。
EventLoopThreadPool
事件循环线程池。
包含一个baseloop的指针,第一个EventLoopThread的vector,以及一个EventLoop的vector。
start:创建一定数量的事件循环线程,添加到 std::vector<std::unique_ptr<EventLoopThread>>
。并启动这些线程,添加到std::vector<EventLoop*>
中。
GetNextLoop:如果工作在多线程中,baseloop 会默认以轮询的方式分配channel给subloop。
TcpServer
负责处理新连接。
Acceptor、EventLoopThreadPool、TCPConnection。
TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const std::string& nameArg,
Option option = kNoReusePort);
//设置底层subloop个数
void setThreadNum(int numThreads);
//开启服务器监听
void start();
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const std::string& nameArg,
Option option = kNoReusePort)
:loop_(CheckLoopNotNull(loop)),
ipport_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Accept(loop, listenAddr, option == kReusePort)),
//在这里对sock进行了初始化,不过还没有监听,更没有accept,关于accept在后续章节再提,快了
//只有在 server start 之后才会listen,listen到才会去accept
threadpool_(new EventLoopThreadPool(loop, name_)),
//构建一个 EventLoopThreadPool (可以视为mainreactor)对象,
//不过也就是构建一下,不干啥,关于EventLoopThreadPool的章节后面会提
//start之后会创建制定数量的线程,并绑定新的loop,返回地址。
connectionCallback_(),
messageCallback_(),
nextConnId_(1)
{
//当有新用户连接时,会执行NewConnectionCallback
acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2));
//在Acceptor 的handleread方法了被调用
}
//开启服务器监听
void TcpServer::start() {
if (started_ == 0) { //防止被多次start
threadpool_->start(threadInitCallback_); //EventLoopThreadPool的start
loop_->runInLoop(std::bind(&Accept::listen, acceptor_.get())); //这里拿来run的loop就是mainloop
++started_;
}
}
//当有新链接来的时候,acceptor会调用这个回调
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {
//根据轮询算法,选择一个subloop,唤醒subloop
EventLoop* ioloop = threadpool_->GetNextLoop();
char buf[64] = { 0 };
snprintf(buf, sizeof buf, "-%s#%d", ipport_.c_str(), nextConnId_);
++nextConnId_;
std::string connName = name_ + buf;
LOG_INFO("TcpConnnection::newConnection [%s] -new connection [%s] from %s \n",
name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
//通过sockfd获取其本机IP
sockaddr_in local;
::bzero(&local, sizeof local);
socklen_t addrlen = sizeof local;
if (::getSockname(sockfd, (sockaddr*)&local, &addrlen) < 0) {
//pass,日志打印,写漏了
}
InetAddress localAddr(::getLocalAddr(sockfd));
//根据连接成功的fd,创建TCPConnection连接对象
//一个连接对应一个 TCPConnectionptr 管理
//关于TCPConnection的事情也是接下来展开
TcpConnectionptr conn(new TcpConnection(ioloop, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
ioloop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
//把当前connfd封装成channel分发给subloop
}
//设置底层subloop个数
void TcpServer::setThreadNum(int numThreads) {
threadpool_->setThreadNum(numThreads);
}
TcpConnection
一个conn对应一个fd。
创建channel、绑定读、写、关闭、错误回调。
从实际应用出发
直接看这段代码:
int main(int argc, char **argv){
EventLoop loop;
InetAddress addr(ip, port);
ChatServer server(&loop,addr,"ChatServer");
server.start();
loop.loop();
return 0;
}
这里面先初始化了一个 loop 的对象,这是一个baseloop,设置wakeup回调类型,以及时间发生后回调操作,监听wakeup channel 的EPOLLIN事件。
然后是 ChatServer server(&loop,addr,“ChatServer”);
1、构建Acceptor,执行到bind之后,为acceptchannel设置 ReadCallback 回调,绑定了监听套接字,在回调函数中有 accept 和 NewConnectionCallBack 回调。
2、构建EventLoopThreadPool,啥也不干。
3、设置 newConnectionCallBack。
4、server.start(); 开启服务监听,将Acceptor::listen 函数绑定在 loop上,开启listen,配置channel enableReading。
EventLoopThreadPool.start 创建事件循环线程,并运行起来。
将线程和事件循环绑定起来。
5、loop.loop() 启动主loop。
再看这段代码:
void ChatServer::onMessage(const TcpConnectionPtr &conn, Buffer *buff, Timestamp time){
string buf = buff->retrieveAllAsString();
json js = json::parse(buf);
//通过msgid获取业务回调,进行网络模块和任务模块之间的解耦合
auto msgHandler = ChatService::instance()->getHandle(js["msgid"].get<int>());
//回调消息绑定好的事件处理器,执行相应的业务处理
msgHandler(conn,js,time);
//成功解耦
}
1、调用Buffer模块转码数据(这个模块的设计也很nice,可惜我还没把握住)。
2、调用JSON解析数据。
3、conn->send
在当前线程:sendInLoop;
不在当前线程:channel-> runInLoop->queueInLoop,(为啥不直接调用呢。。)
且先总结到此处,夜以深了,我该去想我该想的人了,哎、
- 点赞
- 收藏
- 关注作者
评论(0)