custom_lock = "SELECT pg_advisory_xact_lock(%s);" def lock(conn, lock_key): cur = conn.cursor() try: # Acquire the custom lock outside of the transaction cur.execute(custom_lock, (lock_key,)) conn.commit() except Exception as e: # Handle exceptions conn.rollback() print(f"Error acquiring lock: {e}") finally: # Close the cursor cur.close() custom_unlock = "SELECT pg_advisory_unlock(%s);" def unlock(conn, lock_key): cur = conn.cursor() try: # Release the custom lock outside of the transaction cur.execute(custom_unlock, (lock_key,)) conn.commit() except Exception as e: # Handle exceptions conn.rollback() print(f"Error releasing lock: {e}") finally: # Close the cursor cur.close() """lock_key: C'est un entier qui peut être l'ID de la transaction par exemple qu'on peut obtenir comme suit:""" def get_transaction_ID(cur): # Execute the txid_current() function to obtain the current transaction ID cur.execute("SELECT txid_current();") # Fetch the result transaction_id = cur.fetchone()[0] print(f"Current Transaction ID: {transaction_id}") def get_account_id(conn, sender): cur = conn.cursor() try: select_sender_account_query = sql.SQL(""" SELECT account_id FROM account AS a JOIN customer AS c ON c.customer_id = a.customer_id WHERE c.first_name = %s; """) cur.execute(select_sender_account_query, (sender,)) sender_account_id = cur.fetchone()[0] return sender_account_id except Exception as e: print(f"Error getting account id: {e}") finally: cur.close() def pessimistic_method(conn, sender, recipient, amount, transfert_name="0"): cur = conn.cursor() conn.autocommit = False cur.execute("START TRANSACTION;") try: sender_account_id = get_account_id(conn, sender) recipient_account_id = get_account_id(conn, recipient) # todo deadlock ? print(f"{transfert_name}: Waiting for lock {sender} and {recipient}") lock(conn, sender_account_id) print(f"{transfert_name}: Lock {sender}({sender_account_id}) acquired") time.sleep(3) lock(conn, recipient_account_id) print(f"{transfert_name}: Lock {recipient}({recipient_account_id}) acquired") perform_transfer(conn, sender, recipient, amount) print(f"{transfert_name}: Transfer performed : {sender} -> {recipient} ({amount})") # unlock(conn, sender_account_id) # unlock(conn, recipient_account_id) conn.commit() print(f'Transfer completed: {amount} from {sender} to {recipient}') except psycopg2.Error as e: # Handle exceptions conn.rollback() print(f"Error: {e}") finally: # Close the cursor cur.close() # test méthode pessimistes Host = "localhost" Port = 5432 User = "postgres" Password = "postgres" Database = "postgres" conn1 = psycopg2.connect(dbname=Database, host=Host, user=User, password=Password, port=Port) conn2 = psycopg2.connect(dbname=Database, host=Host, user=User, password=Password, port=Port) try: process1 = multiprocessing.Process(target=pessimistic_method, args=(conn1, 'Brandy', 'Crystal', 7336, "1")) process2 = multiprocessing.Process(target=pessimistic_method, args=(conn2, 'Brandy', 'George', 7336, "2")) process1.start() process2.start() process1.join() process2.join() finally: # Retrieve the message after COMMIT using the connection print("Connection status:", conn1.status) print("Connection status:", conn2.status) # Close the connection conn1.close() conn2.close()