GeminiDB for Cassandra 流功能介绍

举报
geminidb_fans 发表于 2020/02/25 19:22:16 2020/02/25
【摘要】 1 使用GeminiDB for Cassandra流捕获表活动1.1 功能介绍当存储在GeminiDB for Cassandra集群中某张表的某项目发生变更时,其他的程序能够做出相应的变更,比如:一个应用程序更改了GeminiDB for Cassandra集群中的某行数据,另一个应用程序能够读取到这行数据变更并做出相应的动作。GeminiDB for Cassand...

1      使用GeminiDB for Cassandra流捕获表活动

1.1      功能介绍

当存储在GeminiDB for Cassandra集群中某张表的某项目发生变更时,其他的程序能够做出相应的变更,比如:一个应用程序更改了GeminiDB for Cassandra集群中的某行数据,另一个应用程序能够读取到这行数据变更并做出相应的动作。

GeminiDB for Cassandra支持这种类似的场景。流表捕获原表的变动,并存储24小时之后过期。客户端通过SDK可访问流表并获取数据修改前后的项目数据。

GeminiDB for Cassandra流是一种有关表中的项目修改的有序信息流,当启动某张表的流时,GeminiDB for Cassandra流表捕获原表的数据项目的更改信息。当应用程序在表中插入、更新、或者删除某条数据时,流表都会记录一条修改数据的流记录。流记录包含对表中某条数据所做的数据修改的相关信息,包含修改前后的新旧数据记录。原表中修改的每个项目,流表中的记录将按照对应的实际修改的顺序显示。

流表会实时的监控原表的记录,包括插入、删除、更新操作,以便能够在其他情况使用,但不包含DDL操作记录;流表使用GeminiDB for Cassandra的物化视图实现,遵循物化视图的相关限制,比如有必须先删除物化视图表之后才能删除原表,对应必须要先删除流表才能删除原表。

1.2      使用场景

主要用于Cassandra往大数据/ESElasticsearch)同步数据变化的场景,支撑客户对应业务开展。

1.3      功能特色

华为GeminiDB for Cassandra专有能力,原生Cassandra不支持。

2      GeminiDB for Cassandra流使用方法概述

image.png

GeminiDB for Cassandra原表与流表维护两个独立的表。在开启原表的流开关之后,访问原表并且有操作原表数据会记录到对应的流表中,要读取处理流表记录,通过访问流表,访问方式与数据库其他表的访问方式相同,见第四、五章节访问方法。

3      开启关闭流

启用流的方式:使用alter table KS.TABLE with stream_enabled=true 语句启用流,当前流支持新旧映像。

关闭流:使用alter table KS.TABLE with stream_enabled=false 可以随时禁用流。关闭流之后当前流表中的数据不会立即删除,24之后删除,原表中数据的变更不会再记录到流表中。举例:

CREATE TABLE ks.table   ( id int, name text, addr text,age int,email text,PRIMARY KEY (name, id));     // 创建表

Alter TABLE ks.table   with stream_enabled=true;     // 开启流

Desc ks.table;     // 查看流表是否创建

INSERT INTO ks.table   (name , id , addr , age , email ) VALUES ('xiaoxin',31,'beijing1',33,'xiaoxin@163.com');   // 向原表写入数据

select * from ks."table$streaming";   // 查看流表是否有相应数据产生

Alter TABLE ks.table   with stream_enabled=false;   // 关闭流表

 

4      读取和处理流记录

应用程序要读取和处理流时,应用程序需要通过SDK连接到C*流表进行相应操作。

流表中的每条流记录均代表一个原表中数据的修改,每条流记录都会有一个时间信息,标识这条流产生的时间信息。每条流记录会在24小时后自动删除。

流表结构:

CREATE TABLE ks.table$streaming (

       @shardID   text,

       @eventID   timeuuid,

       pk,

       ck,

       @newOldImage   boolean,

      @eventName text,

       co1,

       co2,  

       PRIMARY   KEY (@shardID, @eventID, pk, ck, @newOldImage)  //pk,ck,为原表的pk,ck, co1,co2是原表的普通列

);

如上,流表中包含几个特殊的字段:"@shardID"是分区键;"@eventID"是由插入时间生成的timeuuid,代表流数据产生的时间;"@newOldImage"代表新旧映像,0表示旧映像,1表示新映像;"@eventName"代表操作事件如"insert""update""delete"

迭代处理流表的数据时请使用流表的分区加上时间戳范围访问。查询分区时使用,返回分区列表:

select stream_shards from   system_schema.tables  where   keyspace_name='ks' and table_name='table';

例如:

cqlsh:ks> select stream_shards from system_schema.tables  where keyspace_name='ks' and   table_name='table1';

 

 stream_shards

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

 ['-9223372036854775808',   '-6148914691236517206', '-4611686018427387905', '-3843071682022823258',   '-1537228672809129303', '-2', '1537228672809129299', '4611686018427387901',   '5270498306774157603', '6148914691236517202', '7686143364045646492',   '7686143364045646503']

 

(1 rows)

使用分区+时间遍历流表数据,范围查询时使用"@eventID"时间查询,每次默认返回的大小根据数据量大小决定,下次迭代使用时间继续往后迭代。例如:

select * From ks."table$streaming"   where "@shardID" = '-9223372036854775808' and "@eventID"   > a64a8340-e999-11e9-a7bd-cb5f001f61df limit 50;

5      接口说明

5.1      GetShardIterator

public static List<String>   GetShardIterator(Cluster cluster, String keySpace, String tableName)

功能: 获取流表的shard信息

入参:

Cluster cluster:集群的cluster信息,连接数据库使用

String keySpace:要查询流数据的数据库名称

String tableName:要查询流数据的表名称

出参:

       返回List<String> 一组shard集合,GetRecords接口中使用

5.2      GetRecords

public static StreamInfo   GetRecords(Cluster cluster, String keySpace, TableEvent tableEvent)

功能: 获取流表的具体数据

入参:

Cluster cluster:集群的cluster信息,连接数据库使用

String keySpace:要查询流数据的数据库名称

TableEvent tableEvent:要查询流数据相关信息,TableEvent结构如下:

       String table:要查询流数据的表名称

String shardID:要查询流数据的shardID

String eventID:要查询流数据的时间戳信息

int limitRow:要查询流数据的限制条数,没有指定情况下默认是100

出参:

返回一组StreamInfo数据;具体结构如下:

       String shardID:流数据的shardID

String table:流数据的原表名

List<RowInfo> columns:一组流数据的集合,RowInfo具体结构如下:

String eventID:流数据的时间戳信息

String operateType:操作类型,例如: INSERTUPDATEDELETE

List<DataItem> Keys: 流数据对应的原表的主键信息

List<DataItem> NewImage: 新映像的信息

List<DataItem> OldImage: 旧映像的信息

5.3      GetShardIterator

public static List<String>   GetShardIterator(Session session, String keySpace, String tableName)

功能: 获取流表的shard信息

入参:

Session session:数据库集群的连接session,调用函数之后session需要调用者关闭

String keySpace:要查询流数据的数据库名称

String tableName:要查询流数据的表名称

出参:

       返回List<String> 一组shard集合,GetRecords接口中使用

5.4      GetRecords

public static StreamInfo GetRecords(Session   session, String keySpace, TableEvent tableEvent)

功能: 获取流表的具体数据

入参:

Session session:数据库集群的连接session,调用函数之后session需要调用者关闭;

String keySpace:要查询流数据的数据库名称;

TableEvent tableEvent:要查询流数据相关信息,TableEvent结构如下:

       String table:要查询流数据的表名称;

String shardID:要查询流数据的shardID

String eventID:要查询流数据的时间戳信息;

int limitRow:要查询流数据的限制条数,没有指定情况下默认是100

List<ColumnMetadata> primaryKey 要查询流数据的表的主键名称类型信息

出参:

返回一组StreamInfo数据;具体结构如下:

       String shardID:流数据的shardID

String table:流数据的原表名

List<RowInfo> columns:一组流数据的集合,RowInfo具体结构如下:

String eventID:流数据的时间戳信息

String operateType:操作类型,例如: INSERTUPDATEDELETE

List<DataItem> Keys: 流数据对应的原表的主键信息

List<DataItem> NewImage: 新映像的信息

List<DataItem> OldImage: 旧映像的信息

5.5      GetRecords返回结果范例

{

       "ShardID":   "-4611686018427387905",

       "Table":   "tb1",

       "Records":   [{

              "EventID":   "52236080-efb5-11e9-9c62-49626763b3dc",

              "OperateType":   "INSERT",

              "Keys":   [{

                     "columnName":   "name",

                     "value":   "zhoujielun",

                     "type":   "varchar"

              },   {

                     "columnName":   "id",

                     "value":   31,

                     "type":   "int"

              }],

              "NewImage":   [{

                     "columnName":   "name",

                     "value":   "zhoujielun",

                     "type":   "varchar"

              },   {

                     "columnName":   "id",

                     "value":   31,

                     "type":   "int"

              },   {

                     "columnName":   "addr",

                     "value":   "宇宙中心",

                     "type":   "varchar"

              },   {

                     "columnName":   "age",

                     "value":   33,

                     "type":   "int"

              },   {

                     "columnName":   "email",

                     "value":   "zhoujielun.com",

                     "type":   "varchar"

              }],

              "OldImage":   []

       },   {

              "EventID":   "52255c50-efb5-11e9-9c62-49626763b3dc",

              "OperateType":   "UPDATE",

              "Keys":   [{

                     "columnName":   "name",

                     "value":   "zhoujielun",

                     "type":   "varchar"

              },   {

                     "columnName":   "id",

                     "value":   32,

                     "type":   "int"

              }],

              "NewImage":   [{

                     "columnName":   "name",

                     "value":   "zhoujielun",

                     "type":   "varchar"

              },   {

                     "columnName":   "id",

                     "value":   32,

                     "type":   "int"

              },   {

                     "columnName":   "addr",

                     "value":   "宇宙中心",

                     "type":   "varchar"

              },   {

                     "columnName":   "age",

                     "value":   33,

                     "type":   "int"

              },   {

                     "columnName":   "email",

                     "value":   "zhoujielun.com",

                     "type":   "varchar"

              }],

              "OldImage":   [{

                     "columnName":   "name",

                     "value":   "zhoujielun",

                     "type":   "varchar"

              },   {

                     "columnName":   "id",

                     "value":   32,

                     "type":   "int"

              },   {

                     "columnName":   "addr",

                     "value":   "宇宙中心",

                     "type":   "varchar"

              },   {

                     "columnName":   "age",

                     "value":   33,

                     "type":   "int"

              },   {

                     "columnName":   "email",

                     "value":   "zhoujielun.com",

                     "type":   "varchar"

              }]

       },   {

              "EventID":   "52261fa0-efb5-11e9-9c62-49626763b3dc",

              "OperateType":   "UPDATE",

              "Keys":   [{

                     "columnName":   "name",

                     "value":   "zhoujielun",

                     "type":   "varchar"

              },   {

                     "columnName":   "id",

                     "value":   33,

                     "type":   "int"

              }],

              "NewImage":   [{

                     "columnName":   "name",

                     "value":   "zhoujielun",

                     "type":   "varchar"

              },   {

                     "columnName":   "id",

                     "value":   33,

                     "type":   "int"

              },   {

                     "columnName":   "addr",

                     "value":   "宇宙中心",

                     "type":   "varchar"

              },   {

                     "columnName":   "age",

                     "value":   33,

                     "type":   "int"

              },   {

                     "columnName":   "email",

                     "value":   "zhoujielun.com",

                     "type":   "varchar"

              }],

              "OldImage":   [{

                     "columnName":   "name",

                     "value":   "zhoujielun",

                     "type":   "varchar"

              },   {

                     "columnName":   "id",

                     "value":   33,

                     "type":   "int"

              },   {

                     "columnName":   "addr",

                     "value":   "宇宙中心",

                     "type":   "varchar"

              },   {

                     "columnName":   "age",

                     "value":   33,

                     "type":   "int"

              },   {

                     "columnName":   "email",

                     "value":   "zhoujielun.com",

                     "type":   "varchar"

              }]

       }]

}

 

5.6      接口使用demo1

package com.huawei.hwcloud.stream;
 
 
 
import com.datastax.driver.core.Cluster;
 
import com.datastax.driver.core.ColumnMetadata;
 
import com.google.gson.Gson;
 
import com.google.gson.GsonBuilder;
 
import com.huawei.hwcloud.stream.req.RowInfo;
 
import com.huawei.hwcloud.stream.req.StreamInfo;
 
import com.huawei.hwcloud.stream.req.TableEvent;
 
 
import java.util.List;
 
 
 
public class Main {
 
   
public static void main(String[] args) {
 
        Cluster cluster = Cluster.builder().addContactPoint(
"xxx.95.xxx.201").withPort(9042).build();
 
//        Cluster cluster = Cluster.builder().addContactPoint(endpoint).withPort(port).withCredentials(username, password).build();
 
       
List<ColumnMetadata> pm = cluster.getMetadata().getKeyspace("test").getTable("tb1").getPrimaryKey();
        System.
out.println(pm);
        List streamShards =
null;
       
try {
            streamShards = StreamFetcher.GetShardIterator(cluster,
"test", "tb1");
        }
       
catch (Exception e) {
            e.printStackTrace();
        }
        System.
out.println(streamShards);
 
        TableEvent tableEvent =
new TableEvent();
        tableEvent.setEventID(
"43e0eeb0-ee80-11e9-9c62-49626763b3dc");
        tableEvent.setShardID(
"-4611686018427387905");
        tableEvent.setTable(
"tb1");
        tableEvent.setLimitRow(
6);
 
        StreamInfo streamInfo =
null;
        
try {
            streamInfo = StreamFetcher.GetRecords(cluster,
"test", tableEvent);
        }
       
catch (Exception e) {
            e.printStackTrace();
        }
 
        Gson gson =
new GsonBuilder().create();
        String line = gson.toJson(streamInfo);
        System.
out.println(line);
 
        System.
out.println(streamInfo.getColumns().size());
       
for (RowInfo rowInfo: streamInfo.getColumns()) {
            System.
out.println(rowInfo.toString());
        }
 
        System.exit(
0);
    }
}
 

 

 

5.7      接口使用demo2

package com.huawei.hwcloud.stream;
 
 
 
import com.datastax.driver.core.Cluster;
 
import com.datastax.driver.core.ColumnMetadata;
 
import com.datastax.driver.core.Session;
 
import com.google.gson.Gson;
 
import com.google.gson.GsonBuilder;
 
import com.huawei.hwcloud.stream.req.RowInfo;
 
import com.huawei.hwcloud.stream.req.StreamInfo;
 
import com.huawei.hwcloud.stream.req.TableEvent;
 
import com.huawei.hwcloud.stream.utils.WrapperCassandraSession;
 
 
import java.util.List;
 
 
public class Main2
{
 
   
public static void main(String[] args) {
 
        Cluster cluster = Cluster.builder().addContactPoint(
"XXX.95.XXX.201").withPort(9042).build();
 
//        Cluster cluster = Cluster.builder().addContactPoint(endpoint).withPort(port).withCredentials(username, password).build();
 
       
List<ColumnMetadata> pk = cluster.getMetadata().getKeyspace("test").getTable("tb1").getPrimaryKey();
        System.
out.println(pk);
 
        Session session = cluster.connect();
        List<String> streamShards = StreamFetcher.GetShardIterator(session,
"test", "tb1");
        System.
out.println(streamShards);
 
 
        TableEvent tableEvent =
new TableEvent();
        tableEvent.setEventID(
"43e0eeb0-ee80-11e9-9c62-49626763b3dc");
        tableEvent.setShardID(
"-4611686018427387905");
        tableEvent.setTable(
"tb1");
        tableEvent.setLimitRow(
6);
        tableEvent.setPrimaryKey(pk);
 
        StreamInfo streamInfo = StreamFetcher.GetRecords(session,
"test", tableEvent);
 
        Gson gson =
new GsonBuilder().create();
        String line = gson.toJson(streamInfo);
        System.
out.println(line);
 
        System.
out.println(streamInfo.getColumns().size());
       
for (RowInfo rowInfo: streamInfo.getColumns()) {
            System.
out.println(rowInfo.toString());
        }
 
        session.close();
 
        System.exit(
0);
    }
}
 

 

 

6      功能约束

1)流表中的数据保留24小时。

2)流表中的数据会占用数据库的磁盘空间。

3)通过CQL语句不能创建带有"$streaming"后缀的流表。

4)流表可以通过drop MATERIALIZED VIEW ks."table$streaming";进行删除,流表使用物化视图实现,遵从物化视图的限制要求。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。