Skip to content

Instantly share code, notes, and snippets.

@WebLeash
Forked from nikolaik/sentry-alert.py
Created April 17, 2023 18:29
Show Gist options
  • Save WebLeash/a54351b8d7bf23c40757034f19619f34 to your computer and use it in GitHub Desktop.
Save WebLeash/a54351b8d7bf23c40757034f19619f34 to your computer and use it in GitHub Desktop.

Revisions

  1. @nikolaik nikolaik revised this gist Mar 31, 2023. 1 changed file with 44 additions and 16 deletions.
    60 changes: 44 additions & 16 deletions sentry-alert.py
    Original file line number Diff line number Diff line change
    @@ -10,14 +10,24 @@
    SLACK_RULES = {
    "slack-ops": {
    "name": "slack-ops",
    "filters": [{"id": "sentry.rules.filters.level.LevelFilter", "match": "gte", "level": "40"}],
    "filters": [
    {
    "id": "sentry.rules.filters.level.LevelFilter",
    "match": "gte",
    "level": "40",
    }
    ],
    "channel_name": "ops",
    "environment": "production",
    },
    "slack-ops-other": {
    "name": "slack-ops-other",
    "filters": [
    {"id": "sentry.rules.filters.level.LevelFilter", "match": "gte", "level": "30"},
    {
    "id": "sentry.rules.filters.level.LevelFilter",
    "match": "gte",
    "level": "30",
    },
    {
    "attribute": "environment",
    "match": "ne",
    @@ -31,9 +41,13 @@
    }


    def rule_payload(rule: dict, workspace_id: str = SLACK_WORKSPACE_ID, frequency=720, owner=OWNER_TEAM):
    def rule_payload(
    rule: dict, workspace_id: str = SLACK_WORKSPACE_ID, frequency=720, owner=OWNER_TEAM
    ):
    return {
    "conditions": [{"id": "sentry.rules.conditions.every_event.EveryEventCondition"}],
    "conditions": [
    {"id": "sentry.rules.conditions.every_event.EveryEventCondition"}
    ],
    "filters": rule["filters"],
    "actions": [
    {
    @@ -52,51 +66,63 @@ def rule_payload(rule: dict, workspace_id: str = SLACK_WORKSPACE_ID, frequency=7
    }


    def _request(path, method="post", **kwargs):
    AUTH_TOKEN = os.getenv("SENTRY_TOKEN") # A token with scopes project:read,project:write fetched from
    def _request(path, method="post", paginate=False, parse_json=True, **kwargs):
    AUTH_TOKEN = os.getenv(
    "SENTRY_TOKEN"
    ) # A token with scopes project:read,project:write fetched from
    assert AUTH_TOKEN
    headers = {"Authorization": f"Bearer {AUTH_TOKEN}"}

    url = f"https://sentry.io/api/0{path}"
    res = requests.request(method, url, headers=headers, **kwargs)
    res.raise_for_status()
    return res
    if paginate and res.links.get("next", {}).get("results", False):
    params = {"cursor": res.links["next"]["cursor"]}
    return res.json() + _request(path, method=method, params=params, **kwargs)
    return res.json() if parse_json else res


    def list_projects():
    path = "/projects/"
    res = _request(path, method="get")
    return res.json()
    res = _request(path, method="get", paginate=True)
    return res


    def list_rules(app: str, org: str):
    path = f"/projects/{org}/{app}/rules/"
    res = _request(path, method="get")
    return res.json()
    res = _request(path, method="get", paginate=True)
    return res


    def fetch_rule(app: str, rule_id: int, org: str):
    path = f"/projects/{org}/{app}/rules/{rule_id}/"
    res = _request(path, method="get")
    return res.json()
    return res


    def create_rule(app: str, data: dict, org: str):
    path = f"/projects/{org}/{app}/rules/"
    res = _request(path, json=data)
    return res.json()
    return res


    def delete_rule(app: str, rule_id: int, org: str):
    path = f"/projects/{org}/{app}/rules/{rule_id}/"
    try:
    _request(path, method="delete")
    _request(path, method="delete", parse_json=False)
    except requests.exceptions.HTTPError:
    return False
    return True


    def main(apps: list[str], all_apps: bool, create_rules: bool, prune_rules: bool, list_apps: bool, org: str):
    def main(
    apps: list[str],
    all_apps: bool,
    create_rules: bool,
    prune_rules: bool,
    list_apps: bool,
    org: str,
    ):
    if list_apps:
    print(json.dumps([project["slug"] for project in list_projects()], indent=2))
    return
    @@ -128,7 +154,9 @@ def main(apps: list[str], all_apps: bool, create_rules: bool, prune_rules: bool,


    if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Update sentry project with a set of standard rules")
    parser = argparse.ArgumentParser(
    description="Update sentry project with a set of standard rules"
    )
    parser.add_argument("--apps", nargs="*")
    parser.add_argument("--list-apps", action="store_true")
    parser.add_argument("--all", action="store_true")
  2. @nikolaik nikolaik created this gist Jan 20, 2022.
    139 changes: 139 additions & 0 deletions sentry-alert.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,139 @@
    import argparse
    import json
    import os
    import requests

    ORG_SLUG = "..."
    SLACK_WORKSPACE_ID = "..."
    OWNER_TEAM = "..."

    SLACK_RULES = {
    "slack-ops": {
    "name": "slack-ops",
    "filters": [{"id": "sentry.rules.filters.level.LevelFilter", "match": "gte", "level": "40"}],
    "channel_name": "ops",
    "environment": "production",
    },
    "slack-ops-other": {
    "name": "slack-ops-other",
    "filters": [
    {"id": "sentry.rules.filters.level.LevelFilter", "match": "gte", "level": "30"},
    {
    "attribute": "environment",
    "match": "ne",
    "value": "production",
    "id": "sentry.rules.filters.event_attribute.EventAttributeFilter",
    },
    ],
    "channel_name": "ops-other",
    "environment": None,
    },
    }


    def rule_payload(rule: dict, workspace_id: str = SLACK_WORKSPACE_ID, frequency=720, owner=OWNER_TEAM):
    return {
    "conditions": [{"id": "sentry.rules.conditions.every_event.EveryEventCondition"}],
    "filters": rule["filters"],
    "actions": [
    {
    "tags": "environment,url",
    "workspace": workspace_id,
    "id": "sentry.integrations.slack.notify_action.SlackNotifyServiceAction",
    "channel": rule["channel_name"],
    }
    ],
    "actionMatch": "all",
    "filterMatch": "all",
    "frequency": frequency,
    "name": rule["name"],
    "owner": owner,
    "environment": rule["environment"],
    }


    def _request(path, method="post", **kwargs):
    AUTH_TOKEN = os.getenv("SENTRY_TOKEN") # A token with scopes project:read,project:write fetched from
    assert AUTH_TOKEN
    headers = {"Authorization": f"Bearer {AUTH_TOKEN}"}

    url = f"https://sentry.io/api/0{path}"
    res = requests.request(method, url, headers=headers, **kwargs)
    res.raise_for_status()
    return res


    def list_projects():
    path = "/projects/"
    res = _request(path, method="get")
    return res.json()


    def list_rules(app: str, org: str):
    path = f"/projects/{org}/{app}/rules/"
    res = _request(path, method="get")
    return res.json()


    def fetch_rule(app: str, rule_id: int, org: str):
    path = f"/projects/{org}/{app}/rules/{rule_id}/"
    res = _request(path, method="get")
    return res.json()


    def create_rule(app: str, data: dict, org: str):
    path = f"/projects/{org}/{app}/rules/"
    res = _request(path, json=data)
    return res.json()


    def delete_rule(app: str, rule_id: int, org: str):
    path = f"/projects/{org}/{app}/rules/{rule_id}/"
    try:
    _request(path, method="delete")
    except requests.exceptions.HTTPError:
    return False
    return True


    def main(apps: list[str], all_apps: bool, create_rules: bool, prune_rules: bool, list_apps: bool, org: str):
    if list_apps:
    print(json.dumps([project["slug"] for project in list_projects()], indent=2))
    return

    if all_apps:
    apps = [project["slug"] for project in list_projects()]

    for app in apps:
    fresh_rules = list_rules(app, org)
    existing = {rule["name"] for rule in fresh_rules}

    if create_rules:
    for name, rule in SLACK_RULES.items():
    if name in existing:
    print(f"Rule with {name=} already exists, skipping...")
    continue

    data = rule_payload(rule)
    create_rule(app, data, org)

    if prune_rules:
    to_delete = existing - set(SLACK_RULES.keys())
    for rule in to_delete:
    rule_id = [r for r in fresh_rules if r["name"] == rule][0]["id"]
    delete_rule(app, rule_id, org)

    if not create_rules and not prune_rules:
    print(json.dumps(fresh_rules, indent=2))


    if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Update sentry project with a set of standard rules")
    parser.add_argument("--apps", nargs="*")
    parser.add_argument("--list-apps", action="store_true")
    parser.add_argument("--all", action="store_true")
    parser.add_argument("--create", action="store_true")
    parser.add_argument("--prune", action="store_true")
    parser.add_argument("--org", default=ORG_SLUG)
    args = parser.parse_args()
    main(args.apps, args.all, args.create, args.prune, args.list_apps, args.org)