Skip to content

Instantly share code, notes, and snippets.

@sfc-gh-jreini
Created March 13, 2025 02:32
Show Gist options
  • Save sfc-gh-jreini/306bbf3bc11047c34cadb67e16c2762c to your computer and use it in GitHub Desktop.
Save sfc-gh-jreini/306bbf3bc11047c34cadb67e16c2762c to your computer and use it in GitHub Desktop.
Streamlit tips for talking to structured data
import streamlit as st
import json
import _snowflake
from snowflake.snowpark.context import get_active_session
from snowflake.cortex import complete
# -- Snowflake session --
session = get_active_session()
# -- Page Title --
st.title("Chat with your Structured Data :balloon:")
API_ENDPOINT = "/api/v2/cortex/analyst/message"
API_TIMEOUT = 50000 # in milliseconds
# ------------------------------------------------------------------
# Initialize st.session_state if not present
if "messages" not in st.session_state:
st.session_state.messages = [] # For our UI chat display
if "analyst_conversation" not in st.session_state:
st.session_state.analyst_conversation = [] # For sending to Cortex Analyst
def get_sql_from_cortex_analyst():
"""
Sends the entire analyst_conversation in the request to preserve context.
"""
request_body = {
"messages": st.session_state.analyst_conversation, # full conversation so far
"semantic_model_file": '@"CORTEX_ANALYST_DEMO"."REVENUE_TIMESERIES"."RAW_DATA"/revenue_timeseries.yaml',
}
resp = _snowflake.send_snow_api_request(
"POST",
API_ENDPOINT,
{},
{},
request_body,
None,
API_TIMEOUT,
)
return json.loads(resp["content"])
def parse_analyst_messages(analyst_output):
"""
Extract text, sql, and suggestions items from a valid "analyst" output
or return an error dict if the shape is not as expected.
"""
if isinstance(analyst_output, dict) and "message" in analyst_output:
contents = analyst_output["message"].get("content", [])
texts = []
statements = []
suggestions = [] # for suggestions
for item in contents:
if item.get("type") == "text":
texts.append(item.get("text", "").strip())
elif item.get("type") == "sql":
statements.append(item.get("statement", "").strip())
elif item.get("type") == "suggestions":
suggestions.extend(item.get("suggestions", []))
return {"status": "ok", "texts": texts, "statements": statements, "suggestions": suggestions}
if isinstance(analyst_output, list) and analyst_output:
raw_content = analyst_output[0].get("content", "").strip()
return {"status": "error", "error_message": raw_content}
return {"status": "error", "error_message": "Could not parse analyst output"}
def answer_question_using_analyst(query: str):
"""
1) Append the user query to st.session_state.analyst_conversation.
2) Call Cortex Analyst and parse the response.
3) Append the analyst response (texts, SQL, and suggestions) to the conversation.
4) If a SQL statement is returned, display the question interpretation (texts) and suggestions via st.info.
(No st.info display occurs if no SQL is returned.)
5) If no SQL is returned, yield the texts immediately.
6) Otherwise, execute the SQL, display the SQL and results in expanders, and then generate the final answer from SQL results.
7) Yield tokens from the streaming LLM response.
This function always yields output (as a generator) so that the final answer can be streamed.
"""
# STEP 1: Append user query
st.session_state.analyst_conversation.append({
"role": "user",
"content": [{"type": "text", "text": query}]
})
# STEP 2: Call Cortex Analyst
with st.spinner("Calling Cortex Analyst to generate SQL..."):
analyst_output = get_sql_from_cortex_analyst()
parsed = parse_analyst_messages(analyst_output)
if parsed["status"] == "error":
yield f"**Error**: {parsed['error_message']}"
return
# Set flag indicating if SQL was returned
has_sql = bool(parsed.get("statements"))
st.session_state.has_sql = has_sql
# STEP 3: Append analyst response to conversation
analyst_message_content = []
for txt in parsed.get("texts", []):
analyst_message_content.append({"type": "text", "text": txt})
for stmt in parsed.get("statements", []):
analyst_message_content.append({"type": "sql", "statement": stmt})
if parsed.get("suggestions"):
analyst_message_content.append({"type": "suggestions", "suggestions": parsed["suggestions"]})
st.session_state.analyst_conversation.append({
"role": "analyst",
"content": analyst_message_content
})
# STEP 4: Display interpretation and suggestions (only if SQL is present)
if has_sql:
for text in parsed.get("texts", []):
st.info(text)
if parsed.get("suggestions"):
st.info("Suggestions:")
for sug in parsed["suggestions"]:
st.info(f"- {sug}")
# STEP 5: If no SQL, yield the analyst texts as final answer and finish
if not has_sql:
final_text = "\n".join(parsed.get("texts", []))
yield final_text
return
# STEP 6: Process SQL - display SQL and execute it
sql_statement = parsed["statements"][0]
with st.expander("Generated SQL Statement"):
st.code(sql_statement, language="sql")
try:
with st.spinner("Executing SQL..."):
sql_output = session.sql(sql_statement.replace(";", ""))
except Exception as e:
yield f"**Error**: Failed to execute SQL. {e}. Please try again."
return
with st.expander("See SQL Results"):
try:
st.write(sql_output)
except Exception as e:
yield f"**Error**: Couldn't display SQL results. {e}"
return
# STEP 7: Generate final answer from SQL results using LLM streaming
try:
with st.spinner("Generating final answer from SQL results..."):
df = sql_output.to_pandas()
markdown_sql_output = df.to_markdown(index=False)
messages = [
{
"role": "system",
"content": "You are a helpful assistant that uses SQL output to answer questions."
},
{
"role": "user",
"content": (
f"The user asked: {query}\n\n"
f"The SQL results are:\n{markdown_sql_output}\n\n"
"Please answer the question concisely, without extra details."
)
}
]
options = {"guardrails": True}
response_generator = complete("claude-3-5-sonnet", messages, options=options, stream=True)
# Yield tokens from the streaming LLM response as the final answer.
for token in response_generator:
yield token
except Exception as e:
yield f"**Error**: Unable to generate final answer. {e}"
return
def display_messages():
"""
Renders the entire UI chat from st.session_state.messages.
"""
for message in st.session_state.messages:
role = message["role"]
content = message["content"]
if role == "user":
st.chat_message("user").write(content)
elif role == "assistant":
st.chat_message("assistant", avatar="🤖").write(content)
if st.button("Clear Conversation"):
st.session_state.messages.clear()
st.session_state.analyst_conversation.clear()
if "has_sql" in st.session_state:
del st.session_state["has_sql"]
# Display previous conversation
display_messages()
# Prompt user for input
user_input = st.chat_input("What is the highest revenue in each sales region?")
if user_input:
# Display user's message
st.chat_message("user").write(user_input)
st.session_state.messages.append({"role": "user", "content": user_input})
# Generate streaming tokens from the LLM answer generator.
ai_answer_generator = answer_question_using_analyst(user_input)
# Always display the final answer as a chat message (below st.info and expanders).
final_text = st.chat_message("assistant", avatar="🤖").write_stream(ai_answer_generator)
# Save the completed answer.
st.session_state.messages.append({"role": "assistant", "content": final_text})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment