ClickHouse源码分析:Buffer的原理

举报
ZhjDayDayUp 发表于 2022/12/27 14:39:56 2022/12/27
【摘要】 这里以WriteBufferFromPocoSocket为例,记录在ClickHouse中Buffer类的原理。此源码来源于v22.3.2-lts版本。

这里以WriteBufferFromPocoSocket为例,记录在ClickHouse中Buffer类的原理。此源码来源于v22.3.2-lts版本。

一、Buffer类的继承关系

image.png

二、基本实现

BufferBase中的实现

class BufferBase
{
public:
    /** Cursor in the buffer. The position of write or read. */
    using Position = char *;

    /** A reference to the range of memory. */
    struct Buffer
    {
        Buffer(Position begin_pos_, Position end_pos_) : begin_pos(begin_pos_), end_pos(end_pos_) {}

        inline Position begin() const { return begin_pos; }
        inline Position end() const { return end_pos; }
        inline size_t size() const { return size_t(end_pos - begin_pos); }
        inline void resize(size_t size) { end_pos = begin_pos + size; }
        inline bool empty() const { return size() == 0; }

        inline void swap(Buffer & other)
        {
            std::swap(begin_pos, other.begin_pos);
            std::swap(end_pos, other.end_pos);
        }

    private:
        Position begin_pos;
        Position end_pos;        /// 1 byte after the end of the buffer
    };

    /** The constructor takes a range of memory to use for the buffer.
      * offset - the starting point of the cursor. ReadBuffer must set it to the end of the range, and WriteBuffer - to the beginning.
      */
    BufferBase(Position ptr, size_t size, size_t offset)
        : pos(ptr + offset), working_buffer(ptr, ptr + size), internal_buffer(ptr, ptr + size) {}

    void set(Position ptr, size_t size, size_t offset)
    {
        internal_buffer = Buffer(ptr, ptr + size);
        working_buffer = Buffer(ptr, ptr + size);
        pos = ptr + offset;
    }

    /// get buffer
    inline Buffer & internalBuffer() { return internal_buffer; }

    /// get the part of the buffer from which you can read / write data
    inline Buffer & buffer() { return working_buffer; }

    /// get (for reading and modifying) the position in the buffer
    inline Position & position() { return pos; }

    /// offset in bytes of the cursor from the beginning of the buffer
    inline size_t offset() const { return size_t(pos - working_buffer.begin()); }

    /// How many bytes are available for read/write
    inline size_t available() const { return size_t(working_buffer.end() - pos); }

    inline void swap(BufferBase & other)
    {
        internal_buffer.swap(other.internal_buffer);
        working_buffer.swap(other.working_buffer);
        std::swap(pos, other.pos);
    }

    /** How many bytes have been read/written, counting those that are still in the buffer. */
    size_t count() const { return bytes + offset(); }

    /** Check that there is more bytes in buffer after cursor. */
    bool ALWAYS_INLINE hasPendingData() const { return available() > 0; }

    bool isPadded() const { return padded; }

protected:
    void resetWorkingBuffer()
    {
        /// Move position to the end of buffer to trigger call of 'next' on next reading.
        /// Discard all data in current working buffer to prevent wrong assumptions on content
        /// of buffer, e.g. for optimizations of seeks in seekable buffers.
        working_buffer.resize(0);
        pos = working_buffer.end();
    }

    /// Read/write position.
    Position pos;

    /** How many bytes have been read/written, not counting those that are now in the buffer.
      * (counting those that were already used and "removed" from the buffer)
      */
    size_t bytes = 0;

    /** A piece of memory that you can use.
      * For example, if internal_buffer is 1MB, and from a file for reading it was loaded into the buffer
      *  only 10 bytes, then working_buffer will be 10 bytes in size
      *  (working_buffer.end() will point to the position immediately after the 10 bytes that can be read).
      */
    Buffer working_buffer;

    /// A reference to a piece of memory for the buffer.
    Buffer internal_buffer;

    /// Indicator of 15 bytes pad_right
    bool padded{false};
};

这个类中主要方法和变量的关系可以从下图中看出:

image.png
set函数:用于设置buffer的位置和大小
begin_pos:表示buffer的起始位置;
end_pos:表示buffer的结束位置;
pos:表示当前已经写(或读)的位置;

WriteBuffer的基本实现

class WriteBuffer : public BufferBase
{
public:
    inline void next()
    {
        if (!offset())
            return;
        bytes += offset();

        try
        {
            nextImpl();
        }
        catch (...)
        {
            /** If the nextImpl() call was unsuccessful, move the cursor to the beginning,
              * so that later (for example, when the stack was expanded) there was no second attempt to write data.
              */
            pos = working_buffer.begin();
            throw;
        }

        pos = working_buffer.begin();
    }

    inline void nextIfAtEnd()
    {
        if (!hasPendingData()) // buffer写满了,则next。pos指向buffer开头
            next();
    }


    void write(const char * from, size_t n)
    {
        if (finalized)
            throw Exception{ErrorCodes::LOGICAL_ERROR, "Cannot write to finalized buffer"};

        size_t bytes_copied = 0;

        /// Produces endless loop
        assert(!working_buffer.empty());

        while (bytes_copied < n)
        {
            nextIfAtEnd();
            size_t bytes_to_copy = std::min(static_cast<size_t>(working_buffer.end() - pos), n - bytes_copied);
            memcpy(pos, from + bytes_copied, bytes_to_copy);
            pos += bytes_to_copy;
            bytes_copied += bytes_to_copy;
        }
    }

    inline void write(char x)
    {
        if (finalized)
            throw Exception{ErrorCodes::LOGICAL_ERROR, "Cannot write to finalized buffer"};
        nextIfAtEnd();
        *pos = x;
        ++pos;
    }
private:
    virtual void nextImpl() { throw Exception("Cannot write after end of buffer.", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER); }
};

next()
更新已写的字节数(bytes),对buffer中已经使用的内存进行处理(nextImpl()进行实际的处理,比如WriteBufferFromPocoSocket,则是将数据发送出去),将pos重新指向buffer的头部,下次写的数据会将之前的数据覆盖
nextIfAtEnd()
判断buffer是否已经写满,如果写满了,则通过nextImpl处理buffer中已写的数据
write(const char * from, size_t n)
将数据从from拷贝到buffer,并更新pos

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。