Skip to content

Instantly share code, notes, and snippets.

@python273
Last active December 29, 2024 23:37
Show Gist options
  • Save python273/563177b3ad5b9f74c0f8f3299ec13850 to your computer and use it in GitHub Desktop.
Save python273/563177b3ad5b9f74c0f8f3299ec13850 to your computer and use it in GitHub Desktop.

Revisions

  1. python273 revised this gist Sep 13, 2023. 1 changed file with 36 additions and 10 deletions.
    46 changes: 36 additions & 10 deletions app.py
    Original file line number Diff line number Diff line change
    @@ -1,29 +1,50 @@
    import os
    os.environ["OPENAI_API_KEY"] = ""

    from flask import Flask, Response
    from flask import Flask, Response, request
    import threading
    import queue

    from langchain.llms import OpenAI
    from langchain.callbacks.base import CallbackManager
    from langchain.chat_models import ChatOpenAI
    from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
    from langchain.schema import AIMessage, HumanMessage, SystemMessage

    app = Flask(__name__)

    @app.route('/')
    def index():
    # just for the example, html is included directly, move to .html file
    return Response('''
    <!DOCTYPE html>
    <html>
    <head><title>Flask Streaming Langchain Example</title></head>
    <body>
    <form id="form">
    <input name="prompt" value="write a short koan story about seeing beyond"/>
    <input type="submit"/>
    </form>
    <div id="output"></div>
    <script>
    const formEl = document.getElementById('form');
    const outputEl = document.getElementById('output');
    (async function() { // wrap in async function to use await
    let aborter = new AbortController();
    async function run() {
    aborter.abort(); // cancel previous request
    outputEl.innerText = '';
    aborter = new AbortController();
    const prompt = new FormData(formEl).get('prompt');
    try {
    const response = await fetch('/chain', {method: 'POST'});
    const response = await fetch(
    '/chain', {
    signal: aborter.signal,
    method: 'POST',
    headers: {'Content-Type': 'application/json'},
    body: JSON.stringify({
    prompt
    }),
    }
    );
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    while (true) {
    @@ -35,7 +56,12 @@ def index():
    } catch (err) {
    console.error(err);
    }
    })();
    }
    run(); // run on initial prompt
    formEl.addEventListener('submit', function(event) {
    event.preventDefault();
    run();
    });
    </script>
    </body>
    </html>
    @@ -69,13 +95,13 @@ def on_llm_new_token(self, token: str, **kwargs):

    def llm_thread(g, prompt):
    try:
    llm = OpenAI(
    chat = ChatOpenAI(
    verbose=True,
    streaming=True,
    callback_manager=CallbackManager([ChainStreamHandler(g)]),
    callbacks=[ChainStreamHandler(g)],
    temperature=0.7,
    )
    llm(prompt)
    chat([HumanMessage(content=prompt)])
    finally:
    g.close()

    @@ -88,7 +114,7 @@ def chain(prompt):

    @app.route('/chain', methods=['POST'])
    def _chain():
    return Response(chain("# A koan story about AGI\n\n"), mimetype='text/plain')
    return Response(chain(request.json['prompt']), mimetype='text/plain')

    if __name__ == '__main__':
    app.run(threaded=True, debug=True)
  2. python273 revised this gist Apr 23, 2023. 1 changed file with 16 additions and 10 deletions.
    26 changes: 16 additions & 10 deletions app.py
    Original file line number Diff line number Diff line change
    @@ -16,20 +16,26 @@ def index():
    return Response('''
    <!DOCTYPE html>
    <html>
    <head>
    <title>Flask Streaming Langchain Example</title>
    </head>
    <head><title>Flask Streaming Langchain Example</title></head>
    <body>
    <div id="output"></div>
    <script>
    var outputEl = document.getElementById('output');
    fetch('/chain', {method: 'POST'}).then(async (response) => {
    const decoder = new TextDecoder();
    for await (const chunk of response.body) {
    const decoded = decoder.decode(chunk, {stream: true});
    outputEl.innerText += decoded;
    const outputEl = document.getElementById('output');
    (async function() { // wrap in async function to use await
    try {
    const response = await fetch('/chain', {method: 'POST'});
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    while (true) {
    const { done, value } = await reader.read();
    if (done) { break; }
    const decoded = decoder.decode(value, {stream: true});
    outputEl.innerText += decoded;
    }
    } catch (err) {
    console.error(err);
    }
    }).catch((err) => console.error(err));
    })();
    </script>
    </body>
    </html>
  3. python273 revised this gist Mar 25, 2023. No changes.
  4. python273 revised this gist Mar 25, 2023. 1 changed file with 24 additions and 33 deletions.
    57 changes: 24 additions & 33 deletions app.py
    Original file line number Diff line number Diff line change
    @@ -4,7 +4,6 @@
    from flask import Flask, Response
    import threading
    import queue
    import json

    from langchain.llms import OpenAI
    from langchain.callbacks.base import CallbackManager
    @@ -18,18 +17,19 @@ def index():
    <!DOCTYPE html>
    <html>
    <head>
    <title>Flask SSE Example</title>
    <title>Flask Streaming Langchain Example</title>
    </head>
    <body>
    <div id="output"></div>
    <script>
    var outputEl = document.getElementById('output');
    var source = new EventSource('/sse');
    source.onmessage = function(event) {
    outputEl.innerText += JSON.parse(event.data).chunk;
    };
    // don't auto restart connection
    source.onerror = function(event) { source.close(); };
    fetch('/chain', {method: 'POST'}).then(async (response) => {
    const decoder = new TextDecoder();
    for await (const chunk of response.body) {
    const decoded = decoder.decode(chunk, {stream: true});
    outputEl.innerText += decoded;
    }
    }).catch((err) => console.error(err));
    </script>
    </body>
    </html>
    @@ -44,54 +44,45 @@ def __iter__(self):

    def __next__(self):
    item = self.queue.get()
    if item is StopIteration:
    raise StopIteration
    if item is StopIteration: raise item
    return item

    def __call__(self):
    return self

    def send(self, data):
    self.queue.put(data)

    def close(self):
    self.queue.put(StopIteration)

    class SSEStreamHandler(StreamingStdOutCallbackHandler):
    class ChainStreamHandler(StreamingStdOutCallbackHandler):
    def __init__(self, gen):
    super().__init__()
    self.gen = gen

    def on_llm_new_token(self, token: str, **kwargs):
    # sys.stdout.write(token)
    # sys.stdout.flush()
    self.gen.send(token)

    def llm_thread(g, prompt):
    llm = OpenAI(
    streaming=True,
    callback_manager=CallbackManager([SSEStreamHandler(g)]),
    verbose=True,
    temperature=0.7
    )
    llm(prompt)
    g.close()
    try:
    llm = OpenAI(
    verbose=True,
    streaming=True,
    callback_manager=CallbackManager([ChainStreamHandler(g)]),
    temperature=0.7,
    )
    llm(prompt)
    finally:
    g.close()


    def chain(prompt):
    g = ThreadedGenerator()
    t = threading.Thread(target=llm_thread, args=(g, prompt))
    t.start()
    threading.Thread(target=llm_thread, args=(g, prompt)).start()
    return g


    @app.route('/sse')
    def sse():
    def _stream():
    for i in chain("Write me a two paragraph koan."):
    data = {'chunk': i}
    yield 'data: {}\n\n'.format(json.dumps(data))
    return Response(_stream(), mimetype='text/event-stream')
    @app.route('/chain', methods=['POST'])
    def _chain():
    return Response(chain("# A koan story about AGI\n\n"), mimetype='text/plain')

    if __name__ == '__main__':
    app.run(threaded=True, debug=True)
  5. python273 created this gist Mar 25, 2023.
    97 changes: 97 additions & 0 deletions app.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,97 @@
    import os
    os.environ["OPENAI_API_KEY"] = ""

    from flask import Flask, Response
    import threading
    import queue
    import json

    from langchain.llms import OpenAI
    from langchain.callbacks.base import CallbackManager
    from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

    app = Flask(__name__)

    @app.route('/')
    def index():
    return Response('''
    <!DOCTYPE html>
    <html>
    <head>
    <title>Flask SSE Example</title>
    </head>
    <body>
    <div id="output"></div>
    <script>
    var outputEl = document.getElementById('output');
    var source = new EventSource('/sse');
    source.onmessage = function(event) {
    outputEl.innerText += JSON.parse(event.data).chunk;
    };
    // don't auto restart connection
    source.onerror = function(event) { source.close(); };
    </script>
    </body>
    </html>
    ''', mimetype='text/html')

    class ThreadedGenerator:
    def __init__(self):
    self.queue = queue.Queue()

    def __iter__(self):
    return self

    def __next__(self):
    item = self.queue.get()
    if item is StopIteration:
    raise StopIteration
    return item

    def __call__(self):
    return self

    def send(self, data):
    self.queue.put(data)

    def close(self):
    self.queue.put(StopIteration)

    class SSEStreamHandler(StreamingStdOutCallbackHandler):
    def __init__(self, gen):
    super().__init__()
    self.gen = gen

    def on_llm_new_token(self, token: str, **kwargs):
    # sys.stdout.write(token)
    # sys.stdout.flush()
    self.gen.send(token)

    def llm_thread(g, prompt):
    llm = OpenAI(
    streaming=True,
    callback_manager=CallbackManager([SSEStreamHandler(g)]),
    verbose=True,
    temperature=0.7
    )
    llm(prompt)
    g.close()


    def chain(prompt):
    g = ThreadedGenerator()
    t = threading.Thread(target=llm_thread, args=(g, prompt))
    t.start()
    return g


    @app.route('/sse')
    def sse():
    def _stream():
    for i in chain("Write me a two paragraph koan."):
    data = {'chunk': i}
    yield 'data: {}\n\n'.format(json.dumps(data))
    return Response(_stream(), mimetype='text/event-stream')

    if __name__ == '__main__':
    app.run(threaded=True, debug=True)