TiSpark User Guide

TiSpark architecture

TiSpark vs TiFlash

TiSpark is a thin layer built for running Apache Spark on top of TiDB/TiKV to answer the complex OLAP queries. It takes advantages of both the Spark platform and the distributed TiKV cluster and seamlessly glues to TiDB, the distributed OLTP database, to provide a Hybrid Transactional/Analytical Processing (HTAP) solution to serve as a one-stop solution for both online transactions and analysis.

TiFlash is another tool that enables HTAP. Both TiFlash and TiSpark allow the use of multiple hosts to execute OLAP queries on OLTP data. TiFlash stores data in a columnar format, which allows more efficient analytical queries. TiFlash and TiSpark can be used together.

What is TiSpark

TiSpark depends on the TiKV cluster and the PD cluster. You also need to set up a Spark cluster. This document provides a brief introduction to how to setup and use TiSpark. It requires some basic knowledge of Apache Spark. For more information, see Apache Spark website.

Deeply integrating with Spark Catalyst Engine, TiSpark provides precise control on computing. This allows Spark to read data from TiKV efficiently. TiSpark also supports index seek, which enables high-speed point query. TiSpark accelerates data queries by pushing computing to TiKV so as to reduce the volume of data to be processed by Spark SQL. Meanwhile, TiSpark can use TiDB built-in statistics to select the best query plan.

With TiSpark and TiDB, you can run both transaction and analysis tasks on the same platform without building and maintaining ETLs. This simplifies the system architecture and reduces the cost of maintenance.

You can use tools of the Spark ecosystem for data processing on TiDB:

  • TiSpark: Data analysis and ETLs
  • TiKV: Data retrieval
  • Scheduling system: Report generation

Also, TiSpark supports distributed writes to TiKV. Compared with writes to TiDB by using Spark and JDBC, distributed writes to TiKV can implement transactions (either all data are written successfully or all writes fail), and the writes are faster.

Requirements

  • TiSpark supports Spark >= 2.3.
  • TiSpark requires JDK 1.8 and Scala 2.11/2.12.
  • TiSpark runs in any Spark mode such as YARN, Mesos, and Standalone.

Since TiSpark is a TiDB connector of Spark, to use it, a running Spark cluster is required.

This document provides basic advice on deploying Spark. Please turn to the Spark official website for detailed hardware recommendations.

For independent deployment of Spark cluster:

  • It is recommended to allocate 32 GB memory for Spark. Reserve at least 25% of the memory for the operating system and the buffer cache.
  • It is recommended to provision at least 8 to 16 cores per machine for Spark. First, you must assign all the CPU cores to Spark.

The following is an example based on the spark-env.sh configuration:

SPARK_EXECUTOR_MEMORY = 32g SPARK_WORKER_MEMORY = 32g SPARK_WORKER_CORES = 8

Get TiSpark

TiSpark is a third-party jar package for Spark that provides the ability to read and write TiKV.

Get mysql-connector-j

The mysql-connector-java dependency is no longer provided because of the limit of the GPL license.

The following versions of TiSpark's jar will no longer include mysql-connector-java.

  • TiSpark > 3.0.1
  • TiSpark > 2.5.1 for TiSpark 2.5.x
  • TiSpark > 2.4.3 for TiSpark 2.4.x

However, TiSpark needs mysql-connector-java for writing and authentication. In such cases, you need to import mysql-connector-java manually using either of the following methods:

  • Put mysql-connector-java into spark jars file.

  • Import mysql-connector-java when you submit a spark job. See the following example:

spark-submit --jars tispark-assembly-3.0_2.12-3.1.0-SNAPSHOT.jar,mysql-connector-java-8.0.29.jar

Choose TiSpark version

You can choose TiSpark version according to your TiDB and Spark version.

TiSpark versionTiDB, TiKV, PD versionSpark versionScala version
2.4.x-scala_2.115.x, 4.x2.3.x, 2.4.x2.11
2.4.x-scala_2.125.x, 4.x2.4.x2.12
2.5.x5.x, 4.x3.0.x, 3.1.x2.12
3.0.x5.x, 4.x3.0.x, 3.1.x, 3.2.x2.12
3.1.x6.x, 5.x, 4.x3.0.x, 3.1.x, 3.2.x, 3.3.x2.12

TiSpark 2.4.4, 2.5.2, 3.0.2 and 3.1.1 are the latest stable versions and are highly recommended.

Get TiSpark jar

You can get the TiSpark jar using one of the following methods:

git clone https://github.com/pingcap/tispark.git

Run the following command under the TiSpark root directory.

// add -Dmaven.test.skip=true to skip the tests mvn clean install -Dmaven.test.skip=true // or you can add properties to specify spark version mvn clean install -Dmaven.test.skip=true -Pspark3.2.1

TiSpark jar's artifact ID

The Artifact ID of TiSpark varies with TiSpark versions.

TiSpark versionArtifact ID
2.4.x-${scala_version}, 2.5.0tispark-assembly
2.5.1tispark-assembly-${spark_version}
3.0.x, 3.1.xtispark-assembly-${spark_version}-${scala_version}

Getting started

This document describes how to use TiSpark in spark-shell.

Start spark-shell

To use TiSpark in spark-shell:

Add the following configuration in spark-defaults.conf:

spark.sql.extensions org.apache.spark.sql.TiExtensions spark.tispark.pd.addresses ${your_pd_address} spark.sql.catalog.tidb_catalog org.apache.spark.sql.catalyst.catalog.TiCatalog spark.sql.catalog.tidb_catalog.pd.addresses ${your_pd_address}

Start spark-shell with the --jars option.

spark-shell --jars tispark-assembly-{version}.jar

Get TiSpark version

You can get TiSpark version information by running the following command in spark-shell:

spark.sql("select ti_version()").collect

Read data using TiSpark

You can use Spark SQL to read data from TiKV.

spark.sql("use tidb_catalog") spark.sql("select count(*) from ${database}.${table}").show

Write data using TiSpark

You can use the Spark DataSource API to write data to TiKV, for which ACID is guaranteed.

val tidbOptions: Map[String, String] = Map( "tidb.addr" -> "127.0.0.1", "tidb.password" -> "", "tidb.port" -> "4000", "tidb.user" -> "root" ) val customerDF = spark.sql("select * from customer limit 100000") customerDF.write .format("tidb") .option("database", "tpch_test") .option("table", "cust_test_select") .options(tidbOptions) .mode("append") .save()

See Data Source API User Guide for more details.

You can also write with Spark SQL since TiSpark 3.1. See insert SQL for more details.

Write data using JDBC DataSource

You can also use Spark JDBC to write to TiDB without the use of TiSpark.

This is beyond the scope of TiSpark. This document only provides an example here. For detailed information, see JDBC To Other Databases.

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions val customer = spark.sql("select * from customer limit 100000") // you might need to repartition the source to make it balanced across nodes // and increase concurrency val df = customer.repartition(32) df.write .mode(saveMode = "append") .format("jdbc") .option("driver", "com.mysql.jdbc.Driver") // replace the host and port with yours and be sure to use rewrite batch .option("url", "jdbc:mysql://127.0.0.1:4000/test?rewriteBatchedStatements=true") .option("useSSL", "false") // as tested, setting to `150` is a good practice .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 150) .option("dbtable", s"cust_test_select") // database name and table name here .option("isolationLevel", "NONE") // set isolationLevel to NONE .option("user", "root") // TiDB user here .save()

Set isolationLevel to NONE to avoid large single transactions which might lead to TiDB OOM and also avoid the ISOLATION LEVEL does not support error (TiDB currently only supports REPEATABLE-READ).

Delete data using TiSpark

You can use Spark SQL to delete data from TiKV.

spark.sql("use tidb_catalog") spark.sql("delete from ${database}.${table} where xxx")

See delete feature for more details.

Work with other data sources

You can use multiple catalogs to read data from different data sources as follows:

// Read from Hive spark.sql("select * from spark_catalog.default.t").show // Join Hive tables and TiDB tables spark.sql("select t1.id,t2.id from spark_catalog.default.t t1 left join tidb_catalog.test.t t2").show

TiSpark configurations

The configurations in the following table can be put together with spark-defaults.conf or passed in the same way as other Spark configuration properties.

KeyDefault valueDescription
spark.tispark.pd.addresses127.0.0.1:2379The addresses of PD clusters, which are split by commas.
spark.tispark.grpc.framesize2147483647The maximum frame size of gRPC response in bytes (default to 2G).
spark.tispark.grpc.timeout_in_sec10The gRPC timeout time in seconds.
spark.tispark.plan.allow_agg_pushdowntrueWhether aggregations are allowed to push down to TiKV (in case of busy TiKV nodes).
spark.tispark.plan.allow_index_readtrueWhether index is enabled in planning (which might cause heavy pressure on TiKV).
spark.tispark.index.scan_batch_size20000The number of row keys in a batch for the concurrent index scan.
spark.tispark.index.scan_concurrency5The maximum number of threads for index scan that retrieves row keys (shared among tasks inside each JVM).
spark.tispark.table.scan_concurrency512The maximum number of threads for table scan (shared among tasks inside each JVM).
spark.tispark.request.command.priorityLowThe value options are Low, Normal, High. This setting impacts the resources allocated in TiKV. Low is recommended because the OLTP workload is not disturbed.
spark.tispark.coprocess.codec_formatchblockRetain the default codec format for coprocessor. Available options are default, chblock and chunk.
spark.tispark.coprocess.streamingfalseWhether to use streaming for response fetching (experimental).
spark.tispark.plan.unsupported_pushdown_exprsA comma-separated list of expressions. In case you have a very old version of TiKV, you might disable the push down of some expressions if they are not supported.
spark.tispark.plan.downgrade.index_threshold1000000000If the range of index scan on one Region exceeds this limit in the original request, downgrade this Region's request to table scan rather than the planned index scan. By default, the downgrade is disabled.
spark.tispark.show_rowidfalseWhether to show row ID if the ID exists.
spark.tispark.db_prefixThe string that indicates the extra prefix for all databases in TiDB. This string distinguishes the databases in TiDB from the Hive databases with the same name.
spark.tispark.request.isolation.levelSIWhether to resolve locks for the underlying TiDB clusters. When you use the "RC", you get the latest version of the record smaller than your tso and ignore the locks. When you use "SI", you resolve the locks and get the records depending on whether the resolved lock is committed or aborted.
spark.tispark.coprocessor.chunk_batch_size1024Rows fetched from coprocessor.
spark.tispark.isolation_read_enginestikv,tiflashList of readable engines of TiSpark, comma separated. Storage engines not listed will not be read.
spark.tispark.stale_readoptionalThe stale read timestamp(ms). See here for more details.
spark.tispark.tikv.tls_enablefalseWhether to enable TiSpark TLS.  
spark.tispark.tikv.trust_cert_collectionThe trusted certificate for TiKV Client, used for verifying the remote PD's certificate, for example, /home/tispark/config/root.pem The file should contain an X.509 certificate collection.
spark.tispark.tikv.key_cert_chainAn X.509 certificate chain file for TiKV Client, for example, /home/tispark/config/client.pem.
spark.tispark.tikv.key_fileA PKCS#8 private key file for TiKV Client, for example, /home/tispark/client_pkcs8.key.
spark.tispark.tikv.jks_enablefalseWhether to use the JAVA key store instead of the X.509 certificate.
spark.tispark.tikv.jks_trust_pathA JKS format certificate for TiKV Client, generated by keytool, for example, /home/tispark/config/tikv-truststore.
spark.tispark.tikv.jks_trust_passwordThe password of spark.tispark.tikv.jks_trust_path.
spark.tispark.tikv.jks_key_pathA JKS format key for TiKV Client, generated by keytool, for example, /home/tispark/config/tikv-clientstore.
spark.tispark.tikv.jks_key_passwordThe password of spark.tispark.tikv.jks_key_path.
spark.tispark.jdbc.tls_enablefalseWhether to enable TLS when using the JDBC connector.
spark.tispark.jdbc.server_cert_storeThe trusted certificate for JDBC. It is a Java keystore (JKS) format certificate generated by keytool, for example, /home/tispark/config/jdbc-truststore. The default value is "", which means TiSpark does not verify the TiDB server.
spark.tispark.jdbc.server_cert_passwordThe password of spark.tispark.jdbc.server_cert_store.
spark.tispark.jdbc.client_cert_storeA PKCS#12 certificate for JDBC. It is a JKS format certificate generated by keytool, for example, /home/tispark/config/jdbc-clientstore. Default is "", which means TiDB server doesn't verify TiSpark.
spark.tispark.jdbc.client_cert_passwordThe password of spark.tispark.jdbc.client_cert_store.
spark.tispark.tikv.tls_reload_interval10sThe interval for checking if there is any reloading certificates. The default value is 10s (10 seconds).
spark.tispark.tikv.conn_recycle_time60sThe interval for cleaning expired connections with TiKV. It takes effect only when certificate reloading is enabled. The default value is 60s (60 seconds).
spark.tispark.host_mappingThe route map used to configure the mapping between public IP addresses and intranet IP addresses. When the TiDB cluster is running on the intranet, you can map a set of intranet IP addresses to public IP addresses for an outside Spark cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, for example, 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9.
spark.tispark.new_collation_enableWhen new collation is enabled on TiDB, this configuration can be set to true. If new collation is not enabled on TiDB, this configuration can be set to false. If this item is not configured, TiSpark configures new collation automatically based on the TiDB version. The configuration rule is as follows: If the TiDB version is greater than or equal to v6.0.0, it is true; otherwise, it is false.
spark.tispark.replica_readleaderThe type of the replica to read. Value options are leader, follower, and learner. Multiple types can be specified at the same time and TiSpark selects the type according to the order.
spark.tispark.replica_read.labelThe label of the target TiKV node. The format is label_x=value_x,label_y=value_y, and the items are connected by logical conjunction.

TLS configurations

TiSpark TLS has two parts: TiKV Client TLS and JDBC connector TLS. To enable TLS in TiSpark, you need to configure both. spark.tispark.tikv.xxx is used for TiKV Client to create a TLS connection with PD and TiKV server. spark.tispark.jdbc.xxx is used for JDBC to connect with TiDB server in TLS connection.

When TiSpark TLS is enabled, you must configure either the X.509 certificate with tikv.trust_cert_collection, tikv.key_cert_chain and tikv.key_file configurations, or the JKS certificate with tikv.jks_enable, tikv.jks_trust_path and tikv.jks_key_path. jdbc.server_cert_store and jdbc.client_cert_store are optional.

TiSpark only supports TLSv1.2 and TLSv1.3.

  • The following is an example of opening TLS configuration with the X.509 certificate in TiKV Client.
spark.tispark.tikv.tls_enable true spark.tispark.tikv.trust_cert_collection /home/tispark/root.pem spark.tispark.tikv.key_cert_chain /home/tispark/client.pem spark.tispark.tikv.key_file /home/tispark/client.key
  • The following is an example of enabling TLS with JKS configurations in TiKV Client.
spark.tispark.tikv.tls_enable true spark.tispark.tikv.jks_enable true spark.tispark.tikv.jks_key_path /home/tispark/config/tikv-truststore spark.tispark.tikv.jks_key_password tikv_trustore_password spark.tispark.tikv.jks_trust_path /home/tispark/config/tikv-clientstore spark.tispark.tikv.jks_trust_password tikv_clientstore_password

When both JKS and X.509 certificates are configured, JKS would have a higher priority. That means TLS builder will use JKS certificate first. Therefore, do not set spark.tispark.tikv.jks_enable=true when you just want to use a common PEM certificate.

  • The following is an example of enabling TLS in JDBC connector.
spark.tispark.jdbc.tls_enable true spark.tispark.jdbc.server_cert_store /home/tispark/jdbc-truststore spark.tispark.jdbc.server_cert_password jdbc_truststore_password spark.tispark.jdbc.client_cert_store /home/tispark/jdbc-clientstore spark.tispark.jdbc.client_cert_password jdbc_clientstore_password

Log4j configuration

When you start spark-shell or spark-sql and run query, you might see the following warnings:

Failed to get database ****, returning NoSuchObjectException Failed to get database ****, returning NoSuchObjectException

where **** is the database name.

The warnings are benign and occurs because Spark cannot find **** in its own catalog. You can just ignore these warnings.

To mute them, append the following text to ${SPARK_HOME}/conf/log4j.properties.

# tispark disable "WARN ObjectStore:568 - Failed to get database" log4j.logger.org.apache.hadoop.hive.metastore.ObjectStore=ERROR

Time zone configuration

Set time zone by using the -Duser.timezone system property (for example, -Duser.timezone=GMT-7), which affects the Timestamp type.

Do not use spark.sql.session.timeZone.

Features

The major features of TiSpark are as follows:

Feature supportTiSpark 2.4.xTiSpark 2.5.xTiSpark 3.0.xTiSpark 3.1.x
SQL select without tidb_catalog
SQL select with tidb_catalog
DataFrame append
DataFrame reads
SQL show databases
SQL show tables
SQL auth
SQL delete
SQL insert
TLS
DataFrame auth

Support for expression index

TiDB v5.0 supports expression index.

TiSpark currently supports retrieving data from tables with expression index, but the expression index will not be used by the planner of TiSpark.

Work with TiFlash

TiSpark can read data from TiFlash via the configuration spark.tispark.isolation_read_engines.

Support for partitioned tables

Read partitioned tables from TiDB

TiSpark can read the range and hash partitioned tables from TiDB.

Currently, TiSpark does not support a MySQL/TiDB partition table syntax select col_name from table_name partition(partition_name). However, you can still use the where condition to filter the partitions.

TiSpark decides whether to apply partition pruning according to the partition type and the partition expression associated with the table.

TiSpark applies partition pruning on range partitioning only when the partition expression is one of the following:

  • column expression
  • YEAR($argument) where the argument is a column and its type is datetime or string literal that can be parsed as datetime.

If partition pruning is not applicable, TiSpark's reading is equivalent to doing a table scan over all partitions.

Write into partitioned tables

Currently, TiSpark only supports writing data into the range and hash partitioned tables under the following conditions:

  • The partition expression is a column expression.
  • The partition expression is YEAR($argument) where the argument is a column and its type is datetime or string literal that can be parsed as datetime.

There are two ways to write into partitioned tables:

  • Use datasource API to write into partition table which supports replace and append semantics.
  • Use delete statement with Spark SQL.

Security

If you are using TiSpark v2.5.0 or a later version, you can authenticate and authorize TiSpark users by using TiDB.

The authentication and authorization feature is disabled by default. To enable it, add the following configurations to the Spark configuration file spark-defaults.conf.

// Enable authentication and authorization spark.sql.auth.enable true // Configure TiDB information spark.sql.tidb.addr $your_tidb_server_address spark.sql.tidb.port $your_tidb_server_port spark.sql.tidb.user $your_tidb_server_user spark.sql.tidb.password $your_tidb_server_password

For more information, see Authorization and authentication through TiDB server.

Other features

Statistics information

TiSpark uses the statistic information for:

  • Determining which index to use in your query plan with the minimum estimated cost.
  • Small table broadcasting, which enables efficient broadcast join.

To allow TiSpark to access statistic information, make sure that relevant tables have been analyzed.

See Introduction to Statistics for more details about how to analyze tables.

Since TiSpark 2.0, statistics information is automatically loaded by default.

FAQ

See TiSpark FAQ.