DAOS 源码解析之 daos_api
DAOS (Distributed Asynchronous Object Storage) 是一个开源的对象存储系统,专为大规模分布式非易失性内存设计,利用了 SCM 和 NVMe 等的下一代 NVM 技术。 DAOS 同时在硬件之上提供了键值存储接口,提供了诸如事务性非阻塞 I/O、具有自我修复的高级数据保护、端到端数据完整性、细粒度数据控制和弹性存储的高级数据保护,从而优化性能并降低成本。
本文以 Release 1.1.4 版本为标准,解析 include/daos_api.h
中的具体实现。include/daos_api.h
包含了 DAOS 系统主要的 API 接口。
daos_rank_list_parse
daos_rank_list_parse
函数从带有分隔符参数的字符串生成服务器标识符列表。
该函数是 daos_pool_connect
的辅助函数,主要用于生成其所需的服务器标识符列表。
参数:
str [in]
:用于生成服务器标识符列表的带有分隔符参数的字符串。sep [in]
:分隔符,例如在dmg
中使用:
作为分隔符。
返回值:
- 分配的标识符列表,用户需要负责调用
d_rank_list_free
释放列表。
d_rank_list_t *daos_rank_list_parse(const char *str, const char *sep)
{
d_rank_t * buf;
int cap = 8;
d_rank_list_t *ranks = NULL;
char * s, *s_saved;
char * p;
int n = 0;
// 分配空间给 buf
D_ALLOC_ARRAY(buf, cap);
if (buf == NULL)
// 给 buf 分配空间失败,不用释放 s_saved 和 buf,直接返回空指针
D_GOTO(out, ranks = NULL);
// 将 str 指向的字符串复制到字符串指针 s_saved 上去
// s_saved 没被初始化,在复制时会给这个指针分配空间
D_STRNDUP(s_saved, str, strlen(str));
s = s_saved;
if (s == NULL)
// 给 s_saved 分配空间失败,不用释放 buf,返回空指针
D_GOTO(out_buf, ranks = NULL);
// 用 seq 分割字符串
while ((s = strtok_r(s, sep, &p)) != NULL) {
// 当前缓冲区已满,扩展缓冲区至原来的两倍,并将数据复制过去
if (n == cap) {
d_rank_t *buf_new;
int cap_new;
cap_new = cap * 2;
D_ALLOC_ARRAY(buf_new, cap_new);
if (buf_new == NULL)
D_GOTO(out_s, ranks = NULL);
memcpy(buf_new, buf, sizeof(*buf_new) * n);
D_FREE(buf);
buf = buf_new;
cap = cap_new;
}
// 存储标识符
buf[n] = atoi(s);
n++;
s = NULL;
}
if (n > 0) {
// 为 ranks 分配空间,数组长度为 n
ranks = daos_rank_list_alloc(n);
if (ranks == NULL)
// 给 ranks 分配空间失败,释放 s_saved 和 buf,返回空指针
D_GOTO(out_s, ranks = NULL);
// 将缓冲区里的标识符复制到 ranks 中
memcpy(ranks->rl_ranks, buf, sizeof(*buf) * n);
}
out_s:
// 释放 s_saved 的空间
D_FREE(s_saved);
out_buf:
// 释放 buf 的空间
D_FREE(buf);
out:
return ranks;
}
结构体 d_rank_list_t
封装一个数组,用于存储服务器标识符:
typedef struct {
// 数组,用于存储服务器标识
d_rank_t *rl_ranks;
// 数组长度
uint32_t rl_nr;
} d_rank_list_t;
d_rank_t
表示服务器标识符:
typedef uint32_t d_rank_t;
daos_tx_open
daos_tx_open
函数在容器句柄上打开一个事务。该事务句柄可以用于容器中需要事务提交的 IOs。
参数:
coh [in]
:容器句柄。th [out]
:返回的事务句柄。flags [in]
:事务标志(例如DAOS_TF_RDONLY
)。ev [in]
:结束事件,该参数是可选的,可以为NULL
。当该参数为NULL
时,该函数在阻塞模式下运行。
返回值:
- 如果成功,返回 0。
- 如果失败,返回负数。
int
daos_tx_open(daos_handle_t coh, daos_handle_t *th, uint64_t flags,
daos_event_t *ev)
{
daos_tx_open_t *args;
tse_task_t * task;
int rc;
// 判断 *args 大小是否与 daos_tx_open_t 的预期大小相等
DAOS_API_ARG_ASSERT(*args, TX_OPEN);
// 创建新任务 dc_tx_open,并将其与输入事件 ev 关联
// 如果事件 ev 为 NULL,则将获取私有事件
rc = dc_task_create(dc_tx_open, NULL, ev, &task);
if (rc)
// dc_task_create 成功返回 0,失败返回负数
return rc;
// 从 task 中获取参数
args = dc_task_get_args(task);
args->coh = coh;
args->th = th;
args->flags = flags;
// 调度创建的任务 task
// 如果该任务的关联事件是私有事件,则此函数将等待任务完成
// 否则它将立即返回,并通过测试事件或在 EQ 上轮询找到其完成情况
//
// 第二个参数 instant 为 true,表示任务将立即执行
return dc_task_schedule(task, true);
}
结构体 daos_handle_t
表示各种 DAOS 组件(如容器、对象等)的通用句柄:
typedef struct {
uint64_t cookie;
} daos_handle_t;
daos_tx_open
函数的 flags
为下列两种类型:
enum {
// 只读事务
DAOS_TF_RDONLY = (1 << 0),
// 当存在与事务相关的修改时,不要复制调用方的数据缓冲区
// 在事务 daos_tx_commit 操作完成前,缓冲区必须保持不变
//
// 无论此标志如何,始终复制键的缓冲区
// 它们可以在相应的操作完成后释放或调整用途
DAOS_TF_ZERO_COPY = (1 << 1),
};
结构体 daos_event_t
表示事件或事件队列:
typedef struct daos_event {
int ev_error;
// 该结构体仅限内部使用,禁止更改
struct {
uint64_t space[19];
} ev_private;
// 仅限 debug 模式时使用
uint64_t ev_debug;
} daos_event_t;
结构体 daos_tx_open_t
表示事务打开参数:
typedef struct {
// 打开的容器句柄
daos_handle_t coh;
// 返回的打开的事务句柄
daos_handle_t *th;
// 事务标志
uint64_t flags;
} daos_tx_open_t;
结构体 tse_sched_t
用来跟踪调度程序下的所有任务:
typedef struct {
int ds_result;
// 与调度程序关联的用户数据(例如 completion cb)
void *ds_udata;
// daos 的内部调度计划
struct {
uint64_t ds_space[48];
} ds_private;
} tse_sched_t;
daos_tx_commit
daos_tx_commit
函数用于提交事务。
如果操作成功,则事务句柄不能再用于任何新的 IO 操作。
如果返回 -DER_TX_RESTART
,调用方需要使用相同的事务句柄调用 daos_tx_restart
函数重新启动该事务,执行此事务的调用方代码,然后再次调用 daos_tx_commit
。
参数:
th [in]
:要提交的事务句柄。ev [in]
:结束事件,该参数是可选的,可以为NULL
。当该参数为NULL
时,该函数在阻塞模式下运行。
返回值:
- 如果成功,返回 0。
- 如果失败,返回负数。可能的错误值为:
-DER_NO_HDL
:无效的事务句柄-DER_INVAL
:无效的参数-DER_TX_RESTART
:事物需要重启(例如,由于冲突)
int
daos_tx_commit(daos_handle_t th, daos_event_t *ev)
{
daos_tx_commit_t *args;
tse_task_t * task;
int rc;
// 判断 *args 大小是否与 daos_tx_commit_t 的预期大小相等
DAOS_API_ARG_ASSERT(*args, TX_COMMIT);
// 创建新任务 daos_tx_commit,并将其与输入事件 ev 关联
// 如果事件 ev 为 NULL,则将获取私有事件
rc = dc_task_create(dc_tx_commit, NULL, ev, &task);
if (rc)
// dc_task_create 成功返回 0,失败返回负数
return rc;
// 从 task 中获取参数
args = dc_task_get_args(task);
args->th = th;
args->flags = 0;
// 调度创建的任务 task
// 如果该任务的关联事件是私有事件,则此函数将等待任务完成
// 否则它将立即返回,并通过测试事件或在 EQ 上轮询找到其完成情况
//
// 第二个参数 instant 为 true,表示任务将立即执行
return dc_task_schedule(task, true);
}
结构体 daos_tx_commit_t
表示事务提交参数:
typedef struct {
// 打开的事务句柄
daos_handle_t th;
// 事务标志,控制提交行为,例如重试
uint32_t flags;
} daos_tx_commit_t;
daos_tx_open_snap
daos_tx_open_snap
函数从快照创建只读事务。
该函数不会创建快照,但只有一个读取事务会被 daos_cont_create_snap
函数创建的快照中读取出来。
如果用户传递的 epoch
代表的不是快照,或者快照已被删除,那么读取该事务可能会获得未定义的结果。
参数:
coh [in]
:容器句柄。epoch [in]
:要读取的快照的 epoch。th [out]
:返回的只读事务句柄。ev [in]
:结束事件,该参数是可选的,可以为NULL
。当该参数为NULL
时,该函数在阻塞模式下运行。
返回值:
- 如果成功,返回 0。
- 如果失败,返回负数。
int
daos_tx_open_snap(daos_handle_t coh, daos_epoch_t epoch, daos_handle_t *th,
daos_event_t *ev)
{
daos_tx_open_snap_t *args;
tse_task_t * task;
int rc;
// 判断 *args 大小是否与 daos_tx_open_snap_t 的预期大小相等
DAOS_API_ARG_ASSERT(*args, TX_OPEN_SNAP);
// 创建新任务 dc_tx_open_snap,并将其与输入事件 ev 关联
// 如果事件 ev 为 NULL,则将获取私有事件
rc = dc_task_create(dc_tx_open_snap, NULL, ev, &task);
if (rc)
// dc_task_create 成功返回 0,失败返回负数
return rc;
// 从 task 中获取参数
args = dc_task_get_args(task);
args->coh = coh;
args->epoch = epoch;
args->th = th;
// 调度创建的任务 task
// 如果该任务的关联事件是私有事件,则此函数将等待任务完成
// 否则它将立即返回,并通过测试事件或在 EQ 上轮询找到其完成情况
//
// 第二个参数 instant 为 true,表示任务将立即执行
return dc_task_schedule(task, true);
}
结构体 daos_tx_open_snap_t
表示事务快照打开参数:
typedef struct {
// 打开的容器句柄
daos_handle_t coh;
// 要从中读取事务的持久快照的 epoch
daos_epoch_t epoch;
// 返回的打开的事务句柄
daos_handle_t *th;
} daos_tx_open_snap_t;
daos_tx_abort
daos_tx_abort
函数会中止对事务的所有修改,事务句柄将不能用于任何新的 IO 操作。
参数:
th [in]
:要终止的事务句柄。ev [in]
:结束事件,该参数是可选的,可以为NULL
。当该参数为NULL
时,该函数在阻塞模式下运行。
返回值:
- 如果成功,返回 0。
- 如果失败,返回负数。
int
daos_tx_abort(daos_handle_t th, daos_event_t *ev)
{
daos_tx_abort_t *args;
tse_task_t * task;
int rc;
// 判断 *args 大小是否与 daos_tx_abort_t 的预期大小相等
DAOS_API_ARG_ASSERT(*args, TX_ABORT);
// 创建新任务 dc_tx_open_snap,并将其与输入事件 ev 关联
// 如果事件 ev 为 NULL,则将获取私有事件
rc = dc_task_create(dc_tx_abort, NULL, ev, &task);
if (rc)
// dc_task_create 成功返回 0,失败返回负数
return rc;
// 从 task 中获取参数
args = dc_task_get_args(task);
args->th = th;
// 调度创建的任务 task
// 如果该任务的关联事件是私有事件,则此函数将等待任务完成
// 否则它将立即返回,并通过测试事件或在 EQ 上轮询找到其完成情况
//
// 第二个参数 instant 为 true,表示任务将立即执行
return dc_task_schedule(task, true);
}
结构体 daos_tx_abort_t
表示事务中断参数:
typedef struct {
// 打开的事务句柄
daos_handle_t th;
} daos_tx_abort_t;
daos_tx_close
daos_tx_close
函数用于关闭事务句柄。
这是一个本地操作,不需要 RPC 参与。
参数:
th [in]
:要释放的事务句柄。ev [in]
:结束事件,该参数是可选的,可以为NULL
。当该参数为NULL
时,该函数在阻塞模式下运行。
返回值:
- 如果成功,返回 0。
- 如果失败,返回负数。
int
daos_tx_close(daos_handle_t th, daos_event_t *ev)
{
daos_tx_close_t *args;
tse_task_t * task;
int rc;
// 判断 *args 大小是否与 daos_tx_close_t 的预期大小相等
DAOS_API_ARG_ASSERT(*args, TX_CLOSE);
// 创建新任务 dc_tx_open_snap,并将其与输入事件 ev 关联
// 如果事件 ev 为 NULL,则将获取私有事件
rc = dc_task_create(dc_tx_close, NULL, ev, &task);
if (rc)
// dc_task_create 成功返回 0,失败返回负数
return rc;
// 从 task 中获取参数
args = dc_task_get_args(task);
args->th = th;
// 调度创建的任务 task
// 如果该任务的关联事件是私有事件,则此函数将等待任务完成
// 否则它将立即返回,并通过测试事件或在 EQ 上轮询找到其完成情况
//
// 第二个参数 instant 为 true,表示任务将立即执行
return dc_task_schedule(task, true);
}
结构体 daos_tx_close_t
表示事务关闭参数:
typedef struct {
// 打开的事务句柄
daos_handle_t th;
} daos_tx_close_t;
daos_tx_restart
daos_tx_restart
函数用于遇到 -DER_TX_RESTART
错误后重新启动事务,将删除通过事务句柄发出的所有 IOs 操作。
重新启动的事务能否观察到此事务最初打开后提交的任何冲突修改是未定义的。如果调用者重试事务有其它的目的,则应打开新的事务。
这是一个本地操作,不涉及RPC。
参数:
th [in]
:要重启的事务句柄。ev [in]
:结束事件,该参数是可选的,可以为NULL
。当该参数为NULL
时,该函数在阻塞模式下运行。
返回值:
- 如果成功,返回 0。
- 如果失败,返回负数。
int
daos_tx_restart(daos_handle_t th, daos_event_t *ev)
{
daos_tx_restart_t *args;
tse_task_t * task;
int rc;
// 判断 *args 大小是否与 daos_tx_restart_t 的预期大小相等
DAOS_API_ARG_ASSERT(*args, TX_RESTART);
// 创建新任务 dc_tx_open_snap,并将其与输入事件 ev 关联
// 如果事件 ev 为 NULL,则将获取私有事件
rc = dc_task_create(dc_tx_restart, NULL, ev, &task);
if (rc)
// dc_task_create 成功返回 0,失败返回负数
return rc;
// 从 task 中获取参数
args = dc_task_get_args(task);
args->th = th;
// 调度创建的任务 task
// 如果该任务的关联事件是私有事件,则此函数将等待任务完成
// 否则它将立即返回,并通过测试事件或在 EQ 上轮询找到其完成情况
//
// 第二个参数 instant 为 true,表示任务将立即执行
return dc_task_schedule(task, true);
}
结构体 daos_tx_close_t
表示事务关闭参数:
typedef struct {
// 打开的事务句柄
daos_handle_t th;
} daos_tx_close_t;
daos_tx_hdl2epoch
daos_tx_hdl2epoch
函数返回与事务句柄关联的 epoch。
一个 epoch 在事务开始时可能不可用,需要当事务成功提交后才可用。
此函数为当前系统的特定实现,它只能用于测试和调试目的。
参数:
th [in]
:事务句柄。epoch [out]
:需要返回的 epoch 值
返回值:
- 如果成功,返回 0。
- 如果失败,返回负数。
-DER_UNINIT
表示当前 epoch 不可用。
int
daos_tx_hdl2epoch(daos_handle_t th, daos_epoch_t *epoch)
{
return dc_tx_hdl2epoch(th, epoch);
}
daos_epoch_t
为 64 位无符号整数,表示 epoch:
typedef uint64_t daos_epoch_t;
daos_anchor_init
daos_anchor_init
函数用于初始化一个迭代器锚。
参数:
anchor [in]
:待初始化的迭代器锚。opts [in]
:初始化选项(保留)。
static inline int
daos_anchor_init(daos_anchor_t *anchor, unsigned int opts)
{
// 初始化
*anchor = DAOS_ANCHOR_INIT;
return 0;
}
DAOS_ANCHOR_INIT
是初始化宏函数,将 daos_anchor_t
的 da_type
置为 DAOS_ANCHOR_TYPE_ZERO
:
#define DAOS_ANCHOR_INIT ((daos_anchor_t){DAOS_ANCHOR_TYPE_ZERO})
结构体 daos_anchor_t
用于迭代过程中的锚定:
#define DAOS_ANCHOR_BUF_MAX 104
typedef struct {
// daos_anchor_type_t 类型
uint16_t da_type;
uint16_t da_shard;
// daos_anchor_flags 枚举量
uint32_t da_flags;
// 记录 EC 编解码对象的每个分片的偏移量
uint64_t da_sub_anchors;
uint8_t da_buf[DAOS_ANCHOR_BUF_MAX];
} daos_anchor_t;
da_type
包括如下类型:
typedef enum {
DAOS_ANCHOR_TYPE_ZERO = 0,
DAOS_ANCHOR_TYPE_HKEY = 1,
DAOS_ANCHOR_TYPE_KEY = 2,
DAOS_ANCHOR_TYPE_EOF = 3,
} daos_anchor_type_t;
daos_anchor_fini
daos_anchor_fini
函数用于结束迭代,释放迭代过程中分配的空间:
static inline void
daos_anchor_fini(daos_anchor_t *anchor)
{
// 当前为空,未来可能会添加释放空间的功能
}
daos_anchor_is_eof
daos_anchor_is_eof
函数用于迭代 anchor
时判断结束符:
static inline bool
daos_anchor_is_eof(daos_anchor_t *anchor)
{
return anchor->da_type == DAOS_ANCHOR_TYPE_EOF;
}
相关信息
Emai: debugzhang@163.com
- 点赞
- 收藏
- 关注作者
评论(0)