从实例看muduo网络库各模块交互过程

举报
看,未来 发表于 2021/10/08 09:55:15 2021/10/08
【摘要】 @[toc] muduo网络库的核心代码模块1、channel2、Poller 和它的子类 EpollPoller3、EventLoop4、Thread、EventLoopThread、EventLoopThreadPool5、Sock、Acceptor6、Buffer7、TcpServer、TCPConnection至于其他还有Logger模块,就不是重点了吧。 各模块功能解释经过我三天的...

请添加图片描述

@[toc]

muduo网络库的核心代码模块

1、channel
2、Poller 和它的子类 EpollPoller
3、EventLoop
4、Thread、EventLoopThread、EventLoopThreadPool
5、Sock、Acceptor
6、Buffer
7、TcpServer、TCPConnection

至于其他还有Logger模块,就不是重点了吧。


各模块功能解释

经过我三天的研究,以及之前的源码铺垫,整理出来了第一个版本,当然后面会持续更新,预计更新到国庆节回来,那个版本应该是能看了。


Channel

根据收到的事件,调用相应的回调。

==一个channel绑定一个fd==
生命周期:新连接产生 -> 该连接断开。

由poller管理,从属于loop,可配置epoll监听事件。


Poller

muduo中多路事件分发器的核心模块,包含了一个 channel 数组,同时也是一个抽象基类(我只继承了epoll模块),
可以说:One loop per poller.

EpollPoller

实现了:
epoll_create:构造函数
epoll_ctl:一堆的 enable 函数
epoll_wait:poll方法。
通过epoll_wait,将有事件的channel通过参数传递给EventLoop。

此处参数:==events[i].data.ptr==。(经验呐!!!我觉得有这么一点,这篇就亮了!!!还不止呢。)

void EpollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const {

    //for(Channel* channel:activeChannels){
        //这样剧让不行了!!!

    //eventloop 即将拿到它的poller返回的所有发生事件列表
    for (int i = 0; i < numEvents; ++i) {
        Channel* channel = static_cast<Channel*>(events_[i].data.ptr);  //666666666,这一行代码,N年的功力,你接得住吗?
        channel->set_revents(events_[i].events);
        activeChannels->push_back(channel);
    }
}

void EpollPoller::update(int operation, Channel* channel) {
    epoll_event event;
    int fd = channel->fd();

    memset(&event, 0, sizeof(event));
    event.events = channel->events();
    event.data.ptr = channel;           //上面那行,配合上这行看
    event.data.fd = fd;

    if (::epoll_ctl(epollfd_, operation, fd, &event) < 0) {
        if (operation == EPOLL_CTL_DEL) {
            LOG_ERROR("epoll_ctl del error:%d\n", errno);
        }
        else {
            LOG_FATAL("epoll_ctl add/mod error:%d\n", errno);
        }
    }
}

EventLoop

事件循环,One loop per thread,per poller,many channels,per wakeupchannel.

这个 wakeupchannel 是干嘛的呢?专门用于监听唤醒 eventfd 相应的 loop,这个事件通知机制没有见过吧,反正我是第一次见,基于文件描述符的,据说比 condition 要高档一些,condition都显得有点老了,这个比较年轻。

//通过轮询的方式唤醒channel
int createEventfd() {
    //创建一个能被用户应用程序用于时间等待唤醒机制的eventfd对象
    //eventfd 单纯的使用文件描述符实现的线程间的通知机制,可以很好的融入select、poll、epoll的I/O复用机制中
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (evtfd < 0) {
        LOG_ERROR("Failed in eventfd%d\n", errno);
    }
    return evtfd;
}

每一个 EventLoop,都配备有一个wakeupfd,有一个wakeup channel 专门负责处理 wakeupfd 事件。
处理办法:随便读个数据,唤醒本 loop 起来干活了。(每一个 EventLoop 都监听了 wakeupchannel 的 EPOLL_IN 事件)


loop() 开始运行后,
1、通过poll函数进行epoll_wait,获取activeChannel。
2、唤醒相应channel。
3、执行doPendingFunction 方法(这里默认有子loop。)
这里又是个巧夺天工的设计:

void EventLoop::doPendingFunctors()
{
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;

    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }

    for (const Functor& functor : functors)
    {
        functor();
    }
    callingPendingFunctors_ = false;
}

看这一行:functors.swap(pendingFunctors_);

这里为什么要把 pendingFunctors_ 置换出来?这个置换有意思吗?那可太有意思了。

如果不置换,直接拿着 pendingFunctors_ 去执行,这个资源是不是要被锁住?那接下来有新事件过来要放哪里?再开个pendingFunctors_ 2号吗?

这样一置换,相当于这些事件可以并发执行了。

有意思吧。


再看这个queueInLoop 和runInLoop:

void EventLoop::runInLoop(Functor cb) {
    if (isInLoopThread()) {
        cb();
    }
    else {
        queueInLoop(std::move(cb));
    }
}

void EventLoop::queueInLoop(Functor cb) {

    {
        std::unique_lock<std::mutex> lock(mutex_);
        pendingFunctors_.push_back(std::move(cb));
    }

    if (!isInLoopThread() || callingPendingFunctors_) {
        //callingPendingFunctors_:我是在本线程,而且我还在执行回调,有啥事情赶紧的拿过来,不然一会儿loop转一圈过去又阻塞了
        wakeup();
    }
}

如果有子loop,主loop只处理 Acceptor(runInLoop),剩下的全都在queueInLoop -> wakeup 子Loop->handleRead(或者事件已就绪)->唤醒 channel。


EventLoopThread

功能:用于绑定一个 loop 和一个thread。
类成员:一个 EventLoop 指针,一个thread 对象,锁、条件变量、回调等。

one loop per thread 在此处体现:

startloop:启动底层新线程,执行回调,配置 loop 并返回,创建一个独立的 loop,并开启事件循环。


EventLoopThreadPool

事件循环线程池。
包含一个baseloop的指针,第一个EventLoopThread的vector,以及一个EventLoop的vector。

start:创建一定数量的事件循环线程,添加到 std::vector<std::unique_ptr<EventLoopThread>>。并启动这些线程,添加到std::vector<EventLoop*>中。
GetNextLoop:如果工作在多线程中,baseloop 会默认以轮询的方式分配channel给subloop。


TcpServer

负责处理新连接。
Acceptor、EventLoopThreadPool、TCPConnection。

TcpServer(EventLoop* loop,
        const InetAddress& listenAddr,
        const std::string& nameArg,
        Option option = kNoReusePort);
        
//设置底层subloop个数
void setThreadNum(int numThreads);

//开启服务器监听
void start();
TcpServer::TcpServer(EventLoop* loop,
    const InetAddress& listenAddr,
    const std::string& nameArg,
    Option option = kNoReusePort)
    :loop_(CheckLoopNotNull(loop)),
    ipport_(listenAddr.toIpPort()),
    name_(nameArg),
    acceptor_(new Accept(loop, listenAddr, option == kReusePort)),	
    //在这里对sock进行了初始化,不过还没有监听,更没有accept,关于accept在后续章节再提,快了
    //只有在 server start 之后才会listen,listen到才会去accept
    
    threadpool_(new EventLoopThreadPool(loop, name_)),
		//构建一个 EventLoopThreadPool (可以视为mainreactor)对象,
		//不过也就是构建一下,不干啥,关于EventLoopThreadPool的章节后面会提
		//start之后会创建制定数量的线程,并绑定新的loop,返回地址。
	
    connectionCallback_(),
    messageCallback_(),
    nextConnId_(1)
{
    //当有新用户连接时,会执行NewConnectionCallback
    acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2));
    //在Acceptor 的handleread方法了被调用
}

//开启服务器监听
void TcpServer::start() {
    if (started_ == 0) {  //防止被多次start
        threadpool_->start(threadInitCallback_);	//EventLoopThreadPool的start
        loop_->runInLoop(std::bind(&Accept::listen, acceptor_.get()));	//这里拿来run的loop就是mainloop
        ++started_;
    }
}

//当有新链接来的时候,acceptor会调用这个回调
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {
    //根据轮询算法,选择一个subloop,唤醒subloop
    EventLoop* ioloop = threadpool_->GetNextLoop();
    char buf[64] = { 0 };
    snprintf(buf, sizeof buf, "-%s#%d", ipport_.c_str(), nextConnId_);
    ++nextConnId_;
    std::string connName = name_ + buf;

    LOG_INFO("TcpConnnection::newConnection [%s] -new connection [%s] from %s \n",
        name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());

    //通过sockfd获取其本机IP
    sockaddr_in local;
    ::bzero(&local, sizeof local);
    socklen_t addrlen = sizeof local;

    if (::getSockname(sockfd, (sockaddr*)&local, &addrlen) < 0) {
				//pass,日志打印,写漏了
    }
    InetAddress localAddr(::getLocalAddr(sockfd));

    //根据连接成功的fd,创建TCPConnection连接对象
		//一个连接对应一个 TCPConnectionptr 管理
		//关于TCPConnection的事情也是接下来展开
    TcpConnectionptr conn(new TcpConnection(ioloop, connName, sockfd, localAddr, peerAddr));
    connections_[connName] = conn;

    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);
    conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
    
    ioloop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
    //把当前connfd封装成channel分发给subloop

}

//设置底层subloop个数
void TcpServer::setThreadNum(int numThreads) {
    threadpool_->setThreadNum(numThreads);
}

TcpConnection

一个conn对应一个fd。

创建channel、绑定读、写、关闭、错误回调。


从实际应用出发

直接看这段代码:

int main(int argc, char **argv){

    EventLoop loop;
    InetAddress addr(ip, port);
    ChatServer server(&loop,addr,"ChatServer");

    server.start();
    loop.loop();
    return 0;
}

这里面先初始化了一个 loop 的对象,这是一个baseloop,设置wakeup回调类型,以及时间发生后回调操作,监听wakeup channel 的EPOLLIN事件。

然后是 ChatServer server(&loop,addr,“ChatServer”);
1、构建Acceptor,执行到bind之后,为acceptchannel设置 ReadCallback 回调,绑定了监听套接字,在回调函数中有 accept 和 NewConnectionCallBack 回调。

2、构建EventLoopThreadPool,啥也不干。

3、设置 newConnectionCallBack。

4、server.start(); 开启服务监听,将Acceptor::listen 函数绑定在 loop上,开启listen,配置channel enableReading。
EventLoopThreadPool.start 创建事件循环线程,并运行起来。
将线程和事件循环绑定起来。

5、loop.loop() 启动主loop。


再看这段代码:

void ChatServer::onMessage(const TcpConnectionPtr &conn, Buffer *buff, Timestamp time){
    string buf = buff->retrieveAllAsString();

    json js = json::parse(buf);

    //通过msgid获取业务回调,进行网络模块和任务模块之间的解耦合
    auto msgHandler = ChatService::instance()->getHandle(js["msgid"].get<int>());
    
    //回调消息绑定好的事件处理器,执行相应的业务处理
    msgHandler(conn,js,time);

    //成功解耦
}

1、调用Buffer模块转码数据(这个模块的设计也很nice,可惜我还没把握住)。
2、调用JSON解析数据。
3、conn->send

在当前线程:sendInLoop;
不在当前线程:channel-> runInLoop->queueInLoop,(为啥不直接调用呢。。)


且先总结到此处,夜以深了,我该去想我该想的人了,哎、

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。