【技术方案分享】FunctionGraph实现 CDN日志转存OBS_demo-python版本
【摘要】 针对当前CDN日志模块不支持转储能力,本文分享了FunctionGraph服务实现 CDN日志转存OBS的demo,此demo能够实现CDN日志实时转存到OBS,详情内容请查看文章内容。
1.服务说明
FunctionGraph服务:基于事件驱动的函数托管计算服务。通过函数工作流,只需编写业务函数代码并设置运行的条件,无需配置和管理服务器等基础设施,函数以弹性、免运维、高可靠的方式运行。
2.方案说明
2.1 需求及痛点
需求:实现CDN日志实时转存到OBS
痛点:当前CDN日志模块不支持转储能力,需要客户代码实现该能力
2.2 方案设计
方案说明:
- 定时触发器,固定频率启动函数
- 函数请求CDN日志接口,获取日志下载链接
- 函数请求下载链接,将下载的日志上传到OBS固定目录下。
2.3 服务创建
2.3.1 确认CDN日志记录状态
- 在华为云控制台查看CDN日志记录是否正常,如图可知日志会有5个小时的延迟。
2.3.2 FunctionGraph新建函数
- 登录华为云控制台,进入“管理与部署>统一身份认证服务”,选择“委托”页签,点击创建“委托”。
- 下一步,进入委托配置权限界面。作用范围:全局服务;权限选择:OBS OperateAccess 、CDN LogsReadOnlyAccess。点击确认
- 华为云控制台,选择“计算>函数工作流”,进入函数工作流界面,点击右上角“创建函数”,输入图片中的信息,点击创建函数
- 创建后,进入代码编辑界面,将 Demo代码 的代码内容复制到在线IDE
- 点击配置按钮,配置“环境变量”
注:
url :https://cdn.myhuaweicloud.com/v1.0/cdn/logs(CDN日志查询的url)。
domain_name :xxx.com(需要转存日志的CDN加速域名)。
obsAddress :xxx.com(用于存日志的OBS桶域名)。
destBucket :**(用于存日志的OBS桶文件夹名称)。
- 在函数配置界面选择“触发器”,点击右侧“创建触发器”,配置内容如下,点击确定。(实例代码中日志转储的日志为最新生成的近一小时日志)
- 在函数配置界面,单击右上角“请选择测试事件”下拉框,选择“配置测试事件”,配置内容如下,点击保存即可。
- 在函数配置界面,单击右上角“请选择测试事件”下拉框,选择“配置测试事件”,配置内容如下,点击保存即可。
- 点击函数的保存按钮即可,函数配置成功。
3.FunctionGraph适用说明
该FunctionGraph函数工作流适用于CDN日志转储OBS,转储规则为增量转储,可通过配置触发器以及修改代码实现定时增量转储固定周期的CDN日志。
4.Demo代码
# -*- coding:utf-8 -*-
import requests
import datetime
import time
import os
import sys
import json
from com.obs.client.obs_client import ObsClient
from urllib.parse import urlparse
if sys.version_info.major == 2 or not sys.version > '3':
import httplib
else:
import http.client as httplib
current_file_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(current_file_path)
TEMP_ROOT_PATH = "/tmp/" # OBS文件下载后的存储路径
region = 'china' # 默认值,用于FunctionGraph连接OBS使用
secure = True # 默认值,用于FunctionGraph连接OBS使用
signature = 'v4' # 默认值,用于FunctionGraph连接OBS使用
port = 443 # 默认值,用于FunctionGraph连接OBS使用
path_style = True # 默认值,用于FunctionGraph连接OBS使用
def handler(event, context):
#引用context的logger方法
logger = context.getLogger()
#CDN日志记录查询接口的起始时间,转换成时间戳;当前为近6个小时的日志记录
timer = (datetime.datetime.now()-datetime.timedelta(hours=6)).strftime("%Y-%m-%d %H")
timeStamp = int(time.mktime(time.strptime(timer,"%Y-%m-%d %H"))*1000)
#配置日志转储的起始时间点,转换成时间戳
timeStr = (datetime.datetime.now()-datetime.timedelta(hours=6)).strftime("%Y-%m-%d %H")
timeStrStamp = int(time.mktime(time.strptime(timeStr,"%Y-%m-%d %H"))*1000)
#配置日志转储的终止时间点,转换成时间戳
timeEnd = (datetime.datetime.now()-datetime.timedelta(hours=5)).strftime("%Y-%m-%d %H")
timeSEndStamp= int(time.mktime(time.strptime(timeEnd,"%Y-%m-%d %H"))*1000)
#CDN日志记录查询接口参数字段,显示数量为 pageSize* pageNumber
pageSize = 5000
pageNumber = 1
#https请求加上verify=False忽略SSL验证后会出现警告,使用以下代码忽略警告
requests.packages.urllib3.disable_warnings()
#queryDate 日志的产日日期,用于日志存储路径使用
queryDate = (datetime.datetime.now()-datetime.timedelta(hours=6)).strftime("%Y-%m-%d")
#使用start函数
start(context, queryDate, timeStamp, pageSize, pageNumber,timeStrStamp,timeSEndStamp)
#定义start函数,进行日志记录查询
def start(context, queryDate, timeStamp, pageSize, pageNumber,timeStrStamp,timeSEndStamp):
#引用context的logger方法
logger = context.getLogger()
#引用context的中用户参数(新建函数后配置的参数),引用url,domain_name
logUrl = context.getUserData('url')
domainName = context.getUserData('domain_name')
#request的请求参数
params = {'query_date': timeStamp, 'domain_name': domainName, 'page_size': pageSize, 'page_number': pageNumber, 'enterprise_project_id':'ALL'}
#request的请求头
headers = {'Content-Type': 'application/json;charset=UTF-8', 'X-Auth-Token': context.getToken()}
#发起日志记录接口的请求,
res = requests.get(logUrl, params=params, headers=headers, verify=False)
# 请求状态码判断,若非200,打印响应信息
if res.status_code != 200:
logger.info("query log urls: " + res.url + ", error: " + res.text)
return ("query log urls: " + res.url + ", error: " + res.text)
#创建res.text的python对象
resJson = json.loads(res.text)
#打印res.text的内容
logger.info(res.text)
#将响应信息中的total的值赋予total
total = resJson['total']
#定义变量i ,用于循环计数
i = 0
#从resjson logs值中循环取值
for val in resJson['logs']:
i += 1
#打印 val中的link的值
logger.info(val["link"])
#val中的start_time的值赋予start_time
start_time = val["start_time"]
#取时间段内日志记录
if int(start_time) >= timeStrStamp and int(start_time) < timeSEndStamp:
# link分为6个部分,协议、位置、路径、参数、查询、片段。
url = urlparse(val["link"])
# 将url 中的请求地址根据“:”进行分片
netlocs = url.netloc.split(":")
# 创建连接
conn = httplib.HTTPConnection(netlocs[0], int(netlocs[1]))
conn.request('GET', url.path + "?" + url.query)
# CDN日志,OBS的存储路径
objName = os.path.join(val["domain_name"], queryDate, val["name"])
#使用 put_content_to_obs函数
put_content_to_obs(context, objName, conn.getresponse())
else:
continue
# 判断转储的日志量与total对比;不满足则重新执行start函数
if i > total:
start(context, queryDate, timeStamp, pageSize, pageNumber + 1,timeStrStamp,timeSEndStamp)
#定义put_content_to_obs函数,上传日志到OBS
def put_content_to_obs(context, objName, content):
#引用context中的方法获取ak,sk
ak = context.getAccessKey()
sk = context.getSecretKey()
#引用context中的用户定义变量obsAddress,destBucket
obsAddress = context.getUserData('obsAddress')
destBucket = context.getUserData('destBucket')
#创建OBS上传实例
TestObs = ObsClient(access_key_id=ak, secret_access_key=sk,
is_secure=secure, server=obsAddress, signature=signature, path_style=path_style, region=region,
ssl_verify=False, port=port,
max_retry_count=5, timeout=20)
#上传日志
resp = TestObs.putContent(destBucket, objName, content=content)
#判断执行结果
if resp.status < 300:
print('requestId:', resp.requestId)
else:
print('errorCode:', resp.errorCode)
print('errorMessage:', resp.errorMessage)
5.Demo代码说明
5.1 注释说明
#CDN日志记录查询接口的起始时间,转换成时间戳;当前为近6个小时的日志记录
timer = (datetime.datetime.now()-datetime.timedelta(hours=6)).strftime("%Y-%m-%d %H")
timeStamp = int(time.mktime(time.strptime(timer,"%Y-%m-%d %H"))*1000)
#配置日志转储的起始时间点,转换成时间戳
timeStr = (datetime.datetime.now()-datetime.timedelta(hours=6)).strftime("%Y-%m-%d %H")
timeStrStamp = int(time.mktime(time.strptime(timeStr,"%Y-%m-%d %H"))*1000)
#配置日志转储的终止时间点,转换成时间戳
timeEnd = (datetime.datetime.now()-datetime.timedelta(hours=5)).strftime("%Y-%m-%d %H")
timeSEndStamp= int(time.mktime(time.strptime(timeEnd,"%Y-%m-%d %H"))*1000)
示例说明:
假设代码执行时间为 2021-12-30 20:30
日志记录查询接口的查询时间为 timestamp=2021-12-30 14:00 至 2021-12-30 20:30
由于CDN的日志生成时间延迟6小时,则 2021-12-30 14:00 至 2021-12-30 20:30 的时间内只有2021-12-30 14:00 的日志文件生成。
由于设置触发器每小时触发一次,即可每小时上传一次最新生成的日志,实现增量上传。
5.2 Context说明
方法名 | 方法说明 |
---|---|
getRequestID() | 获取请求ID。 |
getRemainingTimeInMilliSeconds () | 获取函数剩余运行时间。 |
getAccessKey() | 获取用户委托的AccessKey(有效期24小时),使用该方法需要给函数配置委托。 |
getSecretKey() | 获取用户委托的SecretKey(有效期24小时),使用该方法需要给函数配置委托。 |
getUserData(string key) | 通过key获取用户通过环境变量传入的值。 |
getFunctionName() | 获取函数名称。 |
getRunningTimeInSeconds () | 获取函数超时时间。 |
getVersion() | 获取函数的版本。 |
getMemorySize() | 分配的内存。 |
getCPUNumber() | 获取函数占用的CPU资源,单位为millicore(1 core=1000 millicores)。取值与MemorySize成比例,默认是128M内存占0.1个核(100 millicores),函数占用的CPU为基础CPU:200 millicores,再加上内存按比例占用的CPU,计算方法:内存/128 * 100 + 200。 |
getProjectID() | 获取projectID。 |
getPackage() | 获取函数组。 |
getToken() | 获取用户委托的token(有效期24小时),使用该方法需要给函数配置委托。 |
getLogger() | 获取context提供的logger方法,返回一个日志输出类,通过使用其info方法按“时间-请求ID-输出内容”的格式输出日志。如调用info方法输出日志:logg = context.getLogger() logg.info("hello") |
6.实测结果
- FunctionGraph函数界面,点击测试按钮。查看执行结果以及转储到OBS中的日志。
a.FunctionGraph 函数界面
b.OBS界面查看日志
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)