Created
July 30, 2025 07:45
-
-
Save zuramai/412da9e1591eccf618fb856c2e684a41 to your computer and use it in GitHub Desktop.
ws.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # WebSocket Upload Progress Guide | |
| This guide explains how to use the new WebSocket-based upload flow for Google Drive uploads in Verispotter, powered by **Watermill with PGX** for reliable job processing. | |
| ## Overview | |
| Google Drive uploads are now processed asynchronously using **Watermill with PGX** to avoid nginx timeouts and provide better reliability. When you submit a Google Drive URL, the server immediately returns a success response and queues the upload job in a PostgreSQL table using the existing PGX connection pool. Background workers process these jobs while sending real-time progress updates via WebSocket. | |
| ### Key Benefits | |
| - **Persistence**: Upload jobs survive server restarts - stored in PostgreSQL | |
| - **PGX Integration**: Uses your existing PGX connection pool for consistency | |
| - **Reliability**: Built-in retry mechanisms and error handling | |
| - **Scalability**: Multiple workers can process jobs concurrently | |
| - **Monitoring**: Job status visible in database tables | |
| - **Real-time Updates**: WebSocket progress notifications for each file | |
| - **Connection Efficiency**: Shares the same database connections as your main application | |
| ## Architecture | |
| ```mermaid | |
| graph LR | |
| A[Client] -->|1. POST /upload| B[REST API] | |
| B -->|2. Publish Job| C[Watermill PGX] | |
| C -->|3. Store Job| D[(PostgreSQL Pool)] | |
| D -->|4. Consume Job| E[Background Worker] | |
| E -->|5. Process Upload| F[Google Drive + GCS] | |
| E -->|6. Send Progress| G[WebSocket Hub] | |
| G -->|7. Real-time Updates| A | |
| ``` | |
| ### Database Schema (PGX) | |
| The system creates these tables using your existing PGX connection pool: | |
| ```sql | |
| -- Jobs table (auto-created) | |
| CREATE TABLE watermill_upload_jobs ( | |
| "offset" SERIAL PRIMARY KEY, | |
| "uuid" UUID NOT NULL, | |
| "created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, | |
| "payload" JSONB NOT NULL, | |
| "metadata" JSONB DEFAULT NULL | |
| ); | |
| -- Worker offsets table (auto-created) | |
| CREATE TABLE watermill_offsets_upload_workers ( | |
| "consumer_group" VARCHAR(255) NOT NULL, | |
| "offset_acked" BIGINT NOT NULL DEFAULT 0, | |
| "offset_consumed" BIGINT NOT NULL DEFAULT 0, | |
| "last_processed_transaction_id" BIGINT, | |
| PRIMARY KEY ("consumer_group") | |
| ); | |
| ``` | |
| ## Upload Flow | |
| ### 1. Submit Upload Request | |
| **POST** `/api/events/{event_id}/upload` | |
| ```bash | |
| curl -X POST \ | |
| "https://your-domain.com/api/events/EVENT_ID/upload" \ | |
| -H "Authorization: Bearer YOUR_JWT_TOKEN" \ | |
| -H "X-GID-Token: YOUR_GOOGLE_DRIVE_TOKEN" \ | |
| -F "gdrive_url=https://drive.google.com/drive/folders/FOLDER_ID" \ | |
| -F "force_upload=false" | |
| ``` | |
| **Response (HTTP 202):** | |
| ```json | |
| { | |
| "message": "Upload request received and will be processed in background. Connect to WebSocket for progress updates.", | |
| "data": { | |
| "websocket_url": "/ws/events/EVENT_ID/upload-progress", | |
| "message": "Connect to the WebSocket endpoint to receive real-time upload progress updates" | |
| } | |
| } | |
| ``` | |
| ### 2. Job Processing with Watermill PGX | |
| When you submit an upload request: | |
| 1. **Job Publication**: The server publishes an upload job message to the `watermill_upload_jobs` table using PGX | |
| 2. **Job Storage**: Watermill stores the job in PostgreSQL using your existing connection pool | |
| 3. **Worker Processing**: Background workers (consumer group: `upload_workers`) pick up jobs using PGX queries | |
| 4. **Progress Updates**: Workers send real-time progress via WebSocket as they process each file | |
| 5. **Completion**: Job is marked as processed when upload completes or fails | |
| ### 3. Connect to WebSocket for Progress Updates | |
| **WebSocket URL:** `wss://your-domain.com/ws/events/{event_id}/upload-progress` | |
| #### Authentication Options: | |
| **Option 1: Authorization Header (Recommended)** | |
| ```javascript | |
| const ws = new WebSocket('wss://your-domain.com/ws/events/EVENT_ID/upload-progress', [], { | |
| headers: { | |
| 'Authorization': 'Bearer YOUR_JWT_TOKEN' | |
| } | |
| }); | |
| ``` | |
| **Option 2: Query Parameter** | |
| ```javascript | |
| const ws = new WebSocket('wss://your-domain.com/ws/events/EVENT_ID/upload-progress?token=YOUR_JWT_TOKEN'); | |
| ``` | |
| ### 4. Handle Progress Messages | |
| ```javascript | |
| ws.onmessage = function(event) { | |
| const message = JSON.parse(event.data); | |
| switch(message.type) { | |
| case 'upload_started': | |
| console.log('Upload started:', message.data.message); | |
| console.log('Job ID:', message.data.job_id); | |
| console.log('Total files:', message.data.total_files); | |
| break; | |
| case 'upload_progress': | |
| const progress = message.data; | |
| console.log(`Processing ${progress.current_file}/${progress.total_files}: ${progress.filename}`); | |
| console.log(`Status: ${progress.status}`); | |
| if (progress.status === 'failed') { | |
| console.error(`Failed: ${progress.error_reason}`); | |
| } | |
| break; | |
| case 'upload_completed': | |
| console.log('Upload completed:', message.data.message); | |
| console.log('Processed files:', message.data.processed_files); | |
| console.log('Oversized files:', message.data.oversized_files); | |
| ws.close(); | |
| break; | |
| case 'upload_failed': | |
| console.error('Upload failed:', message.data.message); | |
| console.error('Error:', message.data.error); | |
| console.error('Job ID:', message.data.job_id); | |
| ws.close(); | |
| break; | |
| } | |
| }; | |
| ``` | |
| ## Monitoring and Operations (PGX) | |
| ### View Pending Jobs | |
| ```sql | |
| SELECT | |
| uuid as job_id, | |
| payload->>'event_id' as event_id, | |
| payload->>'uploader_id' as uploader_id, | |
| created_at | |
| FROM watermill_upload_jobs | |
| WHERE "offset" > ( | |
| SELECT COALESCE(offset_consumed, 0) | |
| FROM watermill_offsets_upload_workers | |
| WHERE consumer_group = 'upload_workers' | |
| ); | |
| ``` | |
| ### View Processing Progress | |
| ```sql | |
| SELECT | |
| consumer_group, | |
| offset_acked, | |
| offset_consumed, | |
| (SELECT COUNT(*) FROM watermill_upload_jobs) as total_jobs | |
| FROM watermill_offsets_upload_workers; | |
| ``` | |
| ### View Job Payload Details | |
| ```sql | |
| SELECT | |
| "offset", | |
| uuid, | |
| payload->>'job_id' as job_id, | |
| payload->>'event_id' as event_id, | |
| payload->>'uploader_id' as uploader_id, | |
| payload->>'gdrive_url' as gdrive_url, | |
| payload->>'force_upload' as force_upload, | |
| created_at | |
| FROM watermill_upload_jobs | |
| ORDER BY "offset" DESC | |
| LIMIT 10; | |
| ``` | |
| ### Retry Failed Jobs | |
| To retry failed jobs, reset the consumed offset: | |
| ```sql | |
| UPDATE watermill_offsets_upload_workers | |
| SET offset_consumed = offset_acked | |
| WHERE consumer_group = 'upload_workers'; | |
| ``` | |
| ## Performance Benefits of PGX Integration | |
| - **Connection Reuse**: Uses your existing PGX pool instead of creating separate SQL connections | |
| - **Better Performance**: PGX is more efficient than database/sql for PostgreSQL | |
| - **Consistent Configuration**: Inherits your existing database settings (timeouts, pool size, SSL) | |
| - **Resource Efficiency**: No additional connection overhead | |
| - **Transaction Support**: Leverages PGX's advanced PostgreSQL features | |
| ## Security Considerations | |
| - **JWT Authentication**: Required for both HTTP upload requests and WebSocket connections | |
| - **Database Security**: Uses the same PGX pool and security as your main application | |
| - **Job Isolation**: Each job runs in its own context with proper error boundaries | |
| - **Access Control**: Users can only view progress for events they have access to | |
| - **Connection Security**: Inherits SSL and authentication from your existing PGX configuration | |
| ## Error Handling & Recovery | |
| - **Server Restart**: Jobs persist in PostgreSQL and resume processing when workers restart | |
| - **Network Issues**: PGX handles connection failures and retries automatically | |
| - **Worker Failures**: Jobs are automatically retried if workers crash during processing | |
| - **Database Failures**: Standard PostgreSQL backup/recovery procedures apply | |
| - **File Failures**: Individual file failures are reported but don't stop the entire job | |
| - **Connection Pool Health**: Benefits from PGX's connection health checking | |
| ## Frontend Implementation Example | |
| ```javascript | |
| class PGXWatermillUploadProgressHandler { | |
| constructor(eventId, jwtToken) { | |
| this.eventId = eventId; | |
| this.jwtToken = jwtToken; | |
| this.ws = null; | |
| this.jobId = null; | |
| } | |
| async startUpload(gdriveUrl, forceUpload = false) { | |
| // 1. Submit upload request to queue job via PGX Watermill | |
| const response = await fetch(`/api/events/${this.eventId}/upload`, { | |
| method: 'POST', | |
| headers: { | |
| 'Authorization': `Bearer ${this.jwtToken}` | |
| }, | |
| body: this.createFormData(gdriveUrl, forceUpload) | |
| }); | |
| if (response.status === 202) { | |
| // 2. Connect to WebSocket for progress updates | |
| this.connectWebSocket(); | |
| } else { | |
| throw new Error('Failed to queue upload job via PGX'); | |
| } | |
| } | |
| createFormData(gdriveUrl, forceUpload) { | |
| const formData = new FormData(); | |
| formData.append('gdrive_url', gdriveUrl); | |
| formData.append('force_upload', forceUpload); | |
| return formData; | |
| } | |
| connectWebSocket() { | |
| const wsUrl = `wss://your-domain.com/ws/events/${this.eventId}/upload-progress`; | |
| this.ws = new WebSocket(wsUrl, [], { | |
| headers: { 'Authorization': `Bearer ${this.jwtToken}` } | |
| }); | |
| this.ws.onmessage = (event) => this.handleMessage(JSON.parse(event.data)); | |
| this.ws.onerror = (error) => this.handleError(error); | |
| this.ws.onclose = () => this.handleClose(); | |
| } | |
| handleMessage(message) { | |
| switch(message.type) { | |
| case 'upload_started': | |
| this.jobId = message.data.job_id; | |
| this.onUploadStarted(message.data); | |
| break; | |
| case 'upload_progress': | |
| this.onUploadProgress(message.data); | |
| break; | |
| case 'upload_completed': | |
| this.onUploadCompleted(message.data); | |
| break; | |
| case 'upload_failed': | |
| this.onUploadFailed(message.data); | |
| break; | |
| } | |
| } | |
| onUploadStarted(data) { | |
| console.log(`Upload job ${data.job_id} started: ${data.total_files} files to process`); | |
| // Update UI: show progress bar, display job ID | |
| } | |
| onUploadProgress(data) { | |
| const percentage = (data.current_file / data.total_files) * 100; | |
| console.log(`Progress: ${percentage.toFixed(1)}% - ${data.message}`); | |
| // Update UI: update progress bar, show current file | |
| if (data.status === 'failed') { | |
| console.warn(`File failed: ${data.filename} - ${data.error_reason}`); | |
| // Update UI: show file error in list | |
| } | |
| } | |
| onUploadCompleted(data) { | |
| console.log(`Upload job ${this.jobId} completed successfully`); | |
| // Update UI: show success message, hide progress bar | |
| this.disconnect(); | |
| } | |
| onUploadFailed(data) { | |
| console.error(`Upload job ${data.job_id} failed: ${data.error}`); | |
| // Update UI: show error message, allow retry | |
| this.disconnect(); | |
| } | |
| // Monitor job in database (optional) | |
| async getJobStatus() { | |
| if (!this.jobId) return null; | |
| const response = await fetch(`/api/admin/jobs/${this.jobId}`, { | |
| headers: { 'Authorization': `Bearer ${this.jwtToken}` } | |
| }); | |
| return response.json(); | |
| } | |
| disconnect() { | |
| if (this.ws) { | |
| this.ws.close(); | |
| this.ws = null; | |
| } | |
| } | |
| } | |
| // Usage | |
| const uploader = new PGXWatermillUploadProgressHandler('EVENT_ID', 'JWT_TOKEN'); | |
| uploader.startUpload('https://drive.google.com/drive/folders/FOLDER_ID', false); | |
| ``` | |
| This PGX-based implementation provides enterprise-grade reliability for handling large Google Drive uploads while maintaining perfect integration with your existing PostgreSQL infrastructure and real-time progress feedback to users. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment