聊聊ShardingSphere的SQL归并

举报
周杰伦本人 发表于 2022/11/29 17:32:25 2022/11/29
【摘要】 聊聊ShardingSphere的SQL归并本篇文章源码基于4.0.1版本所谓SQL归并,我们在之前的文章讲到ShardingStatement在执行查询的时候执行完SQL后对结果进行归并,调用的就是MergeEngine的merge()方法,而归并引擎的创建是由工厂类创建的 归并引擎工厂类MergeEngineFactory的newInstance()方法: public stat...

聊聊ShardingSphere的SQL归并

本篇文章源码基于4.0.1版本

所谓SQL归并,我们在之前的文章讲到ShardingStatement在执行查询的时候执行完SQL后对结果进行归并,调用的就是MergeEngine的merge()方法,而归并引擎的创建是由工厂类创建的

归并引擎工厂类

MergeEngineFactory的newInstance()方法:

    public static MergeEngine newInstance(final DatabaseType databaseType, final ShardingRule shardingRule,
                                          final SQLRouteResult routeResult, final RelationMetas relationMetas, final List<QueryResult> queryResults) {
        if (routeResult.getSqlStatementContext() instanceof SelectSQLStatementContext) {
            return new DQLMergeEngine(databaseType, (SelectSQLStatementContext) routeResult.getSqlStatementContext(), queryResults);
        } 
        if (routeResult.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
            return new DALMergeEngine(shardingRule, queryResults, routeResult.getSqlStatementContext(), relationMetas);
        }
        return new TransparentMergeEngine(queryResults);
    }
  1. 如果是查询语句,创建DQLMergeEngine DQL 结果集合并引擎对象
  2. 如果是数据库管理语句,创建DAL 结果集合并引擎DALMergeEngine对象
  3. 既不是查询语句又不是数据库管理语句就创建透明的结果集合并引擎TransparentMergeEngine对象。

所以我们的重点是看一下这个三类的merge()方法

DQL 结果集合并引擎

    public MergedResult merge() throws SQLException {
        if (1 == queryResults.size()) {
            return new IteratorStreamMergedResult(queryResults);
        }
        Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));
        selectSQLStatementContext.setIndexes(columnLabelIndexMap);
        return decorate(build(columnLabelIndexMap));
    }
  1. 当结果集只有一个的时候,进行创建IteratorStreamMergedResult对象进行遍历归并
  2. 不小于一个的时候调用build()方法根据SQL语句创建不同的合并结果对象,decorate()方法中根据不同的数据库类型进行分页合并。我们看一下它的构建方法创建了什么合并结果对象

DQLMergeEngine的build()方法:

    private MergedResult build(final Map<String, Integer> columnLabelIndexMap) throws SQLException {
        if (isNeedProcessGroupBy()) {
            return getGroupByMergedResult(columnLabelIndexMap);
        }
        if (isNeedProcessDistinctRow()) {
            setGroupByForDistinctRow();
            return getGroupByMergedResult(columnLabelIndexMap);
        }
        if (isNeedProcessOrderBy()) {
            return new OrderByStreamMergedResult(queryResults, selectSQLStatementContext.getOrderByContext().getItems());
        }
        return new IteratorStreamMergedResult(queryResults);
    }
  1. 判断是否需要分组,如果是分组SQL,进行分组合并
  2. 判断是否需要去重,如果是带有Distinct去重的SQL,设置去重分组,进行分组合并
  3. 判断是否需要排序,如果需要排序,进行排序合并
  4. 以上都不是就进行遍历合并

遍历合并

遍历合并很简单,就是一个个进行遍历,它是流式合并

分组合并

分组合并会判断 group by 和 order by 顺序是否相同,如果相同就创建GroupByStreamMergedResult对象,也就是流式分组合并,否则创建GroupByMemoryMergedResult对象,进行内存分组合并,对内存的分组这里不展开讲了,其实就将结果集放入内存中,取下一条的时候直接从内存中取。

GroupByStreamMergedResult的next()方法是获取下一条数据,下一条数据的生成是创建聚合单元实例,AggregationUnitFactory的create()方法创建聚合单元实例,

    public static AggregationUnit create(final AggregationType type, final boolean isDistinct) {
        switch (type) {
            case MAX:
                return new ComparableAggregationUnit(false);
            case MIN:
                return new ComparableAggregationUnit(true);
            case SUM:
                return isDistinct ? new DistinctSumAggregationUnit() : new AccumulationAggregationUnit();
            case COUNT:
                return isDistinct ? new DistinctCountAggregationUnit() : new AccumulationAggregationUnit();
            case AVG:
                return isDistinct ? new DistinctAverageAggregationUnit() : new AverageAggregationUnit();
            default:
                throw new UnsupportedOperationException(type.name());
        }
    }

看一下这个方法就知道了是根据不同的聚合类型创建不同的单元实例,ShardingSphere支持的分组合并的函数就是方法中的五种:最大值,最小值,求和,计数,求平均值。

调用单元实例中的 merge 方法来完成聚合值的计算

排序合并

排序合并就是将分片查到的排序结果进行全局排序,对应的对象是OrderByStreamMergedResult,通过它的成员变量中有个比较队列PriorityQueue,这个队列保存的所有分片结果集的第一个元素,它的next()方法中进行排序操作确定下一个结果对象,大体逻辑是取出队列中的第一个元素,然后,然后将下一个元素放入插入队列中,插入后队列会自动进行排序,这样就会实现全局结果一个一个顺序被取出

DAL 结果集合并引擎

DALMergeEngine的merge()方法:

    public MergedResult merge() throws SQLException {
        SQLStatement dalStatement = sqlStatementContext.getSqlStatement();
        if (dalStatement instanceof ShowDatabasesStatement) {
            return new SingleLocalDataMergedResult(Collections.<Object>singletonList(ShardingConstant.LOGIC_SCHEMA_NAME));
        }
        if (dalStatement instanceof ShowTablesStatement || dalStatement instanceof ShowTableStatusStatement || dalStatement instanceof ShowIndexStatement) {
            return new LogicTablesMergedResult(shardingRule, sqlStatementContext, relationMetas, queryResults);
        }
        if (dalStatement instanceof ShowCreateTableStatement) {
            return new ShowCreateTableMergedResult(shardingRule, sqlStatementContext, relationMetas, queryResults);
        }
        return new TransparentMergedResult(queryResults.get(0));
    }

如果是显示数据库的语句,就进行单个本地数据的合并,

如果是显示表的语句或者显示表状态的语句或者显示索引的语句,就进行逻辑表合并

如果是显示创建表的语句,就显示创建表的合并。

否则进行透明结果合并,也就是不进行处理

透明的结果集合并引擎

这里就是不需要对结果集进行处理,直接遍历取结果集

总结

这篇文章我们讲了ShardingSphere的归并引擎,从它的工厂类MergeEngineFactory入手,创建归并引擎实例的时候,会根据SQL语句创建不同类型的归并引擎,在进行归并处理的时候调用的是归并引擎的merge()方法,我们重点讲述了这三类引擎的合并方法

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。