通过RES接入实时行为数据

举报
ai-res 发表于 2020/08/24 11:10:19 2020/08/24
【摘要】 在RES推荐系统的数据源接入中,支持接入实时行为数据,同时基于实时行为可以完成近线召回,画像更新等实时策略,完成信息流实时传递、召回、画像快速更新等功能

在RES推荐系统的数据源接入中,支持接入实时行为数据,同时基于实时行为可以完成近线召回,画像更新等实时策略,完成信息流实时传递、召回、画像快速更新等功能;

下面介绍如何使用RES接入实时行为数据:


  • 首先登录res系统,创建数据源任务,完成数据结构和数据导入计算

然后在近线数据源选择一个计算规格如:2CU

用户画像实时导入右侧,单击打开按钮,在弹出的对话框中进行确认,启动用户画像导入,同样操作启动物品和行为数据导入

运行成功后,如下图


  • RES通过DIS SDK上传实时数据用户实时日数据并做近线处理,如果需要,请您按照需求下载DIS SDK

1.    初始化DIS客户端,使用代码初始化DIS SDK客户端实例,代码样例如下。具体方式请参见初始化DIS客户端

// 创建DIS客户端实例
DIS dic = DISClientBuilder.standard()
        .withEndpoint("YOUR_ENDPOINT")
        .withAk("YOUR_AK")
        .withSk("YOUR_SK")
        .withProjectId("YOUR_PROJECT_ID")
        .withRegion("YOUR_REGION")
        .build();

o    “YOUR_AK”“YOUR_SK”即访问密钥,获取方式请参见获取访问密钥其中,各参数说明如下:

o    “YOUR_PROJECT_ID”为项目ID“YOUR_REGION”为区域ID,获取方式请参见获取项目名称、项目ID、区域ID

2.    获取需要上传通道的IDstreamId)。

o    单击近线数据源的详情”,获取通道ID;

3.    上传实时数据,示例代码如下,其中,“streamId”的配置值要与步骤2通道ID”的值一致。

// 配置通道ID
String streamId = "xxxx";
// 配置上传的数据
String message = "hello   world.";
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamId(streamId);
List < PutRecordsRequestEntry > putRecordsRequestEntryList = new ArrayList < PutRecordsRequestEntry > ();
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
for (int i = 0; i < 3; i++) {
        PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
        putRecordsRequestEntry.setData(buffer);
        putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));
        putRecordsRequestEntryList.add(putRecordsRequestEntry);
}
putRecordsRequest.setRecords(putRecordsRequestEntryList);
 
LOGGER.info("========== BEGIN PUT ============");
 
PutRecordsResult putRecordsResult = null;
try {
        putRecordsResult = dic.putRecords(putRecordsRequest);
} catch (DISClientException e) {
        LOGGER.error("Failed to   get a normal response, please check params and retry. Error message   [{}]",
               e.getMessage(),
               e);
} catch (Exception e) {
        LOGGER.error(e.getMessage(), e);
}


通过通道下发数据成功后,res对应的fink作业既可以获取到通道数据内容,进而实时更新到ESobs数据,为后续离线或近线、在线召回使用。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。