扩展Druid.io支持GaussDB T 数据库[转]
转自 http://3ms.huawei.com/km/blogs/details/6220889
为了满足某产品的诉求,需要扩展druid.io使之支持高斯数据库。之前druid.io使用mysql关系数据库来存储元数据,现在主要是将元数据存储方式改为使用高斯数据库存储。
在Druid.io官网上找到Druid.io模块扩展的官方文档:
https://druid.apache.org/docs/latest/development/modules.html
通过分析代码了解到Druid.io为了使得代码具有很强的扩展性使用了Guice框架来进行开发,下面我们来看看如何基于Guice框架扩展Druid.io支持高斯数据库。
1、创建一个高斯元数据存储的模块
在源代码druid\extensions-core目录下创建一个名为gaussdb-metadata-storage的maven模块。
创建类ZenithMetadataStorageModule继承SQLMetadataStorageDruidModule这个元数据存储基类,并实现DruidModule这个模块接口:
package org.gaussdb.metadata.storage; import java.util.List; import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Key; import io.druid.guice.LazySingleton; import io.druid.guice.PolyBind; import io.druid.guice.SQLMetadataStorageDruidModule; import io.druid.initialization.DruidModule; import io.druid.metadata.MetadataStorageActionHandlerFactory; import io.druid.metadata.MetadataStorageConnector; import io.druid.metadata.MetadataStorageProvider; import io.druid.metadata.NoopMetadataStorageProvider; import io.druid.metadata.SQLMetadataConnector; public class ZenithMetadataStorageModule extends SQLMetadataStorageDruidModule implements DruidModule { public static final String TYPE = "gaussdb"; public ZenithMetadataStorageModule() { super(TYPE); // TODO Auto-generated constructor stub } @Override public List<? extends Module> getJacksonModules() { // TODO Auto-generated method stub return ImmutableList.of(); } @Override public void configure(Binder binder) { super.configure(binder); PolyBind .optionBinder(binder, Key.get(MetadataStorageProvider.class)) .addBinding(TYPE) .to(NoopMetadataStorageProvider.class) .in(LazySingleton.class); PolyBind .optionBinder(binder, Key.get(MetadataStorageConnector.class)) .addBinding(TYPE) .to(ZenithConnector.class) .in(LazySingleton.class); PolyBind .optionBinder(binder, Key.get(SQLMetadataConnector.class)) .addBinding(TYPE) .to(ZenithConnector.class) .in(LazySingleton.class); PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandlerFactory.class)) .addBinding(TYPE) .to(ZenithMetadataStorageActionHandlerFactory.class) .in(LazySingleton.class); } }
同时需要引入对高斯驱动的依赖
<dependency> <groupId>com.huawei.gauss</groupId> <artifactId>com.huawei.gauss.jdbc.ZenithDriver</artifactId> <version>V300R001C00SPC100B210</version> </dependency>
2、注册高斯元数据存储模块
2.1、修改配置
Druid.io所有核心扩展可开箱即用。通过将其名称添加到common.runtime.properties配置文件的druid.extensions.loadList属性来加载绑定扩展。
druid.extensions.loadList=["gaussdb-metadata-storage"]
并增加高斯数据库的参数配置如下:
# For GaussDB: druid.metadata.storage.type=gaussdb druid.metadata.storage.connector.connectURI=jdbc:zenith:@10.243.49.xx:1611 druid.metadata.storage.connector.user=druid druid.metadata.storage.connector.password=xxx
注意此处的druid.metadata.storage.type与ZenithMetadataStorageModule代码中的TYPE是对应的。
2.2、需要在jar的META-INF / services目录中打包一个额外的文件注册模块。
通过在src / main / resources目录中创建文件,它应该是一个文本文件,类似:
META-INF/services/org.apache.druid.initialization.DruidModule,
其中包含实现DruidModule的包限定类的新行分隔列表,类似:
org.gaussdb.metadata.storage.ZenithMetadataStorageModule
此时,当jar被添加到类路径或作为扩展时,Druid.io会注意到该文件并将实例化Module的实例。 模块应该有一个默认的构造函数,但是如果你需要访问运行时配置属性,它可以有一个带@Inject的方法来获取从Guice注入的一个Properties对象。
3、验证元数据存储的实现
3.1、打包
distribution工程是专门用来打包的,为了保证druid.io打包的时候将该jar包打入,需要修改distribution工程的pom文件。
在exec-maven-plugin这个插件中增加参数:
<argument>-c</argument> <argument>io.druid.extensions:gaussdb-metadata-storage</argument>
最终在druid-0.12.0-bin.tar.gz\druid-0.12.0\extensions\gaussdb-metadata-storage中可以看到jar包及高斯驱动包。
3.2、适配高斯语法
目前为止我们已经可以部署druid了,但是运行的时候会报很多错误和异常,这些错误一般是由于代码中存在与高斯语法不一致的地方导致的,druid.io使用jdbi进行关系数据库的操作,下面一一列出对这块操作的修改。
1、ZenithConnector类中:
package org.gaussdb.metadata.storage; import java.sql.SQLException; import java.util.List; import java.util.Map; import org.apache.commons.dbcp2.BasicDataSource; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.tweak.HandleCallback; import com.google.common.base.Supplier; import com.google.inject.Inject; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; public class ZenithConnector extends SQLMetadataConnector { private static final Logger log = new Logger(ZenithConnector.class); private static final String PAYLOAD_TYPE = "BLOB"; private static final String SERIAL_TYPE = "Integer AUTO_INCREMENT"; private static final String QUOTE_STRING = ""; public static final int DEFAULT_STREAMING_RESULT_SIZE = 100; private final DBI dbi; @Inject public ZenithConnector(Supplier<MetadataStorageConnectorConfig> config, Supplier<MetadataStorageTablesConfig> tablesConfigSupplier) { super(config, tablesConfigSupplier); final BasicDataSource datasource = getDatasource(); datasource.setDriverClassLoader(getClass().getClassLoader()); datasource.setDriverClassName("com.huawei.gauss.jdbc.ZenithDriver"); this.dbi = new DBI(datasource); log.info("Configured Zenith as metadata storage"); } @Override protected String getPayloadType() { return PAYLOAD_TYPE; } @Override protected String getSerialType() { return SERIAL_TYPE; } @Override public String getQuoteString() { return QUOTE_STRING; } @Override protected int getStreamingFetchSize() { return DEFAULT_STREAMING_RESULT_SIZE; } @Override public boolean tableExists(Handle handle, String tableName) { try { String owner = handle.getConnection().getMetaData().getUserName(); List<Map<String, Object>> list = handle .createQuery( "select * from dba_tables where owner= :owner and table_name= :tableName") .bind("owner", owner.toUpperCase()) .bind("tableName", tableName.toUpperCase()).list(); log.info("ZenithConnector----" + list.size()); return !list.isEmpty(); } catch (SQLException e) { log.error("table is not Exists"); } return false; } @Override public Void insertOrUpdate(final String tableName, final String keyColumn, final String valueColumn, final String key, final byte[] value) throws Exception { return getDBI().withHandle(new HandleCallback<Void>() { @Override public Void withHandle(Handle handle) throws Exception { handle.createStatement(StringUtils.format( "INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value", tableName, keyColumn, valueColumn)) .bind("key", key) .bind("value", value) .execute(); return null; } }); } @Override public DBI getDBI() { return dbi; } }
上面代码中有几个关键地方解释下:
PAYLOAD_TYPE = "BLOB" ;// 高斯下CLOB方式不行,必须用BLOB SERIAL_TYPE = "Integer AUTO_INCREMENT"; // 主键自增方式 QUOTE_STRING = " "; // 高斯对空字符串的处理必须转换为空格 DEFAULT_STREAMING_RESULT_SIZE = 100; // 高斯中必须不能为0
另外高斯中对查询数据表是否存在时需要使用用户名参数,可以从连接对象中获取:
String owner = handle.getConnection().getMetaData().getUserName();
2、解决高斯Do not support addBatch问题
druid-server工程中SQLMetadataConnector类:
public void createTable(final String tableName, final Iterable<String> sql) { try { retryWithHandle( new HandleCallback<Void>() { @Override public Void withHandle(Handle handle) { if (!tableExists(handle, tableName)) { log.info("Creating table[%s]", tableName); // final Batch batch = handle.createBatch(); for (String s : sql) { // log.info(s); // batch.add(s); handle.execute(s); } // batch.execute(); } else { log.info("Table[%s] already exists", tableName); } return null; } } ); }
3、解决字段名与保留关键字冲突问题:
druid-server工程中SQLMetadataConnector类:
将sql语句中所有start改为starttime,end改为endtime
public void createPendingSegmentsTable(final String tableName) { createTable( tableName, ImmutableList.of( StringUtils.format( "CREATE TABLE %1$s (\n" + " id VARCHAR(255) NOT NULL,\n" + " dataSource VARCHAR(255) NOT NULL,\n" + " created_date VARCHAR(255) NOT NULL,\n" + " starttime VARCHAR(255) NOT NULL,\n" + " endtime VARCHAR(255) NOT NULL,\n" + " sequence_name VARCHAR(255) NOT NULL,\n" + " sequence_prev_id VARCHAR(255),\n" + " sequence_name_prev_id_sha1 VARCHAR(255) NOT NULL,\n" + " payload %2$s NOT NULL,\n" + " PRIMARY KEY (id),\n" + " UNIQUE (sequence_name_prev_id_sha1)\n" + ")", // tableName, getPayloadType(), getQuoteString() tableName, getPayloadType() ) ) ); }
druid-server工程中IndexerSQLMetadataStorageCoordinator类:
start和end字符串与高斯关键字冲突,将sql语句中所有start改为starttime,end改为endtime
4、sequence_prev_id改为可以为空。
final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
此处的previousSegmentIdNotNull会作为sequence_prev_id插入,但高斯不支持null或“”字符串的插入,所以sequence_prev_id字段要设置为可以为空。
5、字段名大小写问题:
druid-server工程中SQLMetadataSegmentManager类396行:
stringObjectMap中的key转为大写再取值,因为高斯数据库查询返回的字段名是大写的。
- 点赞
- 收藏
- 关注作者
评论(0)