ClickHouse源码分析:Buffer的原理
【摘要】 这里以WriteBufferFromPocoSocket为例,记录在ClickHouse中Buffer类的原理。此源码来源于v22.3.2-lts版本。
这里以WriteBufferFromPocoSocket为例,记录在ClickHouse中Buffer类的原理。此源码来源于v22.3.2-lts版本。
一、Buffer类的继承关系
二、基本实现
BufferBase中的实现
class BufferBase
{
public:
/** Cursor in the buffer. The position of write or read. */
using Position = char *;
/** A reference to the range of memory. */
struct Buffer
{
Buffer(Position begin_pos_, Position end_pos_) : begin_pos(begin_pos_), end_pos(end_pos_) {}
inline Position begin() const { return begin_pos; }
inline Position end() const { return end_pos; }
inline size_t size() const { return size_t(end_pos - begin_pos); }
inline void resize(size_t size) { end_pos = begin_pos + size; }
inline bool empty() const { return size() == 0; }
inline void swap(Buffer & other)
{
std::swap(begin_pos, other.begin_pos);
std::swap(end_pos, other.end_pos);
}
private:
Position begin_pos;
Position end_pos; /// 1 byte after the end of the buffer
};
/** The constructor takes a range of memory to use for the buffer.
* offset - the starting point of the cursor. ReadBuffer must set it to the end of the range, and WriteBuffer - to the beginning.
*/
BufferBase(Position ptr, size_t size, size_t offset)
: pos(ptr + offset), working_buffer(ptr, ptr + size), internal_buffer(ptr, ptr + size) {}
void set(Position ptr, size_t size, size_t offset)
{
internal_buffer = Buffer(ptr, ptr + size);
working_buffer = Buffer(ptr, ptr + size);
pos = ptr + offset;
}
/// get buffer
inline Buffer & internalBuffer() { return internal_buffer; }
/// get the part of the buffer from which you can read / write data
inline Buffer & buffer() { return working_buffer; }
/// get (for reading and modifying) the position in the buffer
inline Position & position() { return pos; }
/// offset in bytes of the cursor from the beginning of the buffer
inline size_t offset() const { return size_t(pos - working_buffer.begin()); }
/// How many bytes are available for read/write
inline size_t available() const { return size_t(working_buffer.end() - pos); }
inline void swap(BufferBase & other)
{
internal_buffer.swap(other.internal_buffer);
working_buffer.swap(other.working_buffer);
std::swap(pos, other.pos);
}
/** How many bytes have been read/written, counting those that are still in the buffer. */
size_t count() const { return bytes + offset(); }
/** Check that there is more bytes in buffer after cursor. */
bool ALWAYS_INLINE hasPendingData() const { return available() > 0; }
bool isPadded() const { return padded; }
protected:
void resetWorkingBuffer()
{
/// Move position to the end of buffer to trigger call of 'next' on next reading.
/// Discard all data in current working buffer to prevent wrong assumptions on content
/// of buffer, e.g. for optimizations of seeks in seekable buffers.
working_buffer.resize(0);
pos = working_buffer.end();
}
/// Read/write position.
Position pos;
/** How many bytes have been read/written, not counting those that are now in the buffer.
* (counting those that were already used and "removed" from the buffer)
*/
size_t bytes = 0;
/** A piece of memory that you can use.
* For example, if internal_buffer is 1MB, and from a file for reading it was loaded into the buffer
* only 10 bytes, then working_buffer will be 10 bytes in size
* (working_buffer.end() will point to the position immediately after the 10 bytes that can be read).
*/
Buffer working_buffer;
/// A reference to a piece of memory for the buffer.
Buffer internal_buffer;
/// Indicator of 15 bytes pad_right
bool padded{false};
};
这个类中主要方法和变量的关系可以从下图中看出:
set函数:用于设置buffer的位置和大小
begin_pos:表示buffer的起始位置;
end_pos:表示buffer的结束位置;
pos:表示当前已经写(或读)的位置;
WriteBuffer的基本实现
class WriteBuffer : public BufferBase
{
public:
inline void next()
{
if (!offset())
return;
bytes += offset();
try
{
nextImpl();
}
catch (...)
{
/** If the nextImpl() call was unsuccessful, move the cursor to the beginning,
* so that later (for example, when the stack was expanded) there was no second attempt to write data.
*/
pos = working_buffer.begin();
throw;
}
pos = working_buffer.begin();
}
inline void nextIfAtEnd()
{
if (!hasPendingData()) // buffer写满了,则next。pos指向buffer开头
next();
}
void write(const char * from, size_t n)
{
if (finalized)
throw Exception{ErrorCodes::LOGICAL_ERROR, "Cannot write to finalized buffer"};
size_t bytes_copied = 0;
/// Produces endless loop
assert(!working_buffer.empty());
while (bytes_copied < n)
{
nextIfAtEnd();
size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied);
memcpy(pos, from + bytes_copied, bytes_to_copy);
pos += bytes_to_copy;
bytes_copied += bytes_to_copy;
}
}
inline void write(char x)
{
if (finalized)
throw Exception{ErrorCodes::LOGICAL_ERROR, "Cannot write to finalized buffer"};
nextIfAtEnd();
*pos = x;
++pos;
}
private:
virtual void nextImpl() { throw Exception("Cannot write after end of buffer.", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER); }
};
next():
更新已写的字节数(bytes),对buffer中已经使用的内存进行处理(nextImpl()进行实际的处理,比如WriteBufferFromPocoSocket,则是将数据发送出去),将pos重新指向buffer的头部,下次写的数据会将之前的数据覆盖
nextIfAtEnd():
判断buffer是否已经写满,如果写满了,则通过nextImpl处理buffer中已写的数据
write(const char * from, size_t n):
将数据从from拷贝到buffer,并更新pos
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)