async Task Main() { // Initialize settings var transformFileSettings = new TransformFileSettings { BlobPath = "some/path", BusinessUnitId = 1 }; var uploadToBlobSettings = new UploadToBlobSettings { ContainerName = "some-container" }; var firstTask = new TransformFile(transformFileSettings, Guid.Empty); var secondTask = new UploadToBlob(uploadToBlobSettings, firstTask.TaskId); var tasks = new List { firstTask, secondTask }; var context = new Dictionary(); var jobExecution = new JobExecution(tasks, context); jobExecution.OnTaskStarted += (sender, args) => Console.WriteLine($"[Event] Task {args.TaskType} has started."); jobExecution.OnTaskCompleted += (sender, args) => Console.WriteLine($"[Event] Task {args.TaskType} has completed."); if (!jobExecution.ValidateTaskTree()) { Console.WriteLine("Invalid task tree. Exiting..."); return; } await jobExecution.Run(); } public class JobExecutionStateDto { public int Id { get; set; } public string ExecutionStatus { get; set; } public Guid Identifier { get; set; } public string ContextSnapshotJson { get; set; } } public interface IJobExecutionTask { Guid TaskId { get; } Guid ExecuteAfter { get; } Task ExecuteAsync(IJobExecution execution); } public class TransformFile : IJobExecutionTask { public Guid TaskId { get; } public Guid ExecuteAfter { get; } public TransformFileSettings Settings { get; } public TransformFile(TransformFileSettings settings, Guid executeAfter) { TaskId = Guid.NewGuid(); ExecuteAfter = executeAfter; Settings = settings; } public async Task ExecuteAsync(IJobExecution execution) { // Simulate real work Console.WriteLine("TransformFile. ExecuteAsync..."); await Task.Delay(1000); return TaskId; } } public class UploadToBlob : IJobExecutionTask { public Guid TaskId { get; } public Guid ExecuteAfter { get; } public UploadToBlobSettings Settings { get; } public UploadToBlob(UploadToBlobSettings settings, Guid executeAfter) { TaskId = Guid.NewGuid(); ExecuteAfter = executeAfter; Settings = settings; } public async Task ExecuteAsync(IJobExecution execution) { // Simulate real work Console.WriteLine("UploadToBlob. ExecuteAsync..."); await Task.Delay(1000); return TaskId; } } public class TransformFileSettings { public string BlobPath { get; set; } public int BusinessUnitId { get; set; } } public class UploadToBlobSettings { public string ContainerName { get; set; } } public interface IJobExecution { Dictionary Context { get; set; } string Status { get; } Task Run(); } public class JobExecution : IJobExecution { public event EventHandler OnTaskStarted; public event EventHandler OnTaskCompleted; // Simulated in-memory database private static readonly Dictionary _db = new Dictionary(); public Dictionary Context { get; set; } public string Status => ExecutionState.ExecutionStatus; private readonly List _tasks; public JobExecutionStateDto ExecutionState { get; private set; } public JobExecution(List tasks, Dictionary context) { ExecutionState = new JobExecutionStateDto { Identifier = Guid.NewGuid(), ExecutionStatus = "Initialized" }; _db[ExecutionState.Identifier] = ExecutionState; Context = context; _tasks = tasks; } public bool ValidateTaskTree() { // Storing all task IDs in a set for quick lookup var taskIds = new HashSet(_tasks.Select(t => t.TaskId)); // Checking for orphan tasks foreach (var task in _tasks) { if (task.ExecuteAfter != Guid.Empty && !taskIds.Contains(task.ExecuteAfter)) { Console.WriteLine($"Validation Failed: Task with ID {task.TaskId} has invalid ExecuteAfter value {task.ExecuteAfter}"); return false; } } return true; } public async Task Run() { if (!ValidateTaskTree()) { Console.WriteLine("Job aborted due to invalid task tree."); return; } Console.WriteLine($"Job started. Status: {Status}"); // Using a Dictionary to map task outcomes to the next task to be executed var taskOutcomeMap = new Dictionary(); foreach (var task in _tasks) { taskOutcomeMap[task.ExecuteAfter] = task.TaskId; } Guid currentTaskOutcome = Guid.Empty; while (taskOutcomeMap.ContainsKey(currentTaskOutcome)) { var nextTaskId = taskOutcomeMap[currentTaskOutcome]; var nextTask = _tasks.First(t => t.TaskId == nextTaskId); // Trigger the OnTaskStarted event OnTaskStarted?.Invoke(this, new TaskEventArgs(nextTask.GetType().Name)); currentTaskOutcome = await nextTask.ExecuteAsync(this); // Trigger the OnTaskCompleted event OnTaskCompleted?.Invoke(this, new TaskEventArgs(nextTask.GetType().Name)); // Update in-memory database ExecutionState.ContextSnapshotJson = JsonConvert.SerializeObject(Context); ExecutionState.ExecutionStatus = "In Progress"; _db[ExecutionState.Identifier] = ExecutionState; } ExecutionState.ExecutionStatus = "Completed"; _db[ExecutionState.Identifier] = ExecutionState; Console.WriteLine($"Job completed. Status: {Status}"); } } public class TaskEventArgs : EventArgs { public string TaskType { get; set; } public TaskEventArgs(string taskType) { TaskType = taskType; } }