Skip to content

Instantly share code, notes, and snippets.

@vndee
Last active December 26, 2024 17:03
Show Gist options
  • Select an option

  • Save vndee/22c7ad22fa21cc5919d7ac2f285faa3f to your computer and use it in GitHub Desktop.

Select an option

Save vndee/22c7ad22fa21cc5919d7ac2f285faa3f to your computer and use it in GitHub Desktop.

Revisions

  1. vndee revised this gist Dec 26, 2024. 1 changed file with 4 additions and 6 deletions.
    10 changes: 4 additions & 6 deletions rds_06.py
    Original file line number Diff line number Diff line change
    @@ -9,23 +9,21 @@ class TaskPriority(IntEnum):

    class TaskScheduler:
    def __init__(self):
    self.pq = PriorityQueue()
    self.queue_key = "scheduled_tasks"
    self.pq = PriorityQueue("scheduled_tasks)

    def add_task(self, task_type: str, data: dict, priority: TaskPriority):
    task = {
    "type": task_type,
    "data": data
    }
    return self.pq.push(self.queue_key, task, priority=priority)
    return self.pq.push(task, priority=priority)

    def get_next_task(self):
    return self.pq.pop(self.queue_key)
    return self.pq.pop()

    # Usage
    scheduler = TaskScheduler()
    scheduler = TaskScheduler("security_update")
    scheduler.add_task(
    "security_update",
    {"server": "prod-1", "patch": "security-001"},
    TaskPriority.CRITICAL
    )
  2. vndee created this gist Dec 25, 2024.
    31 changes: 31 additions & 0 deletions rds_06.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,31 @@
    from redis_data_structures import PriorityQueue
    from enum import IntEnum

    class TaskPriority(IntEnum):
    CRITICAL = 1
    HIGH = 2
    MEDIUM = 3
    LOW = 4

    class TaskScheduler:
    def __init__(self):
    self.pq = PriorityQueue()
    self.queue_key = "scheduled_tasks"

    def add_task(self, task_type: str, data: dict, priority: TaskPriority):
    task = {
    "type": task_type,
    "data": data
    }
    return self.pq.push(self.queue_key, task, priority=priority)

    def get_next_task(self):
    return self.pq.pop(self.queue_key)

    # Usage
    scheduler = TaskScheduler()
    scheduler.add_task(
    "security_update",
    {"server": "prod-1", "patch": "security-001"},
    TaskPriority.CRITICAL
    )