IO缓冲区设计 -- from muduo
@[toc]
Buffer 设计要点
1、对外表现为一块连续的内存。
2、长度可自行增长,以适应不同大小的消息。
3、内部以vector<char>
保存数据,而不是string。
在网络编程中,如何设计并使用缓冲区?一方面我们希望减少系统调用,一次读的数据越多越好;另一方面我们又希望减少内存占用,大多数时候大块缓冲区的利用率很低。muduo利用read + 栈上空间解决了这个问题。
在栈上准备一个 65536 字节的 extrabuf ,然后利用 read 来读取数据,iovec 有两块,第一块指向 muduo Buffer 中的 Writable 字节,另一块指向栈上的 extrabuf。这样如果读入的数据不多,那么全部都读到 Buffer 里面;如果长度过长,则会读到栈上的extrabuf,然后再 append 到 Buffer 中。
放码过来
讲真的,这个模块我垂涎已久了。
muduo里面其他模块,要么耦合的有点大,要么就是小的可怜,意义不大。像这种以我的能力能单独拿出来的而且设计的还不错的模块不多。
代码我基本都精简过了,放心,不影响阅读,甚至还能拿去溜一圈。
1、缓冲区分段介绍
/// A buffer class modeled after org.jboss.netty.buffer.ChannelBuffer
///
/// @code
/// +-------------------+------------------+------------------+
/// | prependable bytes | readable bytes | writable bytes |
/// | | (CONTENT) | |
/// +-------------------+------------------+------------------+
/// | | | |
/// 0 <= readerIndex <= writerIndex <= size
1、readerIndex:可读地址开始,由构造函数传参确定。
2、writerIndex:可写地址开始,由构造函数传参确定。不出意外和上面那个使用同一个参数。
3、size:总长度。
4、可读空间长度 = writerIndex - readerIndex
5、可写空间长度 = size - writerIndex
6、prependable bytes:==预留空间==
自从上次见识了 event[i].data.ptr 预留空间的妙用之后,现在我对预留空间总是有一种特殊的情感在里面。
其他问题后面会说。
2、缓冲区构造
class Buffer : public muduo::copyable
{
private:
std::vector<char> buffer_;
size_t readerIndex_;
size_t writerIndex_;
public:
static const size_t kCheapPrepend = 8; //预留八个字节
static const size_t kInitialSize = 1024; //初始化长度1k
explicit Buffer(size_t initialSize = kInitialSize)
: buffer_(kCheapPrepend + initialSize),
readerIndex_(kCheapPrepend),
writerIndex_(kCheapPrepend)
{
}
//获取可读长度
size_t readableBytes() const
{ return writerIndex_ - readerIndex_; }
//获取可读长度
size_t writableBytes() const
{ return buffer_.size() - writerIndex_; }
//获取预留 + 闲置缓冲区长度(为什么会有闲置呢?自己想想)
size_t prependableBytes() const
{ return readerIndex_; }
//获取写指针
const char* peek() const
{ return begin() + readerIndex_; }
3、线程安全问题
微观来看,Buffer 不是线程安全的,因为 vector 不是线程安全的。
宏观来看,Buffer 是线程安全的。一个 inputbuffer 只会在一条线程中被使用;outputbuffer 也是一样的道理,如果是本线程使用send,那就在本线程使用send,如果不是本线程使用send,那就移交到本线程去 send,反正本线程的 buffer 就是只能在本线程使用,别的线程想都不要想。
那,遇到跨线程函数使用了 buffer 呢?那就拷贝给它、
void swap(Buffer& rhs)
{
buffer_.swap(rhs.buffer_);
std::swap(readerIndex_, rhs.readerIndex_);
std::swap(writerIndex_, rhs.writerIndex_);
}
内存整理
前面放了个小问题去思考,为什么会有闲置呢?因为频繁读写就会造成闲置啊。
我写500字节进去,读了400字节走,是吧,自己画个图。
下面是读取之后的复位操作:
string retrieveAllAsString()
{
return retrieveAsString(readableBytes());
}
string retrieveAsString(size_t len)
{
//这里有个断言,这个 len 是不会比可读缓冲区长度要长的
string result(peek(), len);
retrieve(len);
return result;
}
void retrieve(size_t len)
{
//所以我就不知道这里来个分支干嘛?进得去 else 吗
if (len < readableBytes())
{
readerIndex_ += len;
}
else
{
retrieveAll();
}
}
void retrieveAll()
{
readerIndex_ = kCheapPrepend;
writerIndex_ = kCheapPrepend;
}
void retrieveUntil(const char* end)
{
retrieve(end - peek());
}
append
前边不是讲到了 append 嘛。
顺带放一些短代码进来吧,也不知道该放哪里去了。话说这些接口不用暴露出来吧,这么多方法我也记不住啊,弄个门面嘛。
void append(const StringPiece& str)
{
append(str.data(), str.size());
}
void append(const char* /*restrict*/ data, size_t len)
{
ensureWritableBytes(len);
std::copy(data, data+len, beginWrite());
hasWritten(len);
}
void ensureWritableBytes(size_t len)
{
if (writableBytes() < len)
{
makeSpace(len); //这个莫急
}
}
char* beginWrite()
{ return begin() + writerIndex_; }
const char* beginWrite() const
{ return begin() + writerIndex_; }
void hasWritten(size_t len)
{
assert(len <= writableBytes());
writerIndex_ += len;
}
void unwrite(size_t len)
{
assert(len <= readableBytes());
writerIndex_ -= len;
}
private:
char* begin()
{ return &*buffer_.begin(); }
const char* begin() const
{ return &*buffer_.begin(); }
长度不够用了怎么办?
为什么要 append?
因为缓冲区不够长了,先写到栈空间去,然后刷写回来。
在刷写回来之前,是不是得把缓冲区扩容一下嘛。
扩容分两种:
1、先自身挤一挤,比方说把闲置缓冲区回收回来,把可读内容往前挪挪。
2、挤一挤还不够用,那就在后面扩容吧。
void makeSpace(size_t len) //len:重要存储数据的总长度
{
//如果可写缓冲区的长度加闲置缓冲区的长度不够用
if (writableBytes() + prependableBytes() < len + kCheapPrepend)
{
buffer_.resize(writerIndex_+len);
}
else
{
//那就往前挪挪
size_t readable = readableBytes();
std::copy(begin()+readerIndex_,
begin()+writerIndex_,
begin()+kCheapPrepend);
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readable;
}
}
};
栈上空间的使用
I/O vector,与readv和wirtev操作相关的结构体。
#include <sys/uio.h>
/* Structure for scatter/gather I/O. */
struct iovec{
void *iov_base; /* Pointer to data. */
size_t iov_len; /* Length of data. */
};
成员iov_base指向一个缓冲区,这个缓冲区是存放readv所接收的数据或是writev将要发送的数据。
成员iov_len确定了接收的最大长度以及实际写入的长度。
const size_t Buffer::kCheapPrepend;
const size_t Buffer::kInitialSize;
ssize_t Buffer::readFd(int fd, int* savedErrno)
{
// saved an ioctl()/FIONREAD call to tell how much to read
char extrabuf[65536];
struct iovec vec[2];
const size_t writable = writableBytes();
vec[0].iov_base = begin()+writerIndex_;
vec[0].iov_len = writable;
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
// when there is enough space in this buffer, don't read into extrabuf.
// when extrabuf is used, we read 128k-1 bytes at most.
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
const ssize_t n = sockets::readv(fd, vec, iovcnt);
if (n < 0)
{
*savedErrno = errno;
}
else if (implicit_cast<size_t>(n) <= writable)
{
writerIndex_ += n;
}
else
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable);
}
return n;
}
- 点赞
- 收藏
- 关注作者
评论(0)