This guide describes a robust, crash-safe, idempotent design for handling large concurrent financial imports (e.g. 1000 users Γ 500 records).
- Consistency & Integrity: No half-imports or duplicates.
- Idempotency: Retries never create duplicates.
- Crash-safety: Survive server crashes, client disconnects, cancellations.
- Scalability: Support many users uploading at once.
- Auditability: Always trace each import.
Client β API β Queue (Bull/Redis) β Worker β Postgres (staging β main)
- Start import β API issues import_id (UUID).
- Client sends batches β Inserted into staging table.
- Deduplication β PK (import_id, batch_no, row_no) ensures idempotency.
- Finalize β Worker moves all rows in one transaction into transactions.
- Mark completed β Audit record in imports table.
- Imports table
CREATE TABLE imports (
id uuid PRIMARY KEY,
user_id bigint NOT NULL,
status text NOT NULL DEFAULT 'pending',
total_rows int,
created_at timestamptz DEFAULT now()
);- Staging table
CREATE TABLE transactions_staging (
import_id uuid NOT NULL,
batch_no int NOT NULL,
row_no int NOT NULL,
transaction_id text NOT NULL,
user_id bigint NOT NULL,
amount numeric(20,4) NOT NULL,
currency varchar(3) NOT NULL,
raw jsonb,
PRIMARY KEY (import_id, batch_no, row_no)
);- Main table
CREATE TABLE transactions (
id bigserial PRIMARY KEY,
transaction_id text NOT NULL,
user_id bigint NOT NULL,
amount numeric(20,4) NOT NULL,
currency varchar(3) NOT NULL,
raw jsonb,
created_at timestamptz DEFAULT now(),
UNIQUE (user_id, transaction_id)
) PARTITION BY RANGE (created_at);- Start
POST /imports/start
β { import_id: "uuid-v4", total_rows: 500 }
- Batch Upload
POST /imports/{import_id}/batch/{n}
Body: [ { row_no: 1, transaction_id: "...", amount: ..., currency: "USD" }, ... ]
- Insert into staging:
INSERT INTO transactions_staging (...)
VALUES (...)
ON CONFLICT DO NOTHING;- Finalize
POST /imports/{import_id}/finalize
- Verify row count = expected.
- Move rows:
INSERT INTO transactions (transaction_id, user_id, amount, currency, raw, created_at)
SELECT transaction_id, user_id, amount, currency, raw, now()
FROM transactions_staging
WHERE import_id = $1
ON CONFLICT (user_id, transaction_id) DO NOTHING;Mark import as completed.
- Idempotent β duplicate batches ignored via PK.
- Crash-safe β staging rows persist until finalize.
- No partial commits β final insert wrapped in transaction.
- Auditable β import_id links all rows to a userβs import session.
- Use Bull/Redis queue for async batch processing.
- Keep job payloads small (just metadata).
- Use PgBouncer for connection pooling.
- Partition transactions table monthly for scalability.
- Add dead-letter queue for failed imports.
- Monitor: queue size, DB latency, WAL, job retry counts.
- Client: /imports/start β gets import_id.
- Sends 5 batches β staged safely.
- Network drops, retries batch β safe (no dupes).
- Calls /finalize β moves data to main table.
- Import marked completed.
- Staging table per import β safe buffering.
- Idempotent keys (import_id, batch_no, row_no) β dedupe retries.
- Finalize transaction β no partial imports.
- Bull + Redis + Postgres β scalable and robust.