import 'dart:async'; import 'dart:collection'; import 'package:collection/collection.dart'; import 'package:tuple/tuple.dart'; typedef QueueTask = Tuple3 Function(), Completer>; extension QueueTaskExtension on QueueTask { int get taskId => item1; FutureOr Function() get block => item2; Completer get completer => item3; void cancel([Object? error]) { if (completer.isCompleted) { throw StateError('The task($taskId) has ended'); } completer.completeError(error ?? QueueTaskCancelledError()); } } class MutexQueue { MutexQueue( this.parallel, { this.timeout = const Duration(seconds: 45), }); final int parallel; final Duration timeout; final _tasks = Queue(); int _running = 0; int _id = 0; Future withLock(FutureOr Function() block) { return enqueue(block).completer.future; } QueueTask enqueue(FutureOr Function() block) { final task = QueueTask(_id++, block, Completer()); _tasks.add(task); _runTask(); return task; } void _runTask() { while (_tasks.isNotEmpty && _running < parallel) { _running++; final task = _tasks.removeFirst(); Future(task.block).timeout(timeout).then((value) { task.completer.complete(value); }).catchError((e, s) { task.completer.completeError(e, s); }).whenComplete(() { _running--; _runTask(); }); } } void cancelAll([Object? error]) { while (_tasks.isNotEmpty) { final task = _tasks.removeFirst(); task.cancel(error); } } bool cancel(int id, [Object? error]) { final task = _tasks.firstWhereOrNull((e) => e.taskId == id); if (task == null) { return false; } task.cancel(error); return true; } } class QueueTaskCancelledError extends Error {}