手撕环形队列系列三:多生产者并行写入

举报
实力程序员 发表于 2021/08/04 09:21:13 2021/08/04
【摘要】 本文是手撕环形队列系列的第三篇,之前的文章链接如下:《手撕环形队列》《手撕环形队列系列二:无锁实现高并发》之前系列文章介绍的环形队列,已经能够支持多生产者和多消费者并发操作,数据操作模式为:生产者准备好数据,然后push()方法放入环形队列中;消费者准备好接收的缓冲区,然后调用pop()方法将数据写入接收缓冲区中。这种模式下,生产者准备数据过程中,需要分配内存。把数据写入环形队列后,这个内存...

本文是手撕环形队列系列的第三篇,之前的文章链接如下:
《手撕环形队列》
《手撕环形队列系列二:无锁实现高并发》


之前系列文章介绍的环形队列,已经能够支持多生产者和多消费者并发操作,数据操作模式为:
生产者准备好数据,然后push()方法放入环形队列中;
消费者准备好接收的缓冲区,然后调用pop()方法将数据写入接收缓冲区中。

这种模式下,生产者准备数据过程中,需要分配内存。把数据写入环形队列后,这个内存就不需要了,得释放掉。这样,会导致生产者频繁进行内存分配和释放的动作。

因此,对性能要求更苛刻的系统,希望生产者能直接使用环形队列中的内存,直接写入,这样就不需要生产者再分配内存了。

这种思路下,环形队列和生产者之间的交互关系变为:
1) 生产者调用ring_queue.request_push()方法,获得一个可以写入的token;
2) 生产者通过token 获取内存地址,向这个地址进行数据写入, 把数据都写好;
3) 生产者调用 token.finish()方法, 通知环形队列已经完成写入。

多个生产者可以并行操作,用图示意如下:

2021-08-04_01.jpg

上图中,环形队列中的元素,用三种不同颜色来区分:
可写入区域:可以写入的位置区域;
正在写入区域:已经分配出去,producer 正在进行写入的区域
写入完成区域:producer 写入完成,并且环形队列已经更新其状态的区域。

多个producer(p1, p2, p3) 并发在环形队列尾部写入,它们的速度不一定相同,因此并发写入区域会形成正在写入区域、写入完成区域互相交错的情况
这种情况下,消费者读取时,把tail 作为上限,并且要判断区域是已写入完成状态,才可以读取进行消费。


支持多生产者并行提交的环形队列,C语言实现代码如下。
本文仅展示了多生产者并行写入,消费者依然是串行消费。(依照本文的思路,小伙伴们可以继续扩展,让多个消费者也能并行消费。)

// ring_queue.h
#ifndef RING_QUEUE_H
#define RING_QUEUE_H

typedef struct ring_queue_t {
    char* pbuf;
    char* pstate;
    int item_size;
    int capacity;

    volatile int write_flag;
    volatile int read_flag;

    volatile int head;
    volatile int tail;
    volatile int same_cycle;
} ring_queue_t;

typedef struct ring_queue_token_t {
    ring_queue_t* pqueue;
    void* pitem;
    int idx;
} ring_queue_token_t;

int ring_queue_init(ring_queue_t* pqueue, int item_size, int capacity);
void ring_queue_destroy(ring_queue_t* pqueue);

int ring_queue_request_push(ring_queue_t* pqueue, ring_queue_token_t* ptoken);
void* ring_queue_token_item(ring_queue_token_t* ptoken);
int ring_queue_token_finish(ring_queue_token_t* ptoken);

int ring_queue_pop(ring_queue_t* pqueue, void* pitem);

int ring_queue_is_empty(ring_queue_t* pqueue);
int ring_queue_is_full(ring_queue_t* pqueue);

#endif


// ring_queue.c
#include "ring_queue.h"

#include <stdlib.h>
#include <string.h>

#define CAS(ptr, old, new) __sync_bool_compare_and_swap(ptr, old, new)

enum ItemState {
    ITEM_BLANK,
    ITEM_WRITING,
    ITEM_WRITED,
};

int ring_queue_init(ring_queue_t* pqueue, int item_size, int capacity) {
    memset(pqueue, 0, sizeof(*pqueue));
    pqueue->pbuf = (char*)malloc(item_size * capacity);
    pqueue->pstate = (char*)malloc(capacity);
    if (!pqueue->pbuf || !pqueue->pstate) {
        return -1;
    }

    pqueue->item_size = item_size;
    pqueue->capacity = capacity;
    pqueue->same_cycle = 1;
    memset(pqueue->pstate, ITEM_BLANK, capacity);
    return 0;
}

void ring_queue_destroy(ring_queue_t* pqueue) {
    free(pqueue->pbuf);
    free(pqueue->pstate);
    memset(pqueue, 0, sizeof(*pqueue));
}

int ring_queue_request_push(ring_queue_t* pqueue, ring_queue_token_t* ptoken) {
    // try to lock write flag
    while (1) {
        if (ring_queue_is_full(pqueue)) {
            return -1;
        }

        if (CAS(&pqueue->write_flag, 0, 1)) {   // set write flag successfully
            break;
        }
    }

    // generate token
    int idx = pqueue->tail;
    pqueue->pstate[idx] = ITEM_WRITING;
    ptoken->pqueue = pqueue;
    ptoken->idx = idx;
    ptoken->pitem = pqueue->pbuf + (idx * pqueue->item_size);
    idx = (idx + 1) % pqueue->capacity;
    if (0 == idx) { // a new cycle
        pqueue->same_cycle = 0;     // tail is not the same cycle with head
    }
    pqueue->tail = idx;

    // unlock write flag
    CAS(&pqueue->write_flag, 1, 0);

    return 0;
}

void* ring_queue_token_item(ring_queue_token_t* ptoken) {
    return ptoken->pitem;
}

int ring_queue_token_finish(ring_queue_token_t* ptoken) {
    // set item commit state
    ring_queue_t* pq = ptoken->pqueue;
    pq->pstate[ptoken->idx] = ITEM_WRITED;

    return 0;
}

int ring_queue_pop(ring_queue_t* pqueue, void* pitem) {
    // try to set read flag
    while (1) {
        if (ring_queue_is_empty(pqueue)) {
            return -1;
        }

        if (CAS(&pqueue->read_flag, 0, 1)) {    // set read flag successfully
            break;
        }
    }

    int iret = -2;
    if (ITEM_WRITED == pqueue->pstate[pqueue->head]) {
        // read data
        memcpy(pitem, pqueue->pbuf + pqueue->head * pqueue->item_size, pqueue->item_size);
        pqueue->head = (pqueue->head + 1) % pqueue->capacity;
        if (0 == pqueue->head) {
            pqueue->same_cycle = 1;     // head is now the same cycle with tail
        }
        iret = 0;   // succ
    }

    // reset read flag
    CAS(&pqueue->read_flag, 1, 0);

    return iret;
}


int ring_queue_is_empty(ring_queue_t* pqueue) {
    return (pqueue->head == pqueue->tail) && pqueue->same_cycle;
}

int ring_queue_is_full(ring_queue_t* pqueue) {
    return (pqueue->head == pqueue->tail) && !pqueue->same_cycle;
}


写个简单的程序,测试一下(不是多线程的,仅为了基本功能演示用):

// test_ring_queue.c
#include "ring_queue.h"
#include <stdio.h>

static void test_push(ring_queue_t* pq, ring_queue_token_t* ptk, int val);
static void test_pop(ring_queue_t* pq);

int main() {
    ring_queue_t queue, *pq = &queue;
    int iret = ring_queue_init(pq, sizeof(int), 3);
    iret = ring_queue_is_empty(pq);
    printf("ring_queue is%s empty!\n", iret ? "" : " not");

    ring_queue_token_t tk1, tk2, tk3, tk4;
    iret = ring_queue_request_push(pq, &tk1);
    iret = ring_queue_request_push(pq, &tk2);
    iret = ring_queue_request_push(pq, &tk3);
    iret = ring_queue_request_push(pq, &tk4);

    int val = 1;
    test_push(pq, &tk1, val++);
    test_push(pq, &tk2, val++);
    test_push(pq, &tk3, val++);

    iret = ring_queue_is_full(pq);
    printf("ring_queue is%s full!\n", iret ? "" : " not");

    test_pop(pq);

    iret = ring_queue_request_push(pq, &tk4);
    test_push(pq, &tk4, val++);

    test_pop(pq);
    test_pop(pq);
    test_pop(pq);
    test_pop(pq);

    ring_queue_destroy(pq);
    return 0;
}

static void test_push(ring_queue_t* pq, ring_queue_token_t* ptk, int val) {
    int* p = (int*)ring_queue_token_item(ptk);
    *p = val;
    int iret = ring_queue_token_finish(ptk);
    if (0 == iret) {
        printf("ring_queue_push succ, val = %d\n", val);
    } else {
        printf("ring_queue_push failed! iret = %d\n", iret);
    }
}

static void test_pop(ring_queue_t* pq) {
    int val = -1;
    int iret = ring_queue_pop(pq, &val);
    if (0 == iret) {
        printf("ring_queue_pop succ, val = %d\n", val);
    } else {
        printf("ring_queue_pop failed! iret = %d\n", iret);
    }
}

编译后,运行测试程序,输出如下:

$ ./test_ring_queue
ring_queue is empty!
ring_queue_push succ, val = 1
ring_queue_push succ, val = 2
ring_queue_push succ, val = 3
ring_queue is full!
ring_queue_pop succ, val = 1
ring_queue_push succ, val = 4
ring_queue_pop succ, val = 2
ring_queue_pop succ, val = 3
ring_queue_pop succ, val = 4
ring_queue_pop failed! iret = -1


我的微信号是 实力程序员,欢迎大家转发至朋友圈,分享给更多的朋友。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

举报
请填写举报理由
0/200