Last active
October 22, 2025 17:31
-
-
Save tomfun/26f909789d168d44acbeed08f502cd91 to your computer and use it in GitHub Desktop.
Revisions
-
tomfun renamed this gist
Sep 29, 2023 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
tomfun revised this gist
May 6, 2023 . 1 changed file with 14 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -9,6 +9,20 @@ We will use a practical example to demonstrate how to implement pessimistic lock TypeORM library. Our example will focus on a payment processing scenario, where multiple processes might attempt to update the status of a payment concurrently. ## Potential Sources of Errors In the process of handling payments using an API and a database, there are several potential sources of errors that you may encounter: 1. **Network issues:** Communication between your application and the API server or the database server might be interrupted or delayed due to network problems or server unavailability. 2. **Concurrency issues:** Multiple transactions attempting to perform operations on the same records simultaneously can lead to race conditions, deadlocks, or other inconsistencies. 3. **API errors:** The API server might return errors due to invalid inputs, insufficient funds, or other unexpected issues. 4. **Database errors:** Problems with the database server, such as connection issues or query execution errors, can disrupt the payment processing flow. To address these potential errors, it's essential to design a robust and fault-tolerant payment handling system. Implementing idempotent logic using database transactions, locking mechanisms, and proper error handling can help ensure that your system can recover from errors and continue processing payments correctly, even in the presence of **retries** or failures. ## Introduction to Pessimistic Locking Pessimistic locking is a technique used to prevent concurrent access to a resource, such as a row in a database. -
tomfun revised this gist
May 6, 2023 . 1 changed file with 39 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -276,4 +276,42 @@ await Promise.all([ 00:26 | 2 initially skipped 00:26 | query: COMMIT ``` ## Optimistic Locking Optimistic locking is a concurrency control technique that assumes multiple transactions can complete without affecting each other. It only checks for conflicts when the transaction is ready to commit. If a conflict is detected, the transaction is rolled back and must be retried. ## Pessimistic Locking ### Pessimistic Write This mode locks the selected records for update, preventing other transactions from modifying them until the lock is released. Other transactions trying to update the locked records will have to wait for the lock to be released. ```sql SELECT ... FROM "payment" ... FOR UPDATE; ``` ### Pessimistic Write or Fail This mode attempts to lock the selected records for update. If any of the records are already locked, the query will fail immediately. ```sql SELECT ... FROM "payment" ... FOR UPDATE NOWAIT; ``` ### Pessimistic Partial Write This mode attempts to lock the selected records for update. If any of the records are already locked, it will return the remaining unlocked records and skip the locked ones. ```sql SELECT ... FROM "payment" ... FOR UPDATE SKIP LOCKED; ``` ### Pessimistic Read This mode allows multiple transactions to read the same record simultaneously but prevents any updates to the record until the lock is released. It leads to deadlocks in our example. ```sql SELECT ... FROM "payment" ... FOR SHARE; ``` -
tomfun created this gist
May 6, 2023 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,279 @@ # Pessimistic Locking in Node.js with TypeORM In this article, we will explore pessimistic locking, its main aspects, and how to implement it in Node.js using TypeORM. Pessimistic locking is a strategy to prevent concurrent access to a resource by obtaining exclusive access to the resource before making any changes. This approach ensures that other transactions cannot modify the locked resource until the lock is released. We will use a practical example to demonstrate how to implement pessimistic locking in a Node.js application using the TypeORM library. Our example will focus on a payment processing scenario, where multiple processes might attempt to update the status of a payment concurrently. ## Introduction to Pessimistic Locking Pessimistic locking is a technique used to prevent concurrent access to a resource, such as a row in a database. When a transaction needs to modify a resource, it first obtains a lock on the resource. Other transactions attempting to access the locked resource will either wait for the lock to be released or fail immediately, depending on the locking mode used. Pessimistic locking can help ensure data consistency in scenarios where multiple transactions compete for the same resource. However, it can also lead to reduced performance and potential deadlocks, so it's essential to use it judiciously. ## Implementing Pessimistic Locking with TypeORM In our example, we have a **PaymentEntity** that represents a payment in our system. We want to update the status of a payment atomically and ensure that no other process can change the status while the update is in progress. Here's the code that demonstrates how to implement pessimistic locking using TypeORM: ```ts export class AccountingService { private logger = new Logger(AccountingService.name); @InjectEntityManager() private readonly manager: EntityManager; constructor() { // ... code to initialize variables and run the example } private async justRead(id: number | string, time: number) { // ... code to read the payment status without locking } private async run(id: number, time: number) { // ... code to update the payment status with pessimistic locking } } ``` In this code snippet, we have an AccountingService class that uses the **EntityManager** from **TypeORM** to interact with the database. The **run** method will update the payment status using pessimistic locking, while the **justRead** method will read the payment status without acquiring any lock. Let's take a closer look at the **run** method, which performs the following steps: The `run` method addresses the main problem by performing the following steps within a separate database connection: 1. Obtain a pessimistic lock on the `PaymentEntity` row we want to update. 2. If the payment status is already set to `ok` or `failed`, skip the rest of the steps. 3. Simulate an API call to check/read the status of the completed action using a `setTimeout`. 4. If `done` is `false`, it means we don't have an API call yet. In this case, update the payment status based on the result of the simulated API call. 6. If `done` is `true`, we have the result of an already made API call by another thread/process, and we can update the payment status accordingly. This approach ensures that the status of the payment is updated atomically while preventing other processes from updating the same payment concurrently, thanks to the pessimistic lock. ```ts await queryRunner.startTransaction(); const p = await m.findOne(PaymentEntity, { where: { uuid }, lock: { mode: 'pessimistic_write' }, }); const done = // ...check if we already have called API in another process. e.g. "get payment by saved id" if (!done) { // we will do API call now. e.g. "make a Payment" done = true; p.status = status; await m.save(p); await queryRunner.commitTransaction(); } else { const status = // get its operation status p.status = status; await m.save(p); await queryRunner.commitTransaction(); } ``` Here is **close to reallife** example: ```ts import { EntityManager } from 'typeorm'; async function processPayment(entityManager: EntityManager, paymentId: number) { await entityManager.transaction(async (db) => { const payment = await db.findOne(PaymentEntity, paymentId, { lock: { mode: 'pessimistic_write' }, }); if (payment.status === 'completed' || payment.status === 'failed') { return; } const done = // ...check if we already have called API in another process. e.g. "get payment by saved id" if (!done) { // we will do API call now. e.g. "make a Payment" done = true; const apiCallId = await apiMakePayment(payment); payment.api_call_id = apiCallId; payment.status = apiCallId ? 'completed' : 'failed'; await db.save(payment); } else { const status = // get its operation status payment.status = status; await db.save(payment); } await db.commitTransaction(); }); } // Fictional API call functions async function apiCheckPaymentStatus(apiCallId: string): Promise<string> { // ... } async function apiMakePayment(payment: PaymentEntity): Promise<string> { // ... } ``` ## Launch (stripped) We do not simulate DB error, start from 'new' status, have 3 concurrent jobs and 1 non locking read: ```ts await Promise.all([ run(1, 15000), run(2, 5000), run(3, 1000), justRead('r', 500), ]); ``` The output ```sql 00:00 Server listening at http://0.0.0.0:3005 00:00 query: UPDATE "payment" SET "status" = $1, "updatedAt" = CURRENT_TIMESTAMP WHERE "uuid" = $2 -- PARAMETERS: ["new","e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:00 begin game 00:00 query: START TRANSACTION 00:00 query: SELECT payment details WHERE "uuid" = $1 LIMIT 1 FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:00 1 got new 00:00 query: START TRANSACTION 00:00 query: SELECT payment details WHERE "uuid" = $1 LIMIT 1 FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:00 query: START TRANSACTION 00:00 query: SELECT payment details WHERE "uuid" = $1 LIMIT 1 FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:00 query: SELECT payment details WHERE "uuid" = $1 LIMIT 1 -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:00 r jstread got new 00:15 1 API call: OK 00:15 query: SELECT payment details WHERE "uuid" IN ($1) -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:15 query: UPDATE "payment" SET "status" = $1, "updatedAt" = CURRENT_TIMESTAMP WHERE "uuid" IN ($2) RETURNING "updatedAt" -- PARAMETERS: ["payed","e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:15 query: COMMIT 00:15 1 done 00:15 3 got payed 00:15 3 initially skipped 00:15 query: COMMIT 00:15 2 got payed 00:15 2 initially skipped 00:15 query: COMMIT ``` ### Already processed start from completed task ```ts // this.manager.update( // PaymentEntity, // { uuid: 'e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac' }, // { status: PaymentStatus.NEW }, // ); ``` Will give ```sql 00:00 | begin game 00:00 | query: START TRANSACTION 00:00 | query: SELECT ... FROM "payment" ... FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:00 | 1 got payed 00:00 | 1 initially skipped 00:00 | query: COMMIT 00:00 | query: START TRANSACTION 00:00 | query: SELECT ... FROM "payment" ... FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:00 | query: START TRANSACTION 00:00 | 2 got payed 00:00 | 2 initially skipped 00:00 | query: COMMIT 00:00 | query: SELECT ... FROM "payment" ... FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:00 | 3 got payed 00:00 | 3 initially skipped 00:00 | query: COMMIT 00:01 | query: SELECT ... FROM "payment" ... -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:01 | r jstread got payed ``` - The game begins, and multiple transactions are initiated. - Transactions 1, 2, and 3 get payed and initially skipped. - Transaction 1, 2, and 3 are committed. - A read-only transaction 'r' reads the payment information. ### DB Error ```ts // To simulate DB error: // uncomment queryRunner.release(); this.logger.log(`\n${id} simulate db error`); // comment // await m.save(p); ``` This code simulates a database error by releasing the query runner before attempting to save the payment entity. The logger logs a message with the id and the text "simulate db error" to indicate that the error has been simulated. The second block of code simulates a new fresh job: ```ts // To simulate new fresh job: this.manager.update( PaymentEntity, { uuid: 'e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac' }, { status: PaymentStatus.NEW }, ); await new Promise((r) => setTimeout(r, 500)); this.logger.log(`\nbegin game\n`); await Promise.all([ run(1, 15000), run(2, 5000), run(3, 1000), justRead('r', 500), ]).catch((e) => setTimeout(() => console.log(e), 5000)); // log error delayed ``` ```sql 00:00 | begin game 00:01 | query: START TRANSACTION 00:01 | query: SELECT ... FROM "payment" ... FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:01 | 1 got new 00:01 | query: START TRANSACTION 00:01 | query: SELECT ... FROM "payment" ... FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:01 | query: START TRANSACTION 00:01 | query: SELECT ... FROM "payment" ... FOR UPDATE -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:01 | query: SELECT ... FROM "payment" ... -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:01 | r jstread got new 00:15 | 1 API call: OK 00:15 | 1 simulate db error 00:15 | QueryRunnerAlreadyReleasedError: Query runner already released. Cannot run queries anymore. 00:25 | 3 got new 00:26 | 3 API already affected - switch status OK 00:26 | query: SELECT ... FROM "payment" ... -- PARAMETERS: ["e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:26 | query: UPDATE "payment" SET "status" = $1, "updatedAt" = CURRENT_TIMESTAMP WHERE "uuid" IN ($2) RETURNING "updatedAt" -- PARAMETERS: ["payed","e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac"] 00:26 | query: COMMIT 00:26 | 2 got payed 00:26 | 2 initially skipped 00:26 | query: COMMIT ``` This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,108 @@ import { Logger } from '@nestjs/common'; import { InjectEntityManager } from '@nestjs/typeorm/dist/common/typeorm.decorators'; import { EntityManager } from 'typeorm'; import { QueryRunner } from 'typeorm/query-runner/QueryRunner'; import { PaymentEntity, PaymentStatus, } from '../entity'; export class AccountingService { private logger = new Logger(AccountingService.name); @InjectEntityManager() private readonly manager: EntityManager; constructor() { const uuid = 'e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac'; const OK = PaymentStatus.PAYED; const FAILED = PaymentStatus.REFUNDED; const status = Math.random() > 0.8 ? FAILED : OK; const statusText = status === OK ? 'OK' : 'FAILED'; let done = false; const justRead = async (id: number | string, time: number) => { const queryRunner = this.manager.connection.createQueryRunner(); await queryRunner.connect(); try { const m = queryRunner.manager; await new Promise((r) => setTimeout(r, time)); const p = await m.findOne(PaymentEntity, { where: { uuid: 'e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac' }, }); this.logger.log(`\n${id} jstread got ${p && p.status}`); } finally { await queryRunner.release(); } }; const run = async (id: number, time: number) => { const queryRunner = this.manager.connection.createQueryRunner(); await queryRunner.connect(); try { const m = queryRunner.manager; await queryRunner.startTransaction(); /* pessimistic_write = SELECT ... FOR UPDATE - wait for commit pessimistic_write_or_fail = SELECT ... FOR UPDATE NOWAIT - fail if can't lock pessimistic_partial_write = SELECT ... FOR UPDATE SKIP LOCKED - return null pessimistic_read = SELECT ... FOR SHARE - read & deadlocks :( */ const p = await m.findOne(PaymentEntity, { where: { uuid }, lock: { mode: 'pessimistic_write' }, }); this.logger.log(`\n${id} got ${p && p.status}`); if (p.status === OK || p.status === FAILED) { this.logger.log(`\n${id} initially skipped`); await queryRunner.commitTransaction(); return; } await new Promise((r) => setTimeout(r, time)); if (!done) { done = true; p.status = status; this.logger.log(`\n${id} API call: ${statusText}`); // To simulate DB error: // uncomment // queryRunner.release(); // this.logger.log(`\n${id} simulate db error`); // comment await m.save(p); await queryRunner.commitTransaction(); this.logger.log(`\n${id} done`); } else { this.logger.log( `\n${id} API already affected - switch status ${statusText}`, ); p.status = status; await m.save(p); await queryRunner.commitTransaction(); return; } } finally { await queryRunner.release(); } }; setTimeout(async () => { // To simulate new fresh job: this.manager.update( PaymentEntity, { uuid: 'e1b45f7f-9945-4e89-8381-ae4fe7a8c5ac' }, { status: PaymentStatus.NEW }, ); await new Promise((r) => setTimeout(r, 500)); this.logger.log(`\nbegin game\n`); await Promise.all([ run(1, 15000), run(2, 5000), run(3, 1000), justRead('r', 500), ]).catch((e) => setTimeout(() => console.log(e), 5000)); }, 2000); } }