libaio学习
libaio是linux版本的aio库,aio指的是异步io。这里记录下libaio的使用,主要过程为:libaio的初始化,io请求的下发和回收,libaio销毁。
libaio的接口
libaio的初始化
int io_setup(int maxevents, io_context_t *ctxp);
// maxevents表示能处理的最大事件数,同时初始化结构io_context_t,调用之前ctxp的内容必须初始化为0
提交IO读写请求
long io_submit (io_context_t ctx_id, long nr, struct iocb **iocbpp);
// 提交任务前,必须先填充iocb结构体。可以使用io_prep_pwrite和io_prep_pread接口生成iocb的结构体,这其中涉及到fd,需要先open对应的文件,并设置为O_DIRECT标志。另外,buff需要先对齐。
struct io_iocb_common {
PADDEDptr(void *buf, __pad1); // 缓存空间
PADDEDul(nbytes, __pad2); // 字节数
long long offset; // 偏移
long long __pad3;
unsigned flags;
unsigned resfd;
}; /* result code is the amount read or -'ve errno */
struct iocb {
PADDEDptr(void *data, __pad1); /* Return in the io completion event */ 用户回调函数,可以通过io_set_callback设置
PADDED(unsigned key, __pad2); /* For use in identifying io requests */
short aio_lio_opcode;
short aio_reqprio;
int aio_fildes;
union {
struct io_iocb_common c;
struct io_iocb_vector v;
struct io_iocb_poll poll;
struct io_iocb_sockaddr saddr;
} u;
};
void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset)
{
memset(iocb, 0, sizeof(*iocb));
iocb->aio_fildes = fd;
iocb->aio_lio_opcode = IO_CMD_PREAD;
iocb->aio_reqprio = 0;
iocb->u.c.buf = buf;
iocb->u.c.nbytes = count;
iocb->u.c.offset = offset;
}
void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset)
{
memset(iocb, 0, sizeof(*iocb));
iocb->aio_fildes = fd;
iocb->aio_lio_opcode = IO_CMD_PWRITE;
iocb->aio_reqprio = 0;
iocb->u.c.buf = buf; // 缓存空间
iocb->u.c.nbytes = count; // 字节数
iocb->u.c.offset = offset; // 偏移
}
// 这里需要注意:buf必须是按照扇区对齐的(512的倍数),可以使用posix_memalign进行对齐。
(1)为什么是O_DIRECT标志
O_DIRECT标志指采用DIRECT I/O,相对于此,有个Buffered I/O。后者需要将用户空间的数据拷贝到内核缓存区,然后再拷贝到磁盘,这操作造成了CPU资源和内存资源的浪费。而DIRECT I/O,不需要通过内核缓存数据,直接将用户空间数据拷贝到磁盘。所以选择O_DIRECT标志,利于减少内存拷贝和CPU利用率。但是这样做的话,需要用户保证内存对齐,否则数据是无法正常写入的,可以使用posix_memalign进行内存对齐。读写缓冲区的地址、内容的大小、文件偏移必须是扇区的倍数(通常是512字节)。
(2)posix_memalign的使用
#include <stdlib.h>
int posix_memalign (void **memptr,
size_t alignment,
size_t size);
如果调用posix_memalign()成功时,会返回size字节的动态内存,并且这块内存的地址是alignment的倍数。参数alignment必须是2的幂,还是void指针的大小的倍数。返回的内存块的地址放在了memptr里面,函数返回值是0。
如果调用失败时,没有内存会被分配,memptr的值没有被定义,返回如下错误码之一:
EINVAL:参数不是2的幂,或者不是void指针的倍数。
ENOMEM:没有足够的内存去满足函数的请求。
要注意的是,对于这个函数,errno不会被设置,只能通过返回值得到。
由posix_memalign()获得的内存通过free( )释放。
IO请求返回
读写io请求下发后,使用io_getevents函数等待io结束信号。
int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);
// 返回events数组,events为数组首地址;
// nr为数组长度,即最大返回的event数;
// min_nr为最少返回的event数;
// timieout表示超时时间,填写NULL表示无等待超时
// io_event的结构体内容:
struct io_event {undefined
PADDEDptr(void *data, __pad1);
PADDEDptr(struct iocb *obj, __pad2);
PADDEDul(res, __pad3);
PADDEDul(res2, __pad4);
};
// res为实际完成的字节数
// res2位读写成功的状态,0表示成功
// obj为io_submit中下发的内容
IO取消或者销毁
int io_destroy(io_context_t ctx); // 跟io_setup对应
int io_cancel(io_context_t ctx, struct iocb *iocb, struct io_event *evt); // 取消之前通过io_submit传入的iocb
实例
单独使用aio
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<libaio.h>
#include<fcntl.h>
#include<errno.h>
int aio_write(void *buff, struct iocb* p)
{
strcpy(buff, "hello world");
int fd = open("./aiotest.txt", O_WRONLY | O_CREAT | O_APPEND | O_DIRECT, 0644);
io_prep_pwrite(p, fd, buff, 1024, 0);
return fd;
}
int aio_read(void *buff, struct iocb* p)
{
int fd = open("./aiotest.txt", O_RDONLY | O_CREAT | O_APPEND | O_DIRECT, 0644);
io_prep_pread(p, fd, buff, 1024, 0);
return fd;
}
int main()
{
io_context_t ctx;
memset(&ctx, 0, sizeof(ctx));
int ret = io_setup(10, &ctx);
if (ret != 0)
{
printf("io_setup fail: errno=%d, ret=%d\n", errno, ret);// 注意,errno并没有记录相关的错误
return -1;
}
struct iocb iocbpp;
struct iocb *p = &iocbpp;
void *buff = NULL;
posix_memalign(&buff, 512, 1024);
int fd = aio_read(buff, p); // aio_write(buff, p);
ret = io_submit(ctx, 1, &p);
if (ret != 1)
{
io_destroy(ctx);
free(buff);
printf("io_submit fail, errno=%d, ret=%d\n", errno, ret);
return -1;
}
struct io_event events[10];
if (io_getevents(ctx, 1, 10, events, NULL) != 1)
{
printf("io_getevents fail\n");
close(fd);
io_destroy(ctx);
free(buff);
return -1;
}
printf("read result: %s\n", events[0].obj->u.c.buf);
printf("read res: %s\n", buff);
close(fd);
io_destroy(ctx);
free(buff);
return 0;
}
aio和epoll结合
需要使用到io_set_eventfd,将evenfd写到iocb中,io_set_eventfd的定义如下:
void io_set_eventfd(struct iocb *iocb, int eventfd)
{
iocb->u.c.flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
iocb->u.c.resfd = eventfd;
}
具体步骤:
(1)创建eventfd
efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
(2)将eventfd设置到iocb中
io_set_eventfd(iocb, efd);
(3)提交aio请求
io_submit(ctx, NUM_EVENTS, iocb);
(4)创建epollfd,并将eventfd加到epoll中
epfd = epoll_create(1);
epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent);
epoll_wait(epfd, &epevent, 1, -1);
(5)当eventfd可读时,从eventfd读出完成IO请求的数量,并用io_getevents获取这些IO
read(efd, &finished_aio, sizeof(finished_aio);
r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
具体实例:
// aio.cpp
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<libaio.h>
#include<fcntl.h>
#include<errno.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
int aio_write(void *buff, struct iocb* p)
{
strcpy((char*)buff, "hello world");
int fd = open("./aiotest.txt", O_WRONLY | O_CREAT | O_APPEND | O_DIRECT, 0644);
io_prep_pwrite(p, fd, buff, 1024, 0);
return fd;
}
int aio_read(void *buff, struct iocb* p)
{
int fd = open("./aiotest.txt", O_RDONLY | O_CREAT | O_APPEND | O_DIRECT, 0644);
io_prep_pread(p, fd, buff, 1024, 0);
return fd;
}
int main()
{
io_context_t ctx;
memset(&ctx, 0, sizeof(ctx));
int ret = io_setup(10, &ctx);
if (ret != 0)
{
printf("io_setup fail: errno=%d, ret=%d\n", errno, ret);
return -1;
}
struct iocb iocbpp;
struct iocb *p = &iocbpp;
void *buff = NULL;
posix_memalign(&buff, 512, 1024);
int fd = aio_read(buff, p); // aio_write(buff, p);
int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
io_set_eventfd(p, efd);
ret = io_submit(ctx, 1, &p);
if (ret != 1)
{
io_destroy(ctx);
free(buff);
printf("io_submit fail, errno=%d, ret=%d\n", errno, ret);
return -1;
}
int epfd = epoll_create(1);
struct epoll_event event;
event.events = EPOLLET | EPOLLIN;
event.data.fd = efd;
epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &event);
uint64_t data;
while(true)
{
ret = epoll_wait(epfd, &event, 1, -1);
if (ret > 0)
{
ret = read(efd, &data, sizeof(data)); // 这里会阻塞
if (ret < 0)
{
printf("read fail\n");
}
printf("success read from efd, read %d bytes, data is %d\n", ret, data);
struct io_event events[10];
if (io_getevents(ctx, 1, 10, events, NULL) != 1)
{
printf("io_getevents fail\n");
close(fd);
io_destroy(ctx);
free(buff);
return -1;
}
printf("read result: %s\n", events[0].obj->u.c.buf);
}
else if (ret == 0)
{
printf("epoll wait timeout\n");
break;
}
else
{
printf("epoll wait error\n");
break;
}
}
close(fd);
io_destroy(ctx);
free(buff);
return 0;
}
参考:
https://blog.csdn.net/enlaihe/article/details/112320114?spm=1001.2014.3001.5506
https://blog.csdn.net/cjsycyl/article/details/9332241 (epoll和aio结合)
https://zhuanlan.zhihu.com/p/145394613?from_voters_page=true (讲得挺清楚)
- 点赞
- 收藏
- 关注作者
评论(0)