TiDB 和 Python 的简单 CRUD 应用程序

本文档将展示如何使用 TiDB 和 Python 来构造一个简单的 CRUD 应用程序。

第 1 步:启动你的 TiDB 集群

本节将介绍 TiDB 集群的启动方法。

  • TiDB Cloud
  • 本地集群
  • Gitpod

你可以部署一个本地测试的 TiDB 集群或正式的 TiDB 集群。详细步骤,请参考:

基于 Git 的预配置的开发环境:现在就试试

该环境会自动克隆代码,并通过 TiUP 部署测试集群。

第 2 步:获取代码

git clone https://github.com/pingcap-inc/tidb-example-python.git
  • 使用 SQLAlchemy(推荐)
  • 使用 peewee(推荐)
  • 使用 mysqlclient
  • 使用 PyMySQL
  • 使用 mysql-connector-python

SQLAlchemy 为当前比较流行的开源 Python ORM 之一。此处将以 SQLAlchemy 1.4.44 版本进行说明。

import uuid from typing import List from sqlalchemy import create_engine, String, Column, Integer, select, func from sqlalchemy.orm import declarative_base, sessionmaker engine = create_engine('mysql://root:@127.0.0.1:4000/test') Base = declarative_base() Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) class Player(Base): __tablename__ = "player" id = Column(String(36), primary_key=True) coins = Column(Integer) goods = Column(Integer) def __repr__(self): return f'Player(id={self.id!r}, coins={self.coins!r}, goods={self.goods!r})' def random_player(amount: int) -> List[Player]: players = [] for _ in range(amount): players.append(Player(id=uuid.uuid4(), coins=10000, goods=10000)) return players def simple_example() -> None: with Session() as session: # create a player, who has a coin and a goods. session.add(Player(id="test", coins=1, goods=1)) # get this player, and print it. get_test_stmt = select(Player).where(Player.id == "test") for player in session.scalars(get_test_stmt): print(player) # create players with bulk inserts. # insert 1919 players totally, with 114 players per batch. # each player has a random UUID player_list = random_player(1919) for idx in range(0, len(player_list), 114): session.bulk_save_objects(player_list[idx:idx + 114]) # print the number of players count = session.query(func.count(Player.id)).scalar() print(f'number of players: {count}') # print 3 players. three_players = session.query(Player).limit(3).all() for player in three_players: print(player) session.commit() def trade_check(session: Session, sell_id: str, buy_id: str, amount: int, price: int) -> bool: # sell player goods check sell_player = session.query(Player.goods).filter(Player.id == sell_id).with_for_update().one() if sell_player.goods < amount: print(f'sell player {sell_id} goods not enough') return False # buy player coins check buy_player = session.query(Player.coins).filter(Player.id == buy_id).with_for_update().one() if buy_player.coins < price: print(f'buy player {buy_id} coins not enough') return False def trade(sell_id: str, buy_id: str, amount: int, price: int) -> None: with Session() as session: if trade_check(session, sell_id, buy_id, amount, price) is False: return # deduct the goods of seller, and raise his/her the coins session.query(Player).filter(Player.id == sell_id). \ update({'goods': Player.goods - amount, 'coins': Player.coins + price}) # deduct the coins of buyer, and raise his/her the goods session.query(Player).filter(Player.id == buy_id). \ update({'goods': Player.goods + amount, 'coins': Player.coins - price}) session.commit() print("trade success") def trade_example() -> None: with Session() as session: # create two players # player 1: id is "1", has only 100 coins. # player 2: id is "2", has 114514 coins, and 20 goods. session.add(Player(id="1", coins=100, goods=0)) session.add(Player(id="2", coins=114514, goods=20)) session.commit() # player 1 wants to buy 10 goods from player 2. # it will cost 500 coins, but player 1 cannot afford it. # so this trade will fail, and nobody will lose their coins or goods trade(sell_id="2", buy_id="1", amount=10, price=500) # then player 1 has to reduce the incoming quantity to 2. # this trade will be successful trade(sell_id="2", buy_id="1", amount=2, price=100) with Session() as session: traders = session.query(Player).filter(Player.id.in_(("1", "2"))).all() for player in traders: print(player) session.commit() simple_example() trade_example()

相较于直接使用 Driver,SQLAlchemy 屏蔽了创建数据库连接时,不同数据库差异的细节。SQLAlchemy 还封装了大量的操作,如会话管理、基本对象的 CRUD 等,极大地简化了代码量。

Player 类为数据库表在程序内的映射。Player 的每个属性都对应着 player 表的一个字段。SQLAlchemy 使用 Player 类为了给 SQLAlchemy 提供更多的信息,使用了形如以上示例中的 id = Column(String(36), primary_key=True) 的类型定义,用来指示字段类型和其附加属性。id = Column(String(36), primary_key=True) 表示 id 字段为 String 类型,对应数据库类型为 VARCHAR,长度为 36,且为主键。

关于 SQLAlchemy 的更多使用方法,你可以参考 SQLAlchemy 官网

peewee 为当前比较流行的开源 Python ORM 之一。此处将以 peewee 3.15.4 版本进行说明。

import os import uuid from typing import List from peewee import * from playhouse.db_url import connect db = connect('mysql://root:@127.0.0.1:4000/test') class Player(Model): id = CharField(max_length=36, primary_key=True) coins = IntegerField() goods = IntegerField() class Meta: database = db table_name = "player" def random_player(amount: int) -> List[Player]: players = [] for _ in range(amount): players.append(Player(id=uuid.uuid4(), coins=10000, goods=10000)) return players def simple_example() -> None: # create a player, who has a coin and a goods. Player.create(id="test", coins=1, goods=1) # get this player, and print it. test_player = Player.select().where(Player.id == "test").get() print(f'id:{test_player.id}, coins:{test_player.coins}, goods:{test_player.goods}') # create players with bulk inserts. # insert 1919 players totally, with 114 players per batch. # each player has a random UUID player_list = random_player(1919) Player.bulk_create(player_list, 114) # print the number of players count = Player.select().count() print(f'number of players: {count}') # print 3 players. three_players = Player.select().limit(3) for player in three_players: print(f'id:{player.id}, coins:{player.coins}, goods:{player.goods}') def trade_check(sell_id: str, buy_id: str, amount: int, price: int) -> bool: sell_goods = Player.select(Player.goods).where(Player.id == sell_id).get().goods if sell_goods < amount: print(f'sell player {sell_id} goods not enough') return False buy_coins = Player.select(Player.coins).where(Player.id == buy_id).get().coins if buy_coins < price: print(f'buy player {buy_id} coins not enough') return False return True def trade(sell_id: str, buy_id: str, amount: int, price: int) -> None: with db.atomic() as txn: try: if trade_check(sell_id, buy_id, amount, price) is False: txn.rollback() return # deduct the goods of seller, and raise his/her the coins Player.update(goods=Player.goods - amount, coins=Player.coins + price).where(Player.id == sell_id).execute() # deduct the coins of buyer, and raise his/her the goods Player.update(goods=Player.goods + amount, coins=Player.coins - price).where(Player.id == buy_id).execute() except Exception as err: txn.rollback() print(f'something went wrong: {err}') else: txn.commit() print("trade success") def trade_example() -> None: # create two players # player 1: id is "1", has only 100 coins. # player 2: id is "2", has 114514 coins, and 20 goods. Player.create(id="1", coins=100, goods=0) Player.create(id="2", coins=114514, goods=20) # player 1 wants to buy 10 goods from player 2. # it will cost 500 coins, but player 1 cannot afford it. # so this trade will fail, and nobody will lose their coins or goods trade(sell_id="2", buy_id="1", amount=10, price=500) # then player 1 has to reduce the incoming quantity to 2. # this trade will be successful trade(sell_id="2", buy_id="1", amount=2, price=100) # let's take a look for player 1 and player 2 currently after_trade_players = Player.select().where(Player.id.in_(["1", "2"])) for player in after_trade_players: print(f'id:{player.id}, coins:{player.coins}, goods:{player.goods}') db.connect() # recreate the player table db.drop_tables([Player]) db.create_tables([Player]) simple_example() trade_example()

相较于直接使用 Driver,peewee 屏蔽了创建数据库连接时,不同数据库差异的细节。peewee 还封装了大量的操作,如会话管理、基本对象的 CRUD 等,极大地简化了代码量。

Player 类为数据库表在程序内的映射。Player 的每个属性都对应着 player 表的一个字段。peewee 使用 Player 类为了给 peewee 提供更多的信息,使用了形如以上示例中的 id = CharField(max_length=36, primary_key=True) 的类型定义,用来指示字段类型和其附加属性。id = CharField(max_length=36, primary_key=True) 表示 id 字段为 CharField 类型,对应数据库类型为 VARCHAR,长度为 36,且为主键。

关于 peewee 的更多使用方法,你可以参考 peewee 官网

mysqlclient 为当前比较流行的开源 Python Driver 之一。此处将以 mysqlclient 2.1.1 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。

import uuid from typing import List import MySQLdb from MySQLdb import Connection from MySQLdb.cursors import Cursor def get_connection(autocommit: bool = True) -> MySQLdb.Connection: return MySQLdb.connect( host="127.0.0.1", port=4000, user="root", password="", database="test", autocommit=autocommit ) def create_player(cursor: Cursor, player: tuple) -> None: cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player) def get_player(cursor: Cursor, player_id: str) -> tuple: cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,)) return cursor.fetchone() def get_players_with_limit(cursor: Cursor, limit: int) -> List[tuple]: cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,)) return cursor.fetchall() def random_player(amount: int) -> List[tuple]: players = [] for _ in range(amount): players.append((uuid.uuid4(), 10000, 10000)) return players def bulk_create_player(cursor: Cursor, players: List[tuple]) -> None: cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players) def get_count(cursor: Cursor) -> None: cursor.execute("SELECT count(*) FROM player") return cursor.fetchone()[0] def trade_check(cursor: Cursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool: get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE" # sell player goods check cursor.execute(get_player_with_lock_sql, (sell_id,)) _, sell_goods = cursor.fetchone() if sell_goods < amount: print(f'sell player {sell_id} goods not enough') return False # buy player coins check cursor.execute(get_player_with_lock_sql, (buy_id,)) buy_coins, _ = cursor.fetchone() if buy_coins < price: print(f'buy player {buy_id} coins not enough') return False def trade_update(cursor: Cursor, sell_id: str, buy_id: str, amount: int, price: int) -> None: update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s" # deduct the goods of seller, and raise his/her the coins cursor.execute(update_player_sql, (-amount, price, sell_id)) # deduct the coins of buyer, and raise his/her the goods cursor.execute(update_player_sql, (amount, -price, buy_id)) def trade(connection: Connection, sell_id: str, buy_id: str, amount: int, price: int) -> None: with connection.cursor() as cursor: if trade_check(cursor, sell_id, buy_id, amount, price) is False: connection.rollback() return try: trade_update(cursor, sell_id, buy_id, amount, price) except Exception as err: connection.rollback() print(f'something went wrong: {err}') else: connection.commit() print("trade success") def simple_example() -> None: with get_connection(autocommit=True) as conn: with conn.cursor() as cur: # create a player, who has a coin and a goods. create_player(cur, ("test", 1, 1)) # get this player, and print it. test_player = get_player(cur, "test") print(f'id:{test_player[0]}, coins:{test_player[1]}, goods:{test_player[2]}') # create players with bulk inserts. # insert 1919 players totally, with 114 players per batch. # each player has a random UUID player_list = random_player(1919) for idx in range(0, len(player_list), 114): bulk_create_player(cur, player_list[idx:idx + 114]) # print the number of players count = get_count(cur) print(f'number of players: {count}') # print 3 players. three_players = get_players_with_limit(cur, 3) for player in three_players: print(f'id:{player[0]}, coins:{player[1]}, goods:{player[2]}') def trade_example() -> None: with get_connection(autocommit=False) as conn: with conn.cursor() as cur: # create two players # player 1: id is "1", has only 100 coins. # player 2: id is "2", has 114514 coins, and 20 goods. create_player(cur, ("1", 100, 0)) create_player(cur, ("2", 114514, 20)) conn.commit() # player 1 wants to buy 10 goods from player 2. # it will cost 500 coins, but player 1 cannot afford it. # so this trade will fail, and nobody will lose their coins or goods trade(conn, sell_id="2", buy_id="1", amount=10, price=500) # then player 1 has to reduce the incoming quantity to 2. # this trade will be successful trade(conn, sell_id="2", buy_id="1", amount=2, price=100) # let's take a look for player 1 and player 2 currently with conn.cursor() as cur: _, player1_coin, player1_goods = get_player(cur, "1") print(f'id:1, coins:{player1_coin}, goods:{player1_goods}') _, player2_coin, player2_goods = get_player(cur, "2") print(f'id:2, coins:{player2_coin}, goods:{player2_goods}') simple_example() trade_example()

Driver 有着更低的封装程度,因此我们可以在程序内见到大量的 SQL。程序内查询到的 Player,与 ORM 不同,因为没有数据对象的存在,Player 将以元组 (tuple) 进行表示。

关于 mysqlclient 的更多使用方法,你可以参考 mysqlclient 官方文档

PyMySQL 为当前比较流行的开源 Python Driver 之一。此处将以 PyMySQL 1.0.2 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。

import uuid from typing import List import pymysql.cursors from pymysql import Connection from pymysql.cursors import DictCursor def get_connection(autocommit: bool = False) -> Connection: return pymysql.connect(host='127.0.0.1', port=4000, user='root', password='', database='test', cursorclass=DictCursor, autocommit=autocommit) def create_player(cursor: DictCursor, player: tuple) -> None: cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player) def get_player(cursor: DictCursor, player_id: str) -> dict: cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,)) return cursor.fetchone() def get_players_with_limit(cursor: DictCursor, limit: int) -> tuple: cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,)) return cursor.fetchall() def random_player(amount: int) -> List[tuple]: players = [] for _ in range(amount): players.append((uuid.uuid4(), 10000, 10000)) return players def bulk_create_player(cursor: DictCursor, players: List[tuple]) -> None: cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players) def get_count(cursor: DictCursor) -> int: cursor.execute("SELECT count(*) as count FROM player") return cursor.fetchone()['count'] def trade_check(cursor: DictCursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool: get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE" # sell player goods check cursor.execute(get_player_with_lock_sql, (sell_id,)) seller = cursor.fetchone() if seller['goods'] < amount: print(f'sell player {sell_id} goods not enough') return False # buy player coins check cursor.execute(get_player_with_lock_sql, (buy_id,)) buyer = cursor.fetchone() if buyer['coins'] < price: print(f'buy player {buy_id} coins not enough') return False def trade_update(cursor: DictCursor, sell_id: str, buy_id: str, amount: int, price: int) -> None: update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s" # deduct the goods of seller, and raise his/her the coins cursor.execute(update_player_sql, (-amount, price, sell_id)) # deduct the coins of buyer, and raise his/her the goods cursor.execute(update_player_sql, (amount, -price, buy_id)) def trade(connection: Connection, sell_id: str, buy_id: str, amount: int, price: int) -> None: with connection.cursor() as cursor: if trade_check(cursor, sell_id, buy_id, amount, price) is False: connection.rollback() return try: trade_update(cursor, sell_id, buy_id, amount, price) except Exception as err: connection.rollback() print(f'something went wrong: {err}') else: connection.commit() print("trade success") def simple_example() -> None: with get_connection(autocommit=True) as connection: with connection.cursor() as cur: # create a player, who has a coin and a goods. create_player(cur, ("test", 1, 1)) # get this player, and print it. test_player = get_player(cur, "test") print(test_player) # create players with bulk inserts. # insert 1919 players totally, with 114 players per batch. # each player has a random UUID player_list = random_player(1919) for idx in range(0, len(player_list), 114): bulk_create_player(cur, player_list[idx:idx + 114]) # print the number of players count = get_count(cur) print(f'number of players: {count}') # print 3 players. three_players = get_players_with_limit(cur, 3) for player in three_players: print(player) def trade_example() -> None: with get_connection(autocommit=False) as connection: with connection.cursor() as cur: # create two players # player 1: id is "1", has only 100 coins. # player 2: id is "2", has 114514 coins, and 20 goods. create_player(cur, ("1", 100, 0)) create_player(cur, ("2", 114514, 20)) connection.commit() # player 1 wants to buy 10 goods from player 2. # it will cost 500 coins, but player 1 cannot afford it. # so this trade will fail, and nobody will lose their coins or goods trade(connection, sell_id="2", buy_id="1", amount=10, price=500) # then player 1 has to reduce the incoming quantity to 2. # this trade will be successful trade(connection, sell_id="2", buy_id="1", amount=2, price=100) # let's take a look for player 1 and player 2 currently with connection.cursor() as cur: print(get_player(cur, "1")) print(get_player(cur, "2")) simple_example() trade_example()

Driver 有着更低的封装程度,因此我们可以在程序内见到大量的 SQL。程序内查询到的 Player,与 ORM 不同,因为没有数据对象的存在,Player 将以 dict 进行表示。

关于 PyMySQL 的更多使用方法,你可以参考 PyMySQL 官方文档

mysql-connector-python 为当前比较流行的开源 Python Driver 之一。此处将以 mysql-connector-python 8.0.31 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。

import uuid from typing import List from mysql.connector import connect, MySQLConnection from mysql.connector.cursor import MySQLCursor def get_connection(autocommit: bool = True) -> MySQLConnection: connection = connect(host='127.0.0.1', port=4000, user='root', password='', database='test') connection.autocommit = autocommit return connection def create_player(cursor: MySQLCursor, player: tuple) -> None: cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player) def get_player(cursor: MySQLCursor, player_id: str) -> tuple: cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,)) return cursor.fetchone() def get_players_with_limit(cursor: MySQLCursor, limit: int) -> List[tuple]: cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,)) return cursor.fetchall() def random_player(amount: int) -> List[tuple]: players = [] for _ in range(amount): players.append((str(uuid.uuid4()), 10000, 10000)) return players def bulk_create_player(cursor: MySQLCursor, players: List[tuple]) -> None: cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players) def get_count(cursor: MySQLCursor) -> int: cursor.execute("SELECT count(*) FROM player") return cursor.fetchone()[0] def trade_check(cursor: MySQLCursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool: get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE" # sell player goods check cursor.execute(get_player_with_lock_sql, (sell_id,)) _, sell_goods = cursor.fetchone() if sell_goods < amount: print(f'sell player {sell_id} goods not enough') return False # buy player coins check cursor.execute(get_player_with_lock_sql, (buy_id,)) buy_coins, _ = cursor.fetchone() if buy_coins < price: print(f'buy player {buy_id} coins not enough') return False def trade_update(cursor: MySQLCursor, sell_id: str, buy_id: str, amount: int, price: int) -> None: update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s" # deduct the goods of seller, and raise his/her the coins cursor.execute(update_player_sql, (-amount, price, sell_id)) # deduct the coins of buyer, and raise his/her the goods cursor.execute(update_player_sql, (amount, -price, buy_id)) def trade(connection: MySQLConnection, sell_id: str, buy_id: str, amount: int, price: int) -> None: with connection.cursor() as cursor: if trade_check(cursor, sell_id, buy_id, amount, price) is False: connection.rollback() return try: trade_update(cursor, sell_id, buy_id, amount, price) except Exception as err: connection.rollback() print(f'something went wrong: {err}') else: connection.commit() print("trade success") def simple_example() -> None: with get_connection(autocommit=True) as connection: with connection.cursor() as cur: # create a player, who has a coin and a goods. create_player(cur, ("test", 1, 1)) # get this player, and print it. test_player = get_player(cur, "test") print(f'id:{test_player[0]}, coins:{test_player[1]}, goods:{test_player[2]}') # create players with bulk inserts. # insert 1919 players totally, with 114 players per batch. # all players have random uuid print(f'start to insert one by one, it will take a long time') player_list = random_player(1919) for idx in range(0, len(player_list), 114): print(f'inserted {idx} players') bulk_create_player(cur, player_list[idx:idx + 114]) # print the number of players count = get_count(cur) print(f'number of players: {count}') # print 3 players. three_players = get_players_with_limit(cur, 3) for player in three_players: print(f'id:{player[0]}, coins:{player[1]}, goods:{player[2]}') def trade_example() -> None: with get_connection(autocommit=False) as conn: with conn.cursor() as cur: # create two players # player 1: id is "1", has only 100 coins. # player 2: id is "2", has 114514 coins, and 20 goods. create_player(cur, ("1", 100, 0)) create_player(cur, ("2", 114514, 20)) conn.commit() # player 1 wants to buy 10 goods from player 2. # it will cost 500 coins, but player 1 cannot afford it. # so this trade will fail, and nobody will lose their coins or goods trade(conn, sell_id="2", buy_id="1", amount=10, price=500) # then player 1 has to reduce the incoming quantity to 2. # this trade will be successful trade(conn, sell_id="2", buy_id="1", amount=2, price=100) # let's take a look for player 1 and player 2 currently with conn.cursor() as cur: _, player1_coin, player1_goods = get_player(cur, "1") print(f'id:1, coins:{player1_coin}, goods:{player1_goods}') _, player2_coin, player2_goods = get_player(cur, "2") print(f'id:2, coins:{player2_coin}, goods:{player2_goods}') simple_example() trade_example()

Driver 有着更低的封装程度,因此我们可以在程序内见到大量的 SQL。程序内查询到的 Player,与 ORM 不同,因为没有数据对象的存在,Player 将以 tuple 进行表示。

关于 mysql-connector-python 的更多使用方法,你可以参考 mysql-connector-python 官方文档

第 3 步:运行代码

本节将逐步介绍代码的运行方法。

第 3 步第 1 部分:表初始化

本示例需手动初始化表,若你使用本地集群,可直接运行:

  • MySQL CLI
  • MyCLI
mysql --host 127.0.0.1 --port 4000 -u root < player_init.sql
mycli --host 127.0.0.1 --port 4000 -u root --no-warn < player_init.sql

若不使用本地集群,或未安装命令行客户端,请用喜欢的方式(如 Navicat、DBeaver 等 GUI 工具)直接登录集群,并运行 player_init.sql 文件内的 SQL 语句。

第 3 步第 2 部分:TiDB Cloud 更改参数

若你使用了 TiDB Serverless 集群,此处需使用系统本地的 CA 证书,并将证书路径记为 <ca_path> 以供后续指代。请参考以下系统相关的证书路径地址:

  • MacOS / Alpine
  • Debian / Ubuntu / Arch
  • RedHat / Fedora / CentOS / Mageia
  • OpenSUSE

/etc/ssl/cert.pem

/etc/ssl/certs/ca-certificates.crt

/etc/pki/tls/certs/ca-bundle.crt

/etc/ssl/ca-bundle.pem

若设置后仍有证书错误,请查阅 TiDB Serverless 安全连接文档

  • 使用 SQLAlchemy(推荐)
  • 使用 peewee(推荐)
  • 使用 mysqlclient
  • 使用 PyMySQL
  • 使用 mysql-connector-python

若你使用 TiDB Serverless 集群,更改 sqlalchemy_example.pycreate_engine 函数的入参:

engine = create_engine('mysql://root:@127.0.0.1:4000/test')

若你设定的密码为 123456,而且从 TiDB Serverless 集群面板中得到的连接信息为:

  • Endpoint: xxx.tidbcloud.com
  • Port: 4000
  • User: 2aEp24QWEDLqRFs.root

那么此处应将 create_engine 更改为:

engine = create_engine('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test', connect_args={ "ssl_mode": "VERIFY_IDENTITY", "ssl": { "ca": "<ca_path>" } })

若你使用 TiDB Serverless 集群,更改 peewee_example.pyconnect 函数的入参:

db = connect('mysql://root:@127.0.0.1:4000/test')

若你设定的密码为 123456,而且从 TiDB Serverless 集群面板中得到的连接信息为:

  • Endpoint: xxx.tidbcloud.com
  • Port: 4000
  • User: 2aEp24QWEDLqRFs.root

那么此处应将 connect 更改为:

  • peewee 将 PyMySQL 作为 Driver 时:

    db = connect('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test', ssl_verify_cert=True, ssl_ca="<ca_path>")
  • peewee 将 mysqlclient 作为 Driver 时:

    db = connect('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test', ssl_mode="VERIFY_IDENTITY", ssl={"ca": "<ca_path>"})

由于 peewee 会将参数透传至 Driver 中,使用 peewee 时请注意 Driver 的使用类型。

若你使用 TiDB Serverless 集群,更改 mysqlclient_example.pyget_connection 函数:

def get_connection(autocommit: bool = True) -> MySQLdb.Connection: return MySQLdb.connect( host="127.0.0.1", port=4000, user="root", password="", database="test", autocommit=autocommit )

若你设定的密码为 123456,而且从 TiDB Serverless 集群面板中得到的连接信息为:

  • Endpoint: xxx.tidbcloud.com
  • Port: 4000
  • User: 2aEp24QWEDLqRFs.root

那么此处应将 get_connection 更改为:

def get_connection(autocommit: bool = True) -> MySQLdb.Connection: return MySQLdb.connect( host="xxx.tidbcloud.com", port=4000, user="2aEp24QWEDLqRFs.root", password="123456", database="test", autocommit=autocommit, ssl_mode="VERIFY_IDENTITY", ssl={ "ca": "<ca_path>" } )

若你使用 TiDB Serverless 集群,更改 pymysql_example.pyget_connection 函数:

def get_connection(autocommit: bool = False) -> Connection: return pymysql.connect(host='127.0.0.1', port=4000, user='root', password='', database='test', cursorclass=DictCursor, autocommit=autocommit)

若你设定的密码为 123456,而且从 TiDB Serverless 集群面板中得到的连接信息为:

  • Endpoint: xxx.tidbcloud.com
  • Port: 4000
  • User: 2aEp24QWEDLqRFs.root

那么此处应将 get_connection 更改为:

def get_connection(autocommit: bool = False) -> Connection: return pymysql.connect(host='xxx.tidbcloud.com', port=4000, user='2aEp24QWEDLqRFs.root', password='123546', database='test', cursorclass=DictCursor, autocommit=autocommit, ssl_ca='<ca_path>', ssl_verify_cert=True, ssl_verify_identity=True)

若你使用 TiDB Serverless 集群,更改 mysql_connector_python_example.pyget_connection 函数:

def get_connection(autocommit: bool = True) -> MySQLConnection: connection = connect(host='127.0.0.1', port=4000, user='root', password='', database='test') connection.autocommit = autocommit return connection

若你设定的密码为 123456,而且从 TiDB Serverless 集群面板中得到的连接信息为:

  • Endpoint: xxx.tidbcloud.com
  • Port: 4000
  • User: 2aEp24QWEDLqRFs.root

那么此处应将 get_connection 更改为:

def get_connection(autocommit: bool = True) -> MySQLConnection: connection = connect( host="xxx.tidbcloud.com", port=4000, user="2aEp24QWEDLqRFs.root", password="123456", database="test", autocommit=autocommit, ssl_ca='<ca_path>', ssl_verify_identity=True ) connection.autocommit = autocommit return connection

第 3 步第 3 部分:运行

运行前请先安装依赖:

pip3 install -r requirement.txt

当以后需要多次运行脚本时,请在每次运行前先依照表初始化一节再次进行表初始化。

  • 使用 SQLAlchemy(推荐)
  • 使用 peewee(推荐)
  • 使用 mysqlclient
  • 使用 PyMySQL
  • 使用 mysql-connector-python
python3 sqlalchemy_example.py
python3 peewee_example.py
python3 mysqlclient_example.py
python3 pymysql_example.py
python3 mysql_connector_python_example.py

第 4 步:预期输出

  • 使用 SQLAlchemy(推荐)
  • 使用 peewee(推荐)
  • 使用 mysqlclient
  • 使用 PyMySQL
  • 使用 mysql-connector-python