TiSpark 用户指南

TiSpark 架构

TiSpark vs TiFlash

TiSpark 是 PingCAP 为解决用户复杂 OLAP 需求而推出的产品。它借助 Spark 平台,同时融合 TiKV 分布式集群的优势,和 TiDB 一起为用户一站式解决 HTAP (Hybrid Transactional/Analytical Processing) 的需求。

TiFlash 也是一个解决 HTAP 需求的产品。TiFlash 和 TiSpark 都允许使用多个主机在 OLTP 数据上执行 OLAP 查询。TiFlash 是列式存储,这提供了更高效的分析查询。TiFlash 和 TiSpark 可以同时使用。

TiSpark 是什么

TiSpark 依赖于 TiKV 集群和 Placement Driver (PD),也需要你搭建一个 Spark 集群。本文简单介绍如何部署和使用 TiSpark。本文假设你对 Spark 有基本认知。你可以参阅 Apache Spark 官网了解 Spark 的相关信息。

TiSpark 深度整合了 Spark Catalyst 引擎,可以对计算进行精确的控制,使 Spark 能够高效地读取 TiKV 中的数据。TiSpark 还提供索引支持,帮助实现高速点查。

TiSpark 通过将计算下推到 TiKV 中提升了数据查询的效率,减少了 Spark SQL 需要处理的数据量,通过 TiDB 内置的统计信息选择最优的查询计划。

TiSpark 和 TiDB 可以让用户无需创建和维护 ETL,直接在同一个平台上进行事务和分析两种任务。这简化了系统架构,降低了运维成本。

用户可以在 TiDB 上使用 Spark 生态圈的多种工具进行数据处理,例如:

  • TiSpark:数据分析和 ETL。
  • TiKV:数据检索。
  • 调度系统:生成报表。

除此之外,TiSpark 还提供了分布式写入 TiKV 的功能。与使用 Spark 结合 JDBC 写入 TiDB 的方式相比,分布式写入 TiKV 能够实现事务(要么全部数据写入成功,要么全部都写入失败)。

版本要求

  • TiSpark 支持 Spark 2.3 或以上版本。
  • TiSpark 需要 JDK 1.8 以及 Scala 2.11/2.12 版本。
  • TiSpark 可以运行在任何 Spark 模式上,如 YARNMesos 以及 Standalone

推荐配置

TiSpark 作为 Spark 的 TiDB 连接器,需要 Spark 集群的支持。本文仅提供部署 Spark 的参考建议,对于硬件以及部署细节,请参考 Spark 官方文档

对于独立部署的 Spark 集群,可以参考如下建议配置:

  • 建议为 Spark 分配 32G 以上的内存,并为操作系统和缓存保留至少 25% 的内存。
  • 建议每台机器至少为 Spark 分配 8 到 16 核 CPU。起初,你可以设定将所有 CPU 核分配给 Spark。

可以参考如下的 spark-env.sh 配置文件:

SPARK_EXECUTOR_MEMORY = 32g SPARK_WORKER_MEMORY = 32g SPARK_WORKER_CORES = 8

获取 TiSpark

TiSpark 是 Spark 的第三方 jar 包,提供读写 TiKV 的能力。

获取 mysql-connector-j

由于 GPL 许可证的限制,TiSpark 不再提供 mysql-connector-java 的依赖。以下版本将不再包含 mysql-connector-java

  • TiSpark > 3.0.1
  • TiSpark > 2.5.1 (TiSpark 2.5.x)
  • TiSpark > 2.4.3 (TiSpark 2.4.x)

在使用 TiSpark 写入与鉴权时,仍需要 mysql-connector-java 依赖,因此你需要手动下载,并使用以下方式引入:

  • mysql-connector-java 放入 Spark jars 包中。

  • 在你提交 Spark 任务时,引入 mysql-connector-java,详见以下示例:

    spark-submit --jars tispark-assembly-3.0_2.12-3.1.0-SNAPSHOT.jar,mysql-connector-java-8.0.29.jar

选择 TiSpark 版本

你可以根据 TiDB 和 Spark 版本选择相应的 TiSpark 版本。

TiSpark 版本TiDB、TiKV、PD 版本Spark 版本Scala 版本
2.4.x-scala_2.115.x, 4.x2.3.x, 2.4.x2.11
2.4.x-scala_2.125.x, 4.x2.4.x2.12
2.5.x5.x, 4.x3.0.x, 3.1.x2.12
3.0.x5.x, 4.x3.0.x, 3.1.x, 3.2.x2.12
3.1.x6.x, 5.x, 4.x3.0.x, 3.1.x, 3.2.x, 3.3.x2.12

获取 TiSpark jar 包

你能用以下方式获取 jar 包:

  1. 下载 TiSpark 源码:

    git clone https://github.com/pingcap/tispark.git cd tisapark
  2. 在 TiSpark 根目录运行如下命令:

    // add -Dmaven.test.skip=true to skip the tests mvn clean install -Dmaven.test.skip=true // or you can add properties to specify spark version mvn clean install -Dmaven.test.skip=true -Pspark3.2.1

TiSpark jar 包的 artifact ID

注意不同版本的 TiSpark artifact ID 也不同:

TiSpark 版本Artifact ID
2.4.x-${scala_version}, 2.5.0tispark-assembly
2.5.1tispark-assembly-${spark_version}
3.0.x, 3.1.xtispark-assembly-${spark_version}-${scala_version}

快速开始

本章节将以 spark-shell 为例,介绍如何使用 TiSpark。请保证您已下载 Spark。

启动 spark-shell

spark-defaults.conf 中添加如下配置:

spark.sql.extensions org.apache.spark.sql.TiExtensions spark.tispark.pd.addresses ${your_pd_adress} spark.sql.catalog.tidb_catalog org.apache.spark.sql.catalyst.catalog.TiCatalog spark.sql.catalog.tidb_catalog.pd.addresses ${your_pd_adress}

启动 spark-shell:

spark-shell --jars tispark-assembly-{version}.jar

获取 TiSpark 版本

在 spark-shell 中运行如下命令,可获取 TiSpark 版本信息:

spark.sql("select ti_version()").collect

使用 TiSpark 读取数据

可以通过 Spark SQL 从 TiKV 读取数据:

spark.sql("use tidb_catalog") spark.sql("select count(*) from ${database}.${table}").show

使用 TiSpark 写入数据

通过 Spark DataSource API,可以在保证 ACID 前提下写入数据到 TiKV:

val tidbOptions: Map[String, String] = Map( "tidb.addr" -> "127.0.0.1", "tidb.password" -> "", "tidb.port" -> "4000", "tidb.user" -> "root" ) val customerDF = spark.sql("select * from customer limit 100000") customerDF.write .format("tidb") .option("database", "tpch_test") .option("table", "cust_test_select") .options(tidbOptions) .mode("append") .save()

详见 Data Source API User Guide

在 TiSpark 3.1 之后,你还能通过 Spark SQL 写入 TiSpark 3.1。详见 Insert SQL

通过 JDBC 数据源写入数据

你同样可以在不使用 TiSpark 的情况下使用 Spark JDBC 数据源写入 TiDB。

由于这超过了 TiSpark 的范畴,本文仅简单提供示例,详情请参考 JDBC To Other Databases

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions val customer = spark.sql("select * from customer limit 100000") // 为了平衡各节点以及提高并发数,你可以将数据源重新分区 val df = customer.repartition(32) df.write .mode(saveMode = "append") .format("jdbc") .option("driver", "com.mysql.jdbc.Driver") // 替换为你的主机名和端口地址,并确保开启了重写批处理 .option("url", "jdbc:mysql://127.0.0.1:4000/test?rewriteBatchedStatements=true") .option("useSSL", "false") // 作为测试,建议设置为 150 .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 150) .option("dbtable", s"cust_test_select") // 数据库名和表名 .option("isolationLevel", "NONE") // 如果需要写入较大 Dataframe,推荐将 isolationLevel 设置为 NONE .option("user", "root") // TiDB 用户名 .save()

为了避免大事务导致 OOM 以及 TiDB 报 ISOLATION LEVEL does not support 错误(TiDB 目前仅支持 REPEATABLE-READ),推荐设置 isolationLevelNONE

使用 TiSpark 删除数据

可以使用 Spark SQL 删除 TiKV 数据:

spark.sql("use tidb_catalog") spark.sql("delete from ${database}.${table} where xxx")

详情请参考 delete feature

与其他数据源一起使用

你可以使用多个 catalog 从不同数据源读取数据:

// 从 Hive 读取 spark.sql("select * from spark_catalog.default.t").show // Join Hive 表和 TiDB 表 spark.sql("select t1.id,t2.id from spark_catalog.default.t t1 left join tidb_catalog.test.t t2").show

TiSpark 配置

可以将如下参数配置在 spark-defaults.conf 中,也可以参考 Spark 其他配置,用同样的方式传入。

KeyDefault valueDescription
spark.tispark.pd.addresses127.0.0.1:2379PD 集群的地址,通过逗号分隔。
spark.tispark.grpc.framesize2147483647gRPC 的最大回复大小,单位 bytes(默认 2G)。
spark.tispark.grpc.timeout_in_sec10gRPC 超时时间,单位秒。
spark.tispark.plan.allow_agg_pushdowntrue是否运行聚合下推(为了避免 TiKV 节点繁忙)。
spark.tispark.plan.allow_index_readtrue是否在执行计划中开启 index(可能导致 TiKV 压力过大)。
spark.tispark.index.scan_batch_size20000在 index scan 的一次 batch 中 row keys 的数量。
spark.tispark.index.scan_concurrency5在 index scan 中获取 row keys 的最大线程数(每个 JVM 的所有任务共享)。
spark.tispark.table.scan_concurrency512在 table scan 中的最大线程数(每个 JVM 的所有任务共享)。
spark.tispark.request.command.priorityLow可选项有 LowNormalHigh。修改配置会影响影响 TiKV 的资源分配。建议使用 Low,因为 OLTP 的负载不会被影响。
spark.tispark.coprocess.codec_formatchblockCoprocessor 的编码格式。可选项为 defaultchblockchunk
spark.tispark.coprocess.streamingfalse是否在获取响应时使用 streaming(实验性质)。
spark.tispark.plan.unsupported_pushdown_exprs表达式清单,多个表达式用逗号分隔。为了防止老版本的 TiKV 不支持某些表达式,你可以禁止下推它们。
spark.tispark.plan.downgrade.index_threshold1000000000如果 index scan 请求的范围超过了此限制,该 Region 请求会被降级为 table scan。降级默认关闭。
spark.tispark.show_rowidfalse是否在 row_id 存在时显示它。
spark.tispark.db_prefixTiDB数据库的前缀。该配置可用于在 TiSpark 2.4 中区分同名的 TiDB 和 Hive 数据库。
spark.tispark.request.isolation.levelSI是否为底层的 TiKV 解锁。当你使用 "RC" 级别,你将忽略锁,得到比 tso 更小的最新版本数据。当你使用 "SI" 级别,你将进行解锁并根据该锁对应的事务提交与否获取数据。
spark.tispark.coprocessor.chunk_batch_size1024从 coprocessor 获取的一个 batch 的 Row 数量。
spark.tispark.isolation_read_enginestikv,tiflashTiSpark 读引擎,多个引擎使用逗号分隔。未配置的存储引擎不会被读取。
spark.tispark.stale_readoptionalstale read 时间戳。详情请参考 stale read
spark.tispark.tikv.tls_enablefalse是否开启 TiSpark TLS。
spark.tispark.tikv.trust_cert_collectionTiKV Client 的受信任根证书,用于验证 PD 证书。如 /home/tispark/config/root.pem,该文件需包含 X.509 证书。
spark.tispark.tikv.key_cert_chainTiKV Client 的 X.509 格式的客户端证书链。如 /home/tispark/config/client.pem
spark.tispark.tikv.key_fileTiKV Client 的 PKCS#8 私钥文件。如 /home/tispark/client_pkcs8.key
spark.tispark.tikv.jks_enablefalse是否使用 JAVA key store 而不是 X.509 证书。
spark.tispark.tikv.jks_trust_pathTiKV Client JKS 格式的受信任根证书。由 keytool 生成,如 /home/tispark/config/tikv-truststore
spark.tispark.tikv.jks_trust_passwordspark.tispark.tikv.jks_trust_path 的密码。
spark.tispark.tikv.jks_key_pathTiKV Client JKS 格式的客户端证书。由 keytool 生成,如 /home/tispark/config/tikv-clientstore
spark.tispark.tikv.jks_key_passwordspark.tispark.tikv.jks_key_path 的密码。
spark.tispark.jdbc.tls_enablefalse是否开启 JDBC connector TLS。
spark.tispark.jdbc.server_cert_storeJDBC 的受信任根证书。由 keytool 生产的 Java keystore (JKS) 格式的证书。如 /home/tispark/config/jdbc-truststore。默认值为 "",表示 TiSpark 不会校验 TiDB 服务端。
spark.tispark.jdbc.server_cert_passwordspark.tispark.jdbc.server_cert_store 的密码。
spark.tispark.jdbc.client_cert_storeJDBC 的 PKCS#12 格式的客户端证书。这是由 keytool 生成的 JKS 格式证书。如 /home/tispark/config/jdbc-clientstore。默认值为 "",表示 TiDB 不会校验 TiSpark。
spark.tispark.jdbc.client_cert_passwordspark.tispark.jdbc.client_cert_store 的密码。
spark.tispark.tikv.tls_reload_interval10s重载证书的时间间隔。默认值为 10s
spark.tispark.tikv.conn_recycle_time60s清理 TiKV 失效连接的时间间隔。默认时间为 60s。当重载证书开启时此配置才会生效。
spark.tispark.host_mapping路由映射配置。用于配置公有 IP 地址和私有 IP 地址的映射。当 TiDB 在私有网络上运行时,你可以将一系列内部 IP 地址映射到公网 IP 地址以便 Spark 集群访问。其格式为 {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2},例如 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9
spark.tispark.new_collation_enable当 TiDB 开启 new collation,推荐将此配置设为true。当 TiDB 关闭 new collation,推荐将此配置设置为 false。在未配置的情况下,TiSpark 会依据 TiDB 版本自动配置 new collation。其规则为:当 TiDB 版本大于等于 v6.0.0 时为 true;否则为 false
spark.tispark.replica_readleader读取副本的类型。可选值为 leaderfollowerlearner。可以同时指定多个类型,TiSpark 会根据顺序选择。
spark.tispark.replica_read.label目标 TiKV 节点的标签。格式为 label_x=value_x,label_y=value_y,各项之间为“逻辑与”的关系。

TLS 配置

TiSpark TLS 分为两部分:TiKV Client TLS 以及 JDBC connector TLS。 前者用于创建和 TiKV/PD 的 TLS 连接,后者用于创建与 TiDB 的 TLS 连接。

当配置 TiKV Client TLS 时,你需要以 X.509 格式的证书配置 tikv.trust_cert_collectiontikv.key_cert_chaintikv.key_file;或者以 JKS 格式的证书配置 tikv.jks_enabletikv.jks_trust_pathtikv.jks_key_path

当配置 JDBC connector TLS 时,你需要配置 spark.tispark.jdbc.tls_enable,而 jdbc.server_cert_storejdbc.client_cert_store 则是可选的。

TiSpark 目前仅持 TLS 1.2 and TLS 1.3。

  • 如下是使用 X.509 证书配置 TiKV Client TLS 的例子:
spark.tispark.tikv.tls_enable true spark.tispark.tikv.trust_cert_collection /home/tispark/root.pem spark.tispark.tikv.key_cert_chain /home/tispark/client.pem spark.tispark.tikv.key_file /home/tispark/client.key
  • 如下是使用 JKS 配置 TiKV Client TLS 的例子:
spark.tispark.tikv.tls_enable true spark.tispark.tikv.jks_enable true spark.tispark.tikv.jks_key_path /home/tispark/config/tikv-truststore spark.tispark.tikv.jks_key_password tikv_trustore_password spark.tispark.tikv.jks_trust_path /home/tispark/config/tikv-clientstore spark.tispark.tikv.jks_trust_password tikv_clientstore_password

当你同时配置 JKS 和 X.509 证书时,JKS 优先级更高。因此,当你只想使用普通的 pem 证书时,不要同时设置 spark.tispark.tikv.jks_enable=true

  • 下面是一个配置 JDBC connector TLS 的例子:
spark.tispark.jdbc.tls_enable true spark.tispark.jdbc.server_cert_store /home/tispark/jdbc-truststore spark.tispark.jdbc.server_cert_password jdbc_truststore_password spark.tispark.jdbc.client_cert_store /home/tispark/jdbc-clientstore spark.tispark.jdbc.client_cert_password jdbc_clientstore_password

时区配置

使用 -Duser.timezone 系统参数来配置时区(比如 -Duser.timezone=GMT-7)。时区会影响 Timestamp 数据类型。

请不要使用 spark.sql.session.timeZone

特性

TiSpark 的主要特性如下:

特性支持TiSpark 2.4.xTiSpark 2.5.xTiSpark 3.0.xTiSpark 3.1.x
SQL select without tidb_catalog
SQL select with tidb_catalog
DataFrame append
DataFrame reads
SQL show databases
SQL show tables
SQL auth
SQL delete
SQL insert
TLS
DataFrame auth

Expression index 支持

TiDB v5.0 开始支持 expression index

TiSpark 目前支持从 expression index 的表中获取数据,但 expression index 不会被 TiSpark 执行计划使用。

TiFlash 支持

TiSpark 能够通过配置 spark.tispark.isolation_read_engines 从 TiFlash 读取数据。

分区表支持

读分区表

TiSpark 目前支持读取 range 与 hash 分区表。

TiSpark 目前不支持 partition table 语法 select col_name from table_name partition(partition_name)。但是,你仍可以使用 where 条件过滤分区。

TiSpark 会根据分区类型、分区表达式以及具体 SQL 决定是否进行分区裁剪。目前,TiSpark 仅支持在 range 分区下,且在下列任一条件下进行分区裁剪:

  • 列表达式,如 partition by col1
  • 形如 YEAR($col) 的 year 函数,其中 col 为列名且类型为 datetime、date 或能被解析为 datetime、date 的 string 字面量。
  • 形如 TO_DAYS($col) 的 to_days 函数,其中 col 为列名且类型为 datetime、date 或能被解析为 datetime、date 的 string 字面量。

如果分区裁剪未被应用,TiSpark 将会读取所有分区表。

写分区表

目前, TiSpark 仅支持写入 range 与 hash 分区表,且需满足以下任一条件:

  • 列表达式,如 partition by col1
  • 形如 YEAR($col) 的 year 函数,其中 col 为列名且类型为 datetime、date 或能被解析为 datetime、date 的 string 字面量。

有两种方式写入分区表:

  • 使用支持 replace 和 append 语义的 Datasource API 写入分区表。
  • 使用 Spark SQL 删除语句。

安全

从 TiSpark v2.5.0 起,你可以通过 TiDB 对 TiSpark 进行鉴权与授权。

该功能默认关闭。要开启该功能,请在 Spark 配置文件 spark-defaults.conf 中添加以下配置项:

// 开启鉴权与授权功能 spark.sql.auth.enable true // 配置 TiDB 信息 spark.sql.tidb.addr $your_tidb_server_address spark.sql.tidb.port $your_tidb_server_port spark.sql.tidb.user $your_tidb_server_user spark.sql.tidb.password $your_tidb_server_password

更多详细信息,请参考 TiSpark 鉴权与授权指南

其他特性

统计信息

TiSpark 可以使用 TiDB 的统计信息:

  • 选择代价最低的索引访问
  • 估算数据大小以决定是否进行广播优化

如果你希望 TiSpark 使用统计信息支持,需要确保所涉及的表已经被分析。参考统计信息简介了解如何进行表分析。

从 TiSpark 2.0 开始,统计信息将会默认被读取。

FAQ

详情请参考 TiSpark FAQ