Apache CarbonData 2.0 Preview（关键特性提前预览）
作者：陈亮，Apache CarbonData项目 PMC Chairman，技术Committer
1. Support pre priming cache in Index cache server:
Use Case: CarbonData has a mechanism to load the min/max index cache into memory on the first query that is executed on the specified table. This causes degrade to the query performance. To improve the performance of the First time query, the user can enable prepriming feature which will load the min/max cache into memory on each load.
2. Carbon Extension for Spark 2.4, without Carbon Session
Use Case: Due to the tight integration of carbon with computer engine spark, Carbon requires CarbonSession to be created instead of SparkSession,
To make the integration layer modular, CarbonData now supports the SparkSessionExtention API which enables carbon to integrate its parser and optimizer to the existing SparkSession.
val spark = SparkSession .builder() .config(conf) .master("spark://localhost:7077") .appName("Test") .enableHiveSupport() .config("spark.sql.warehouse.dir", "./warehouse") .config("spark.sql.extensions", "org.apache.spark.sql.CarbonExtensions") .getOrCreate() spark.sql("""CREATE TABLE IF NOT EXISTS test_table ( id string, name string, city string, age Int) STORED AS carbondata""")
3. MV Time-series support with Rollup support, multiple granularity
UseCase: Analytics data such as application performance monitoring, network data, sensor data, events, clicks, banking, server metrics, etc., has to be aggregated and analyzed or monitored over a period of time for business needs. CarbonData supports pre-computation of aggregations and joins through Materialized views which provides faster performance results, timeseries support is required for many users.
spark.sql("""CREATE TABLE sales ( order_time timestamp, user_id string, xingbie string, country string, quantity int, price bigint) STORED AS carbondata""") spark.sql(""" CREATE MATERIALIZED VIEW agg_sales SELECT timeseries(order_time, 'minute'),avg(price) FROM sales GROUP BY timeseries(order_time, 'minute') """) spark.sql(""" SELECT timeseries(order_time,'minute'), avg(price) FROM sales GROUP BY timeseries(order_time,'minute') """)
4. Supports the spatial index Data-map
UseCase: For queries which require a filter on a spatial object like a region on a 2D map, these type of queries would be treated as a full scan query, causing significant performance degrade.To solve this limitation in carbon, a concept called as ‘spatial indexing’, that allows for accessing a spatial object efficiently is implemented. It is a common technique used by spatial databases.
spark.sql("""create table source_index(id BIGINT, latitude long, longitude long) stored by 'carbondata' TBLPROPERTIES ( 'INDEX_HANDLER'='mygeohash', 'INDEX_HANDLER.mygeohash.type'='geohash', 'INDEX_HANDLER.mygeohash.sourcecolumns'='longitude, latitude', 'INDEX_HANDLER.mygeohash.originLatitude'='19.832277', 'INDEX_HANDLER.mygeohash.gridSize'='50', 'INDEX_HANDLER.mygeohash.minLongitude'='1.811865', 'INDEX_HANDLER.mygeohash.maxLongitude'='2.782233', 'INDEX_HANDLER.mygeohash.minLatitude'='19.832277', 'INDEX_HANDLER.mygeohash.maxLatitude'='20.225281', 'INDEX_HANDLER.mygeohash.conversionRatio'='1000000') """) spark.sql(""" select * from source_index where IN_POLYGON('16.321011 4.123503,16.137676 5.947911,16.560993 5.935276,16.321011 4.123503') """)
Get more about usage:
5. Support Secondary Index
Use Case: When unsorted data is stored in carbon the pruning process tends to give false positives when comparing min/max. For example a blocklet might have 3,5,8 integer values in it which means the min=3 and max=8. If the user has a filter expression with the value 4 then the pruning process will give a false report that this blocklet will have data and the reading flow should decompress this page and read the contents. This would lead to unnecessary IO finally resulting in a performance degrade.
To improve the query performance, the Secondary Index has been designed on the existing min/max architecture which is basically a reverse index of the unique data to the blocklets it is present in. This will give the exact location of the data so that false positive scenarios during pruning are minimized.
spark.sql("""CREATE TABLE sales (order_time timestamp, user_id string, xingbie string, country string, quantity int, price bigint) STORED AS carbondata """) spark.sql("""CREATE INDEX index_sales ON TABLE sales(user_id) AS 'carbondata' """)
Get more about usage:
6. Support CDC merge functionality
In the current data warehouse world slowly changing dimensions (SCD) and change data capture(CDC) are very common scenarios. Legacy systems like RDBMS can handle these scenarios very well because of the support of transactions.
To keep up with the existing database technologies, CarbonData now supports CDC and SCD functionalities.
initframe.write .format("carbondata").option("tableName", "order").mode(SaveMode.Overwrite).save() val dwframe = sqlContext.read.format("carbondata").option("tableName", "order").load() val dwSelframe = dwframe.as("A") val updateMap = Map("id" -> "A.id", "name" -> "B.name", "c_name" -> "B.c_name", "quantity" -> "B.quantity", "price" -> "B.price", "state" -> "B.state").asInstanceOf[Map[Any, Any]] dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched( col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
Get more about usage:
7. Support Flink streaming write to CarbonData
Use Case: Carbonata needs to be integrated with fault-tolerant streaming dataflow engines like Apache Flink, where users can build a flink streaming job and use flink sink to write data to carbon through CarbonSDK. Flink sink will generate table stage files, data from stage files can be inserted to the carbon table by carbon Insert stage command, by making them visible for query.
spark.sql(""" CREATE TABLE test_flink (stringField string, intField int, shortField short) STORED AS carbondata """) // create flink streaming environment StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment() environment.setParallelism(1) environment.enableCheckpointing(2000L) environment.setRestartStrategy(RestartStrategies.noRestart()) DataStreamSource<OUT> stream = environment.addSource(////DataSource like Kafka/////) // create carbon sdk writer factory with LOCAL/S3/OBS builder CarbonWriterFactory factory = CarbonWriterFactory.builder("Local").build(dbName,tableName,tablePath, tableProperties,writerProperties,carbonProperties) // create stream sink and add it to stream StreamingFileSink<IN> streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build() stream.addSink(streamSink) // execute flink streaming job which generate’s stage files environment.execute()
Get more about usage:
8: Add segment
Use Case: Many users have already generated data with different formats like ORC, Parquet, JSON, CSV etc. If users want to migrate to CarbonData for better performance or for better features(SDK) then there was no mechanism. All the existing data had to be converted to CarbonData to migrate.
To solve this limitation, add segment is introduced so that the user can easily add segments of different formats to a carbon table and run the queries.
alter table test add segment options ('path'='hdfs://usr/oldtable','format'='parquet')
Get more about usage:
9. Hive leverage the index for query performance improvement
UseCase: Hive expression has to be pushed down to carbon to filter data which improves the query performance.
Usage/Example: When set hive.optimize.index.filter = true, hive expression can be pushed down to carbon to filter the data.
10. Hive Write support
Use Case: CarbonData now supports write and read from Hive execution engine. It will be helpful for users who want to try carbon without migrating to spark. Also, users can now convert their existing parquet/orc table directly to carbon format for ETL purposes.
CREATE TABLE hive_carbon_table(shortField SMALLINT , intField INT, bigintField BIGINT , doubleField DOUBLE, stringField STRING, timestampField TIMESTAMP, decimalField DECIMAL(18,2), dateField DATE, charField CHAR(5), floatField FLOAT) stored by 'org.apache.carbondata.hive.CarbonStorageHandler'
11. Support prestodb-0.217 and prestosql-316
Use Case: Currently presto has two community, presto db and presto sql. To support CarbonData for users of both the community, now carbon supports prestodb-0.217 and prestosql-316.
Usage: By default prestosql profile is selected for maven build. User can use prestodb profile to build CarbonData for prestodb.
12) Insert into performance improvement
Use Case: Currently insert and load command have a common code flow, which includes many overheads to insert command because features like BadRecords are not required by the insert command.
Now load and insert flow have been separated and some additional optimizations are implemented to insert command such as,
1. Rearrange projections instead of rows.
2. Use internal row instead row object from spark.
It is observed that these optimization resulted in 40% insert performance improvement for TPCH data.
13) Optimize Bucket Table
Use Case: Bucketing feature is used to distribute/organize the table/partition data into multiple files such that similar records are present in the same file. Join operation on datasets will cause a large volume of data shuffling making it quite slow, which can be avoided on bucket columns. Bucket tables have been made consistent with spark to improve the join performance by avoiding shuffle for the bucket column.
spark.sql("""CREATE TABLE bucket_table1 (stringField string, intField int, shortField short) STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='stringField') """) spark.sql(""" CREATE TABLE bucket_table2 (stringField string, intField int, shortField short) STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='stringField') """)
14) pycarbon support
Use Case: CarbonData now provides python API(PyCarbon) support for integrating it with AI frameworks like TensorFlow, PyTorch, MXNet. By using PyCarbon, AI framework should be able to read training data faster by leveraging CarbonData's indexing and caching ability. Since CarbonData is a columnar storage, AI developers should also be able to perform projection and filtering to pick required data for training efficiently.
Please refer the below link to use pycarbon https://github.com/apache/carbondata/blob/master/python/README.md
15) Materialized view on all table such as Parquet and ORC
Use Case: CarbonData’s datamap interface can be used to improve the query performance of other formats like Parquet/ORC. One of the implementations of datamap interface is MV table which precompute the aggregation results based on the user input. By creating MV datamap on a parquet/orc table the user can get the benefit of quering a pre-computed data instead of raw data which results in better query results.
This is possible as carbon will redirect the query to the MV datamap instead of the parquet tables.
Spark.sql(""" create table source(empname String, designation String, deptno int, deptname String, salary int) using parquet """) Spark.sql(""" create materialized view mv_parquet as select empname, deptname, avg(salary) from source group by empname, deptname """)