扩展Druid.io支持GaussDB T 数据库[转]
转自 http://3ms.huawei.com/km/blogs/details/6220889
为了满足某产品的诉求,需要扩展druid.io使之支持高斯数据库。之前druid.io使用mysql关系数据库来存储元数据,现在主要是将元数据存储方式改为使用高斯数据库存储。
在Druid.io官网上找到Druid.io模块扩展的官方文档:
https://druid.apache.org/docs/latest/development/modules.html
通过分析代码了解到Druid.io为了使得代码具有很强的扩展性使用了Guice框架来进行开发,下面我们来看看如何基于Guice框架扩展Druid.io支持高斯数据库。
1、创建一个高斯元数据存储的模块
在源代码druid\extensions-core目录下创建一个名为gaussdb-metadata-storage的maven模块。
创建类ZenithMetadataStorageModule继承SQLMetadataStorageDruidModule这个元数据存储基类,并实现DruidModule这个模块接口:
package org.gaussdb.metadata.storage;
import java.util.List;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.guice.SQLMetadataStorageDruidModule;
import io.druid.initialization.DruidModule;
import io.druid.metadata.MetadataStorageActionHandlerFactory;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.MetadataStorageProvider;
import io.druid.metadata.NoopMetadataStorageProvider;
import io.druid.metadata.SQLMetadataConnector;
public class ZenithMetadataStorageModule extends SQLMetadataStorageDruidModule implements DruidModule
{
    public static final String TYPE = "gaussdb";
    
    public ZenithMetadataStorageModule()
    {
        super(TYPE);
        // TODO Auto-generated constructor stub
    }
    @Override
    public List<? extends Module> getJacksonModules()
    {
        // TODO Auto-generated method stub
        return ImmutableList.of();
    }
    
    @Override
    public void configure(Binder binder)
    {
      super.configure(binder);
      PolyBind
          .optionBinder(binder, Key.get(MetadataStorageProvider.class))
          .addBinding(TYPE)
          .to(NoopMetadataStorageProvider.class)
          .in(LazySingleton.class);
      PolyBind
          .optionBinder(binder, Key.get(MetadataStorageConnector.class))
          .addBinding(TYPE)
          .to(ZenithConnector.class)
          .in(LazySingleton.class);
      PolyBind
          .optionBinder(binder, Key.get(SQLMetadataConnector.class))
          .addBinding(TYPE)
          .to(ZenithConnector.class)
          .in(LazySingleton.class);
      PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandlerFactory.class))
              .addBinding(TYPE)
              .to(ZenithMetadataStorageActionHandlerFactory.class)
              .in(LazySingleton.class);
    }
}同时需要引入对高斯驱动的依赖
<dependency> <groupId>com.huawei.gauss</groupId> <artifactId>com.huawei.gauss.jdbc.ZenithDriver</artifactId> <version>V300R001C00SPC100B210</version> </dependency>
2、注册高斯元数据存储模块
2.1、修改配置
Druid.io所有核心扩展可开箱即用。通过将其名称添加到common.runtime.properties配置文件的druid.extensions.loadList属性来加载绑定扩展。
druid.extensions.loadList=["gaussdb-metadata-storage"]
并增加高斯数据库的参数配置如下:
# For GaussDB: druid.metadata.storage.type=gaussdb druid.metadata.storage.connector.connectURI=jdbc:zenith:@10.243.49.xx:1611 druid.metadata.storage.connector.user=druid druid.metadata.storage.connector.password=xxx
注意此处的druid.metadata.storage.type与ZenithMetadataStorageModule代码中的TYPE是对应的。
2.2、需要在jar的META-INF / services目录中打包一个额外的文件注册模块。
通过在src / main / resources目录中创建文件,它应该是一个文本文件,类似:
META-INF/services/org.apache.druid.initialization.DruidModule,
其中包含实现DruidModule的包限定类的新行分隔列表,类似:
org.gaussdb.metadata.storage.ZenithMetadataStorageModule
此时,当jar被添加到类路径或作为扩展时,Druid.io会注意到该文件并将实例化Module的实例。 模块应该有一个默认的构造函数,但是如果你需要访问运行时配置属性,它可以有一个带@Inject的方法来获取从Guice注入的一个Properties对象。
3、验证元数据存储的实现
3.1、打包
distribution工程是专门用来打包的,为了保证druid.io打包的时候将该jar包打入,需要修改distribution工程的pom文件。
在exec-maven-plugin这个插件中增加参数:
<argument>-c</argument> <argument>io.druid.extensions:gaussdb-metadata-storage</argument>
最终在druid-0.12.0-bin.tar.gz\druid-0.12.0\extensions\gaussdb-metadata-storage中可以看到jar包及高斯驱动包。
3.2、适配高斯语法
目前为止我们已经可以部署druid了,但是运行的时候会报很多错误和异常,这些错误一般是由于代码中存在与高斯语法不一致的地方导致的,druid.io使用jdbi进行关系数据库的操作,下面一一列出对这块操作的修改。
1、ZenithConnector类中:
package org.gaussdb.metadata.storage;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import org.apache.commons.dbcp2.BasicDataSource;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataConnector;
public class ZenithConnector extends SQLMetadataConnector
{
    private static final Logger log = new Logger(ZenithConnector.class);
    
    private static final String PAYLOAD_TYPE = "BLOB";
    
    private static final String SERIAL_TYPE = "Integer AUTO_INCREMENT";
    
    private static final String QUOTE_STRING = "";
    
    public static final int DEFAULT_STREAMING_RESULT_SIZE = 100;
    
    private final DBI dbi;
    
    @Inject
    public ZenithConnector(Supplier<MetadataStorageConnectorConfig> config,
            Supplier<MetadataStorageTablesConfig> tablesConfigSupplier)
    {
        super(config, tablesConfigSupplier);
        
        final BasicDataSource datasource = getDatasource();
        
        datasource.setDriverClassLoader(getClass().getClassLoader());
        datasource.setDriverClassName("com.huawei.gauss.jdbc.ZenithDriver");
        
        this.dbi = new DBI(datasource);
        
        log.info("Configured Zenith as metadata storage");
    }
    
    @Override
    protected String getPayloadType()
    {
        return PAYLOAD_TYPE;
    }
    
    @Override
    protected String getSerialType()
    {
        return SERIAL_TYPE;
    }
    
    @Override
    public String getQuoteString()
    {
        return QUOTE_STRING;
    }
    
    @Override
    protected int getStreamingFetchSize()
    {
        return DEFAULT_STREAMING_RESULT_SIZE;
    }
    
    @Override
    public boolean tableExists(Handle handle, String tableName)
    {        
        try
        {
            String owner = handle.getConnection().getMetaData().getUserName();
            List<Map<String, Object>> list = handle
                    .createQuery(
                            "select * from dba_tables where owner= :owner and table_name= :tableName")
                    .bind("owner", owner.toUpperCase())
                    .bind("tableName", tableName.toUpperCase()).list();
            log.info("ZenithConnector----" + list.size());
            return !list.isEmpty();
        }
        catch (SQLException e)
        {
            log.error("table is not Exists");            
        }
        return false;
    }
    
    @Override
    public Void insertOrUpdate(final String tableName, final String keyColumn,
        final String valueColumn, final String key, final byte[] value)
        throws Exception
    {
        return getDBI().withHandle(new HandleCallback<Void>()
        {
            @Override
            public Void withHandle(Handle handle) throws Exception
            {
                handle.createStatement(StringUtils.format(
                        "INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
                        tableName,
                        keyColumn,
                        valueColumn))
                        .bind("key", key)
                        .bind("value", value)
                        .execute();
                return null;
            }
        });
    }
    
    @Override
    public DBI getDBI()
    {
        return dbi;
    }
    
}上面代码中有几个关键地方解释下:
PAYLOAD_TYPE = "BLOB" ;// 高斯下CLOB方式不行,必须用BLOB SERIAL_TYPE = "Integer AUTO_INCREMENT"; // 主键自增方式 QUOTE_STRING = " "; // 高斯对空字符串的处理必须转换为空格 DEFAULT_STREAMING_RESULT_SIZE = 100; // 高斯中必须不能为0
另外高斯中对查询数据表是否存在时需要使用用户名参数,可以从连接对象中获取:
String owner = handle.getConnection().getMetaData().getUserName();
2、解决高斯Do not support addBatch问题
druid-server工程中SQLMetadataConnector类:
public void createTable(final String tableName, final Iterable<String> sql)
  {
    try {
      retryWithHandle(
          new HandleCallback<Void>()
          {
            @Override
            public Void withHandle(Handle handle)
            {
            	if (!tableExists(handle, tableName)) {
                    log.info("Creating table[%s]", tableName);
//                  final Batch batch = handle.createBatch();
                  for (String s : sql) {
//                      log.info(s);
//                    batch.add(s);
                      handle.execute(s);
                  }
//                  batch.execute();
                  } else {
                    log.info("Table[%s] already exists", tableName);
                  }
                  return null;
            }
          }
      );
    }3、解决字段名与保留关键字冲突问题:
druid-server工程中SQLMetadataConnector类:
将sql语句中所有start改为starttime,end改为endtime
  public void createPendingSegmentsTable(final String tableName)
  {
    createTable(
        tableName,
        ImmutableList.of(
            StringUtils.format(
                "CREATE TABLE %1$s (\n"
                + "  id VARCHAR(255) NOT NULL,\n"
                + "  dataSource VARCHAR(255) NOT NULL,\n"
                + "  created_date VARCHAR(255) NOT NULL,\n"
                + "  starttime VARCHAR(255) NOT NULL,\n"
                + "  endtime VARCHAR(255) NOT NULL,\n"
                + "  sequence_name VARCHAR(255) NOT NULL,\n"
                + "  sequence_prev_id VARCHAR(255),\n"
                + "  sequence_name_prev_id_sha1 VARCHAR(255) NOT NULL,\n"
                + "  payload %2$s NOT NULL,\n"
                + "  PRIMARY KEY (id),\n"
                + "  UNIQUE (sequence_name_prev_id_sha1)\n"
                + ")",
//                tableName, getPayloadType(), getQuoteString()
                tableName, getPayloadType()
            )
        )
    );
  }druid-server工程中IndexerSQLMetadataStorageCoordinator类:
start和end字符串与高斯关键字冲突,将sql语句中所有start改为starttime,end改为endtime
4、sequence_prev_id改为可以为空。
final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
此处的previousSegmentIdNotNull会作为sequence_prev_id插入,但高斯不支持null或“”字符串的插入,所以sequence_prev_id字段要设置为可以为空。
5、字段名大小写问题:
druid-server工程中SQLMetadataSegmentManager类396行:
stringObjectMap中的key转为大写再取值,因为高斯数据库查询返回的字段名是大写的。
- 点赞
 - 收藏
 - 关注作者
 
            
           
评论(0)