Skip to content

Instantly share code, notes, and snippets.

@max-arnold
Created May 5, 2024 09:51
Show Gist options
  • Save max-arnold/d059a74bf48fb9be9741f60ea79c5208 to your computer and use it in GitHub Desktop.
Save max-arnold/d059a74bf48fb9be9741f60ea79c5208 to your computer and use it in GitHub Desktop.

Revisions

  1. max-arnold created this gist May 5, 2024.
    312 changes: 312 additions & 0 deletions ymq_tui.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,312 @@
    """
    1. Install the following dependencies into a virtualenv
    pip install textual==0.57.0 textual-dev==1.5.1 requests==2.31.0 requests-aws4auth==1.2.3 yandexcloud==0.267.0
    2. Create a env.json file
    {
    "YC_REGION": "ru-central1",
    "YC_FOLDER_ID": "FFF",
    "YC_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS": "yc-service-account-key.json",
    "YMQ_ACCESS_KEY": "XXX",
    "YMQ_SECRET_KEY": "YYY"
    }
    3. Run
    python ymq_tui.py --env env.json
    """
    import argparse
    import json
    import os
    from xml.etree import ElementTree

    from textual import on, work
    from textual.app import App
    from textual.app import ComposeResult
    from textual.containers import Grid, Container
    from textual.message import Message
    from textual.screen import ModalScreen
    from textual.widgets import Button, DataTable, Footer, Label

    import requests
    from requests_aws4auth import AWS4Auth

    import yandexcloud
    from yandex.cloud.serverless.triggers.v1.trigger_pb2 import _TRIGGER_STATUS
    from yandex.cloud.serverless.triggers.v1.trigger_service_pb2_grpc import TriggerServiceStub
    from yandex.cloud.serverless.triggers.v1.trigger_service_pb2 import ListTriggersRequest, PauseTriggerRequest, ResumeTriggerRequest

    YMQ_ENDPOINT_URL = "https://message-queue.api.cloud.yandex.net"

    session = None

    def get_session():
    global session
    if session is not None:
    return session
    session = requests.Session()
    return session


    def export_variables(env):
    with open(env) as json_file:
    env_vars = json.loads(json_file.read())
    for env_name, env_value in env_vars.items():
    os.environ[str(env_name)] = str(env_value)


    def check_response(res):
    if res.status_code != 200:
    tree = ElementTree.fromstring(res.content.decode("utf-8"))
    msg = tree.find("Error").find("Message").text
    raise requests.HTTPError(f"{res.status_code}: {msg}", response=res)


    def queue_list():
    aws_auth = AWS4Auth(
    os.environ["YMQ_ACCESS_KEY"], os.environ["YMQ_SECRET_KEY"], os.environ["YC_REGION"], "sqs"
    )
    data = {
    "Action": "ListQueues",
    }
    res = get_session().post(YMQ_ENDPOINT_URL, data=data, auth=aws_auth, timeout=5)
    check_response(res)
    tree = ElementTree.fromstring(res.content.decode("utf-8"))
    return [q.text for q in tree.find("ListQueuesResult").findall("QueueUrl")]


    def queue_purge(queue):
    aws_auth = AWS4Auth(
    os.environ["YMQ_ACCESS_KEY"], os.environ["YMQ_SECRET_KEY"], os.environ["YC_REGION"], "sqs"
    )
    data = {
    "Action": "PurgeQueue",
    "QueueUrl": queue,
    }
    res = get_session().post(YMQ_ENDPOINT_URL, data=data, auth=aws_auth, timeout=5)
    check_response(res)


    def queue_get_attributes(queue):
    aws_auth = AWS4Auth(
    os.environ["YMQ_ACCESS_KEY"], os.environ["YMQ_SECRET_KEY"], os.environ["YC_REGION"], "sqs"
    )
    data = {
    "Action": "GetQueueAttributes",
    "AttributeName.1": "ApproximateNumberOfMessages",
    "AttributeName.2": "ApproximateNumberOfMessagesDelayed",
    "AttributeName.3": "ApproximateNumberOfMessagesNotVisible",
    "QueueUrl": queue,
    }
    res = get_session().post(YMQ_ENDPOINT_URL, data=data, auth=aws_auth, timeout=5)
    check_response(res)
    tree = ElementTree.fromstring(res.content.decode("utf-8"))
    attrlist = tree.find("GetQueueAttributesResult").findall("Attribute")
    return {attr.find("Name").text: attr.find("Value").text for attr in attrlist}


    class ConfirmModal(ModalScreen):
    BINDINGS = [
    ("escape", "app.pop_screen", "Cancel"),
    ]

    DEFAULT_CSS = """
    ConfirmModal {
    align: center middle;
    }
    #dialog {
    grid-size: 2;
    grid-gutter: 1 2;
    grid-rows: 1fr 3;
    padding: 0 1;
    width: 60;
    height: 11;
    border: thick $background 80%;
    background: $surface;
    }
    #question {
    column-span: 2;
    height: 1fr;
    width: 1fr;
    content-align: center middle;
    }
    Button {
    width: 100%;
    }
    Container {
    align: center middle;
    }
    """

    def __init__(self, question, name=None, id=None, classes=None):
    self.question = question
    super().__init__(name, id, classes)

    def compose(self):
    yield Grid(
    Label(self.question, id="question"),
    Container(Button("Cancel", id="cancel")),
    Container(Button("Confirm", variant="primary", id="confirm")),
    id="dialog",
    )

    def on_button_pressed(self, event):
    if event.button.id == "confirm":
    self.dismiss(True)
    else:
    self.dismiss(False)


    class QueueManagerApp(App):
    BINDINGS = [
    ("escape", "quit()", "Quit"),
    ("ctrl+r", "refresh", "Refresh"),
    ("e", "enable_trigger", "Enable"),
    ("d", "disable_trigger", "Disable"),
    ("p", "purge_queue", "Purge"),
    ("f12", "take_screenshot()", "Screenshot"),
    ]

    def __init__(self):
    super().__init__()
    parser = argparse.ArgumentParser(description="QueueManager")
    parser.add_argument("--env", required=True, help="Select env")
    options = parser.parse_args()
    export_variables(options.env)
    with open(os.environ["YC_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"]) as json_file:
    sa_key = json.loads(json_file.read())
    self.yc_sdk = yandexcloud.SDK(service_account_key=sa_key)
    self.yc_triggers = self.yc_sdk.client(TriggerServiceStub)
    self.queues = {}

    def compose(self):
    yield DataTable(cursor_type="row")
    yield Footer()

    def on_mount(self):
    table = self.query_one(DataTable)
    table.add_columns("Name", "Type", "Messages", "Delayed", "Invisible", "Trigger")
    self.action_refresh()

    def action_refresh(self):
    table = self.query_one(DataTable)
    # table.loading = True
    self.get_queues()

    class ReturnData(Message):
    def __init__(self, data):
    self.data = data
    super().__init__()

    @work(exclusive=True, thread=True)
    def get_queues(self):
    tlist = self.yc_triggers.List(ListTriggersRequest(folder_id=os.environ["YC_FOLDER_ID"]))
    triggers = {}
    for trigger in tlist.triggers:
    qname = trigger.rule.message_queue.queue_id.rsplit(":", 1)[-1]
    triggers[qname] = {
    "id": trigger.id,
    "status": _TRIGGER_STATUS.values_by_number[trigger.status].name
    }

    qlist = queue_list()
    queues = {}
    for queue in qlist:
    name = queue.rsplit("/", 1)[-1]
    attrs = queue_get_attributes(queue)
    queues[queue] = {
    "name": self.QMAP.get(name, name),
    "type": "FIFO" if name.endswith(".fifo") else "Standard",
    "attrs": attrs,
    "trigger": triggers.get(name, {"id": None, "status": "N/A"}),
    }
    self.post_message(self.ReturnData(queues))

    QMAP = {
    }

    @on(ReturnData)
    async def update_table(self, return_data):
    table = self.query_one(DataTable)
    coordinate = table.cursor_coordinate
    table.clear()
    self.queues = return_data.data
    for qk, qv in sorted(return_data.data.items(), key=lambda v: v[1]['name']):
    table.add_row(
    qv["name"],
    qv["type"],
    qv["attrs"]["ApproximateNumberOfMessages"],
    qv["attrs"]["ApproximateNumberOfMessagesDelayed"],
    qv["attrs"]["ApproximateNumberOfMessagesNotVisible"],
    f'[red]{qv["trigger"]["status"]}[/red]'
    if qv["trigger"]["status"] == "PAUSED"
    else f'[green]{qv["trigger"]["status"]}[/green]'
    if qv["trigger"]["status"] == "ACTIVE"
    else qv["trigger"]["status"],
    key=qk
    )
    table.cursor_coordinate = coordinate
    table.loading = False
    table.focus()

    def action_purge_queue(self):
    table = self.query_one(DataTable)
    if not table.is_valid_coordinate(table.cursor_coordinate):
    return
    cell_key = table.coordinate_to_cell_key(table.cursor_coordinate)
    row_key = cell_key.row_key.value
    if row_key is None:
    return

    def check_confirm(confirm):
    if confirm:
    queue_purge(row_key)
    self.notify(self.queues[row_key]["name"], title="Purged")
    self.action_refresh()

    self.app.push_screen(
    ConfirmModal(f'Purge queue {self.queues[row_key]["name"]}?'),
    check_confirm,
    )

    def action_enable_trigger(self):
    table = self.query_one(DataTable)
    if not table.is_valid_coordinate(table.cursor_coordinate):
    return
    cell_key = table.coordinate_to_cell_key(table.cursor_coordinate)
    row_key = cell_key.row_key.value
    if row_key is None:
    return
    if self.queues[row_key]["trigger"]["id"] is None:
    return
    self.yc_triggers.Resume(ResumeTriggerRequest(trigger_id=self.queues[row_key]["trigger"]["id"]))
    self.notify(self.queues[row_key]["name"], title="Enabled")
    self.action_refresh()

    def action_disable_trigger(self):
    table = self.query_one(DataTable)
    if not table.is_valid_coordinate(table.cursor_coordinate):
    return
    cell_key = table.coordinate_to_cell_key(table.cursor_coordinate)
    row_key = cell_key.row_key.value
    if row_key is None:
    return
    if self.queues[row_key]["trigger"]["id"] is None:
    return
    self.yc_triggers.Pause(PauseTriggerRequest(trigger_id=self.queues[row_key]["trigger"]["id"]))
    self.notify(self.queues[row_key]["name"], title="Disabled")
    self.action_refresh()

    def action_take_screenshot(self):
    filename = self.save_screenshot()
    self.notify(f"Saved in {filename}", title="Screenshot saved")

    app = QueueManagerApp()

    if __name__ == "__main__":
    app.run()