DAOS 源码解析之 daos_api

举报
debugzhang 发表于 2021/04/01 21:20:46 2021/04/01
【摘要】 DAOS (Distributed Asynchronous Object Storage) 是一个开源的对象存储系统,专为大规模分布式非易失性内存设计,利用了 SCM 和 NVMe 等的下一代 NVM 技术。 DAOS 同时在硬件之上提供了键值存储接口,提供了诸如事务性非阻塞 I/O、具有自我修复的高级数据保护、端到端数据完整性、细粒度数据控制和弹性存储的高级数据保护,从而优化性能并降低成本。

DAOS (Distributed Asynchronous Object Storage) 是一个开源的对象存储系统,专为大规模分布式非易失性内存设计,利用了 SCM 和 NVMe 等的下一代 NVM 技术。 DAOS 同时在硬件之上提供了键值存储接口,提供了诸如事务性非阻塞 I/O、具有自我修复的高级数据保护、端到端数据完整性、细粒度数据控制和弹性存储的高级数据保护,从而优化性能并降低成本。

本文以 Release 1.1.4 版本为标准,解析 include/daos_api.h 中的具体实现。include/daos_api.h 包含了 DAOS 系统主要的 API 接口。

daos_rank_list_parse

daos_rank_list_parse 函数从带有分隔符参数的字符串生成服务器标识符列表。

该函数是 daos_pool_connect 的辅助函数,主要用于生成其所需的服务器标识符列表。

参数:

  • str [in]:用于生成服务器标识符列表的带有分隔符参数的字符串。
  • sep [in]:分隔符,例如在 dmg 中使用 : 作为分隔符。

返回值:

  • 分配的标识符列表,用户需要负责调用 d_rank_list_free 释放列表。
d_rank_list_t *daos_rank_list_parse(const char *str, const char *sep)
{
	d_rank_t *     buf;
	int            cap   = 8;
	d_rank_list_t *ranks = NULL;
	char *         s, *s_saved;
	char *         p;
	int            n = 0;

	// 分配空间给 buf 
	D_ALLOC_ARRAY(buf, cap);
	if (buf == NULL)
		// 给 buf 分配空间失败,不用释放 s_saved 和 buf,直接返回空指针
		D_GOTO(out, ranks = NULL);
	
	// 将 str 指向的字符串复制到字符串指针 s_saved 上去
	// s_saved 没被初始化,在复制时会给这个指针分配空间
	D_STRNDUP(s_saved, str, strlen(str));
	s = s_saved;
	if (s == NULL)
		// 给 s_saved 分配空间失败,不用释放 buf,返回空指针
		D_GOTO(out_buf, ranks = NULL);

	// 用 seq 分割字符串
	while ((s = strtok_r(s, sep, &p)) != NULL) {
		// 当前缓冲区已满,扩展缓冲区至原来的两倍,并将数据复制过去
		if (n == cap) {
			d_rank_t *buf_new;
			int       cap_new;

			cap_new = cap * 2;
			D_ALLOC_ARRAY(buf_new, cap_new);
			if (buf_new == NULL)
				D_GOTO(out_s, ranks = NULL);
			memcpy(buf_new, buf, sizeof(*buf_new) * n);
			D_FREE(buf);
			buf = buf_new;
			cap = cap_new;
		}
		// 存储标识符
		buf[n] = atoi(s);
		n++;
		s = NULL;
	}

	if (n > 0) {
		// 为 ranks 分配空间,数组长度为 n
		ranks = daos_rank_list_alloc(n);
		if (ranks == NULL)
			// 给 ranks 分配空间失败,释放 s_saved 和 buf,返回空指针
			D_GOTO(out_s, ranks = NULL);
		// 将缓冲区里的标识符复制到 ranks 中
		memcpy(ranks->rl_ranks, buf, sizeof(*buf) * n);
	}
out_s:
	// 释放 s_saved 的空间
	D_FREE(s_saved);
out_buf:
	// 释放 buf 的空间
	D_FREE(buf);
out:
	return ranks;
}

结构体 d_rank_list_t 封装一个数组,用于存储服务器标识符:

typedef struct {
	// 数组,用于存储服务器标识
	d_rank_t	*rl_ranks;
	// 数组长度
	uint32_t	rl_nr;
} d_rank_list_t;

d_rank_t 表示服务器标识符:

typedef uint32_t d_rank_t;

daos_tx_open

daos_tx_open 函数在容器句柄上打开一个事务。该事务句柄可以用于容器中需要事务提交的 IOs。

参数:

  • coh [in]:容器句柄。
  • th [out]:返回的事务句柄。
  • flags [in]:事务标志(例如 DAOS_TF_RDONLY)。
  • ev [in]:结束事件,该参数是可选的,可以为 NULL。当该参数为 NULL 时,该函数在阻塞模式下运行。

返回值:

  • 如果成功,返回 0。
  • 如果失败,返回负数。
int
daos_tx_open(daos_handle_t coh, daos_handle_t *th, uint64_t flags,
	     daos_event_t *ev)
{
	daos_tx_open_t *args;
	tse_task_t *    task;
	int             rc;

	// 判断 *args 大小是否与 daos_tx_open_t 的预期大小相等
	DAOS_API_ARG_ASSERT(*args, TX_OPEN);
  
	// 创建新任务 dc_tx_open,并将其与输入事件 ev 关联
	// 如果事件 ev 为 NULL,则将获取私有事件
	rc = dc_task_create(dc_tx_open, NULL, ev, &task);
	if (rc)
		// dc_task_create 成功返回 0,失败返回负数
		return rc;

	// 从 task 中获取参数
	args        = dc_task_get_args(task);
	args->coh   = coh;
	args->th    = th;
	args->flags = flags;

	// 调度创建的任务 task
	// 如果该任务的关联事件是私有事件,则此函数将等待任务完成
	// 否则它将立即返回,并通过测试事件或在 EQ 上轮询找到其完成情况
	//
	// 第二个参数 instant 为 true,表示任务将立即执行
	return dc_task_schedule(task, true);
}

结构体 daos_handle_t 表示各种 DAOS 组件(如容器、对象等)的通用句柄:

typedef struct {
	uint64_t	cookie;
} daos_handle_t;

daos_tx_open 函数的 flags 为下列两种类型:

enum {
	// 只读事务
	DAOS_TF_RDONLY		= (1 << 0),
	// 当存在与事务相关的修改时,不要复制调用方的数据缓冲区
	// 在事务 daos_tx_commit 操作完成前,缓冲区必须保持不变
	// 
	// 无论此标志如何,始终复制键的缓冲区
	// 它们可以在相应的操作完成后释放或调整用途
	DAOS_TF_ZERO_COPY	= (1 << 1),
};

结构体 daos_event_t 表示事件或事件队列:

typedef struct daos_event {
	int			ev_error;
  
	// 该结构体仅限内部使用,禁止更改
	struct {
		uint64_t space[19];
	}			ev_private;
  
	// 仅限 debug 模式时使用
	uint64_t	ev_debug;
} daos_event_t;

结构体 daos_tx_open_t 表示事务打开参数:

typedef struct {
	// 打开的容器句柄
	daos_handle_t	coh;
	// 返回的打开的事务句柄
	daos_handle_t	*th;
	// 事务标志
	uint64_t		flags;
} daos_tx_open_t;

结构体 tse_sched_t 用来跟踪调度程序下的所有任务:

typedef struct {
	int		ds_result;

	// 与调度程序关联的用户数据(例如 completion cb)
	void	*ds_udata;

	// daos 的内部调度计划
	struct {
		uint64_t	ds_space[48];
	}		ds_private;
} tse_sched_t;

daos_tx_commit

daos_tx_commit 函数用于提交事务。

如果操作成功,则事务句柄不能再用于任何新的 IO 操作。

如果返回 -DER_TX_RESTART,调用方需要使用相同的事务句柄调用 daos_tx_restart 函数重新启动该事务,执行此事务的调用方代码,然后再次调用 daos_tx_commit

参数:

  • th [in]:要提交的事务句柄。
  • ev [in]:结束事件,该参数是可选的,可以为 NULL。当该参数为 NULL 时,该函数在阻塞模式下运行。

返回值:

  • 如果成功,返回 0。
  • 如果失败,返回负数。可能的错误值为:
    • -DER_NO_HDL:无效的事务句柄
    • -DER_INVAL:无效的参数
    • -DER_TX_RESTART:事物需要重启(例如,由于冲突)
int
daos_tx_commit(daos_handle_t th, daos_event_t *ev)
{
	daos_tx_commit_t *args;
	tse_task_t *      task;
	int               rc;

	// 判断 *args 大小是否与 daos_tx_commit_t 的预期大小相等
	DAOS_API_ARG_ASSERT(*args, TX_COMMIT);
  
	// 创建新任务 daos_tx_commit,并将其与输入事件 ev 关联
	// 如果事件 ev 为 NULL,则将获取私有事件
	rc = dc_task_create(dc_tx_commit, NULL, ev, &task);
	if (rc)
		// dc_task_create 成功返回 0,失败返回负数
		return rc;

	// 从 task 中获取参数
	args        = dc_task_get_args(task);
	args->th    = th;
	args->flags = 0;

	// 调度创建的任务 task
	// 如果该任务的关联事件是私有事件,则此函数将等待任务完成
	// 否则它将立即返回,并通过测试事件或在 EQ 上轮询找到其完成情况
	//
	// 第二个参数 instant 为 true,表示任务将立即执行
	return dc_task_schedule(task, true);
}

结构体 daos_tx_commit_t 表示事务提交参数:

typedef struct {
	// 打开的事务句柄
	daos_handle_t	th;
	// 事务标志,控制提交行为,例如重试
	uint32_t		flags;
} daos_tx_commit_t;

daos_tx_open_snap

daos_tx_open_snap 函数从快照创建只读事务。

该函数不会创建快照,但只有一个读取事务会被 daos_cont_create_snap 函数创建的快照中读取出来。

如果用户传递的 epoch 代表的不是快照,或者快照已被删除,那么读取该事务可能会获得未定义的结果。

参数:

  • coh [in]:容器句柄。
  • epoch [in]:要读取的快照的 epoch。
  • th [out]:返回的只读事务句柄。
  • ev [in]:结束事件,该参数是可选的,可以为 NULL。当该参数为 NULL 时,该函数在阻塞模式下运行。

返回值:

  • 如果成功,返回 0。
  • 如果失败,返回负数。
int
daos_tx_open_snap(daos_handle_t coh, daos_epoch_t epoch, daos_handle_t *th,
		  daos_event_t *ev)
{
	daos_tx_open_snap_t *args;
	tse_task_t *         task;
	int                  rc;

	// 判断 *args 大小是否与 daos_tx_open_snap_t 的预期大小相等
	DAOS_API_ARG_ASSERT(*args, TX_OPEN_SNAP);
  
	// 创建新任务 dc_tx_open_snap,并将其与输入事件 ev 关联
	// 如果事件 ev 为 NULL,则将获取私有事件
	rc = dc_task_create(dc_tx_open_snap, NULL, ev, &task);
	if (rc)
		// dc_task_create 成功返回 0,失败返回负数
		return rc;

	// 从 task 中获取参数
	args        = dc_task_get_args(task);
	args->coh   = coh;
	args->epoch = epoch;
	args->th    = th;

	// 调度创建的任务 task
	// 如果该任务的关联事件是私有事件,则此函数将等待任务完成
	// 否则它将立即返回,并通过测试事件或在 EQ 上轮询找到其完成情况
	//
	// 第二个参数 instant 为 true,表示任务将立即执行
	return dc_task_schedule(task, true);
}

结构体 daos_tx_open_snap_t 表示事务快照打开参数:

typedef struct {
	// 打开的容器句柄
	daos_handle_t	coh;
	// 要从中读取事务的持久快照的 epoch
	daos_epoch_t	epoch;
	// 返回的打开的事务句柄
	daos_handle_t	*th;
} daos_tx_open_snap_t;

daos_tx_abort

daos_tx_abort 函数会中止对事务的所有修改,事务句柄将不能用于任何新的 IO 操作。

参数:

  • th [in]:要终止的事务句柄。
  • ev [in]:结束事件,该参数是可选的,可以为 NULL。当该参数为 NULL 时,该函数在阻塞模式下运行。

返回值:

  • 如果成功,返回 0。
  • 如果失败,返回负数。
int
daos_tx_abort(daos_handle_t th, daos_event_t *ev)
{
	daos_tx_abort_t *args;
	tse_task_t *     task;
	int              rc;

	// 判断 *args 大小是否与 daos_tx_abort_t 的预期大小相等
	DAOS_API_ARG_ASSERT(*args, TX_ABORT);
  
	// 创建新任务 dc_tx_open_snap,并将其与输入事件 ev 关联
	// 如果事件 ev 为 NULL,则将获取私有事件
	rc = dc_task_create(dc_tx_abort, NULL, ev, &task);
	if (rc)
		// dc_task_create 成功返回 0,失败返回负数
		return rc;
	
	// 从 task 中获取参数
	args     = dc_task_get_args(task);
	args->th = th;

	// 调度创建的任务 task
	// 如果该任务的关联事件是私有事件,则此函数将等待任务完成
	// 否则它将立即返回,并通过测试事件或在 EQ 上轮询找到其完成情况
	//
	// 第二个参数 instant 为 true,表示任务将立即执行
	return dc_task_schedule(task, true);
}

结构体 daos_tx_abort_t 表示事务中断参数:

typedef struct {
	// 打开的事务句柄
	daos_handle_t	th;
} daos_tx_abort_t;

daos_tx_close

daos_tx_close 函数用于关闭事务句柄。

这是一个本地操作,不需要 RPC 参与。

参数:

  • th [in]:要释放的事务句柄。
  • ev [in]:结束事件,该参数是可选的,可以为 NULL。当该参数为 NULL 时,该函数在阻塞模式下运行。

返回值:

  • 如果成功,返回 0。
  • 如果失败,返回负数。
int
daos_tx_close(daos_handle_t th, daos_event_t *ev)
{
	daos_tx_close_t *args;
	tse_task_t *     task;
	int              rc;

	// 判断 *args 大小是否与 daos_tx_close_t 的预期大小相等
	DAOS_API_ARG_ASSERT(*args, TX_CLOSE);
  
	// 创建新任务 dc_tx_open_snap,并将其与输入事件 ev 关联
	// 如果事件 ev 为 NULL,则将获取私有事件
	rc = dc_task_create(dc_tx_close, NULL, ev, &task);
	if (rc)
		// dc_task_create 成功返回 0,失败返回负数
		return rc;

	// 从 task 中获取参数
	args     = dc_task_get_args(task);
	args->th = th;

	// 调度创建的任务 task
	// 如果该任务的关联事件是私有事件,则此函数将等待任务完成
	// 否则它将立即返回,并通过测试事件或在 EQ 上轮询找到其完成情况
	//
	// 第二个参数 instant 为 true,表示任务将立即执行
	return dc_task_schedule(task, true);
}

结构体 daos_tx_close_t 表示事务关闭参数:

typedef struct {
	// 打开的事务句柄
	daos_handle_t	th;
} daos_tx_close_t;

daos_tx_restart

daos_tx_restart 函数用于遇到 -DER_TX_RESTART 错误后重新启动事务,将删除通过事务句柄发出的所有 IOs 操作。

重新启动的事务能否观察到此事务最初打开后提交的任何冲突修改是未定义的。如果调用者重试事务有其它的目的,则应打开新的事务。

这是一个本地操作,不涉及RPC。

参数:

  • th [in]:要重启的事务句柄。
  • ev [in]:结束事件,该参数是可选的,可以为 NULL。当该参数为 NULL 时,该函数在阻塞模式下运行。

返回值:

  • 如果成功,返回 0。
  • 如果失败,返回负数。
int
daos_tx_restart(daos_handle_t th, daos_event_t *ev)
{
	daos_tx_restart_t *args;
	tse_task_t *       task;
	int                rc;

	// 判断 *args 大小是否与 daos_tx_restart_t 的预期大小相等
	DAOS_API_ARG_ASSERT(*args, TX_RESTART);
  
	// 创建新任务 dc_tx_open_snap,并将其与输入事件 ev 关联
	// 如果事件 ev 为 NULL,则将获取私有事件
	rc = dc_task_create(dc_tx_restart, NULL, ev, &task);
	if (rc)
		// dc_task_create 成功返回 0,失败返回负数
		return rc;

	// 从 task 中获取参数
	args     = dc_task_get_args(task);
	args->th = th;

	// 调度创建的任务 task
	// 如果该任务的关联事件是私有事件,则此函数将等待任务完成
	// 否则它将立即返回,并通过测试事件或在 EQ 上轮询找到其完成情况
	//
	// 第二个参数 instant 为 true,表示任务将立即执行
	return dc_task_schedule(task, true);
}

结构体 daos_tx_close_t 表示事务关闭参数:

typedef struct {
	// 打开的事务句柄
	daos_handle_t	th;
} daos_tx_close_t;

daos_tx_hdl2epoch

daos_tx_hdl2epoch 函数返回与事务句柄关联的 epoch。

一个 epoch 在事务开始时可能不可用,需要当事务成功提交后才可用。

此函数为当前系统的特定实现,它只能用于测试和调试目的。

参数:

  • th [in]:事务句柄。
  • epoch [out]:需要返回的 epoch 值

返回值:

  • 如果成功,返回 0。
  • 如果失败,返回负数。
    • -DER_UNINIT 表示当前 epoch 不可用。
int
daos_tx_hdl2epoch(daos_handle_t th, daos_epoch_t *epoch)
{
	return dc_tx_hdl2epoch(th, epoch);
}

daos_epoch_t 为 64 位无符号整数,表示 epoch:

typedef uint64_t	daos_epoch_t;

daos_anchor_init

daos_anchor_init 函数用于初始化一个迭代器锚。

参数:

  • anchor [in]:待初始化的迭代器锚。
  • opts [in]:初始化选项(保留)。
static inline int
daos_anchor_init(daos_anchor_t *anchor, unsigned int opts)
{
	// 初始化
	*anchor = DAOS_ANCHOR_INIT;
	return 0;
}

DAOS_ANCHOR_INIT 是初始化宏函数,将 daos_anchor_t 的 da_type 置为 DAOS_ANCHOR_TYPE_ZERO

#define DAOS_ANCHOR_INIT	((daos_anchor_t){DAOS_ANCHOR_TYPE_ZERO})

结构体 daos_anchor_t 用于迭代过程中的锚定:

#define DAOS_ANCHOR_BUF_MAX	104
typedef struct {
	// daos_anchor_type_t 类型
	uint16_t	da_type;
	uint16_t	da_shard;
	// daos_anchor_flags 枚举量
	uint32_t	da_flags;
	// 记录 EC 编解码对象的每个分片的偏移量
	uint64_t	da_sub_anchors;
	uint8_t		da_buf[DAOS_ANCHOR_BUF_MAX];
} daos_anchor_t;

da_type 包括如下类型:

typedef enum {
	DAOS_ANCHOR_TYPE_ZERO	= 0,
	DAOS_ANCHOR_TYPE_HKEY	= 1,
	DAOS_ANCHOR_TYPE_KEY	= 2,
	DAOS_ANCHOR_TYPE_EOF	= 3,
} daos_anchor_type_t;

daos_anchor_fini

daos_anchor_fini 函数用于结束迭代,释放迭代过程中分配的空间:

static inline void
daos_anchor_fini(daos_anchor_t *anchor)
{
	// 当前为空,未来可能会添加释放空间的功能
}

daos_anchor_is_eof

daos_anchor_is_eof 函数用于迭代 anchor 时判断结束符:

static inline bool
daos_anchor_is_eof(daos_anchor_t *anchor)
{
	return anchor->da_type == DAOS_ANCHOR_TYPE_EOF;
}

相关信息

Emai: debugzhang@163.com

DAOS: https://github.com/daos-stack/daos

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。