Auxiliary Optimization Rules
Auxiliary Optimization Rules#
Kyuubi provides SQL extension out of box. Due to the version compatibility with Apache Spark, currently we only support Apache Spark branch-3.1 (i.e 3.1.1 and 3.1.2). And don’t worry, Kyuubi will support the new Apache Spark version in the future. Thanks to the adaptive query execution framework (AQE), Kyuubi can do these optimizations.
merging small files automatically
Small files is a long time issue with Apache Spark. Kyuubi can merge small files by adding an extra shuffle. Currently, Kyuubi supports handle small files with datasource table and hive table, and also Kyuubi support optimize dynamic partition insertion. For example, a common write query
INSERT INTO TABLE $table1 SELECT * FROM $table2, Kyuubi will introduce an extra shuffle before write and then the small files will go away.
insert shuffle node before Join to make AQE OptimizeSkewedJoin work
In current implementation, Apache Spark can only optimize skewed join by the standard join which means a join must have two sort and shuffle node. However, in complex scenario this assuming will be broken easily. Kyuubi can guarantee the join is standard by adding an extra shuffle node before join. So that, OptimizeSkewedJoin can work better.
stage level config isolation in AQE
As we know,
spark.sql.adaptive.advisoryPartitionSizeInBytesis a key config in Apache Spark AQE. It controls how big data size per-task should handle during shuffle, so we always use a 64MB or a smaller value to make parallelism enough. However, in general, we expect a file is big enough like 256MB or 512MB. Kyuubi can make the config isolation to solve the conflict so that we can make staging partition data size small and last partition data size big.
|Kyuubi Spark SQL extension||Supported Spark version(s)||Available since||EOL||Bundled in Binary release tarball||Maven profile|
Check the matrix that if you are using the supported Spark version, and find the corresponding Kyuubi Spark SQL Extension jar
Get the Kyuubi Spark SQL Extension jar
Each Kyuubi binary release tarball only contains one default version of Kyuubi Spark SQL Extension jar, if you are looking for such version, you can find it under
All supported versions of Kyuubi Spark SQL Extension jar will be deployed to Maven Central
If you like, you can compile Kyuubi Spark SQL Extension jar by yourself, please activate the corresponding Maven’s profile on you compile command, i.e. you can get Kyuubi Spark SQL Extension jar for Spark 3.1 under
extensions/spark/kyuubi-extension-spark-3-1/targetwhen compile with
Put the Kyuubi Spark SQL extension jar
KyuubiSparkSQLExtension, i.e. add a config into
Now, you can enjoy the Kyuubi SQL Extension.
Kyuubi provides some configs to make these feature easy to use.
|spark.sql.optimizer.insertRepartitionBeforeWrite.enabled||true||Add repartition node at the top of query plan. An approach of merging small files.||1.2.0|
|spark.sql.optimizer.insertRepartitionNum||none||The partition number if
|spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum||100||The partition number of each dynamic partition if
|spark.sql.optimizer.forceShuffleBeforeJoin.enabled||false||Ensure shuffle node exists before shuffled join (shj and smj) to make AQE
|spark.sql.optimizer.finalStageConfigIsolation.enabled||false||If true, the final stage support use different config with previous stage. The prefix of final stage config key should be
|spark.sql.analyzer.classification.enabled||false||When true, allows Kyuubi engine to judge this SQL's classification and set
|spark.sql.optimizer.insertZorderBeforeWriting.enabled||true||When true, we will follow target table properties to insert zorder or not. The key properties are: 1)
|spark.sql.optimizer.zorderGlobalSort.enabled||true||When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder.||1.4.0|
|spark.sql.watchdog.maxPartitions||none||Set the max partition number when spark scans a data source. Enable MaxPartitionStrategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined||1.4.0|
|spark.sql.optimizer.dropIgnoreNonExistent||false||When true, do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a non-existent database/table/view/function/partition||1.5.0|