Skip to content

Instantly share code, notes, and snippets.

@justinobney
Created June 22, 2024 17:07
Show Gist options
  • Save justinobney/13faab60f196e8a3088c57782238dcc2 to your computer and use it in GitHub Desktop.
Save justinobney/13faab60f196e8a3088c57782238dcc2 to your computer and use it in GitHub Desktop.

Revisions

  1. justinobney created this gist Jun 22, 2024.
    205 changes: 205 additions & 0 deletions JobSystemConcept.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,205 @@
    async Task Main()
    {
    // Initialize settings
    var transformFileSettings = new TransformFileSettings { BlobPath = "some/path", BusinessUnitId = 1 };
    var uploadToBlobSettings = new UploadToBlobSettings { ContainerName = "some-container" };

    var firstTask = new TransformFile(transformFileSettings, Guid.Empty);
    var secondTask = new UploadToBlob(uploadToBlobSettings, firstTask.TaskId);

    var tasks = new List<IJobExecutionTask> { firstTask, secondTask };

    var context = new Dictionary<string, string>();

    var jobExecution = new JobExecution(tasks, context);

    jobExecution.OnTaskStarted += (sender, args) =>
    Console.WriteLine($"[Event] Task {args.TaskType} has started.");

    jobExecution.OnTaskCompleted += (sender, args) =>
    Console.WriteLine($"[Event] Task {args.TaskType} has completed.");


    if (!jobExecution.ValidateTaskTree())
    {
    Console.WriteLine("Invalid task tree. Exiting...");
    return;
    }

    await jobExecution.Run();
    }

    public class JobExecutionStateDto
    {
    public int Id { get; set; }
    public string ExecutionStatus { get; set; }
    public Guid Identifier { get; set; }
    public string ContextSnapshotJson { get; set; }
    }

    public interface IJobExecutionTask
    {
    Guid TaskId { get; }
    Guid ExecuteAfter { get; }
    Task<Guid> ExecuteAsync(IJobExecution execution);
    }

    public class TransformFile : IJobExecutionTask
    {
    public Guid TaskId { get; }
    public Guid ExecuteAfter { get; }
    public TransformFileSettings Settings { get; }

    public TransformFile(TransformFileSettings settings, Guid executeAfter)
    {
    TaskId = Guid.NewGuid();
    ExecuteAfter = executeAfter;
    Settings = settings;
    }

    public async Task<Guid> ExecuteAsync(IJobExecution execution)
    {
    // Simulate real work
    Console.WriteLine("TransformFile. ExecuteAsync...");
    await Task.Delay(1000);
    return TaskId;
    }
    }

    public class UploadToBlob : IJobExecutionTask
    {
    public Guid TaskId { get; }
    public Guid ExecuteAfter { get; }
    public UploadToBlobSettings Settings { get; }

    public UploadToBlob(UploadToBlobSettings settings, Guid executeAfter)
    {
    TaskId = Guid.NewGuid();
    ExecuteAfter = executeAfter;
    Settings = settings;
    }

    public async Task<Guid> ExecuteAsync(IJobExecution execution)
    {
    // Simulate real work
    Console.WriteLine("UploadToBlob. ExecuteAsync...");
    await Task.Delay(1000);
    return TaskId;
    }
    }

    public class TransformFileSettings
    {
    public string BlobPath { get; set; }
    public int BusinessUnitId { get; set; }
    }

    public class UploadToBlobSettings
    {
    public string ContainerName { get; set; }
    }

    public interface IJobExecution
    {
    Dictionary<string, string> Context { get; set; }
    string Status { get; }
    Task Run();
    }

    public class JobExecution : IJobExecution
    {
    public event EventHandler<TaskEventArgs> OnTaskStarted;
    public event EventHandler<TaskEventArgs> OnTaskCompleted;

    // Simulated in-memory database
    private static readonly Dictionary<Guid, JobExecutionStateDto> _db = new Dictionary<Guid, JobExecutionStateDto>();

    public Dictionary<string, string> Context { get; set; }
    public string Status => ExecutionState.ExecutionStatus;
    private readonly List<IJobExecutionTask> _tasks;
    public JobExecutionStateDto ExecutionState { get; private set; }

    public JobExecution(List<IJobExecutionTask> tasks, Dictionary<string, string> context)
    {
    ExecutionState = new JobExecutionStateDto
    {
    Identifier = Guid.NewGuid(),
    ExecutionStatus = "Initialized"
    };

    _db[ExecutionState.Identifier] = ExecutionState;

    Context = context;
    _tasks = tasks;
    }

    public bool ValidateTaskTree()
    {
    // Storing all task IDs in a set for quick lookup
    var taskIds = new HashSet<Guid>(_tasks.Select(t => t.TaskId));

    // Checking for orphan tasks
    foreach (var task in _tasks)
    {
    if (task.ExecuteAfter != Guid.Empty && !taskIds.Contains(task.ExecuteAfter))
    {
    Console.WriteLine($"Validation Failed: Task with ID {task.TaskId} has invalid ExecuteAfter value {task.ExecuteAfter}");
    return false;
    }
    }

    return true;
    }

    public async Task Run()
    {
    if (!ValidateTaskTree())
    {
    Console.WriteLine("Job aborted due to invalid task tree.");
    return;
    }

    Console.WriteLine($"Job started. Status: {Status}");

    // Using a Dictionary to map task outcomes to the next task to be executed
    var taskOutcomeMap = new Dictionary<Guid, Guid>();
    foreach (var task in _tasks)
    {
    taskOutcomeMap[task.ExecuteAfter] = task.TaskId;
    }

    Guid currentTaskOutcome = Guid.Empty;
    while (taskOutcomeMap.ContainsKey(currentTaskOutcome))
    {
    var nextTaskId = taskOutcomeMap[currentTaskOutcome];
    var nextTask = _tasks.First(t => t.TaskId == nextTaskId);

    // Trigger the OnTaskStarted event
    OnTaskStarted?.Invoke(this, new TaskEventArgs(nextTask.GetType().Name));

    currentTaskOutcome = await nextTask.ExecuteAsync(this);

    // Trigger the OnTaskCompleted event
    OnTaskCompleted?.Invoke(this, new TaskEventArgs(nextTask.GetType().Name));

    // Update in-memory database
    ExecutionState.ContextSnapshotJson = JsonConvert.SerializeObject(Context);
    ExecutionState.ExecutionStatus = "In Progress";
    _db[ExecutionState.Identifier] = ExecutionState;
    }

    ExecutionState.ExecutionStatus = "Completed";
    _db[ExecutionState.Identifier] = ExecutionState;

    Console.WriteLine($"Job completed. Status: {Status}");
    }
    }

    public class TaskEventArgs : EventArgs
    {
    public string TaskType { get; set; }
    public TaskEventArgs(string taskType)
    {
    TaskType = taskType;
    }
    }