Skip to content

Instantly share code, notes, and snippets.

@zuramai
Created July 30, 2025 07:45
Show Gist options
  • Save zuramai/412da9e1591eccf618fb856c2e684a41 to your computer and use it in GitHub Desktop.
Save zuramai/412da9e1591eccf618fb856c2e684a41 to your computer and use it in GitHub Desktop.
ws.md
# WebSocket Upload Progress Guide
This guide explains how to use the new WebSocket-based upload flow for Google Drive uploads in Verispotter, powered by **Watermill with PGX** for reliable job processing.
## Overview
Google Drive uploads are now processed asynchronously using **Watermill with PGX** to avoid nginx timeouts and provide better reliability. When you submit a Google Drive URL, the server immediately returns a success response and queues the upload job in a PostgreSQL table using the existing PGX connection pool. Background workers process these jobs while sending real-time progress updates via WebSocket.
### Key Benefits
- **Persistence**: Upload jobs survive server restarts - stored in PostgreSQL
- **PGX Integration**: Uses your existing PGX connection pool for consistency
- **Reliability**: Built-in retry mechanisms and error handling
- **Scalability**: Multiple workers can process jobs concurrently
- **Monitoring**: Job status visible in database tables
- **Real-time Updates**: WebSocket progress notifications for each file
- **Connection Efficiency**: Shares the same database connections as your main application
## Architecture
```mermaid
graph LR
A[Client] -->|1. POST /upload| B[REST API]
B -->|2. Publish Job| C[Watermill PGX]
C -->|3. Store Job| D[(PostgreSQL Pool)]
D -->|4. Consume Job| E[Background Worker]
E -->|5. Process Upload| F[Google Drive + GCS]
E -->|6. Send Progress| G[WebSocket Hub]
G -->|7. Real-time Updates| A
```
### Database Schema (PGX)
The system creates these tables using your existing PGX connection pool:
```sql
-- Jobs table (auto-created)
CREATE TABLE watermill_upload_jobs (
"offset" SERIAL PRIMARY KEY,
"uuid" UUID NOT NULL,
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"payload" JSONB NOT NULL,
"metadata" JSONB DEFAULT NULL
);
-- Worker offsets table (auto-created)
CREATE TABLE watermill_offsets_upload_workers (
"consumer_group" VARCHAR(255) NOT NULL,
"offset_acked" BIGINT NOT NULL DEFAULT 0,
"offset_consumed" BIGINT NOT NULL DEFAULT 0,
"last_processed_transaction_id" BIGINT,
PRIMARY KEY ("consumer_group")
);
```
## Upload Flow
### 1. Submit Upload Request
**POST** `/api/events/{event_id}/upload`
```bash
curl -X POST \
"https://your-domain.com/api/events/EVENT_ID/upload" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "X-GID-Token: YOUR_GOOGLE_DRIVE_TOKEN" \
-F "gdrive_url=https://drive.google.com/drive/folders/FOLDER_ID" \
-F "force_upload=false"
```
**Response (HTTP 202):**
```json
{
"message": "Upload request received and will be processed in background. Connect to WebSocket for progress updates.",
"data": {
"websocket_url": "/ws/events/EVENT_ID/upload-progress",
"message": "Connect to the WebSocket endpoint to receive real-time upload progress updates"
}
}
```
### 2. Job Processing with Watermill PGX
When you submit an upload request:
1. **Job Publication**: The server publishes an upload job message to the `watermill_upload_jobs` table using PGX
2. **Job Storage**: Watermill stores the job in PostgreSQL using your existing connection pool
3. **Worker Processing**: Background workers (consumer group: `upload_workers`) pick up jobs using PGX queries
4. **Progress Updates**: Workers send real-time progress via WebSocket as they process each file
5. **Completion**: Job is marked as processed when upload completes or fails
### 3. Connect to WebSocket for Progress Updates
**WebSocket URL:** `wss://your-domain.com/ws/events/{event_id}/upload-progress`
#### Authentication Options:
**Option 1: Authorization Header (Recommended)**
```javascript
const ws = new WebSocket('wss://your-domain.com/ws/events/EVENT_ID/upload-progress', [], {
headers: {
'Authorization': 'Bearer YOUR_JWT_TOKEN'
}
});
```
**Option 2: Query Parameter**
```javascript
const ws = new WebSocket('wss://your-domain.com/ws/events/EVENT_ID/upload-progress?token=YOUR_JWT_TOKEN');
```
### 4. Handle Progress Messages
```javascript
ws.onmessage = function(event) {
const message = JSON.parse(event.data);
switch(message.type) {
case 'upload_started':
console.log('Upload started:', message.data.message);
console.log('Job ID:', message.data.job_id);
console.log('Total files:', message.data.total_files);
break;
case 'upload_progress':
const progress = message.data;
console.log(`Processing ${progress.current_file}/${progress.total_files}: ${progress.filename}`);
console.log(`Status: ${progress.status}`);
if (progress.status === 'failed') {
console.error(`Failed: ${progress.error_reason}`);
}
break;
case 'upload_completed':
console.log('Upload completed:', message.data.message);
console.log('Processed files:', message.data.processed_files);
console.log('Oversized files:', message.data.oversized_files);
ws.close();
break;
case 'upload_failed':
console.error('Upload failed:', message.data.message);
console.error('Error:', message.data.error);
console.error('Job ID:', message.data.job_id);
ws.close();
break;
}
};
```
## Monitoring and Operations (PGX)
### View Pending Jobs
```sql
SELECT
uuid as job_id,
payload->>'event_id' as event_id,
payload->>'uploader_id' as uploader_id,
created_at
FROM watermill_upload_jobs
WHERE "offset" > (
SELECT COALESCE(offset_consumed, 0)
FROM watermill_offsets_upload_workers
WHERE consumer_group = 'upload_workers'
);
```
### View Processing Progress
```sql
SELECT
consumer_group,
offset_acked,
offset_consumed,
(SELECT COUNT(*) FROM watermill_upload_jobs) as total_jobs
FROM watermill_offsets_upload_workers;
```
### View Job Payload Details
```sql
SELECT
"offset",
uuid,
payload->>'job_id' as job_id,
payload->>'event_id' as event_id,
payload->>'uploader_id' as uploader_id,
payload->>'gdrive_url' as gdrive_url,
payload->>'force_upload' as force_upload,
created_at
FROM watermill_upload_jobs
ORDER BY "offset" DESC
LIMIT 10;
```
### Retry Failed Jobs
To retry failed jobs, reset the consumed offset:
```sql
UPDATE watermill_offsets_upload_workers
SET offset_consumed = offset_acked
WHERE consumer_group = 'upload_workers';
```
## Performance Benefits of PGX Integration
- **Connection Reuse**: Uses your existing PGX pool instead of creating separate SQL connections
- **Better Performance**: PGX is more efficient than database/sql for PostgreSQL
- **Consistent Configuration**: Inherits your existing database settings (timeouts, pool size, SSL)
- **Resource Efficiency**: No additional connection overhead
- **Transaction Support**: Leverages PGX's advanced PostgreSQL features
## Security Considerations
- **JWT Authentication**: Required for both HTTP upload requests and WebSocket connections
- **Database Security**: Uses the same PGX pool and security as your main application
- **Job Isolation**: Each job runs in its own context with proper error boundaries
- **Access Control**: Users can only view progress for events they have access to
- **Connection Security**: Inherits SSL and authentication from your existing PGX configuration
## Error Handling & Recovery
- **Server Restart**: Jobs persist in PostgreSQL and resume processing when workers restart
- **Network Issues**: PGX handles connection failures and retries automatically
- **Worker Failures**: Jobs are automatically retried if workers crash during processing
- **Database Failures**: Standard PostgreSQL backup/recovery procedures apply
- **File Failures**: Individual file failures are reported but don't stop the entire job
- **Connection Pool Health**: Benefits from PGX's connection health checking
## Frontend Implementation Example
```javascript
class PGXWatermillUploadProgressHandler {
constructor(eventId, jwtToken) {
this.eventId = eventId;
this.jwtToken = jwtToken;
this.ws = null;
this.jobId = null;
}
async startUpload(gdriveUrl, forceUpload = false) {
// 1. Submit upload request to queue job via PGX Watermill
const response = await fetch(`/api/events/${this.eventId}/upload`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.jwtToken}`
},
body: this.createFormData(gdriveUrl, forceUpload)
});
if (response.status === 202) {
// 2. Connect to WebSocket for progress updates
this.connectWebSocket();
} else {
throw new Error('Failed to queue upload job via PGX');
}
}
createFormData(gdriveUrl, forceUpload) {
const formData = new FormData();
formData.append('gdrive_url', gdriveUrl);
formData.append('force_upload', forceUpload);
return formData;
}
connectWebSocket() {
const wsUrl = `wss://your-domain.com/ws/events/${this.eventId}/upload-progress`;
this.ws = new WebSocket(wsUrl, [], {
headers: { 'Authorization': `Bearer ${this.jwtToken}` }
});
this.ws.onmessage = (event) => this.handleMessage(JSON.parse(event.data));
this.ws.onerror = (error) => this.handleError(error);
this.ws.onclose = () => this.handleClose();
}
handleMessage(message) {
switch(message.type) {
case 'upload_started':
this.jobId = message.data.job_id;
this.onUploadStarted(message.data);
break;
case 'upload_progress':
this.onUploadProgress(message.data);
break;
case 'upload_completed':
this.onUploadCompleted(message.data);
break;
case 'upload_failed':
this.onUploadFailed(message.data);
break;
}
}
onUploadStarted(data) {
console.log(`Upload job ${data.job_id} started: ${data.total_files} files to process`);
// Update UI: show progress bar, display job ID
}
onUploadProgress(data) {
const percentage = (data.current_file / data.total_files) * 100;
console.log(`Progress: ${percentage.toFixed(1)}% - ${data.message}`);
// Update UI: update progress bar, show current file
if (data.status === 'failed') {
console.warn(`File failed: ${data.filename} - ${data.error_reason}`);
// Update UI: show file error in list
}
}
onUploadCompleted(data) {
console.log(`Upload job ${this.jobId} completed successfully`);
// Update UI: show success message, hide progress bar
this.disconnect();
}
onUploadFailed(data) {
console.error(`Upload job ${data.job_id} failed: ${data.error}`);
// Update UI: show error message, allow retry
this.disconnect();
}
// Monitor job in database (optional)
async getJobStatus() {
if (!this.jobId) return null;
const response = await fetch(`/api/admin/jobs/${this.jobId}`, {
headers: { 'Authorization': `Bearer ${this.jwtToken}` }
});
return response.json();
}
disconnect() {
if (this.ws) {
this.ws.close();
this.ws = null;
}
}
}
// Usage
const uploader = new PGXWatermillUploadProgressHandler('EVENT_ID', 'JWT_TOKEN');
uploader.startUpload('https://drive.google.com/drive/folders/FOLDER_ID', false);
```
This PGX-based implementation provides enterprise-grade reliability for handling large Google Drive uploads while maintaining perfect integration with your existing PostgreSQL infrastructure and real-time progress feedback to users.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment