Created
May 5, 2024 09:51
-
-
Save max-arnold/d059a74bf48fb9be9741f60ea79c5208 to your computer and use it in GitHub Desktop.
Revisions
-
max-arnold created this gist
May 5, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,312 @@ """ 1. Install the following dependencies into a virtualenv pip install textual==0.57.0 textual-dev==1.5.1 requests==2.31.0 requests-aws4auth==1.2.3 yandexcloud==0.267.0 2. Create a env.json file { "YC_REGION": "ru-central1", "YC_FOLDER_ID": "FFF", "YC_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS": "yc-service-account-key.json", "YMQ_ACCESS_KEY": "XXX", "YMQ_SECRET_KEY": "YYY" } 3. Run python ymq_tui.py --env env.json """ import argparse import json import os from xml.etree import ElementTree from textual import on, work from textual.app import App from textual.app import ComposeResult from textual.containers import Grid, Container from textual.message import Message from textual.screen import ModalScreen from textual.widgets import Button, DataTable, Footer, Label import requests from requests_aws4auth import AWS4Auth import yandexcloud from yandex.cloud.serverless.triggers.v1.trigger_pb2 import _TRIGGER_STATUS from yandex.cloud.serverless.triggers.v1.trigger_service_pb2_grpc import TriggerServiceStub from yandex.cloud.serverless.triggers.v1.trigger_service_pb2 import ListTriggersRequest, PauseTriggerRequest, ResumeTriggerRequest YMQ_ENDPOINT_URL = "https://message-queue.api.cloud.yandex.net" session = None def get_session(): global session if session is not None: return session session = requests.Session() return session def export_variables(env): with open(env) as json_file: env_vars = json.loads(json_file.read()) for env_name, env_value in env_vars.items(): os.environ[str(env_name)] = str(env_value) def check_response(res): if res.status_code != 200: tree = ElementTree.fromstring(res.content.decode("utf-8")) msg = tree.find("Error").find("Message").text raise requests.HTTPError(f"{res.status_code}: {msg}", response=res) def queue_list(): aws_auth = AWS4Auth( os.environ["YMQ_ACCESS_KEY"], os.environ["YMQ_SECRET_KEY"], os.environ["YC_REGION"], "sqs" ) data = { "Action": "ListQueues", } res = get_session().post(YMQ_ENDPOINT_URL, data=data, auth=aws_auth, timeout=5) check_response(res) tree = ElementTree.fromstring(res.content.decode("utf-8")) return [q.text for q in tree.find("ListQueuesResult").findall("QueueUrl")] def queue_purge(queue): aws_auth = AWS4Auth( os.environ["YMQ_ACCESS_KEY"], os.environ["YMQ_SECRET_KEY"], os.environ["YC_REGION"], "sqs" ) data = { "Action": "PurgeQueue", "QueueUrl": queue, } res = get_session().post(YMQ_ENDPOINT_URL, data=data, auth=aws_auth, timeout=5) check_response(res) def queue_get_attributes(queue): aws_auth = AWS4Auth( os.environ["YMQ_ACCESS_KEY"], os.environ["YMQ_SECRET_KEY"], os.environ["YC_REGION"], "sqs" ) data = { "Action": "GetQueueAttributes", "AttributeName.1": "ApproximateNumberOfMessages", "AttributeName.2": "ApproximateNumberOfMessagesDelayed", "AttributeName.3": "ApproximateNumberOfMessagesNotVisible", "QueueUrl": queue, } res = get_session().post(YMQ_ENDPOINT_URL, data=data, auth=aws_auth, timeout=5) check_response(res) tree = ElementTree.fromstring(res.content.decode("utf-8")) attrlist = tree.find("GetQueueAttributesResult").findall("Attribute") return {attr.find("Name").text: attr.find("Value").text for attr in attrlist} class ConfirmModal(ModalScreen): BINDINGS = [ ("escape", "app.pop_screen", "Cancel"), ] DEFAULT_CSS = """ ConfirmModal { align: center middle; } #dialog { grid-size: 2; grid-gutter: 1 2; grid-rows: 1fr 3; padding: 0 1; width: 60; height: 11; border: thick $background 80%; background: $surface; } #question { column-span: 2; height: 1fr; width: 1fr; content-align: center middle; } Button { width: 100%; } Container { align: center middle; } """ def __init__(self, question, name=None, id=None, classes=None): self.question = question super().__init__(name, id, classes) def compose(self): yield Grid( Label(self.question, id="question"), Container(Button("Cancel", id="cancel")), Container(Button("Confirm", variant="primary", id="confirm")), id="dialog", ) def on_button_pressed(self, event): if event.button.id == "confirm": self.dismiss(True) else: self.dismiss(False) class QueueManagerApp(App): BINDINGS = [ ("escape", "quit()", "Quit"), ("ctrl+r", "refresh", "Refresh"), ("e", "enable_trigger", "Enable"), ("d", "disable_trigger", "Disable"), ("p", "purge_queue", "Purge"), ("f12", "take_screenshot()", "Screenshot"), ] def __init__(self): super().__init__() parser = argparse.ArgumentParser(description="QueueManager") parser.add_argument("--env", required=True, help="Select env") options = parser.parse_args() export_variables(options.env) with open(os.environ["YC_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"]) as json_file: sa_key = json.loads(json_file.read()) self.yc_sdk = yandexcloud.SDK(service_account_key=sa_key) self.yc_triggers = self.yc_sdk.client(TriggerServiceStub) self.queues = {} def compose(self): yield DataTable(cursor_type="row") yield Footer() def on_mount(self): table = self.query_one(DataTable) table.add_columns("Name", "Type", "Messages", "Delayed", "Invisible", "Trigger") self.action_refresh() def action_refresh(self): table = self.query_one(DataTable) # table.loading = True self.get_queues() class ReturnData(Message): def __init__(self, data): self.data = data super().__init__() @work(exclusive=True, thread=True) def get_queues(self): tlist = self.yc_triggers.List(ListTriggersRequest(folder_id=os.environ["YC_FOLDER_ID"])) triggers = {} for trigger in tlist.triggers: qname = trigger.rule.message_queue.queue_id.rsplit(":", 1)[-1] triggers[qname] = { "id": trigger.id, "status": _TRIGGER_STATUS.values_by_number[trigger.status].name } qlist = queue_list() queues = {} for queue in qlist: name = queue.rsplit("/", 1)[-1] attrs = queue_get_attributes(queue) queues[queue] = { "name": self.QMAP.get(name, name), "type": "FIFO" if name.endswith(".fifo") else "Standard", "attrs": attrs, "trigger": triggers.get(name, {"id": None, "status": "N/A"}), } self.post_message(self.ReturnData(queues)) QMAP = { } @on(ReturnData) async def update_table(self, return_data): table = self.query_one(DataTable) coordinate = table.cursor_coordinate table.clear() self.queues = return_data.data for qk, qv in sorted(return_data.data.items(), key=lambda v: v[1]['name']): table.add_row( qv["name"], qv["type"], qv["attrs"]["ApproximateNumberOfMessages"], qv["attrs"]["ApproximateNumberOfMessagesDelayed"], qv["attrs"]["ApproximateNumberOfMessagesNotVisible"], f'[red]{qv["trigger"]["status"]}[/red]' if qv["trigger"]["status"] == "PAUSED" else f'[green]{qv["trigger"]["status"]}[/green]' if qv["trigger"]["status"] == "ACTIVE" else qv["trigger"]["status"], key=qk ) table.cursor_coordinate = coordinate table.loading = False table.focus() def action_purge_queue(self): table = self.query_one(DataTable) if not table.is_valid_coordinate(table.cursor_coordinate): return cell_key = table.coordinate_to_cell_key(table.cursor_coordinate) row_key = cell_key.row_key.value if row_key is None: return def check_confirm(confirm): if confirm: queue_purge(row_key) self.notify(self.queues[row_key]["name"], title="Purged") self.action_refresh() self.app.push_screen( ConfirmModal(f'Purge queue {self.queues[row_key]["name"]}?'), check_confirm, ) def action_enable_trigger(self): table = self.query_one(DataTable) if not table.is_valid_coordinate(table.cursor_coordinate): return cell_key = table.coordinate_to_cell_key(table.cursor_coordinate) row_key = cell_key.row_key.value if row_key is None: return if self.queues[row_key]["trigger"]["id"] is None: return self.yc_triggers.Resume(ResumeTriggerRequest(trigger_id=self.queues[row_key]["trigger"]["id"])) self.notify(self.queues[row_key]["name"], title="Enabled") self.action_refresh() def action_disable_trigger(self): table = self.query_one(DataTable) if not table.is_valid_coordinate(table.cursor_coordinate): return cell_key = table.coordinate_to_cell_key(table.cursor_coordinate) row_key = cell_key.row_key.value if row_key is None: return if self.queues[row_key]["trigger"]["id"] is None: return self.yc_triggers.Pause(PauseTriggerRequest(trigger_id=self.queues[row_key]["trigger"]["id"])) self.notify(self.queues[row_key]["name"], title="Disabled") self.action_refresh() def action_take_screenshot(self): filename = self.save_screenshot() self.notify(f"Saved in {filename}", title="Screenshot saved") app = QueueManagerApp() if __name__ == "__main__": app.run()