扩展Druid.io支持GaussDB T 数据库[转]
转自 http://3ms.huawei.com/km/blogs/details/6220889
为了满足某产品的诉求,需要扩展druid.io使之支持高斯数据库。之前druid.io使用mysql关系数据库来存储元数据,现在主要是将元数据存储方式改为使用高斯数据库存储。
在Druid.io官网上找到Druid.io模块扩展的官方文档:
https://druid.apache.org/docs/latest/development/modules.html
通过分析代码了解到Druid.io为了使得代码具有很强的扩展性使用了Guice框架来进行开发,下面我们来看看如何基于Guice框架扩展Druid.io支持高斯数据库。
1、创建一个高斯元数据存储的模块
在源代码druid\extensions-core目录下创建一个名为gaussdb-metadata-storage的maven模块。
创建类ZenithMetadataStorageModule继承SQLMetadataStorageDruidModule这个元数据存储基类,并实现DruidModule这个模块接口:
package org.gaussdb.metadata.storage;
import java.util.List;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.guice.SQLMetadataStorageDruidModule;
import io.druid.initialization.DruidModule;
import io.druid.metadata.MetadataStorageActionHandlerFactory;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.MetadataStorageProvider;
import io.druid.metadata.NoopMetadataStorageProvider;
import io.druid.metadata.SQLMetadataConnector;
public class ZenithMetadataStorageModule extends SQLMetadataStorageDruidModule implements DruidModule
{
public static final String TYPE = "gaussdb";
public ZenithMetadataStorageModule()
{
super(TYPE);
// TODO Auto-generated constructor stub
}
@Override
public List<? extends Module> getJacksonModules()
{
// TODO Auto-generated method stub
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
super.configure(binder);
PolyBind
.optionBinder(binder, Key.get(MetadataStorageProvider.class))
.addBinding(TYPE)
.to(NoopMetadataStorageProvider.class)
.in(LazySingleton.class);
PolyBind
.optionBinder(binder, Key.get(MetadataStorageConnector.class))
.addBinding(TYPE)
.to(ZenithConnector.class)
.in(LazySingleton.class);
PolyBind
.optionBinder(binder, Key.get(SQLMetadataConnector.class))
.addBinding(TYPE)
.to(ZenithConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandlerFactory.class))
.addBinding(TYPE)
.to(ZenithMetadataStorageActionHandlerFactory.class)
.in(LazySingleton.class);
}
}
同时需要引入对高斯驱动的依赖
<dependency>
<groupId>com.huawei.gauss</groupId>
<artifactId>com.huawei.gauss.jdbc.ZenithDriver</artifactId>
<version>V300R001C00SPC100B210</version>
</dependency>
2、注册高斯元数据存储模块
2.1、修改配置
Druid.io所有核心扩展可开箱即用。通过将其名称添加到common.runtime.properties配置文件的druid.extensions.loadList属性来加载绑定扩展。
druid.extensions.loadList=["gaussdb-metadata-storage"]
并增加高斯数据库的参数配置如下:
# For GaussDB:
druid.metadata.storage.type=gaussdb
druid.metadata.storage.connector.connectURI=jdbc:zenith:@10.243.49.xx:1611
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=xxx
注意此处的druid.metadata.storage.type与ZenithMetadataStorageModule代码中的TYPE是对应的。
2.2、需要在jar的META-INF / services目录中打包一个额外的文件注册模块。
通过在src / main / resources目录中创建文件,它应该是一个文本文件,类似:
META-INF/services/org.apache.druid.initialization.DruidModule,
其中包含实现DruidModule的包限定类的新行分隔列表,类似:
org.gaussdb.metadata.storage.ZenithMetadataStorageModule
此时,当jar被添加到类路径或作为扩展时,Druid.io会注意到该文件并将实例化Module的实例。 模块应该有一个默认的构造函数,但是如果你需要访问运行时配置属性,它可以有一个带@Inject的方法来获取从Guice注入的一个Properties对象。
3、验证元数据存储的实现
3.1、打包
distribution工程是专门用来打包的,为了保证druid.io打包的时候将该jar包打入,需要修改distribution工程的pom文件。
在exec-maven-plugin这个插件中增加参数:
<argument>-c</argument>
<argument>io.druid.extensions:gaussdb-metadata-storage</argument>
最终在druid-0.12.0-bin.tar.gz\druid-0.12.0\extensions\gaussdb-metadata-storage中可以看到jar包及高斯驱动包。
3.2、适配高斯语法
目前为止我们已经可以部署druid了,但是运行的时候会报很多错误和异常,这些错误一般是由于代码中存在与高斯语法不一致的地方导致的,druid.io使用jdbi进行关系数据库的操作,下面一一列出对这块操作的修改。
1、ZenithConnector类中:
package org.gaussdb.metadata.storage;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import org.apache.commons.dbcp2.BasicDataSource;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataConnector;
public class ZenithConnector extends SQLMetadataConnector
{
private static final Logger log = new Logger(ZenithConnector.class);
private static final String PAYLOAD_TYPE = "BLOB";
private static final String SERIAL_TYPE = "Integer AUTO_INCREMENT";
private static final String QUOTE_STRING = "";
public static final int DEFAULT_STREAMING_RESULT_SIZE = 100;
private final DBI dbi;
@Inject
public ZenithConnector(Supplier<MetadataStorageConnectorConfig> config,
Supplier<MetadataStorageTablesConfig> tablesConfigSupplier)
{
super(config, tablesConfigSupplier);
final BasicDataSource datasource = getDatasource();
datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName("com.huawei.gauss.jdbc.ZenithDriver");
this.dbi = new DBI(datasource);
log.info("Configured Zenith as metadata storage");
}
@Override
protected String getPayloadType()
{
return PAYLOAD_TYPE;
}
@Override
protected String getSerialType()
{
return SERIAL_TYPE;
}
@Override
public String getQuoteString()
{
return QUOTE_STRING;
}
@Override
protected int getStreamingFetchSize()
{
return DEFAULT_STREAMING_RESULT_SIZE;
}
@Override
public boolean tableExists(Handle handle, String tableName)
{
try
{
String owner = handle.getConnection().getMetaData().getUserName();
List<Map<String, Object>> list = handle
.createQuery(
"select * from dba_tables where owner= :owner and table_name= :tableName")
.bind("owner", owner.toUpperCase())
.bind("tableName", tableName.toUpperCase()).list();
log.info("ZenithConnector----" + list.size());
return !list.isEmpty();
}
catch (SQLException e)
{
log.error("table is not Exists");
}
return false;
}
@Override
public Void insertOrUpdate(final String tableName, final String keyColumn,
final String valueColumn, final String key, final byte[] value)
throws Exception
{
return getDBI().withHandle(new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(StringUtils.format(
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
tableName,
keyColumn,
valueColumn))
.bind("key", key)
.bind("value", value)
.execute();
return null;
}
});
}
@Override
public DBI getDBI()
{
return dbi;
}
}
上面代码中有几个关键地方解释下:
PAYLOAD_TYPE = "BLOB" ;// 高斯下CLOB方式不行,必须用BLOB
SERIAL_TYPE = "Integer AUTO_INCREMENT"; // 主键自增方式
QUOTE_STRING = " "; // 高斯对空字符串的处理必须转换为空格
DEFAULT_STREAMING_RESULT_SIZE = 100; // 高斯中必须不能为0
另外高斯中对查询数据表是否存在时需要使用用户名参数,可以从连接对象中获取:
String owner = handle.getConnection().getMetaData().getUserName();
2、解决高斯Do not support addBatch问题
druid-server工程中SQLMetadataConnector类:
public void createTable(final String tableName, final Iterable<String> sql)
{
try {
retryWithHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
if (!tableExists(handle, tableName)) {
log.info("Creating table[%s]", tableName);
// final Batch batch = handle.createBatch();
for (String s : sql) {
// log.info(s);
// batch.add(s);
handle.execute(s);
}
// batch.execute();
} else {
log.info("Table[%s] already exists", tableName);
}
return null;
}
}
);
}
3、解决字段名与保留关键字冲突问题:
druid-server工程中SQLMetadataConnector类:
将sql语句中所有start改为starttime,end改为endtime
public void createPendingSegmentsTable(final String tableName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " id VARCHAR(255) NOT NULL,\n"
+ " dataSource VARCHAR(255) NOT NULL,\n"
+ " created_date VARCHAR(255) NOT NULL,\n"
+ " starttime VARCHAR(255) NOT NULL,\n"
+ " endtime VARCHAR(255) NOT NULL,\n"
+ " sequence_name VARCHAR(255) NOT NULL,\n"
+ " sequence_prev_id VARCHAR(255),\n"
+ " sequence_name_prev_id_sha1 VARCHAR(255) NOT NULL,\n"
+ " payload %2$s NOT NULL,\n"
+ " PRIMARY KEY (id),\n"
+ " UNIQUE (sequence_name_prev_id_sha1)\n"
+ ")",
// tableName, getPayloadType(), getQuoteString()
tableName, getPayloadType()
)
)
);
}
druid-server工程中IndexerSQLMetadataStorageCoordinator类:
start和end字符串与高斯关键字冲突,将sql语句中所有start改为starttime,end改为endtime
4、sequence_prev_id改为可以为空。
final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
此处的previousSegmentIdNotNull会作为sequence_prev_id插入,但高斯不支持null或“”字符串的插入,所以sequence_prev_id字段要设置为可以为空。
5、字段名大小写问题:
druid-server工程中SQLMetadataSegmentManager类396行:
stringObjectMap中的key转为大写再取值,因为高斯数据库查询返回的字段名是大写的。
- 点赞
- 收藏
- 关注作者
评论(0)