云原生网关 APISIX 的核心流程以源码分析的方式剖析其工作原理-2

举报
苏州程序大白 发表于 2022/04/21 11:53:52 2022/04/21
【摘要】 @[TOC](云原生网关 APISIX 的核心流程以源码分析的方式剖析其工作原理-2) ✨博主介绍🌊 作者主页:苏州程序大白🌊 作者简介:🏆CSDN人工智能域优质创作者🥇,苏州市凯捷智能科技有限公司创始之一,目前合作公司富士康、歌尔等几家新能源公司💬如果文章对你有帮助,欢迎关注、点赞、收藏💅 有任何问题欢迎私信,看到会及时回复💅关注苏州程序大白,分享粉丝福利 etcdetcd ...

@[TOC](云原生网关 APISIX 的核心流程以源码分析的方式剖析其工作原理-2)

✨博主介绍

🌊 作者主页:苏州程序大白

🌊 作者简介:🏆CSDN人工智能域优质创作者🥇,苏州市凯捷智能科技有限公司创始之一,目前合作公司富士康、歌尔等几家新能源公司

💬如果文章对你有帮助,欢迎关注、点赞、收藏

💅 有任何问题欢迎私信,看到会及时回复
💅关注苏州程序大白,分享粉丝福利

etcd

etcd 在 APISIX 中作用相同与 PostgreSQL 之于 Kong,内部使用 lua-resty-etcd 作为客户端,使用 timer 定时执行和长轮询获取跟踪 etcd 中数据的变化。

这里的优化点与 Kong 一样,在 init_by_lua 阶段进行数据的 warm up,之后数据会 fork 到其他的进程中。

It does not really make much sense to use this library in the context of init_by_lua because the cache will not get shared by any of the worker processes (unless you just want to “warm up” the cache with predefined items which will get inherited by the workers via fork()).8

初始化

读取 etcd 数据到全局单例的 Lua table。

-- 初始化 etcd
function _M.init()
    local local_conf, err = config_local.local_conf()
    if not local_conf then
        return nil, err
    end

    if table.try_read_attr(local_conf, "apisix", "disable_sync_configuration_during_start") then
        return true
    end

    -- 获取 etcd cli
    local etcd_cli, err = get_etcd()
    if not etcd_cli then
        return nil, "failed to start a etcd instance: " .. err
    end

    local etcd_conf = local_conf.etcd
    local prefix = etcd_conf.prefix
    -- 加载 etcd 所有数据到 lua table 中, 单例模式
    local res, err = readdir(etcd_cli, prefix, create_formatter(prefix))
    if not res then
        return nil, err
    end

    return true
end

对数据进行格式化,存入 Lua table 中:

-- 创建格式化 formatter
local function create_formatter(prefix)
    -- 返回闭包函数, 对 etcd 返回的结果进行格式化
    -- 格式个毛, 这就是个 hook 函数
    return function (res)
        res.body.nodes = {}

        local dirs
        if is_http then
            dirs = constants.HTTP_ETCD_DIRECTORY
        else
            dirs = constants.STREAM_ETCD_DIRECTORY
        end

        local curr_dir_data
        local curr_key
        for _, item in ipairs(res.body.kvs) do
            if curr_dir_data then
                -- 将匹配的内容插入 table
                if core_str.has_prefix(item.key, curr_key) then
                    table.insert(curr_dir_data, etcd_apisix.kvs_to_node(item))
                    goto CONTINUE
                end

                curr_dir_data = nil
            end

            -- 截取 prefix 后的 key
            local key = sub_str(item.key, #prefix + 1)
            if dirs[key] then
                -- single item
                loaded_configuration[key] = {
                    body = etcd_apisix.kvs_to_node(item),
                    headers = res.headers,
                }
            else
                -- 前缀一致
                local key = sub_str(item.key, #prefix + 1, #item.key - 1) -- 去掉末尾的 /
                -- ensure the same key hasn't been handled as single item
                if dirs[key] and not loaded_configuration[key] then
                    loaded_configuration[key] = {
                        body = {
                            nodes = {},
                        },
                        headers = res.headers,
                    }
                    curr_dir_data = loaded_configuration[key].body.nodes
                    curr_key = item.key
                end
            end

            ::CONTINUE::
        end

        return res
    end
end

这部分逻辑在 init_by_lua 执行,fork 到其他子进程。

数据校验

schema_def.lua 文件中定义了所有储存数据结构的 schema 校验规则,使用 jsonschema 库进行数据校验。

core/schema.lua 中使用 LRU 缓存校验器。

load_full_data 函数加载数据结构所需的 etcd kvs,并进行数据转换、校验、格式化、执行回调。

在这里插入图片描述

local function load_full_data(self, dir_res, headers)
    local err
    local changed = false

    if self.single_item then
        -- table size 为 1
        ...
        -- 执行逻辑与下面数组格式类似
    else
        if not dir_res.nodes then
            dir_res.nodes = {}
        end

        self.values = new_tab(#dir_res.nodes, 0)
        self.values_hash = new_tab(0, #dir_res.nodes)

        for _, item in ipairs(dir_res.nodes) do
            local key = short_key(self, item.key)
            local data_valid = true
            
            -- 数据格式校验...

            -- schema 校验...

            -- 过滤器...

            if data_valid then
                changed = true
                insert_tab(self.values, item)
                self.values_hash[key] = #self.values

                item.value.id = key
                item.clean_handlers = {}

                -- 执行回调
                if self.filter then
                    self.filter(item)
                end
            end
			-- 更新 mvcc 版本
            self:upgrade_version(item.modifiedIndex)
        end
    end
	...
    self.need_reload = false
end

后台数据同步

利用 etcd watch 机制进行数据变更的同步。
在这里插入图片描述

-- 定时器自动同步 etcd 数据
local function _automatic_fetch(premature, self)
    if premature then
        return
    end

    local i = 0
    while not exiting() and self.running and i <= 32 do
        i = i + 1

        local ok, err = xpcall(function()
            if not self.etcd_cli then
                local etcd_cli, err = get_etcd()
                if not etcd_cli then
                    error("failed to create etcd instance for key ["
                          .. self.key .. "]: " .. (err or "unknown"))
                end
                self.etcd_cli = etcd_cli
            end

            -- 同步数据
            local ok, err = sync_data(self)
            if err then
                if err ~= "timeout" and err ~= "Key not found"
                    and self.last_err ~= err then
                    log.error("failed to fetch data from etcd: ", err, ", ",
                              tostring(self))
                end

                if err ~= self.last_err then
                    self.last_err = err
                    self.last_err_time = ngx_time()
                else
                    if ngx_time() - self.last_err_time >= 30 then
                        self.last_err = nil
                    end
                end

                ngx_sleep(self.resync_delay + rand() * 0.5 * self.resync_delay)
            elseif not ok then
                -- no error. reentry the sync with different state
                ngx_sleep(0.05)
            end

        end, debug.traceback)

        if not ok then
            log.error("failed to fetch data from etcd: ", err, ", ",
                      tostring(self))
            ngx_sleep(self.resync_delay + rand() * 0.5 * self.resync_delay)
            break
        end
    end

    -- 进行下一次循环
    if not exiting() and self.running then
        ngx_timer_at(0, _automatic_fetch, self)
    end
end

配置同步

封装上述的逻辑提供给 routespluginsservices 等数据结构使用,每个数据结构监听自己的 prefix,同步数据并执行回调,通常在回调逻辑上触发更新,例如重新构建 Router、重新构建 plugins table 等。

-- etcd 配置创建
function _M.new(key, opts)
    local local_conf, err = config_local.local_conf()
    if not local_conf then
        return nil, err
    end

    -- etcd 重新同步事件 5, 与 Kong 重新 poll db 数据一致
    local etcd_conf = local_conf.etcd
    local prefix = etcd_conf.prefix
    local resync_delay = etcd_conf.resync_delay
    if not resync_delay or resync_delay < 0 then
        resync_delay = 5
    end

    local automatic = opts and opts.automatic
    local item_schema = opts and opts.item_schema
    local filter_fun = opts and opts.filter
    local timeout = opts and opts.timeout
    local single_item = opts and opts.single_item
    local checker = opts and opts.checker

    local obj = setmetatable({
        etcd_cli = nil,
        key = key and prefix .. key,
        automatic = automatic,
        item_schema = item_schema,
        checker = checker,
        sync_times = 0,
        running = true,
        conf_version = 0,
        values = nil,
        need_reload = true,
        routes_hash = nil,
        prev_index = 0,
        last_err = nil,
        last_err_time = nil,
        resync_delay = resync_delay,
        timeout = timeout,
        single_item = single_item,
        filter = filter_fun,
    }, mt)

    if automatic then
        -- timer 定时获取数据
        if not key then
            return nil, "missing `key` argument"
        end

        -- 从单例 table 获取 etcd 数据, 进行处理
        if loaded_configuration[key] then
            local res = loaded_configuration[key]
            -- 清空 table
            loaded_configuration[key] = nil -- tried to load

            log.notice("use loaded configuration ", key)

            local dir_res, headers = res.body, res.headers
            -- 加载数据并校验数据, 过滤数据
            load_full_data(obj, dir_res, headers)
        end

        -- 创建定时器自动同步
        ngx_timer_at(0, _automatic_fetch, obj)

    else
        local etcd_cli, err = get_etcd()
        if not etcd_cli then
            return nil, "failed to start a etcd instance: " .. err
        end
        obj.etcd_cli = etcd_cli
    end

    if key then
        created_obj[key] = obj
    end

    return obj
end

Router

APISIX 的 Router 匹配基于压缩字典树(Radix Tree)实现,主要使用 lua-resty-radixtree 库。内置多种解析模式,这里只关注 HTTP 默认的 radixtree_uri 实现。

路由构建

core.config.new 调用的是 etcd 库(config_etcd.lua)维护的配置同步方法,返回原表,可以访问从 etcd 同步的数据。core.schema.route 包含了 route 这个数据结构的 schema 及校验规则,check_route 内部检查 route 直接绑定 plugin 的数据结构。

APISIX 引入 route 直接绑定 plugin 的简化配置,不需要额外创建 plugin 对象。

-- 初始化 router
function _M.init_worker(filter)
    local user_routes, err = core.config.new("/routes", {
            automatic = true, -- 自动同步
            item_schema = core.schema.route,
            checker = check_route,
            filter = filter,
        })
    if not user_routes then
        error("failed to create etcd instance for fetching /routes : " .. err)
    end

    return user_routes
end

filter 是回调函数,下述的流程中会注入。

路由初始化

router.http_init_worker 中进行 Router 初始化。

在这里插入图片描述

-- attach common methods if the router doesn't provide its custom implementation
local function attach_http_router_common_methods(http_router)
    ...

    if http_router.init_worker == nil then
        http_router.init_worker = function (filter)
            -- 添加路由
            http_router.user_routes = http_route.init_worker(filter)
        end
    end
end


function _M.http_init_worker()
    local conf = core.config.local_conf()
    -- 默认的匹配模式
    local router_http_name = "radixtree_uri"
    local router_ssl_name = "radixtree_sni"

    if conf and conf.apisix and conf.apisix.router then
        router_http_name = conf.apisix.router.http or router_http_name
        router_ssl_name = conf.apisix.router.ssl or router_ssl_name
    end

    -- 创建 router 实例
    local router_http = require("apisix.http.router." .. router_http_name)
    -- 修改 router 的 table
    attach_http_router_common_methods(router_http)
    -- 初始化路由
    -- 调用 apisix.http.route.init_worker 方法
    -- 从 etcd 获取数据并执行回调
    -- filter 为格式化, 解析 upstream
    router_http.init_worker(filter)
    _M.router_http = router_http

    local router_ssl = require("apisix.ssl.router." .. router_ssl_name)
    router_ssl.init_worker()
    _M.router_ssl = router_ssl

    _M.api = require("apisix.api_router")

    ...
end

http_router.user_routes 储存在 router 的 table 中,会在路由匹配时用到(懒加载)。

路由匹配

access_by_lua 阶段中进行路由匹配,将匹配结果(route、service)传递到 ctx 中供 balancer 请求上游。

do
    local uri_routes = {}
    local uri_router
    local match_opts = {}
    
    function _M.match(api_ctx)
        -- 从 module 的 user_routes 属性获取路由, 在 etcd route 变化时回调添加
        local user_routes = _M.user_routes
        if not cached_version or cached_version ~= user_routes.conf_version then
            uri_router = base_router.create_radixtree_uri_router(user_routes.values,
                                                                uri_routes, false)
            cached_version = user_routes.conf_version
        end

        if not uri_router then
            core.log.error("failed to fetch valid `uri` router: ")
            return true
        end

        return base_router.match_uri(uri_router, match_opts, api_ctx)
    end

end

radixtree 路由匹配库提供了匹配成功回调 handler,匹配成功后传递到 ctx 中。

core.table.insert(uri_routes, {
                ...
                handler = function (api_ctx, match_opts)
                    api_ctx.matched_params = nil
                    api_ctx.matched_route = route
                    api_ctx.curr_req_matched = match_opts.matched
                end
            })

Balancer

Balancer 部分与 Kong 逻辑一致,甚至代码里函数名都一样,主要逻辑是 Service/Upstream 节点解析、负载均衡策略、健康检查与失败重试。

APISIX 支持的一特性是外部服务发现,Kong 中默认支持通过 DNS 解析 Service host,根据 AAAA、A、SRV 记录添加 IP 与优先级,APISIX 支持了从 consul、eruka 和其他注册中心获取 IP 地址列表,并同步节点数据(长轮询)。

服务发现

如果 serivce host 是域名, 通过外部注册中心进行服务发现,获取上游 IP 列表。

function _M.set_by_route(route, api_ctx)
   	...
    -- 如果 serivce host 是域名, 通过 discovery 发现, dns 解析
    if up_conf.service_name then
        ...
        -- 外部注册中心
        local dis = discovery[up_conf.discovery_type]
        if not dis then
            return 500, "discovery " .. up_conf.discovery_type .. " is uninitialized"
        end

        -- 从注册中心数据源(缓存本地 table)获取 IP
        local new_nodes, err = dis.nodes(up_conf.service_name)
        if not new_nodes then
            return HTTP_CODE_UPSTREAM_UNAVAILABLE, "no valid upstream node: " .. (err or "nil")
        end

        ...
    end

    -- 将 upstream 节点信息存入 ctx
    set_directly(api_ctx, up_conf.type .. "#upstream_" .. tostring(up_conf),
                 api_ctx.conf_version, up_conf)

    local nodes_count = up_conf.nodes and #up_conf.nodes or 0
    if nodes_count == 0 then
        return HTTP_CODE_UPSTREAM_UNAVAILABLE, "no valid upstream node"
    end
	...

    set_upstream_scheme(api_ctx, up_conf)

    local ok, err = fill_node_info(up_conf, api_ctx.upstream_scheme, false)
    if not ok then
        return 503, err
    end
    ...

    local scheme = up_conf.scheme
    if (scheme == "https" or scheme == "grpcs") and up_conf.tls then
        ...
    end

    return
end

负载均衡

不同于 Kong 使用自己封装的 lua-resty-dns-client/balancer 作为负载均衡器,APISIX 基于 lua-resty-balancer 封装了负载均衡策略,基于 lua-resty-healthcheck(fork 版本)实现节点健康检查。

API 网关的负载均衡策略(Kong/APISIX)都是基于 OpenResty lua-resty-core/balancer 提供的负载均衡函数实现,set_current_peer 设置当前请求上游地址,set_more_tries 设置请求失败重试次数,get_last_failure 获取上一次请求失败结果判断是否需要继续重试,set_timeouts 设置单个请求超时时间。
在这里插入图片描述
set_balancer_opts 设置 Nginx Balancer 参数。

-- set_balancer_opts will be called in balancer phase and before any tries
local function set_balancer_opts(route, ctx)
    local up_conf = ctx.upstream_conf

    -- If the matched route has timeout config, prefer to use the route config.
    local timeout = nil
    if route and route.value and route.value.timeout then
        timeout = route.value.timeout
    else
        if up_conf.timeout then
            timeout = up_conf.timeout
        end
    end
    -- 设置 Nginx 请求超时时间
    if timeout then
        local ok, err = set_timeouts(timeout.connect, timeout.send,
                                     timeout.read)
        if not ok then
            core.log.error("could not set upstream timeouts: ", err)
        end
    end

    local retries = up_conf.retries
    if not retries or retries < 0 then
        retries = #up_conf.nodes - 1
    end

    -- 设置 Nginx 失败重试次数
    if retries > 0 then
        local ok, err = set_more_tries(retries)
        ...
    end
end

access_by_lua 阶段中服务发现,调用 balancer 库获取 peer 节点,balancer_by_lua 中从 ctx 中获取 peer 节点信息,访问后端节点,若失败重试(该阶段再次被调用),重新获取 peer 节点,重新创建请求(recreate_request())再次访问后端节点。

Plugin

插件机制也与 Kong 类似,插件开发者可以定义 Schema 配置数据结构,以及 Handler 注入 Nginx 请求生命周期,API 网关提供核心的库供开发者使用(SDK)。

APISIX 相比 Kong,开源的插件较多,插件 Schema 便于编写,同时插件只需要单文件,而 Kong 的插件通常是单独一个仓库,不方便维护。但是考虑到插件需要单独的 Test::Nginx 单元测试,单独一个仓库也未尝不可(Kong 还说了以后会把 Github 项目主仓库的插件代码移到单独的仓库)。

具体各个阶段执行逻辑应该与 Kong 相同,即部分阶段插件开协程并发执行,部分阶段避免数据竞争,插件顺序执行。

值得注意的一点是 APISIX 生命周期里没有 rewrite_by_lua 阶段,插件实现的该阶段会在 access_by_lua 中优先于 access_by_lua 插件逻辑执行。

The apisix run both “.access” and “.rewrite” in the “access” phase.9

插件加载

插件列表从本地 yaml 文件获取,同时监听本地文件变化,同步配置;插件配置信息从 etcd 获取。

local init_plugins_syncer
do
    local plugins_conf

    function init_plugins_syncer()
        local err
        -- 储存插件的配置信息, 一条 kv
        plugins_conf, err = core.config.new("/plugins", {
            automatic = true, -- 后台创建 timer watch etcd 自动同步配置
            item_schema = core.schema.plugins,
            single_item = true,
            -- filter 方法中访问到 etcd kv 的 item, 这里进行插件加载的回调
            -- 每次 etcd 插件配置变动, 自动同步
            filter = function(item)
                -- we need to pass 'item' instead of plugins_conf because
                -- the latter one is nil at the first run
                _M.load(item)
            end,
        })
        if not plugins_conf then
            error("failed to create etcd instance for fetching /plugins : " .. err)
        end
    end
end

插件列表会储存到 Lua table 中:

-- 加载插件
local function load(plugin_names)
    local processed = {}
    for _, name in ipairs(plugin_names) do
        if processed[name] == nil then
            processed[name] = true
        end
    end

    core.log.warn("new plugins: ", core.json.delay_encode(processed))

    -- 移除已经存在的 module
    for name in pairs(local_plugins_hash) do
        unload_plugin(name)
    end

    core.table.clear(local_plugins)
    core.table.clear(local_plugins_hash)

    -- 加载插件
    for name in pairs(processed) do
        load_plugin(name, local_plugins)
    end

    -- 插件排序, priority 越高的插件越先执行, 与 Kong 同样
    -- sort by plugin's priority
    if #local_plugins > 1 then
        sort_tab(local_plugins, sort_plugin)
    end

    -- 打印调试日志
    for i, plugin in ipairs(local_plugins) do
        ...
    end

    return true
end

插件配置信息 plugin_meta 也加载到 Lua table 中,在插件匹配的时候会获取。

插件匹配

插件过滤,遍历插件列表,匹配开启的插件,O(n) 操作 plugin.filter(route)

-- 插件配置绑定
function _M.filter(user_route, plugins)
    ...

    plugins = plugins or core.tablepool.fetch("plugins", 32, 0)
    for _, plugin_obj in ipairs(local_plugins) do
        local name = plugin_obj.name
        local plugin_conf = user_plugin_conf[name]

        -- 插件和插件配置存入
        if type(plugin_conf) == "table" and not plugin_conf.disable then
            core.table.insert(plugins, plugin_obj)
            core.table.insert(plugins, plugin_conf)
        end
    end

    trace_plugins_info_for_debug(plugins)

    return plugins
end

插件执行

这里以 access_by_lua 阶段插件执行逻辑为例,根据 Route、Service 匹配插件,创建临时 Table 储存 plugin 和 plugin_conf,存入 ctx 中。

        -- 插件过滤, 遍历插件列表, 匹配开启的插件, O(n)
        local plugins = plugin.filter(route)
        api_ctx.plugins = plugins

        -- fake 执行 rewrite 阶段
        plugin.run_plugin("rewrite", plugins, api_ctx)
        if api_ctx.consumer then
            local changed
            route, changed = plugin.merge_consumer_route(
                route,
                api_ctx.consumer,
                api_ctx
            )

            core.log.info("find consumer ", api_ctx.consumer.username,
                          ", config changed: ", changed)

            if changed then
                core.table.clear(api_ctx.plugins)
                api_ctx.plugins = plugin.filter(route, api_ctx.plugins)
            end
        end
        -- 执行 access 阶段
        plugin.run_plugin("access", plugins, api_ctx)
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。