TiSpark 用户指南

TiSpark 是 PingCAP 为解决用户复杂 OLAP 需求而推出的产品。它借助 Spark 平台,同时融合 TiKV 分布式集群的优势,和 TiDB 一起为用户一站式解决 HTAP (Hybrid Transactional/Analytical Processing) 的需求。TiSpark 依赖于 TiKV 集群和 Placement Driver (PD),也需要你搭建一个 Spark 集群。

本文简单介绍如何部署和使用 TiSpark。本文假设你对 Spark 有基本认知。你可以参阅 Apache Spark 官网了解 Spark 的相关信息。

概述

TiSpark 是将 Spark SQL 直接运行在分布式存储引擎 TiKV 上的 OLAP 解决方案。其架构图如下:

TiSpark Architecture

  • TiSpark 深度整合了 Spark Catalyst 引擎,可以对计算提供精确的控制,使 Spark 能够高效的读取 TiKV 中的数据,提供索引支持以实现高速的点查。
  • 通过多种计算下推减少 Spark SQL 需要处理的数据大小,以加速查询;利用 TiDB 的内建的统计信息选择更优的查询计划。
  • 从数据集群的角度看,TiSpark + TiDB 可以让用户无需进行脆弱和难以维护的 ETL,直接在同一个平台进行事务和分析两种工作,简化了系统架构和运维。
  • 用户借助 TiSpark 项目可以在 TiDB 上使用 Spark 生态圈提供的多种工具进行数据处理。例如,使用 TiSpark 进行数据分析和 ETL;使用 TiKV 作为机器学习的数据源;借助调度系统产生定时报表等等。
  • 除此之外,TiSpark 还提供了分布式写入 TiKV 的功能。相比使用 Spark 结合 JDBC 的方式写入 TiDB,分布式写入 TiKV 可以实现事务(要么全部数据写入成功,要么全部都写入失败),并且写入速度会更快。

环境准备

现有 TiSpark 2.x 版本支持 Spark 2.3.x 和 Spark 2.4.x。如果你希望使用 Spark 2.1.x 版本,需使用 TiSpark 1.x。

TiSpark 需要 JDK 1.8+ 以及 Scala 2.11(Spark2.0+ 默认 Scala 版本)。

TiSpark 可以在 YARN,Mesos,Standalone 等任意 Spark 模式下运行。

推荐配置

本部分描述了 TiKV 与 TiSpark 集群分开部署、Spark 与 TiSpark 集群独立部署,以及 TiSpark 与 TiKV 集群混合部署的建议配置。

TiKV 与 TiSpark 集群分开部署的配置

对于 TiKV 与 TiSpark 分开部署的场景,可以参考如下建议配置:

  • 硬件配置建议

    普通场景可以参考 TiDB 和 TiKV 硬件配置建议,但是如果是偏重分析的场景,可以将 TiKV 节点增加到至少 64G 内存。

Spark 与 TiSpark 集群独立部署的配置

关于 Spark 的详细硬件推荐配置请参考官网,如下是 TiSpark 所需环境的简单描述:

Spark 推荐 32G 内存以上的配额。请在配置中预留 25% 的内存给操作系统。

Spark 推荐每台计算节点配备 CPU 累计 8 到 16 核以上。你可以初始设定分配所有 CPU 核给 Spark。

Spark 的具体配置方式也请参考官方说明。以下为根据 spark-env.sh 配置的范例:

SPARK_EXECUTOR_CORES: 5 SPARK_EXECUTOR_MEMORY: 10g SPARK_WORKER_CORES: 5 SPARK_WORKER_MEMORY: 10g

spark-defaults.conf 中,增加如下配置:

spark.tispark.pd.addresses $your_pd_servers spark.sql.extensions org.apache.spark.sql.TiExtensions

CDH spark 版本中添加如下配置:

spark.tispark.pd.addresses=$your_pd_servers spark.sql.extensions=org.apache.spark.sql.TiExtensions

your_pd_servers 是用逗号分隔的 PD 地址,每个地址使用 地址:端口 的格式。

例如你有一组 PD 在10.16.20.110.16.20.210.16.20.3,那么 PD 配置格式是10.16.20.1:2379,10.16.20.2:2379,10.16.20.3:2379

TiSpark 与 TiKV 集群混合部署的配置

对于 TiKV 与 TiSpark 混合部署的场景,需在原有 TiKV 预留资源之外累加 Spark 所需部分,并分配 25% 的内存作为系统本身占用。

部署 TiSpark

TiSpark 的 jar 包可以在 TiSpark Releases 页面下载对应版本的 jar 包并拷贝到合适的目录。

已有 Spark 集群的部署方式

如果在已有 Spark 集群上运行 TiSpark,无需重启集群。可以使用 Spark 的 --jars 参数将 TiSpark 作为依赖引入:

spark-shell --jars $TISPARK_FOLDER/tispark-${name_with_version}.jar

没有 Spark 集群的部署方式

如果没有使用中的 Spark 集群,推荐使用 Spark Standalone 方式部署。这里简单介绍下 Standalone 部署方式。如果遇到问题,可以去官网寻求帮助;也欢迎在 GitHub 上提 issue

下载安装包并安装

你可以在 Download Apache Spark™ 页面下载 Apache Spark。

对于 Standalone 模式且无需 Hadoop 支持,则选择 Spark 2.3.x 或者 Spark 2.4.x 且带有 Hadoop 依赖的 Pre-build with Apache Hadoop 2.x 任意版本。如有需要配合使用的 Hadoop 集群,则选择对应的 Hadoop 版本号。你也可以选择从源代码自行构建以配合官方 Hadoop 2.x 之前的版本。

如果你已经有了 Spark 二进制文件,并且当前 PATH 为 SPARKPATH,需将 TiSpark jar 包拷贝到 ${SPARKPATH}/jars 目录下。

启动 Master

在选中的 Spark Master 节点执行如下命令:

cd $SPARKPATH
./sbin/start-master.sh

在这步完成以后,屏幕上会打印出一个 log 文件。检查 log 文件确认 Spark-Master 是否启动成功。你可以打开 http://spark-master-hostname:8080 查看集群信息(如果你没有改动 Spark-Master 默认 Port Numebr)。在启动 Spark-Worker 的时候,也可以通过这个面板来确认 Worker 是否已经加入集群。

启动 Worker

类似地,可以用如下命令启动 Spark-Worker 节点:

./sbin/start-slave.sh spark://spark-master-hostname:7077

命令返回以后,即可通过刚才的面板查看这个 Worker 是否已经正确地加入了 Spark 集群。在所有 Worker 节点重复刚才的命令。确认所有的 Worker 都可以正确连接 Master,这样你就拥有了一个 Standalone 模式的 Spark 集群。

Spark SQL shell 和 JDBC 服务器

当前版本的 TiSpark 可以直接使用 spark-sql 和 Spark 的 ThriftServer JDBC 服务器。

一个使用范例

假设你已经按照上述步骤成功启动了 TiSpark 集群,下面简单介绍如何使用 Spark SQL 来做 OLAP 分析。这里我们用名为 tpch 数据库中的 lineitem 表作为范例。

假设你的 PD 节点位于 192.168.1.100,端口为 2379,在 $SPARK_HOME/conf/spark-defaults.conf 加入:

spark.tispark.pd.addresses 192.168.1.100:2379
spark.sql.extensions org.apache.spark.sql.TiExtensions

然后在 Spark-Shell 里像原生 Spark 一样输入下面的命令:

spark.sql("use tpch")
spark.sql("select count(*) from lineitem").show

结果为:

+-------------+ | Count (1) | +-------------+ | 600000000 | +-------------+

Spark SQL 交互 Shell 和原生 Spark 一致:

spark-sql> use tpch;
Time taken: 0.015 seconds
spark-sql> select count(*) from lineitem;
2000 Time taken: 0.673 seconds, Fetched 1 row(s)

SQuirreLSQL 和 hive-beeline 可以使用 JDBC 连接 Thrift 服务器。例如,使用 beeline 连接:

./beeline
Beeline version 1.2.2 by Apache Hive
beeline> !connect jdbc:hive2://localhost:10000
1: jdbc:hive2://localhost:10000> use testdb;
+---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (0.013 seconds)
select count(*) from account;
+-----------+--+ | count(1) | +-----------+--+ | 1000000 | +-----------+--+ 1 row selected (1.97 seconds)

和 Hive 一起使用 TiSpark

TiSpark 可以和 Hive 混合使用。在启动 Spark 之前,需要添加 HADOOP_CONF_DIR 环境变量指向 Hadoop 配置目录并且将 hive-site.xml 拷贝到 $SPARK_HOME/conf 目录下。

val tisparkDF = spark.sql("select * from tispark_table").toDF tisparkDF.write.saveAsTable("hive_table") // save table to hive spark.sql("select * from hive_table a, tispark_table b where a.col1 = b.col1").show // join table across Hive and Tispark

通过 TiSpark 将 DataFrame 批量写入 TiDB

TiSpark 从 v2.3 版本开始原生支持将 DataFrame 批量写入 TiDB 集群,该写入模式通过 TiKV 的两阶段提交协议实现。

TiSpark 批量写入相比 Spark + JDBC 写入,有以下特点:

比较的方面TiSpark 批量写入Spark + JDBC 写入
原子性DataFrame 的数据要么全部写入成功,要么全部写入失败如果在写入过程中 spark 任务失败退出,会出现部分数据写入成功的情况
隔离性写入过程中其他事务对正在写入的数据不可见写入过程中其他事务能看到部分写入成功的数据
错误恢复失败后只需要重新运行 Spark 程序需要业务来实现幂等,例如失败后需要先清理部分写入成功的数据,再重新运行 Spark 程序,并且需要设置 spark.task.maxFailures=1,防止 task 内重试导致数据重复
速度直接写入 TiKV,速度更快通过 TiDB 再写入 TiKV,对速度会有影响

以下通过 scala API 演示如何使用 TiSpark 批量写入:

// select data to write val df = spark.sql("select * from tpch.ORDERS") // write data to tidb df.write. format("tidb"). option("tidb.addr", "127.0.0.1"). option("tidb.port", "4000"). option("tidb.user", "root"). option("tidb.password", ""). option("database", "tpch"). option("table", "target_orders"). mode("append"). save()

如果写入的数据量比较大,且写入时间超过 10 分钟,则需要保证 GC 时间大于写入时间。

update mysql.tidb set VARIABLE_VALUE="6h" where VARIABLE_NAME="tikv_gc_life_time";

详细使用手册请参考该文档

通过 JDBC 将 DataFrame 写入 TiDB

除了使用 TiSpark 将 DataFrame 批量写入 TiDB 集群以外,也可以使用 Spark 原生的 JDBC 支持进行写入:

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions val customer = spark.sql("select * from customer limit 100000") // you might repartition source to make it balance across nodes // and increase concurrency val df = customer.repartition(32) df.write .mode(saveMode = "append") .format("jdbc") .option("driver", "com.mysql.jdbc.Driver") // replace host and port as your and be sure to use rewrite batch .option("url", "jdbc:mysql://127.0.0.1:4000/test?rewriteBatchedStatements=true") .option("useSSL", "false") // As tested, 150 is good practice .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 150) .option("dbtable", s"cust_test_select") // database name and table name here .option("isolationLevel", "NONE") // recommended to set isolationLevel to NONE if you have a large DF to load. .option("user", "root") // TiDB user here .save()

推荐将 isolationLevel 设置为 NONE,否则单一大事务有可能造成 TiDB 服务器内存溢出。

统计信息

TiSpark 可以使用 TiDB 的统计信息:

  1. 选择代价最低的索引或扫表访问
  2. 估算数据大小以决定是否进行广播优化

如果希望使用统计信息支持,需要确保所涉及的表已经被分析。请阅读这份文档了解如何进行表分析。

从 TiSpark 2.0 开始,统计信息将会默认被读取。

TiSpark FAQ

  • Q. 是独立部署还是和现有 Spark/Hadoop 集群共用资源?

    A. 可以利用现有 Spark 集群无需单独部署,但是如果现有集群繁忙,TiSpark 将无法达到理想速度。

  • Q. 是否可以和 TiKV 混合部署?

    A. 如果 TiDB 以及 TiKV 负载较高且运行关键的线上任务,请考虑单独部署 TiSpark;并且考虑使用不同的网卡保证 OLTP 的网络资源不被侵占而影响线上业务。如果线上业务要求不高或者机器负载不大,可以考虑与 TiKV 混合部署。

  • Q. Spark 执行中报 warning:WARN ObjectStore:568 - Failed to get database

    A. Warning 忽略即可,原因是 Spark 找不到对应的 hive 库,因为这个库是在 TIKV 中,而不是在 hive 中。可以考虑调整 log4j 日志,将该参数添加到 spark 下 conf 里 log4j 文件(如果后缀是 template 那先 mv 成后缀 properties)。

  • Q. Spark 执行中报 java.sql.BatchUpdateException: Data Truncated

    A. 写入的数据长度超过了数据库定义的数据类型的长度,可以确认 target table 的字段长度,进行调整。

  • Q. TiSpark 任务是否默认读取 Hive 的元数据?

    A. TiSpark 通过读取 hive-site 里的 meta 来搜寻 hive 的库。如果搜寻不到,就通过读取 tidb meta 搜寻 tidb 库。如果不需要该行为,可不在 hive site 中配置 hive 的 meta。

  • Q. TiSpark 执行 Spark 任务时报:"Error:java.io.InvalidClassException: com.pingcap.tikv.region.TiRegion; local class incompatible: stream classdesc serialVersionUID ..."

    A. 该报错日志中显示 serialVersionUID 冲突,说明存在不同版本的 class 和 TiRegion。因为 TiRegion 是 TiSpark 独有的,所以可能存在多个版本的 TiSpark 包。要解决该报错,请确保集群中各节点的 TiSpark 依赖包版本一致。