Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save unbracketed/aa72e5d314302e04cc49112279b5884f to your computer and use it in GitHub Desktop.
Save unbracketed/aa72e5d314302e04cc49112279b5884f to your computer and use it in GitHub Desktop.

Revisions

  1. unbracketed created this gist Mar 6, 2025.
    232 changes: 232 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,232 @@
    # /// script
    # requires-python = ">=3.12"
    # dependencies = [
    # "nanodjango",
    # "channels",
    # "daphne",
    # "htpy",
    # "markdown",
    # "markupsafe",
    # "llm"
    # ]
    # ///

    import json
    import uuid

    from channels.generic.websocket import WebsocketConsumer
    from channels.routing import ProtocolTypeRouter, URLRouter
    from channels.auth import AuthMiddlewareStack
    from django.http import HttpResponse
    from django.urls import path
    from markupsafe import Markup
    from markdown import markdown
    from htpy import (
    body,
    button,
    div,
    form,
    h1,
    head,
    html,
    input,
    meta,
    script,
    link,
    title,
    main,
    style,
    fieldset,
    article,
    )
    from nanodjango import Django
    import llm


    model = llm.get_model("4o")


    def html_template():
    return html[
    head[
    meta(charset="utf-8"),
    meta(name="viewport", content="width=device-width, initial-scale=1"),
    title["llm chat"],
    script(src="https://unpkg.com/[email protected]"),
    script(src="https://unpkg.com/[email protected]"),
    script(src="https://cdn.jsdelivr.net/npm/marked/marked.min.js"),
    link(
    rel="stylesheet",
    href="https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css",
    ),
    style[
    Markup("""
    .message { padding: .5rem; }
    .message.user-message {
    border: 1px solid #999;
    border-radius: 0.5rem;
    margin-bottom: 0.5rem;
    }
    .message.echo-message {
    font-weight: bold;
    border: 1px solid green;
    border-radius: 0.5rem;
    margin-bottom: 1rem;
    }
    """)
    ],
    script[
    Markup("""
    // Create a MutationObserver to watch for content changes
    const observer = new MutationObserver((mutations) => {
    mutations.forEach((mutation) => {
    if (mutation.type === 'childList' && mutation.target.classList.contains('echo-message')) {
    // Only parse if the content isn't already HTML
    if (!mutation.target.innerHTML.includes('<')) {
    mutation.target.innerHTML = marked.parse(mutation.target.innerHTML);
    }
    }
    });
    });

    // Start observing the message list for changes
    document.addEventListener('DOMContentLoaded', () => {
    const messageList = document.getElementById('message-list');
    if (messageList) {
    observer.observe(messageList, {
    childList: true,
    subtree: true,
    characterData: true
    });
    }
    });
    """)
    ],
    ],
    body[
    main(class_="container")[
    article[
    h1["Chat with GPT-4o"],
    div(hx_ext="ws", ws_connect="/ws/echo/")[
    div("#message-list"),
    form(ws_send=True)[
    fieldset(role="group")[
    input(
    name="message",
    type="text",
    placeholder="Type your message...",
    autocomplete="off",
    ),
    button(
    class_="primary outline",
    type="submit",
    onclick="setTimeout(() => this.closest('form').querySelector('input[name=message]').value = '', 0)",
    )["↩"],
    ]
    ],
    ],
    ],
    ]
    ],
    ]


    def response_message(message_text, id):
    return div("#message-list", hx_swap_oob=f"beforeend:{id}")[message_text]


    def formatted_response_message(message_text, id):
    return div(id, hx_swap_oob="outerHTML")[
    Markup(markdown(message_text, extensions=['fenced_code']))
    ]


    def response_markdown_trigger(id):
    return json.dumps({
    "type": "parse-markdown",
    "elementId": id
    })


    def response_container(id):
    return div("#message-list", hx_swap_oob="beforeend")[
    div(id, class_=["message", "echo-message"])
    ]


    def user_message(message_text):
    return div("#message-list", hx_swap_oob="beforeend")[
    div(class_=["message", "user-message"])[
    message_text
    ]
    ]


    app = Django(
    # EXTRA_APPS=[
    # "channels",
    # ],
    #
    # Nanodjango doesn't yet support prepending "priority" apps to INSTALLED_APPS,
    # and `daphne` must be the first app in INSTALLED_APPS.
    INSTALLED_APPS=[
    "daphne",
    "django.contrib.admin",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "whitenoise.runserver_nostatic",
    "django.contrib.staticfiles",
    "channels",
    ],
    CHANNEL_LAYERS={
    "default": {
    "BACKEND": "channels.layers.InMemoryChannelLayer",
    },
    },
    ASGI_APPLICATION="__main__.htmx_websocket_app",
    )


    @app.route("/")
    def index(request):
    return HttpResponse(html_template())


    class EchoConsumer(WebsocketConsumer):
    def receive(self, text_data):
    text_data_json = json.loads(text_data)
    message_text = text_data_json.get("message", "")
    if not message_text.strip():
    return
    user_message_html = user_message(message_text)
    self.send(text_data=user_message_html)

    response = model.prompt(message_text)
    response_container_id = f"#echo-message-{str(uuid.uuid4())}"
    response_container_html = response_container(response_container_id)
    self.send(text_data=response_container_html)
    full_response = ""
    for chunk in response:
    full_response += chunk
    echo_message_html = response_message(chunk, response_container_id)
    self.send(text_data=echo_message_html)
    self.send(text_data=formatted_response_message(full_response, response_container_id))


    websocket_urlpatterns = [
    path("ws/echo/", EchoConsumer.as_asgi()),
    ]

    # Configure ASGI application. Channels
    htmx_websocket_app = ProtocolTypeRouter(
    {
    "http": app.asgi,
    "websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)),
    }
    )


    if __name__ == "__main__":
    app.run()