扩展Druid.io支持GaussDB T 数据库[转]

举报
GaussDB-虎哥 发表于 2020/03/05 11:49:49 2020/03/05
【摘要】 需要扩展druid.io使之支持高斯数据库。之前druid.io使用mysql关系数据库来存储元数据,现在主要是将元数据存储方式改为使用高斯数据库存储。在Druid.io官网上找到Druid.io模块扩展的

转自 http://3ms.huawei.com/km/blogs/details/6220889


为了满足某产品的诉求,需要扩展druid.io使之支持高斯数据库。之前druid.io使用mysql关系数据库来存储元数据,现在主要是将元数据存储方式改为使用高斯数据库存储。

Druid.io官网上找到Druid.io模块扩展的官方文档:

https://druid.apache.org/docs/latest/development/modules.html
通过分析代码了解到Druid.io为了使得代码具有很强的扩展性使用了Guice框架来进行开发,下面我们来看看如何基于Guice框架扩展Druid.io支持高斯数据库。
1、创建一个高斯元数据存储的模块
在源代码druid\extensions-core目录下创建一个名为gaussdb-metadata-storage的maven模块。

创建类ZenithMetadataStorageModule继承SQLMetadataStorageDruidModule这个元数据存储基类,并实现DruidModule这个模块接口:

package org.gaussdb.metadata.storage;

import java.util.List;

import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;

import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.guice.SQLMetadataStorageDruidModule;
import io.druid.initialization.DruidModule;
import io.druid.metadata.MetadataStorageActionHandlerFactory;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.MetadataStorageProvider;
import io.druid.metadata.NoopMetadataStorageProvider;
import io.druid.metadata.SQLMetadataConnector;

public class ZenithMetadataStorageModule extends SQLMetadataStorageDruidModule implements DruidModule
{
    public static final String TYPE = "gaussdb";
    
    public ZenithMetadataStorageModule()
    {
        super(TYPE);
        // TODO Auto-generated constructor stub
    }

    @Override
    public List<? extends Module> getJacksonModules()
    {
        // TODO Auto-generated method stub
        return ImmutableList.of();
    }
    
    @Override
    public void configure(Binder binder)
    {
      super.configure(binder);

      PolyBind
          .optionBinder(binder, Key.get(MetadataStorageProvider.class))
          .addBinding(TYPE)
          .to(NoopMetadataStorageProvider.class)
          .in(LazySingleton.class);

      PolyBind
          .optionBinder(binder, Key.get(MetadataStorageConnector.class))
          .addBinding(TYPE)
          .to(ZenithConnector.class)
          .in(LazySingleton.class);

      PolyBind
          .optionBinder(binder, Key.get(SQLMetadataConnector.class))
          .addBinding(TYPE)
          .to(ZenithConnector.class)
          .in(LazySingleton.class);

      PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandlerFactory.class))
              .addBinding(TYPE)
              .to(ZenithMetadataStorageActionHandlerFactory.class)
              .in(LazySingleton.class);
    }
}

同时需要引入对高斯驱动的依赖


<dependency>
            <groupId>com.huawei.gauss</groupId>
            <artifactId>com.huawei.gauss.jdbc.ZenithDriver</artifactId>
            <version>V300R001C00SPC100B210</version>
</dependency>

2、注册高斯元数据存储模块
2.1、修改配置
Druid.io所有核心扩展可开箱即用。通过将其名称添加到common.runtime.properties配置文件的druid.extensions.loadList属性来加载绑定扩展。

druid.extensions.loadList=["gaussdb-metadata-storage"]

并增加高斯数据库的参数配置如下:


# For GaussDB:
druid.metadata.storage.type=gaussdb
druid.metadata.storage.connector.connectURI=jdbc:zenith:@10.243.49.xx:1611
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=xxx

注意此处的druid.metadata.storage.type与ZenithMetadataStorageModule代码中的TYPE是对应的。
2.2、需要在jar的META-INF / services目录中打包一个额外的文件注册模块。
通过在src / main / resources目录中创建文件,它应该是一个文本文件,类似:

META-INF/services/org.apache.druid.initialization.DruidModule,

其中包含实现DruidModule的包限定类的新行分隔列表,类似:

org.gaussdb.metadata.storage.ZenithMetadataStorageModule
此时,当jar被添加到类路径或作为扩展时,Druid.io会注意到该文件并将实例化Module的实例。 模块应该有一个默认的构造函数,但是如果你需要访问运行时配置属性,它可以有一个带@Inject的方法来获取从Guice注入的一个Properties对象。

3、验证元数据存储的实现
3.1、打包
distribution工程是专门用来打包的,为了保证druid.io打包的时候将该jar包打入,需要修改distribution工程的pom文件。

在exec-maven-plugin这个插件中增加参数:

<argument>-c</argument>
<argument>io.druid.extensions:gaussdb-metadata-storage</argument>

最终在druid-0.12.0-bin.tar.gz\druid-0.12.0\extensions\gaussdb-metadata-storage中可以看到jar包及高斯驱动包。

3.2、适配高斯语法

目前为止我们已经可以部署druid了,但是运行的时候会报很多错误和异常,这些错误一般是由于代码中存在与高斯语法不一致的地方导致的,druid.io使用jdbi进行关系数据库的操作下面一一列出对这块操作的修改。

1、ZenithConnector类中:

package org.gaussdb.metadata.storage;

import java.sql.SQLException;
import java.util.List;
import java.util.Map;

import org.apache.commons.dbcp2.BasicDataSource;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;

import com.google.common.base.Supplier;
import com.google.inject.Inject;

import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataConnector;

public class ZenithConnector extends SQLMetadataConnector
{
    private static final Logger log = new Logger(ZenithConnector.class);
    
    private static final String PAYLOAD_TYPE = "BLOB";
    
    private static final String SERIAL_TYPE = "Integer AUTO_INCREMENT";
    
    private static final String QUOTE_STRING = "";
    
    public static final int DEFAULT_STREAMING_RESULT_SIZE = 100;
    
    private final DBI dbi;
    
    @Inject
    public ZenithConnector(Supplier<MetadataStorageConnectorConfig> config,
            Supplier<MetadataStorageTablesConfig> tablesConfigSupplier)
    {
        super(config, tablesConfigSupplier);
        
        final BasicDataSource datasource = getDatasource();
        
        datasource.setDriverClassLoader(getClass().getClassLoader());
        datasource.setDriverClassName("com.huawei.gauss.jdbc.ZenithDriver");
        
        this.dbi = new DBI(datasource);
        
        log.info("Configured Zenith as metadata storage");
    }
    
    @Override
    protected String getPayloadType()
    {
        return PAYLOAD_TYPE;
    }
    
    @Override
    protected String getSerialType()
    {
        return SERIAL_TYPE;
    }
    
    @Override
    public String getQuoteString()
    {
        return QUOTE_STRING;
    }
    
    @Override
    protected int getStreamingFetchSize()
    {
        return DEFAULT_STREAMING_RESULT_SIZE;
    }
    
    @Override
    public boolean tableExists(Handle handle, String tableName)
    {        
        try
        {
            String owner = handle.getConnection().getMetaData().getUserName();
            List<Map<String, Object>> list = handle
                    .createQuery(
                            "select * from dba_tables where owner= :owner and table_name= :tableName")
                    .bind("owner", owner.toUpperCase())
                    .bind("tableName", tableName.toUpperCase()).list();
            log.info("ZenithConnector----" + list.size());
            return !list.isEmpty();
        }
        catch (SQLException e)
        {
            log.error("table is not Exists");            
        }
        return false;
    }
    
    @Override
    public Void insertOrUpdate(final String tableName, final String keyColumn,
        final String valueColumn, final String key, final byte[] value)
        throws Exception
    {
        return getDBI().withHandle(new HandleCallback<Void>()
        {
            @Override
            public Void withHandle(Handle handle) throws Exception
            {
                handle.createStatement(StringUtils.format(
                        "INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
                        tableName,
                        keyColumn,
                        valueColumn))
                        .bind("key", key)
                        .bind("value", value)
                        .execute();
                return null;
            }
        });
    }
    
    @Override
    public DBI getDBI()
    {
        return dbi;
    }
    
}

上面代码中有几个关键地方解释下:


PAYLOAD_TYPE = "BLOB" ;//  高斯下CLOB方式不行,必须用BLOB
SERIAL_TYPE = "Integer AUTO_INCREMENT";  // 主键自增方式
QUOTE_STRING = " ";  // 高斯对空字符串的处理必须转换为空格
DEFAULT_STREAMING_RESULT_SIZE = 100;  // 高斯中必须不能为0

另外高斯中对查询数据表是否存在时需要使用用户名参数,可以从连接对象中获取:

-

Java 代码


String owner = handle.getConnection().getMetaData().getUserName();

2、解决高斯Do not support addBatch问题

druid-server工程中SQLMetadataConnector类:

public void createTable(final String tableName, final Iterable<String> sql)
  {
    try {
      retryWithHandle(
          new HandleCallback<Void>()
          {
            @Override
            public Void withHandle(Handle handle)
            {
            	if (!tableExists(handle, tableName)) {
                    log.info("Creating table[%s]", tableName);
//                  final Batch batch = handle.createBatch();
                  for (String s : sql) {
//                      log.info(s);
//                    batch.add(s);
                      handle.execute(s);
                  }
//                  batch.execute();
                  } else {
                    log.info("Table[%s] already exists", tableName);
                  }
                  return null;
            }
          }
      );
    }

3、解决字段名与保留关键字冲突问题:

druid-server工程中SQLMetadataConnector类:

将sql语句中所有start改为starttime,end改为endtime

  public void createPendingSegmentsTable(final String tableName)
  {
    createTable(
        tableName,
        ImmutableList.of(
            StringUtils.format(
                "CREATE TABLE %1$s (\n"
                + "  id VARCHAR(255) NOT NULL,\n"
                + "  dataSource VARCHAR(255) NOT NULL,\n"
                + "  created_date VARCHAR(255) NOT NULL,\n"
                + "  starttime VARCHAR(255) NOT NULL,\n"
                + "  endtime VARCHAR(255) NOT NULL,\n"
                + "  sequence_name VARCHAR(255) NOT NULL,\n"
                + "  sequence_prev_id VARCHAR(255),\n"
                + "  sequence_name_prev_id_sha1 VARCHAR(255) NOT NULL,\n"
                + "  payload %2$s NOT NULL,\n"
                + "  PRIMARY KEY (id),\n"
                + "  UNIQUE (sequence_name_prev_id_sha1)\n"
                + ")",
//                tableName, getPayloadType(), getQuoteString()
                tableName, getPayloadType()
            )
        )
    );
  }

druid-server工程中IndexerSQLMetadataStorageCoordinator类:

start和end字符串与高斯关键字冲突,将sql语句中所有start改为starttime,end改为endtime

4、sequence_prev_id改为可以为空。

-

Java 代码


final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;


此处的previousSegmentIdNotNull会作为sequence_prev_id插入,但高斯不支持null或“”字符串的插入,所以sequence_prev_id字段要设置为可以为空。


5、字段名大小写问题:

druid-server工程中SQLMetadataSegmentManager类396行:
stringObjectMap中的key转为大写再取值,因为高斯数据库查询返回的字段名是大写的



【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。