手撕环形队列系列三:多生产者并行写入
本文是手撕环形队列系列的第三篇,之前的文章链接如下:
《手撕环形队列》
《手撕环形队列系列二:无锁实现高并发》
之前系列文章介绍的环形队列,已经能够支持多生产者和多消费者并发操作,数据操作模式为:
生产者准备好数据,然后push()方法放入环形队列中;
消费者准备好接收的缓冲区,然后调用pop()方法将数据写入接收缓冲区中。
这种模式下,生产者准备数据过程中,需要分配内存。把数据写入环形队列后,这个内存就不需要了,得释放掉。这样,会导致生产者频繁进行内存分配和释放的动作。
因此,对性能要求更苛刻的系统,希望生产者能直接使用环形队列中的内存,直接写入,这样就不需要生产者再分配内存了。
这种思路下,环形队列和生产者之间的交互关系变为:
1) 生产者调用ring_queue.request_push()方法,获得一个可以写入的token;
2) 生产者通过token 获取内存地址,向这个地址进行数据写入, 把数据都写好;
3) 生产者调用 token.finish()方法, 通知环形队列已经完成写入。
多个生产者可以并行操作,用图示意如下:
上图中,环形队列中的元素,用三种不同颜色来区分:
可写入区域:可以写入的位置区域;
正在写入区域:已经分配出去,producer 正在进行写入的区域
写入完成区域:producer 写入完成,并且环形队列已经更新其状态的区域。
多个producer(p1, p2, p3) 并发在环形队列尾部写入,它们的速度不一定相同,因此并发写入区域会形成正在写入区域、写入完成区域互相交错的情况。
这种情况下,消费者读取时,把tail 作为上限,并且要判断区域是已写入完成状态,才可以读取进行消费。
支持多生产者并行提交的环形队列,C语言实现代码如下。
本文仅展示了多生产者并行写入,消费者依然是串行消费。(依照本文的思路,小伙伴们可以继续扩展,让多个消费者也能并行消费。)
// ring_queue.h
#ifndef RING_QUEUE_H
#define RING_QUEUE_H
typedef struct ring_queue_t {
char* pbuf;
char* pstate;
int item_size;
int capacity;
volatile int write_flag;
volatile int read_flag;
volatile int head;
volatile int tail;
volatile int same_cycle;
} ring_queue_t;
typedef struct ring_queue_token_t {
ring_queue_t* pqueue;
void* pitem;
int idx;
} ring_queue_token_t;
int ring_queue_init(ring_queue_t* pqueue, int item_size, int capacity);
void ring_queue_destroy(ring_queue_t* pqueue);
int ring_queue_request_push(ring_queue_t* pqueue, ring_queue_token_t* ptoken);
void* ring_queue_token_item(ring_queue_token_t* ptoken);
int ring_queue_token_finish(ring_queue_token_t* ptoken);
int ring_queue_pop(ring_queue_t* pqueue, void* pitem);
int ring_queue_is_empty(ring_queue_t* pqueue);
int ring_queue_is_full(ring_queue_t* pqueue);
#endif
// ring_queue.c
#include "ring_queue.h"
#include <stdlib.h>
#include <string.h>
#define CAS(ptr, old, new) __sync_bool_compare_and_swap(ptr, old, new)
enum ItemState {
ITEM_BLANK,
ITEM_WRITING,
ITEM_WRITED,
};
int ring_queue_init(ring_queue_t* pqueue, int item_size, int capacity) {
memset(pqueue, 0, sizeof(*pqueue));
pqueue->pbuf = (char*)malloc(item_size * capacity);
pqueue->pstate = (char*)malloc(capacity);
if (!pqueue->pbuf || !pqueue->pstate) {
return -1;
}
pqueue->item_size = item_size;
pqueue->capacity = capacity;
pqueue->same_cycle = 1;
memset(pqueue->pstate, ITEM_BLANK, capacity);
return 0;
}
void ring_queue_destroy(ring_queue_t* pqueue) {
free(pqueue->pbuf);
free(pqueue->pstate);
memset(pqueue, 0, sizeof(*pqueue));
}
int ring_queue_request_push(ring_queue_t* pqueue, ring_queue_token_t* ptoken) {
// try to lock write flag
while (1) {
if (ring_queue_is_full(pqueue)) {
return -1;
}
if (CAS(&pqueue->write_flag, 0, 1)) { // set write flag successfully
break;
}
}
// generate token
int idx = pqueue->tail;
pqueue->pstate[idx] = ITEM_WRITING;
ptoken->pqueue = pqueue;
ptoken->idx = idx;
ptoken->pitem = pqueue->pbuf + (idx * pqueue->item_size);
idx = (idx + 1) % pqueue->capacity;
if (0 == idx) { // a new cycle
pqueue->same_cycle = 0; // tail is not the same cycle with head
}
pqueue->tail = idx;
// unlock write flag
CAS(&pqueue->write_flag, 1, 0);
return 0;
}
void* ring_queue_token_item(ring_queue_token_t* ptoken) {
return ptoken->pitem;
}
int ring_queue_token_finish(ring_queue_token_t* ptoken) {
// set item commit state
ring_queue_t* pq = ptoken->pqueue;
pq->pstate[ptoken->idx] = ITEM_WRITED;
return 0;
}
int ring_queue_pop(ring_queue_t* pqueue, void* pitem) {
// try to set read flag
while (1) {
if (ring_queue_is_empty(pqueue)) {
return -1;
}
if (CAS(&pqueue->read_flag, 0, 1)) { // set read flag successfully
break;
}
}
int iret = -2;
if (ITEM_WRITED == pqueue->pstate[pqueue->head]) {
// read data
memcpy(pitem, pqueue->pbuf + pqueue->head * pqueue->item_size, pqueue->item_size);
pqueue->head = (pqueue->head + 1) % pqueue->capacity;
if (0 == pqueue->head) {
pqueue->same_cycle = 1; // head is now the same cycle with tail
}
iret = 0; // succ
}
// reset read flag
CAS(&pqueue->read_flag, 1, 0);
return iret;
}
int ring_queue_is_empty(ring_queue_t* pqueue) {
return (pqueue->head == pqueue->tail) && pqueue->same_cycle;
}
int ring_queue_is_full(ring_queue_t* pqueue) {
return (pqueue->head == pqueue->tail) && !pqueue->same_cycle;
}
写个简单的程序,测试一下(不是多线程的,仅为了基本功能演示用):
// test_ring_queue.c
#include "ring_queue.h"
#include <stdio.h>
static void test_push(ring_queue_t* pq, ring_queue_token_t* ptk, int val);
static void test_pop(ring_queue_t* pq);
int main() {
ring_queue_t queue, *pq = &queue;
int iret = ring_queue_init(pq, sizeof(int), 3);
iret = ring_queue_is_empty(pq);
printf("ring_queue is%s empty!\n", iret ? "" : " not");
ring_queue_token_t tk1, tk2, tk3, tk4;
iret = ring_queue_request_push(pq, &tk1);
iret = ring_queue_request_push(pq, &tk2);
iret = ring_queue_request_push(pq, &tk3);
iret = ring_queue_request_push(pq, &tk4);
int val = 1;
test_push(pq, &tk1, val++);
test_push(pq, &tk2, val++);
test_push(pq, &tk3, val++);
iret = ring_queue_is_full(pq);
printf("ring_queue is%s full!\n", iret ? "" : " not");
test_pop(pq);
iret = ring_queue_request_push(pq, &tk4);
test_push(pq, &tk4, val++);
test_pop(pq);
test_pop(pq);
test_pop(pq);
test_pop(pq);
ring_queue_destroy(pq);
return 0;
}
static void test_push(ring_queue_t* pq, ring_queue_token_t* ptk, int val) {
int* p = (int*)ring_queue_token_item(ptk);
*p = val;
int iret = ring_queue_token_finish(ptk);
if (0 == iret) {
printf("ring_queue_push succ, val = %d\n", val);
} else {
printf("ring_queue_push failed! iret = %d\n", iret);
}
}
static void test_pop(ring_queue_t* pq) {
int val = -1;
int iret = ring_queue_pop(pq, &val);
if (0 == iret) {
printf("ring_queue_pop succ, val = %d\n", val);
} else {
printf("ring_queue_pop failed! iret = %d\n", iret);
}
}
编译后,运行测试程序,输出如下:
$ ./test_ring_queue
ring_queue is empty!
ring_queue_push succ, val = 1
ring_queue_push succ, val = 2
ring_queue_push succ, val = 3
ring_queue is full!
ring_queue_pop succ, val = 1
ring_queue_push succ, val = 4
ring_queue_pop succ, val = 2
ring_queue_pop succ, val = 3
ring_queue_pop succ, val = 4
ring_queue_pop failed! iret = -1
我的微信号是 实力程序员,欢迎大家转发至朋友圈,分享给更多的朋友。
- 点赞
- 收藏
- 关注作者
评论(0)