取引制限
このドキュメントでは、TiDB におけるトランザクションの制限について簡単に紹介します。
分離レベル
TiDB がサポートする分離レベルはRC (Read Committed)とSI (Snapshot Isolation)で、 SIは基本的にRR (Repeatable Read)分離レベルと同等です。
スナップショット分離はファントム読み取りを回避できます
TiDB の分離レベルSI
はPhantom Readsを回避できますが、ANSI/ISO SQL 標準のRR
は回避できません。
次の 2 つの例は、ファントム読み取りとは何かを示しています。
例 1:トランザクション Aはクエリに従って最初に
n
行を取得し、次にトランザクション Bはこれらのn
行以外のm
行を変更するか、トランザクション Aのクエリに一致するm
行を追加します。トランザクション Aがクエリを再度実行すると、条件に一致する行がn+m
行あることがわかります。幻のようなものなので幻読と呼ぶ。例 2:管理者 Aは、データベース内のすべての学生の成績を特定のスコアから ABCDE の成績に変更しますが、管理者 Bはこの時点で特定のスコアを持つレコードを挿入します。管理者 Aが変更を完了し、まだ変更されていないレコード (管理者 Bによって挿入されたもの) がまだあることに気付いた場合。それはファントムリードです。
SI は書き込みスキューを回避できない
TiDB の SI 分離レベルでは、書き込みスキュー例外を回避できません。 SELECT FOR UPDATE
構文を使用して、書き込みスキューの例外を回避できます。
書き込みスキュー例外は、2 つの同時トランザクションが異なるが関連するレコードを読み取り、各トランザクションが読み取ったデータを更新し、最終的にトランザクションをコミットする場合に発生します。複数のトランザクションで同時に変更できないこれらの関連レコード間に制約がある場合、最終結果は制約に違反します。
たとえば、病院の医師シフト管理プログラムを作成しているとします。病院では通常、複数の医師が同時に待機する必要がありますが、少なくとも 1 人の医師が待機している必要があります。医師は、そのシフト中に少なくとも 1 人の医師が待機している限り、シフトを取り下げることができます (気分が悪い場合など)。
現在、医師Alice
とBob
が待機している状況があります。どちらも気分が悪いので、病気休暇を取ることにしました。彼らはたまたま同時にボタンをクリックします。次のプログラムでこのプロセスをシミュレートしましょう。
- Java
- Golang
package com.pingcap.txn.write.skew;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class EffectWriteSkew {
public static void main(String[] args) throws SQLException, InterruptedException {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl("jdbc:mysql://localhost:4000/test?useServerPrepStmts=true&cachePrepStmts=true");
ds.setUsername("root");
// prepare data
Connection connection = ds.getConnection();
createDoctorTable(connection);
createDoctor(connection, 1, "Alice", true, 123);
createDoctor(connection, 2, "Bob", true, 123);
createDoctor(connection, 3, "Carol", false, 123);
Semaphore txn1Pass = new Semaphore(0);
CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 1, 1);
countDownLatch.countDown();
});
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 2, 2);
countDownLatch.countDown();
});
countDownLatch.await();
}
public static void createDoctorTable(Connection connection) throws SQLException {
connection.createStatement().executeUpdate("CREATE TABLE `doctors` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint(1) DEFAULT NULL," +
" `shift_id` int(11) DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin");
}
public static void createDoctor(Connection connection, Integer id, String name, Boolean onCall, Integer shiftID) throws SQLException {
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)");
insert.setInt(1, id);
insert.setString(2, name);
insert.setBoolean(3, onCall);
insert.setInt(4, shiftID);
insert.executeUpdate();
}
public static void askForLeave(HikariDataSource ds, Semaphore txn1Pass, Integer txnID, Integer doctorID) {
try(Connection connection = ds.getConnection()) {
try {
connection.setAutoCommit(false);
String comment = txnID == 2 ? " " : "" + "/* txn #{txn_id} */ ";
connection.createStatement().executeUpdate(comment + "BEGIN");
// Txn 1 should be waiting for txn 2 done
if (txnID == 1) {
txn1Pass.acquire();
}
PreparedStatement currentOnCallQuery = connection.prepareStatement(comment +
"SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?");
currentOnCallQuery.setBoolean(1, true);
currentOnCallQuery.setInt(2, 123);
ResultSet res = currentOnCallQuery.executeQuery();
if (!res.next()) {
throw new RuntimeException("error query");
} else {
int count = res.getInt("count");
if (count >= 2) {
// If current on-call doctor has 2 or more, this doctor can leave
PreparedStatement insert = connection.prepareStatement( comment +
"UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?");
insert.setBoolean(1, false);
insert.setInt(2, doctorID);
insert.setInt(3, 123);
insert.executeUpdate();
connection.commit();
} else {
throw new RuntimeException("At least one doctor is on call");
}
}
// Txn 2 done, let txn 1 run again
if (txnID == 2) {
txn1Pass.release();
}
} catch (Exception e) {
// If got any error, you should roll back, data is priceless
connection.rollback();
e.printStackTrace();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
TiDB トランザクションを適応させるには、次のコードに従ってユーティリティを書き込みます。
package main
import (
"database/sql"
"fmt"
"sync"
"github.com/pingcap-inc/tidb-example-golang/util"
_ "github.com/go-sql-driver/mysql"
)
func main() {
openDB("mysql", "root:@tcp(127.0.0.1:4000)/test", func(db *sql.DB) {
writeSkew(db)
})
}
func openDB(driverName, dataSourceName string, runnable func(db *sql.DB)) {
db, err := sql.Open(driverName, dataSourceName)
if err != nil {
panic(err)
}
defer db.Close()
runnable(db)
}
func writeSkew(db *sql.DB) {
err := prepareData(db)
if err != nil {
panic(err)
}
waitingChan, waitGroup := make(chan bool), sync.WaitGroup{}
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 1, 1)
if err != nil {
panic(err)
}
}()
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 2, 2)
if err != nil {
panic(err)
}
}()
waitGroup.Wait()
}
func askForLeave(db *sql.DB, waitingChan chan bool, goroutineID, doctorID int) error {
txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID)
if goroutineID != 1 {
txnComment = "\t" + txnComment
}
txn, err := util.TiDBSqlBegin(db, true)
if err != nil {
return err
}
fmt.Println(txnComment + "start txn")
// Txn 1 should be waiting until txn 2 is done.
if goroutineID == 1 {
<-waitingChan
}
txnFunc := func() error {
queryCurrentOnCall := "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?"
rows, err := txn.Query(queryCurrentOnCall, true, 123)
if err != nil {
return err
}
defer rows.Close()
fmt.Println(txnComment + queryCurrentOnCall + " successful")
count := 0
if rows.Next() {
err = rows.Scan(&count)
if err != nil {
return err
}
}
rows.Close()
if count < 2 {
return fmt.Errorf("at least one doctor is on call")
}
shift := "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?"
_, err = txn.Exec(shift, false, doctorID, 123)
if err == nil {
fmt.Println(txnComment + shift + " successful")
}
return err
}
err = txnFunc()
if err == nil {
txn.Commit()
fmt.Println("[runTxn] commit success")
} else {
txn.Rollback()
fmt.Printf("[runTxn] got an error, rollback: %+v\n", err)
}
// Txn 2 is done. Let txn 1 run again.
if goroutineID == 2 {
waitingChan <- true
}
return nil
}
func prepareData(db *sql.DB) error {
err := createDoctorTable(db)
if err != nil {
return err
}
err = createDoctor(db, 1, "Alice", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 2, "Bob", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 3, "Carol", false, 123)
if err != nil {
return err
}
return nil
}
func createDoctorTable(db *sql.DB) error {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS `doctors` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint(1) DEFAULT NULL," +
" `shift_id` int(11) DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")
return err
}
func createDoctor(db *sql.DB, id int, name string, onCall bool, shiftID int) error {
_, err := db.Exec("INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)",
id, name, onCall, shiftID)
return err
}
SQL ログ:
/* txn 1 */ BEGIN
/* txn 2 */ BEGIN
/* txn 2 */ SELECT COUNT(*) as `count` FROM `doctors` WHERE `on_call` = 1 AND `shift_id` = 123
/* txn 2 */ UPDATE `doctors` SET `on_call` = 0 WHERE `id` = 2 AND `shift_id` = 123
/* txn 2 */ COMMIT
/* txn 1 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = 1 and `shift_id` = 123
/* txn 1 */ UPDATE `doctors` SET `on_call` = 0 WHERE `id` = 1 AND `shift_id` = 123
/* txn 1 */ COMMIT
実行結果:
mysql> SELECT * FROM doctors;
+----+-------+---------+----------+
| id | name | on_call | shift_id |
+----+-------+---------+----------+
| 1 | Alice | 0 | 123 |
| 2 | Bob | 0 | 123 |
| 3 | Carol | 0 | 123 |
+----+-------+---------+----------+
どちらのトランザクションでも、アプリケーションは最初に 2 人以上の医師が待機しているかどうかを確認します。その場合、1 人の医師が安全に休暇を取ることができると想定しています。データベースはスナップショット分離を使用するため、両方のチェックで2
が返され、両方のトランザクションが次の段階に進みます。 Alice
は自分の記録を非番に更新し、 Bob
も非番です。両方のトランザクションが正常にコミットされます。現在、少なくとも 1 人の医師が待機している必要があるという要件に違反する勤務中の医師はいません。次の図 ( Designing Data-Intensive Applicationsから引用) は、実際に何が起こるかを示しています。
ここで、書き込みスキューの問題を回避するために、サンプル プログラムをSELECT FOR UPDATE
を使用するように変更しましょう。
- Java
- Golang
package com.pingcap.txn.write.skew;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class EffectWriteSkew {
public static void main(String[] args) throws SQLException, InterruptedException {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl("jdbc:mysql://localhost:4000/test?useServerPrepStmts=true&cachePrepStmts=true");
ds.setUsername("root");
// prepare data
Connection connection = ds.getConnection();
createDoctorTable(connection);
createDoctor(connection, 1, "Alice", true, 123);
createDoctor(connection, 2, "Bob", true, 123);
createDoctor(connection, 3, "Carol", false, 123);
Semaphore txn1Pass = new Semaphore(0);
CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 1, 1);
countDownLatch.countDown();
});
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 2, 2);
countDownLatch.countDown();
});
countDownLatch.await();
}
public static void createDoctorTable(Connection connection) throws SQLException {
connection.createStatement().executeUpdate("CREATE TABLE `doctors` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint(1) DEFAULT NULL," +
" `shift_id` int(11) DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin");
}
public static void createDoctor(Connection connection, Integer id, String name, Boolean onCall, Integer shiftID) throws SQLException {
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)");
insert.setInt(1, id);
insert.setString(2, name);
insert.setBoolean(3, onCall);
insert.setInt(4, shiftID);
insert.executeUpdate();
}
public static void askForLeave(HikariDataSource ds, Semaphore txn1Pass, Integer txnID, Integer doctorID) {
try(Connection connection = ds.getConnection()) {
try {
connection.setAutoCommit(false);
String comment = txnID == 2 ? " " : "" + "/* txn #{txn_id} */ ";
connection.createStatement().executeUpdate(comment + "BEGIN");
// Txn 1 should be waiting for txn 2 done
if (txnID == 1) {
txn1Pass.acquire();
}
PreparedStatement currentOnCallQuery = connection.prepareStatement(comment +
"SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ? FOR UPDATE");
currentOnCallQuery.setBoolean(1, true);
currentOnCallQuery.setInt(2, 123);
ResultSet res = currentOnCallQuery.executeQuery();
if (!res.next()) {
throw new RuntimeException("error query");
} else {
int count = res.getInt("count");
if (count >= 2) {
// If current on-call doctor has 2 or more, this doctor can leave
PreparedStatement insert = connection.prepareStatement( comment +
"UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?");
insert.setBoolean(1, false);
insert.setInt(2, doctorID);
insert.setInt(3, 123);
insert.executeUpdate();
connection.commit();
} else {
throw new RuntimeException("At least one doctor is on call");
}
}
// Txn 2 done, let txn 1 run again
if (txnID == 2) {
txn1Pass.release();
}
} catch (Exception e) {
// If got any error, you should roll back, data is priceless
connection.rollback();
e.printStackTrace();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
package main
import (
"database/sql"
"fmt"
"sync"
"github.com/pingcap-inc/tidb-example-golang/util"
_ "github.com/go-sql-driver/mysql"
)
func main() {
openDB("mysql", "root:@tcp(127.0.0.1:4000)/test", func(db *sql.DB) {
writeSkew(db)
})
}
func openDB(driverName, dataSourceName string, runnable func(db *sql.DB)) {
db, err := sql.Open(driverName, dataSourceName)
if err != nil {
panic(err)
}
defer db.Close()
runnable(db)
}
func writeSkew(db *sql.DB) {
err := prepareData(db)
if err != nil {
panic(err)
}
waitingChan, waitGroup := make(chan bool), sync.WaitGroup{}
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 1, 1)
if err != nil {
panic(err)
}
}()
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 2, 2)
if err != nil {
panic(err)
}
}()
waitGroup.Wait()
}
func askForLeave(db *sql.DB, waitingChan chan bool, goroutineID, doctorID int) error {
txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID)
if goroutineID != 1 {
txnComment = "\t" + txnComment
}
txn, err := util.TiDBSqlBegin(db, true)
if err != nil {
return err
}
fmt.Println(txnComment + "start txn")
// Txn 1 should be waiting until txn 2 is done.
if goroutineID == 1 {
<-waitingChan
}
txnFunc := func() error {
queryCurrentOnCall := "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?"
rows, err := txn.Query(queryCurrentOnCall, true, 123)
if err != nil {
return err
}
defer rows.Close()
fmt.Println(txnComment + queryCurrentOnCall + " successful")
count := 0
if rows.Next() {
err = rows.Scan(&count)
if err != nil {
return err
}
}
rows.Close()
if count < 2 {
return fmt.Errorf("at least one doctor is on call")
}
shift := "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?"
_, err = txn.Exec(shift, false, doctorID, 123)
if err == nil {
fmt.Println(txnComment + shift + " successful")
}
return err
}
err = txnFunc()
if err == nil {
txn.Commit()
fmt.Println("[runTxn] commit success")
} else {
txn.Rollback()
fmt.Printf("[runTxn] got an error, rollback: %+v\n", err)
}
// Txn 2 is done. Let txn 1 run again.
if goroutineID == 2 {
waitingChan <- true
}
return nil
}
func prepareData(db *sql.DB) error {
err := createDoctorTable(db)
if err != nil {
return err
}
err = createDoctor(db, 1, "Alice", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 2, "Bob", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 3, "Carol", false, 123)
if err != nil {
return err
}
return nil
}
func createDoctorTable(db *sql.DB) error {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS `doctors` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint(1) DEFAULT NULL," +
" `shift_id` int(11) DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")
return err
}
func createDoctor(db *sql.DB, id int, name string, onCall bool, shiftID int) error {
_, err := db.Exec("INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)",
id, name, onCall, shiftID)
return err
}
SQL ログ:
/* txn 1 */ BEGIN
/* txn 2 */ BEGIN
/* txn 2 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE on_call = 1 AND `shift_id` = 123 FOR UPDATE
/* txn 2 */ UPDATE `doctors` SET on_call = 0 WHERE `id` = 2 AND `shift_id` = 123
/* txn 2 */ COMMIT
/* txn 1 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = 1 FOR UPDATE
At least one doctor is on call
/* txn 1 */ ROLLBACK
実行結果:
mysql> SELECT * FROM doctors;
+----+-------+---------+----------+
| id | name | on_call | shift_id |
+----+-------+---------+----------+
| 1 | Alice | 1 | 123 |
| 2 | Bob | 0 | 123 |
| 3 | Carol | 0 | 123 |
+----+-------+---------+----------+
savepoint
とネストされたトランザクションのサポート
SpringがサポートするPROPAGATION_NESTED
の伝搬動作は、ネストされたトランザクションをトリガーします。これは、現在のトランザクションとは独立して開始される子トランザクションです。ネストされたトランザクションが開始されると、 savepoint
が記録されます。ネストされたトランザクションが失敗した場合、トランザクションはsavepoint
状態にロールバックします。ネストされたトランザクションは外部トランザクションの一部であり、外部トランザクションとともにコミットされます。
次の例は、 savepoint
メカニズムを示しています。
mysql> BEGIN;
mysql> INSERT INTO T2 VALUES(100);
mysql> SAVEPOINT svp1;
mysql> INSERT INTO T2 VALUES(200);
mysql> ROLLBACK TO SAVEPOINT svp1;
mysql> RELEASE SAVEPOINT svp1;
mysql> COMMIT;
mysql> SELECT * FROM T2;
+------+
| ID |
+------+
| 100 |
+------+
ノート:
v6.2.0 以降、TiDB は
savepoint
の機能をサポートしています。 TiDB クラスターが v6.2.0 より前の場合、TiDB クラスターはPROPAGATION_NESTED
の動作をサポートしていません。アプリケーションがPROPAGATION_NESTED
の伝播動作を使用するJava Springフレームワークに基づいている場合は、ネストされたトランザクションのロジックを削除するために、アプリケーション側でそれを適応させる必要があります。
大口取引制限
基本原則は、トランザクションのサイズを制限することです。 KV レベルでは、TiDB には 1 つのトランザクションのサイズに制限があります。 SQL レベルでは、1 行のデータが 1 つの KV エントリにマップされ、追加のインデックスごとに 1 つの KV エントリが追加されます。 SQL レベルでの制限は次のとおりです。
- 単一行レコードの最大サイズは
120 MB
です。 TiDB v5.0 以降のバージョンではperformance.txn-entry-size-limit
で構成できます。以前のバージョンの値は6 MB
です。 - サポートされる単一トランザクションの最大サイズは
10 GB
です。 TiDB v4.0 以降のバージョンではperformance.txn-total-size-limit
で構成できます。以前のバージョンの値は100 MB
です。
サイズ制限と行制限の両方について、トランザクション実行中のトランザクションのエンコードと追加のキーのオーバーヘッドも考慮する必要があることに注意してください。最適なパフォーマンスを実現するには、100 ~ 500 行ごとに 1 つのトランザクションを書き込むことをお勧めします。
自動コミットされたSELECT FOR UPDATE
ステートメントはロックを待機しません
現在、ロックは自動コミットされたSELECT FOR UPDATE
ステートメントに追加されていません。この効果を次の図に示します。
これは、MySQL との既知の非互換性の問題です。この問題は、明示的なBEGIN;COMMIT;
ステートメントを使用して解決できます。