""" Ollama Chat This is a simple chatbot that uses Ollama to generate responses. Tech Stack: - Ollama - Streamlit - Python # Run using streamlit ollama pull gemma3:12b streamlit run ollama_chat.py """ import ollama import streamlit as st from PIL import Image import pytesseract import io import fitz # PyMuPDF for PDF processing import base64 import json import os from datetime import datetime # Constants HISTORY_DIR = "chat_history" MAX_HISTORY_FILES = 10 # Maximum number of history files to keep MAX_TITLE_LENGTH = 50 # Maximum length for chat titles def ensure_history_dir(): """Ensure the chat history directory exists""" if not os.path.exists(HISTORY_DIR): os.makedirs(HISTORY_DIR) def generate_chat_title(messages): """Generate a title from the first user message""" for message in messages: if message["role"] == "user": title = message["content"][:MAX_TITLE_LENGTH] # Truncate at the last complete word if necessary if len(message["content"]) > MAX_TITLE_LENGTH: title = title.rsplit(' ', 1)[0] + "..." return title return "New Chat" def save_chat_history(messages, filename=None): """Save chat history to a JSON file""" ensure_history_dir() if filename is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") title = generate_chat_title(messages) filename = f"chat_history_{timestamp}.json" filepath = os.path.join(HISTORY_DIR, filename) # Clean up old history files if we exceed the maximum cleanup_old_history_files() # Save messages along with the title data = { "messages": messages, "title": generate_chat_title(messages) } with open(filepath, 'w') as f: json.dump(data, f) return filename def load_chat_history(filename=None): """Load chat history from a specific file or the most recent one""" ensure_history_dir() try: if filename: filepath = os.path.join(HISTORY_DIR, filename) if os.path.exists(filepath): with open(filepath, 'r') as f: data = json.load(f) # Handle both old and new format files if isinstance(data, dict) and "messages" in data: return data["messages"] return data else: history_files = get_chat_history_files() if history_files: latest_file = os.path.join(HISTORY_DIR, history_files[0]["filename"]) with open(latest_file, 'r') as f: data = json.load(f) if isinstance(data, dict) and "messages" in data: return data["messages"] return data except Exception as e: st.error(f"Error loading chat history: {str(e)}") return [] def delete_chat_history(filename): """Delete a specific chat history file""" try: filepath = os.path.join(HISTORY_DIR, filename) if os.path.exists(filepath): os.remove(filepath) return True except Exception as e: st.error(f"Error deleting chat history: {str(e)}") return False def get_chat_history_files(): """Get list of chat history files with their timestamps and titles""" ensure_history_dir() try: history_files = [] for filename in os.listdir(HISTORY_DIR): if filename.startswith("chat_history_"): filepath = os.path.join(HISTORY_DIR, filename) # Extract timestamp from filename timestamp_str = filename.replace("chat_history_", "").replace(".json", "") try: timestamp = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S") formatted_time = timestamp.strftime("%Y-%m-%d %H:%M:%S") # Load the title from the file with open(filepath, 'r') as f: data = json.load(f) title = data.get("title", "Untitled Chat") if isinstance(data, dict) else "Untitled Chat" history_files.append({ "filename": filename, "timestamp": timestamp, "display_name": formatted_time, "title": title }) except (ValueError, json.JSONDecodeError, IOError): continue # Sort by timestamp, newest first return sorted(history_files, key=lambda x: x["timestamp"], reverse=True) except Exception as e: st.error(f"Error listing chat histories: {str(e)}") return [] def cleanup_old_history_files(): """Remove old history files if we exceed the maximum""" try: history_files = get_chat_history_files() if len(history_files) > MAX_HISTORY_FILES: for old_file in history_files[MAX_HISTORY_FILES:]: os.remove(os.path.join(HISTORY_DIR, old_file["filename"])) except Exception as e: st.error(f"Error cleaning up history files: {str(e)}") system_prompt = """ You are a helpful assistant that can answer questions and help with tasks. You can also analyze images and PDFs that users share with you. """ def extract_text_from_pdf(pdf_file): """Extract text from uploaded PDF file""" text = "" try: pdf_bytes = pdf_file.read() doc = fitz.open(stream=pdf_bytes, filetype="pdf") for page in doc: text += page.get_text() return text except Exception as e: return f"Error extracting text from PDF: {str(e)}" def process_image(image_file): """Process uploaded image and extract text using OCR""" try: # Read the image file image_bytes = image_file.read() image = Image.open(io.BytesIO(image_bytes)) # Perform OCR text = pytesseract.image_to_string(image) # Convert image to base64 for display buffered = io.BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() return text.strip(), img_str except Exception as e: return f"Error processing image: {str(e)}", None def chat(user_prompt, model="gemma3:12b", files_content="", message_history=None): """ Chat with the model using conversation history for context """ # Initialize messages with system prompt messages = [{'role': 'assistant', 'content': system_prompt}] # Add conversation history if provided if message_history: for msg in message_history: # Convert our message format to Ollama's format messages.append({ 'role': msg['role'], 'content': msg['content'] }) # Add current prompt full_prompt = user_prompt if files_content: full_prompt = f"{files_content}\n\nUser question: {user_prompt}" messages.append({ 'role': 'user', 'content': f"Model being used is {model}. {full_prompt}" }) stream = ollama.chat( model=model, messages=messages, stream=True, ) return stream def stream_parser(stream): for chunk in stream: yield chunk["message"]["content"] # Set the title of the chatbot st.title("Ollama Chatbot") # Initialize session state for deletion confirmations if "show_delete_all_dialog" not in st.session_state: st.session_state.show_delete_all_dialog = False if "delete_chat_id" not in st.session_state: st.session_state.delete_chat_id = None # Sidebar for chat management with st.sidebar: st.title("Chat Management") # New Chat button if st.button("New Chat", key="new_chat"): st.session_state.messages = [] st.session_state.current_chat = None st.rerun() # Clear History button with confirmation if st.button("Clear All History"): st.session_state.show_delete_all_dialog = True st.rerun() # Delete all confirmation dialog if st.session_state.show_delete_all_dialog: st.warning("Are you sure you want to delete all chat histories?") col1, col2 = st.columns(2) with col1: if st.button("Yes, delete all"): try: for file in os.listdir(HISTORY_DIR): os.remove(os.path.join(HISTORY_DIR, file)) st.session_state.messages = [] st.session_state.current_chat = None st.session_state.show_delete_all_dialog = False st.rerun() except Exception as e: st.error(f"Error clearing history: {str(e)}") with col2: if st.button("Cancel"): st.session_state.show_delete_all_dialog = False st.rerun() # Chat history selection st.subheader("Chat History") chat_histories = get_chat_history_files() # Individual chat deletion confirmation dialog if st.session_state.delete_chat_id: history_to_delete = next((h for h in chat_histories if h["filename"] == st.session_state.delete_chat_id), None) if history_to_delete: st.warning(f"Delete chat '{history_to_delete['title']}'?") col1, col2 = st.columns(2) with col1: if st.button("Yes, delete"): if delete_chat_history(st.session_state.delete_chat_id): if st.session_state.current_chat == st.session_state.delete_chat_id: st.session_state.messages = [] st.session_state.current_chat = None st.session_state.delete_chat_id = None st.rerun() with col2: if st.button("Cancel"): st.session_state.delete_chat_id = None st.rerun() # Display chat histories with delete buttons for history in chat_histories: col1, col2 = st.columns([4, 1]) with col1: if st.button(f"📝 {history['title']}\n{history['display_name']}", key=history['filename']): st.session_state.messages = load_chat_history(history['filename']) st.session_state.current_chat = history['filename'] st.rerun() with col2: if st.button("🗑️", key=f"delete_{history['filename']}"): st.session_state.delete_chat_id = history['filename'] st.rerun() # Initialize the chat history and current chat if "messages" not in st.session_state: st.session_state.messages = [] if "current_chat" not in st.session_state: st.session_state.current_chat = None # Display chat history on app re-run for message in st.session_state.messages: with st.chat_message(message["role"]): if "image" in message: st.image(f"data:image/png;base64,{message['image']}") st.markdown(message["content"]) # File upload section uploaded_files = st.file_uploader("Upload PDF or Image files", type=['pdf', 'png', 'jpg', 'jpeg'], accept_multiple_files=True) # Process uploaded files files_content = "" uploaded_images = [] if uploaded_files: for file in uploaded_files: if file.type == "application/pdf": pdf_text = extract_text_from_pdf(file) files_content += f"\nContent from PDF '{file.name}':\n{pdf_text}\n" elif file.type.startswith('image/'): image_text, img_str = process_image(file) if img_str: uploaded_images.append(img_str) files_content += f"\nContent from Image '{file.name}':\n{image_text}\n" # React to user input if user_prompt := st.chat_input("What would you like to ask?"): # Display user prompt and any uploaded images in chat message widget with st.chat_message("user"): st.markdown(user_prompt) for img_str in uploaded_images: st.image(f"data:image/png;base64,{img_str}") # Add user's prompt and images to session state message_data = {"role": "user", "content": user_prompt} if uploaded_images: message_data["image"] = uploaded_images[0] # Store the first image for history st.session_state.messages.append(message_data) with st.spinner('Generating response...'): # Pass the existing messages as context llm_stream = chat( user_prompt, files_content=files_content, message_history=st.session_state.messages # Add conversation history ) # streams the response back to the screen stream_output = st.write_stream(stream_parser(llm_stream)) # appends response to the message list st.session_state.messages.append({"role": "assistant", "content": stream_output}) # Save chat history after each message if st.session_state.current_chat: save_chat_history(st.session_state.messages, st.session_state.current_chat) else: new_filename = save_chat_history(st.session_state.messages) st.session_state.current_chat = new_filename