Skip to content

Instantly share code, notes, and snippets.

@KGB33
Last active August 31, 2021 16:35
Show Gist options
  • Select an option

  • Save KGB33/5bb5e4675bbeb9a4025c991a16a1f58f to your computer and use it in GitHub Desktop.

Select an option

Save KGB33/5bb5e4675bbeb9a4025c991a16a1f58f to your computer and use it in GitHub Desktop.

Revisions

  1. KGB33 renamed this gist Aug 31, 2021. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. KGB33 created this gist Aug 31, 2021.
    184 changes: 184 additions & 0 deletions playground.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,184 @@
    from typing import Final
    import os
    import asyncio
    import json

    import requests
    import websockets

    BASE_URL: Final = f"https://discord.com/api/v9"

    sequence_number: str | None = None

    BOT_TOKEN: Final = os.getenv("DISCORD_TOKEN")
    BOT_NAME: Final = "RoboShpee"
    BOT_ID: Final = "541500270438514688"
    GUILD_ID: Final = "263452485639733249"

    ident = {
    "op": 2,
    "d": {
    "token": BOT_TOKEN,
    "properties": {"$os": "linux", "$browser": BOT_NAME, "$device": BOT_NAME},
    "compress": False,
    "intents": 513,
    "presence": {
    "activities": [{"name": "Discord.py depreciated :(", "type": 0}],
    "status": "online",
    "afk": False,
    },
    },
    }


    class DiscordError(Exception):
    pass


    class AuthenticationError(DiscordError):
    pass


    def register_command():
    """
    Tells discord to display the /command in the app,
    as well as what options the command has.
    """
    # Guild Scoped commands
    url = BASE_URL + f"/applications/{BOT_ID}/guilds/{GUILD_ID}/commands"

    # For global commands - Takes about an hour to replicate to every guild.
    # url = BASE_URL + f"/applications/{BOT_ID}/commands"
    echo = {
    "name": "echo",
    "type": 1,
    "application_id": BOT_ID,
    "description": "Echos the user's input",
    "options": [
    {
    "type": 3,
    "name": "content",
    "description": "What to echo back",
    "required": True,
    },
    ],
    }
    headers = {"Authorization": f"Bot {BOT_TOKEN}"}
    requests.post(url, headers=headers, json=echo)


    def connect_to_gateway():
    """
    The 'main' function, it gets the wss url from
    discord's HTTP API, then starts the websocket client.
    """
    url = get_gateway_url()["url"]
    asyncio.run(open_wss(url))


    def get_gateway_url():
    url = BASE_URL + "/gateway/bot"
    headers = {"Authorization": f"Bot {BOT_TOKEN}"}
    r = requests.get(url, headers=headers)
    match r.status_code:
    case 200:
    return r.json()
    case 401:
    raise AuthenticationError(r.json()["message"])
    case _:
    raise DiscordError(f"Unknown Error occurred, {r.status_code=}, {r.json()=}")


    async def open_wss(url: str):
    url_params = "?v=9&encoding=json"
    async with websockets.connect(url + url_params) as websocket:
    # Session ID should be used to reconnect when you receive an opcode 7
    # but reconnecting is not implemented here.
    session_id, hb_int = await init_connection(websocket)
    await asyncio.gather(
    lisiten_and_dispatch(websocket), heartbeat(websocket, hb_int)
    )


    async def init_connection(ws: websockets.WebSocketClientProtocol) -> tuple[str, int]:
    """
    Implements the connection handshake implemented
    [here](https://discord.com/developers/docs/topics/gateway#connecting-to-the-gateway)
    """
    # Hello
    msg = json.loads(await ws.recv())
    assert msg["op"] == 10
    hb_int = msg["d"]["heartbeat_interval"]

    # Ident
    await ws.send(json.dumps(ident))

    # Ready
    msg = json.loads(await ws.recv())
    assert (msg["op"] == 0) and (msg["t"] == "READY")
    session_id = msg["d"]["session_id"]

    return session_id, hb_int


    async def heartbeat(ws, hb_int):
    """
    Implements the heartbeat protocol described in the linked docs.
    Note: the websockets library will automatically send & receive PINGs & PONGs.
    These will keep the connection alive. Whereas the heartbeat will keep the
    connection authenticated.
    https://discord.com/developers/docs/topics/gateway#heartbeating
    """
    hb_int = hb_int / 1000 # ms -> s
    while True:
    await asyncio.sleep(hb_int)
    await ws.send(json.dumps({"op": 1, "d": sequence_number}))


    async def lisiten_and_dispatch(websocket):
    async for message in websocket:
    msg = json.loads(message)
    global sequence_number
    if msg["s"]: # heartbeat AWKs have no sequence_number
    sequence_number = msg["s"]
    match msg["op"]:
    case 10:
    print(f"Unexpected HELLO... {msg=}")
    case 0:
    if msg["t"] == "READY":
    print("Unexpected READY...")
    elif msg["t"] == "INTERACTION_CREATE":
    handle_interaction(msg)
    else:
    print(f"Received Dispatch: {msg['t']}")
    case 9:
    print("Invalid Session...")
    break
    case 11:
    print("Heartbeat AWK received...")
    case _:
    print(f"Got unknown msg OP:")
    print(json.dumps(msg, sort_keys=True, indent=4))


    def handle_interaction(msg):
    interaction = msg["d"]
    # always assuming the /command is /echo
    response = {"type": 4, "data": {"content": msg["d"]["data"]["options"][0]["value"]}}
    url = f"https://discord.com/api/v9/interactions/{interaction['id']}/{interaction['token']}/callback"
    requests.post(url, json=response)


    import logging

    logger = logging.getLogger("websockets")
    logger.setLevel(logging.INFO)
    logger.addHandler(logging.StreamHandler())


    if __name__ == "__main__":
    # Commands only need to be registered once.
    # register_command()
    connect_to_gateway()