-
-
Save WebLeash/a54351b8d7bf23c40757034f19619f34 to your computer and use it in GitHub Desktop.
Revisions
-
nikolaik revised this gist
Mar 31, 2023 . 1 changed file with 44 additions and 16 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -10,14 +10,24 @@ SLACK_RULES = { "slack-ops": { "name": "slack-ops", "filters": [ { "id": "sentry.rules.filters.level.LevelFilter", "match": "gte", "level": "40", } ], "channel_name": "ops", "environment": "production", }, "slack-ops-other": { "name": "slack-ops-other", "filters": [ { "id": "sentry.rules.filters.level.LevelFilter", "match": "gte", "level": "30", }, { "attribute": "environment", "match": "ne", @@ -31,9 +41,13 @@ } def rule_payload( rule: dict, workspace_id: str = SLACK_WORKSPACE_ID, frequency=720, owner=OWNER_TEAM ): return { "conditions": [ {"id": "sentry.rules.conditions.every_event.EveryEventCondition"} ], "filters": rule["filters"], "actions": [ { @@ -52,51 +66,63 @@ def rule_payload(rule: dict, workspace_id: str = SLACK_WORKSPACE_ID, frequency=7 } def _request(path, method="post", paginate=False, parse_json=True, **kwargs): AUTH_TOKEN = os.getenv( "SENTRY_TOKEN" ) # A token with scopes project:read,project:write fetched from assert AUTH_TOKEN headers = {"Authorization": f"Bearer {AUTH_TOKEN}"} url = f"https://sentry.io/api/0{path}" res = requests.request(method, url, headers=headers, **kwargs) res.raise_for_status() if paginate and res.links.get("next", {}).get("results", False): params = {"cursor": res.links["next"]["cursor"]} return res.json() + _request(path, method=method, params=params, **kwargs) return res.json() if parse_json else res def list_projects(): path = "/projects/" res = _request(path, method="get", paginate=True) return res def list_rules(app: str, org: str): path = f"/projects/{org}/{app}/rules/" res = _request(path, method="get", paginate=True) return res def fetch_rule(app: str, rule_id: int, org: str): path = f"/projects/{org}/{app}/rules/{rule_id}/" res = _request(path, method="get") return res def create_rule(app: str, data: dict, org: str): path = f"/projects/{org}/{app}/rules/" res = _request(path, json=data) return res def delete_rule(app: str, rule_id: int, org: str): path = f"/projects/{org}/{app}/rules/{rule_id}/" try: _request(path, method="delete", parse_json=False) except requests.exceptions.HTTPError: return False return True def main( apps: list[str], all_apps: bool, create_rules: bool, prune_rules: bool, list_apps: bool, org: str, ): if list_apps: print(json.dumps([project["slug"] for project in list_projects()], indent=2)) return @@ -128,7 +154,9 @@ def main(apps: list[str], all_apps: bool, create_rules: bool, prune_rules: bool, if __name__ == "__main__": parser = argparse.ArgumentParser( description="Update sentry project with a set of standard rules" ) parser.add_argument("--apps", nargs="*") parser.add_argument("--list-apps", action="store_true") parser.add_argument("--all", action="store_true") -
nikolaik created this gist
Jan 20, 2022 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,139 @@ import argparse import json import os import requests ORG_SLUG = "..." SLACK_WORKSPACE_ID = "..." OWNER_TEAM = "..." SLACK_RULES = { "slack-ops": { "name": "slack-ops", "filters": [{"id": "sentry.rules.filters.level.LevelFilter", "match": "gte", "level": "40"}], "channel_name": "ops", "environment": "production", }, "slack-ops-other": { "name": "slack-ops-other", "filters": [ {"id": "sentry.rules.filters.level.LevelFilter", "match": "gte", "level": "30"}, { "attribute": "environment", "match": "ne", "value": "production", "id": "sentry.rules.filters.event_attribute.EventAttributeFilter", }, ], "channel_name": "ops-other", "environment": None, }, } def rule_payload(rule: dict, workspace_id: str = SLACK_WORKSPACE_ID, frequency=720, owner=OWNER_TEAM): return { "conditions": [{"id": "sentry.rules.conditions.every_event.EveryEventCondition"}], "filters": rule["filters"], "actions": [ { "tags": "environment,url", "workspace": workspace_id, "id": "sentry.integrations.slack.notify_action.SlackNotifyServiceAction", "channel": rule["channel_name"], } ], "actionMatch": "all", "filterMatch": "all", "frequency": frequency, "name": rule["name"], "owner": owner, "environment": rule["environment"], } def _request(path, method="post", **kwargs): AUTH_TOKEN = os.getenv("SENTRY_TOKEN") # A token with scopes project:read,project:write fetched from assert AUTH_TOKEN headers = {"Authorization": f"Bearer {AUTH_TOKEN}"} url = f"https://sentry.io/api/0{path}" res = requests.request(method, url, headers=headers, **kwargs) res.raise_for_status() return res def list_projects(): path = "/projects/" res = _request(path, method="get") return res.json() def list_rules(app: str, org: str): path = f"/projects/{org}/{app}/rules/" res = _request(path, method="get") return res.json() def fetch_rule(app: str, rule_id: int, org: str): path = f"/projects/{org}/{app}/rules/{rule_id}/" res = _request(path, method="get") return res.json() def create_rule(app: str, data: dict, org: str): path = f"/projects/{org}/{app}/rules/" res = _request(path, json=data) return res.json() def delete_rule(app: str, rule_id: int, org: str): path = f"/projects/{org}/{app}/rules/{rule_id}/" try: _request(path, method="delete") except requests.exceptions.HTTPError: return False return True def main(apps: list[str], all_apps: bool, create_rules: bool, prune_rules: bool, list_apps: bool, org: str): if list_apps: print(json.dumps([project["slug"] for project in list_projects()], indent=2)) return if all_apps: apps = [project["slug"] for project in list_projects()] for app in apps: fresh_rules = list_rules(app, org) existing = {rule["name"] for rule in fresh_rules} if create_rules: for name, rule in SLACK_RULES.items(): if name in existing: print(f"Rule with {name=} already exists, skipping...") continue data = rule_payload(rule) create_rule(app, data, org) if prune_rules: to_delete = existing - set(SLACK_RULES.keys()) for rule in to_delete: rule_id = [r for r in fresh_rules if r["name"] == rule][0]["id"] delete_rule(app, rule_id, org) if not create_rules and not prune_rules: print(json.dumps(fresh_rules, indent=2)) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Update sentry project with a set of standard rules") parser.add_argument("--apps", nargs="*") parser.add_argument("--list-apps", action="store_true") parser.add_argument("--all", action="store_true") parser.add_argument("--create", action="store_true") parser.add_argument("--prune", action="store_true") parser.add_argument("--org", default=ORG_SLUG) args = parser.parse_args() main(args.apps, args.all, args.create, args.prune, args.list_apps, args.org)