Skip to content

Instantly share code, notes, and snippets.

@mzhang77
Created January 28, 2025 02:39
Show Gist options
  • Save mzhang77/cdaf373be5deb785e06d27d8089f1011 to your computer and use it in GitHub Desktop.
Save mzhang77/cdaf373be5deb785e06d27d8089f1011 to your computer and use it in GitHub Desktop.
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "mysql-connector-python",
# ]
# ///
import threading
import time
import mysql.connector
from mysql.connector import Error
# Database connection configuration
DB_CONFIG = {
'host': '127.0.0.1',
'port': 4000,
'user': 'root',
'password': '',
'database': 'test',
'autocommit': False,
}
connection = mysql.connector.connect(**DB_CONFIG)
cursor = connection.cursor(dictionary=True)
cursor.execute('drop table if exists table_1;')
cursor.execute('CREATE TABLE `table_1` ( `id` INTEGER NOT NULL PRIMARY KEY, `created_at` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), `value` INTEGER NOT NULL)')
cursor.execute('drop table if exists table_2')
cursor.execute('CREATE TABLE `table_2` ( `id` varchar(100) NOT NULL PRIMARY KEY, `created_at` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), `value` INTEGER NOT NULL, `table_1_id` INTEGER NOT NULL)')
cursor.close()
connection.close()
def execute_thread_1():
try:
connection = mysql.connector.connect(**DB_CONFIG)
cursor = connection.cursor(dictionary=True)
print("Thread 1: Starting transaction")
cursor.execute(
"SELECT table_1.id, table_1.value, table_1.created_at, tbl_2.id "
"FROM table_1 "
"LEFT JOIN (SELECT * FROM table_2) AS tbl_2 "
"ON table_1.id = tbl_2.table_1_id "
"FOR UPDATE;"
)
_ = cursor.fetchall()
print("Thread 1: Query executed, holding lock")
# Simulate processing delay
time.sleep(5)
print("Thread 1: Inserting into table_2")
cursor.execute(
"INSERT INTO table_2 (id, value, table_1_id) VALUES (%s, %s, %s);",
("2", 2, 1),
)
connection.commit()
print("Thread 1: Inserted row and committed transaction")
except Error as e:
print(f"Thread 1 Error: {e}")
finally:
if connection.is_connected():
cursor.close()
connection.close()
print("Thread 1: Connection closed")
def execute_thread_2():
try:
connection = mysql.connector.connect(**DB_CONFIG)
cursor = connection.cursor(dictionary=True)
print("Thread 2: Starting transaction")
cursor.execute('select table_1.id, table_1.value, table_1.created_at, tbl_2.id as tbl_2_id from table_1 left join (select * from table_2) as tbl_2 on table_1.id=tbl_2.table_1_id for update;'
)
print("Thread 2: Select for update executed, checking results")
rows = cursor.fetchall()
for row in rows:
print(f"Thread 2: Row - {row}")
cursor.execute('select table_1.id, table_1.value, table_1.created_at, tbl_2.id as tbl_2_id from table_1 left join (select * from table_2) as tbl_2 on table_1.id=tbl_2.table_1_id;'
)
print("Thread 2: Select executed, checking results")
rows = cursor.fetchall()
for row in rows:
print(f"Thread 2: Row - {row}")
except Error as e:
print(f"Thread 2 Error: {e}")
finally:
if connection.is_connected():
cursor.close()
connection.close()
print("Thread 2: Connection closed")
# Main function to start the threads
if __name__ == "__main__":
# Insert initial data into table_1
try:
connection = mysql.connector.connect(**DB_CONFIG)
cursor = connection.cursor()
cursor.execute("INSERT INTO table_1 (id, value) VALUES (1, 1);")
connection.commit()
print("Main: Inserted initial row into table_1")
except Error as e:
print(f"Main Error: {e}")
finally:
if connection.is_connected():
cursor.close()
connection.close()
# Start threads
thread_1 = threading.Thread(target=execute_thread_1)
thread_2 = threading.Thread(target=execute_thread_2)
thread_1.start()
time.sleep(1) # Ensure thread 1 starts first and holds the lock
thread_2.start()
thread_1.join()
thread_2.join()
print("Main: All threads completed")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment