import { InferenceJobArguments, InferenceProcessor, } from "../frontend/InferenceProcessor.ts"; import { ClientStream } from "../router/clientStream.ts"; import { delay } from "https://deno.land/std@0.224.0/async/delay.ts"; import { StreamingState } from "./types.ts"; interface StreamingJob { id: string; clientStream: ClientStream; state: StreamingState; jobArgs: InferenceJobArguments; } export class Router { private processor: InferenceProcessor; private tools: any[]; private jobs = new Map(); private isRunning = false; constructor(processor: InferenceProcessor, tools: any[]) { this.processor = processor; this.tools = tools; } start(): void { if (this.isRunning) return; this.isRunning = true; this.monitorJobs(); } stop(): void { this.isRunning = false; } addJob(jobArgs: InferenceJobArguments, socket: WebSocket): string { const jobId = crypto.randomUUID(); const clientStream = new ClientStream(socket); this.jobs.set(jobId, { id: jobId, clientStream, state: StreamingState.LISTENING, jobArgs }); console.log("Job added:", jobId); return jobId; } private async monitorJobs(): Promise { while (this.isRunning) { const jobPromises = Array.from(this.jobs.entries()).map( ([jobId, job]) => this.checkJobState(jobId, job) ); await Promise.all(jobPromises); await delay(10); } } private checkJobState(jobId: string, job: StreamingJob) { const controlMessage = job.clientStream.checkAndClearControlMessage(); if (controlMessage === "beginStreaming" && job.state === StreamingState.LISTENING) { job.state = StreamingState.STREAMING; this.processJob(jobId).catch(err => { console.error(`Error processing job ${jobId}:`, err); const erroredJob = this.jobs.get(jobId); if (erroredJob) { erroredJob.state = StreamingState.LISTENING; } }); } else if (controlMessage === "endStreaming" && job.state === StreamingState.STREAMING) { job.state = StreamingState.LISTENING; } } private async processJob(jobId: string): Promise { const job = this.jobs.get(jobId); if (!job) return; try { const inferenceJob = this.processor.newJob(job.jobArgs); const currentJob = this.jobs.get(jobId); for await (const chunk of this.processor.streamJob(inferenceJob, this.tools)) { if (!currentJob || currentJob.state !== StreamingState.STREAMING) { break; } if (chunk.type === "chunk" && chunk.text) { currentJob.clientStream.sendStreamData(chunk.text); if (chunk.tool && chunk.toolExecutor) { await chunk.toolExecutor(); } } } const endJob = this.jobs.get(jobId); if (endJob && endJob.state === StreamingState.STREAMING) { endJob.clientStream.sendStreamEnd(); endJob.state = StreamingState.LISTENING; } } catch (error) { console.error(`Error in job ${jobId}:`, error); const errorJob = this.jobs.get(jobId); if (errorJob) { errorJob.state = StreamingState.LISTENING; } } } }