🦊 Welcome to Kyuubi’s online documentation ✨, v1.9.0-SNAPSHOT

Auxiliary Optimization Rules

Auxiliary Optimization Rules#

Kyuubi provides SQL extension out of box. Due to the version compatibility with Apache Spark, currently we support Apache Spark branch-3.1 and later. And don’t worry, Kyuubi will support the new Apache Spark version in the future. Thanks to the adaptive query execution framework (AQE), Kyuubi can do these optimizations.

Features#

  • merging small files automatically

    Small files is a long time issue with Apache Spark. Kyuubi can merge small files by adding an extra shuffle. Currently, Kyuubi supports handle small files with datasource table and hive table, and also Kyuubi support optimize dynamic partition insertion. For example, a common write query INSERT INTO TABLE $table1 SELECT * FROM $table2, Kyuubi will introduce an extra shuffle before write and then the small files will go away.

  • insert shuffle node before Join to make AQE OptimizeSkewedJoin work

    In current implementation, Apache Spark can only optimize skewed join by the standard join which means a join must have two sort and shuffle node. However, in complex scenario this assuming will be broken easily. Kyuubi can guarantee the join is standard by adding an extra shuffle node before join. So that, OptimizeSkewedJoin can work better.

  • stage level config isolation in AQE

    As we know, spark.sql.adaptive.advisoryPartitionSizeInBytes is a key config in Apache Spark AQE. It controls how big data size per-task should handle during shuffle, so we always use a 64MB or a smaller value to make parallelism enough. However, in general, we expect a file is big enough like 256MB or 512MB. Kyuubi can make the config isolation to solve the conflict so that we can make staging partition data size small and last partition data size big.

Usage#

Kyuubi Spark SQL extension Supported Spark version(s) Available since EOL Bundled in Binary release tarball Maven profile
kyuubi-extension-spark-3-1 3.1.x 1.3.0-incubating N/A 1.3.0-incubating spark-3.1
kyuubi-extension-spark-3-2 3.2.x 1.4.0-incubating N/A 1.4.0-incubating spark-3.2
kyuubi-extension-spark-3-3 3.3.x 1.6.0-incubating N/A 1.6.0-incubating spark-3.3
  1. Check the matrix that if you are using the supported Spark version, and find the corresponding Kyuubi Spark SQL Extension jar

  2. Get the Kyuubi Spark SQL Extension jar

    1. Each Kyuubi binary release tarball only contains one default version of Kyuubi Spark SQL Extension jar, if you are looking for such version, you can find it under $KYUUBI_HOME/extension

    2. All supported versions of Kyuubi Spark SQL Extension jar will be deployed to Maven Central

    3. If you like, you can compile Kyuubi Spark SQL Extension jar by yourself, please activate the corresponding Maven’s profile on you compile command, i.e. you can get Kyuubi Spark SQL Extension jar for Spark 3.1 under extensions/spark/kyuubi-extension-spark-3-1/target when compile with -Pspark-3.1

  3. Put the Kyuubi Spark SQL extension jar kyuubi-extension-spark-*.jar into $SPARK_HOME/jars

  4. Enable KyuubiSparkSQLExtension, i.e. add a config into $SPARK_HOME/conf/spark-defaults.conf, spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension

Now, you can enjoy the Kyuubi SQL Extension.

Additional Configurations#

Kyuubi provides some configs to make these feature easy to use.

Name Default Value Description Since
spark.sql.optimizer.insertRepartitionBeforeWrite.enabled true Add repartition node at the top of query plan. An approach of merging small files. 1.2.0
spark.sql.optimizer.insertRepartitionNum none The partition number if spark.sql.optimizer.insertRepartitionBeforeWrite.enabled is enabled. If AQE is disabled, the default value is spark.sql.shuffle.partitions. If AQE is enabled, the default value is none that means depend on AQE. This config is used for Spark 3.1 only. 1.2.0
spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum 100 The partition number of each dynamic partition if spark.sql.optimizer.insertRepartitionBeforeWrite.enabled is enabled. We will repartition by dynamic partition columns to reduce the small file but that can cause data skew. This config is to extend the partition of dynamic partition column to avoid skew but may generate some small files. 1.2.0
spark.sql.optimizer.forceShuffleBeforeJoin.enabled false Ensure shuffle node exists before shuffled join (shj and smj) to make AQE OptimizeSkewedJoin works (complex scenario join, multi table join). 1.2.0
spark.sql.optimizer.finalStageConfigIsolation.enabled false If true, the final stage support use different config with previous stage. The prefix of final stage config key should be spark.sql.finalStage.. For example, the raw spark config: spark.sql.adaptive.advisoryPartitionSizeInBytes, then the final stage config should be: spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes. 1.2.0
spark.sql.analyzer.classification.enabled false When true, allows Kyuubi engine to judge this SQL's classification and set spark.sql.analyzer.classification back into sessionConf. Through this configuration item, Spark can optimizing configuration dynamic. 1.4.0
spark.sql.optimizer.insertZorderBeforeWriting.enabled true When true, we will follow target table properties to insert zorder or not. The key properties are: 1) kyuubi.zorder.enabled: if this property is true, we will insert zorder before writing data. 2) kyuubi.zorder.cols: string split by comma, we will zorder by these cols. 1.4.0
spark.sql.optimizer.zorderGlobalSort.enabled true When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder. 1.4.0
spark.sql.watchdog.maxPartitions none Set the max partition number when spark scans a data source. Enable maxPartition Strategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined 1.4.0
spark.sql.watchdog.maxFileSize none Set the maximum size in bytes of files when spark scans a data source. Enable maxFileSize Strategy by specifying this configuration. Add maxFileSize Strategy to avoid scan excessive size of files, it's optional that works with defined 1.8.0
spark.sql.optimizer.dropIgnoreNonExistent false When true, do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a non-existent database/table/view/function/partition 1.5.0
spark.sql.optimizer.rebalanceBeforeZorder.enabled false When true, we do a rebalance before zorder in case data skew. Note that, if the insertion is dynamic partition we will use the partition columns to rebalance. Note that, this config only affects with Spark 3.3.x. 1.6.0
spark.sql.optimizer.rebalanceZorderColumns.enabled false When true and spark.sql.optimizer.rebalanceBeforeZorder.enabled is true, we do rebalance before Z-Order. If it's dynamic partition insert, the rebalance expression will include both partition columns and Z-Order columns. Note that, this config only affects with Spark 3.3.x. 1.6.0
spark.sql.optimizer.twoPhaseRebalanceBeforeZorder.enabled false When true and spark.sql.optimizer.rebalanceBeforeZorder.enabled is true, we do two phase rebalance before Z-Order for the dynamic partition write. The first phase rebalance using dynamic partition column; The second phase rebalance using dynamic partition column Z-Order columns. Note that, this config only affects with Spark 3.3.x. 1.6.0
spark.sql.optimizer.zorderUsingOriginalOrdering.enabled false When true and spark.sql.optimizer.rebalanceBeforeZorder.enabled is true, we do sort by the original ordering i.e. lexicographical order. Note that, this config only affects with Spark 3.3.x. 1.6.0
spark.sql.optimizer.inferRebalanceAndSortOrders.enabled false When ture, infer columns for rebalance and sort orders from original query, e.g. the join keys from join. It can avoid compression ratio regression. 1.7.0
spark.sql.optimizer.inferRebalanceAndSortOrdersMaxColumns 3 The max columns of inferred columns. 1.7.0
spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled false When true, add repartition even if the original plan does not have shuffle. 1.7.0
spark.sql.optimizer.finalStageConfigIsolationWriteOnly.enabled true When true, only enable final stage isolation for writing. 1.7.0
spark.sql.finalWriteStage.eagerlyKillExecutors.enabled false When true, eagerly kill redundant executors before running final write stage. 1.8.0
spark.sql.finalWriteStage.skipKillingExecutorsForTableCache true When true, skip killing executors if the plan has table caches. 1.8.0
spark.sql.finalWriteStage.retainExecutorsFactor 1.2 If the target executors * factor < active executors, and target executors * factor > min executors, then inject kill executors or inject custom resource profile. 1.8.0
spark.sql.finalWriteStage.resourceIsolation.enabled false When true, make final write stage resource isolation using custom RDD resource profile. 1.8.0
spark.sql.finalWriteStageExecutorCores fallback spark.executor.cores Specify the executor core request for final write stage. It would be passed to the RDD resource profile. 1.8.0
spark.sql.finalWriteStageExecutorMemory fallback spark.executor.memory Specify the executor on heap memory request for final write stage. It would be passed to the RDD resource profile. 1.8.0
spark.sql.finalWriteStageExecutorMemoryOverhead fallback spark.executor.memoryOverhead Specify the executor memory overhead request for final write stage. It would be passed to the RDD resource profile. 1.8.0
spark.sql.finalWriteStageExecutorOffHeapMemory NONE Specify the executor off heap memory request for final write stage. It would be passed to the RDD resource profile. 1.8.0
spark.sql.execution.scriptTransformation.enabled true When false, script transformation is not allowed. 1.9.0