Last active
July 9, 2025 00:55
-
-
Save twobob/afac3eeb069bedc63bcaa4ac64373ae2 to your computer and use it in GitHub Desktop.
Revisions
-
twobob revised this gist
Jul 9, 2025 . 1 changed file with 396 additions and 157 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -14,10 +14,7 @@ - Granular status updates at every stage - Inline copy-icon buttons for each message - Selectable model list with automatic, intelligent fallback - Debug menu to test all models and refresh list from API. """ from dotenv import load_dotenv @@ -36,6 +33,24 @@ from pygments.lexers import get_lexer_by_name from pygments.token import Token import base64 from PIL import Image, ImageTk import io import cairosvg try: import pyperclip except ImportError: pyperclip = None try: from tkinterdnd2 import DND_FILES, TkinterDnD except ImportError: print("FATAL ERROR: tkinterdnd2 library not found.") print("Please install it to enable drag-and-drop functionality: pip install tkinterdnd2") exit(1) # ---------------------------------------- # Environment & Mode Configuration # ---------------------------------------- @@ -49,8 +64,7 @@ exit(1) else: API_KEY = os.environ.get("OPENROUTER_KEY") #print(API_KEY) API_URL = "https://openrouter.ai/api/v1/chat/completions" @@ -65,7 +79,6 @@ {"display": "DeepSeek R1 0528", "api": "deepseek/deepseek-r1-0528:free"}, {"display": "Sarvam-M", "api": "sarvamai/sarvam-m:free"}, {"display": "Devstral Small", "api": "mistralai/devstral-small:free"}, {"display": "Qwen3 30B A3B", "api": "qwen/qwen3-30b-a3b:free"}, {"display": "Qwen3 8B", "api": "qwen/qwen3-8b:free"}, {"display": "Qwen3 14B", "api": "qwen/qwen3-14b:free"}, @@ -74,20 +87,12 @@ {"display": "DeepSeek R1T Chimera", "api": "tngtech/deepseek-r1t-chimera:free"}, {"display": "GLM-Z1-32B-0414", "api": "thudm/glm-z1-32b:free"}, {"display": "GLM-4-32B-0414", "api": "thudm/glm-4-32b:free"}, {"display": "DeepCoder-14B-Preview", "api": "agentica-org/deepcoder-14b-preview:free"}, {"display": "Kimi-VL-A3B-Thinking", "api": "moonshotai/kimi-vl-a3b-thinking:free"}, {"display": "Llama 3.3 Nemotron Super 49B v1", "api": "nvidia/llama-3.3-nemotron-super-49b-v1:free"}, {"display": "Qwen2.5 VL 32B Instruct", "api": "qwen/qwen2.5-vl-32b-instruct:free"}, {"display": "Qwerky 72B", "api": "featherless/qwerky-72b:free"}, {"display": "Mistral Small 3.1 24B", "api": "mistralai/mistral-small-3.1-24b-instruct:free"}, {"display": "Llama 3.3 70B Instruct", "api": "meta-llama/llama-3.3-70b-instruct:free"}, {"display": "Qwen2.5 Coder 32B Instruct", "api": "qwen/qwen2.5-coder-32b-instruct:free"}, {"display": "Llama 3.2 11B Vision Instruct", "api": "meta-llama/llama-3.2-11b-vision-instruct:free"}, @@ -97,6 +102,12 @@ {"display": "Mistral 7B Instruct", "api": "mistralai/mistral-7b-instruct:free"} ] VISION_MODELS = { "moonshotai/kimi-vl-a3b-thinking:free", "qwen/qwen2.5-vl-32b-instruct:free", "meta-llama/llama-3.2-11b-vision-instruct:free", } # ---------------------------------------- # UI Theming Definitions # ---------------------------------------- @@ -106,21 +117,23 @@ "btn_bg": "#4a4d4f", "btn_fg": "#dcdcdc", "user_fg": "#87ceeb", "ai_fg": "#98fb98", "menu_bg": "#2b2b2b", "menu_fg": "#dcdcdc", "status_fg": "#a9a9a9", "system_fg": "#ff6347", "vision_fg_ok": "#32CD32", "vision_fg_no": "#FF6347" }, "light": { "bg": "#ffffff", "fg": "#000000", "input_bg": "#f0f0f0", "btn_bg": "#e1e1e1", "btn_fg": "#000000", "user_fg": "#00008b", "ai_fg": "#006400", "menu_bg": "#ffffff", "menu_fg": "#000000", "status_fg": "#555555", "system_fg": "#dc143c", "vision_fg_ok": "#228B22", "vision_fg_no": "#B22222" } } # ---------------------------------------- # Core Chat API Function # ---------------------------------------- def chat_with_cypher_alpha(messages, model_name, timeout=30): """ Streamed chat completion generator. Yields content chunks or structured errors. """ @@ -136,14 +149,18 @@ def chat_with_cypher_alpha(messages, model_name, timeout=90): try: chunk = json.loads(data) delta = chunk["choices"][0]["delta"] if "content" in delta and delta["content"] is not None: yield delta["content"] except (json.JSONDecodeError, KeyError, IndexError): yield {"type": "error", "subtype": "parse", "message": f"Invalid data from {model_name}: {data.decode('utf-8', 'ignore')}"} continue yield None except requests.exceptions.Timeout: yield {"type": "error", "subtype": "network", "message": f"Request timed out for {model_name}."} yield None except requests.HTTPError as e: if e.response.status_code == 401: yield {"type": "error", "subtype": "auth_error", "message": "Authentication failed. The API key is invalid or has been revoked."} elif e.response.status_code == 429: yield {"type": "error", "subtype": "rate_limit", "message": "Rate limit reached. Please wait.", "cooldown": 20} elif "not found" in e.response.text.lower() or e.response.status_code == 404: @@ -165,25 +182,42 @@ def chat_with_cypher_alpha(messages, model_name, timeout=90): # ---------------------------------------- class ChatUI: def __init__(self, root): self.root = root self.root.title("Cypher Alpha Chat") self.root.geometry("900x800") self.current_theme = "dark" self.messages = [] self.stream_queue = queue.Queue() self.current_stream_content = [] self.is_streaming = False self.available_models = AVAILABLE_MODELS[:] self.staged_images = [] self.model_var = tk.StringVar(self.root) if self.available_models: self.model_var.set(self.available_models[0]["display"]) svg_base64 = ("PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSIyMDAiIGhlaWdodD0iMjAwIiB2aWV3Qm94PSIwIDAgMjQgMjQiPgogIDxwYXRoIGZpbGw9ImN1cnJlbnRDb2xvciIgZD0iTTIwIDJIMTBjLTEuMTAzIDAtMiAuODk3LTIgMnY0SDRjLTEuMTAzIDAtMiAuODk3LTIgMnYxMGMwIDEuMTAzLjg5NyAyIDIgMmgxMGMxLjEwMyAwIDItLjg5NyAyLTJ2LTRoNGMxLjEwMyAwIDItLjg5NyAyLTJWNGMwLTEuMTAzLS44OTctMi0yLTJ6TTQgMjBWMTBoMTBsLjAwMiAxMEg0em0xNi02aC00di00YzAtMS4xMDMtLjg5Ny0yLTItMmgtNFY0aDEwdjEweiIvPgo8L3N2Zz4=") png_bytes = cairosvg.svg2png(bytestring=base64.b64decode(svg_base64), output_width=16, output_height=16) self.copy_icon = PhotoImage(data=base64.b64encode(png_bytes).decode('ascii')) if not pyperclip: print("---") print("WARNING: 'pyperclip' library not found. Clipboard functions may not work reliably.") print(" Please install it for a better experience: pip install pyperclip") print("---") self._create_widgets() self.apply_theme() self.root.drop_target_register(DND_FILES) self.root.dnd_bind('<<Drop>>', self.handle_drop) greeting = "Hello! How can I help you today? Drag and drop images to chat with vision models." self.display_message("assistant", greeting, highlight=False) self.messages.append({"role": "assistant", "content": greeting}) def _create_widgets(self): """Create menus, chat area, input box, buttons, status bar.""" @@ -200,20 +234,32 @@ def _create_widgets(self): view_menu.add_command(label="Toggle Theme", command=self.toggle_theme) self.menu_bar.add_cascade(label="View", menu=view_menu) debug_menu = Menu(self.menu_bar, tearoff=0) debug_menu.add_command(label="Test All Models", command=self.start_model_test) debug_menu.add_command(label="Refresh Models from API", command=self.start_model_fetch) self.menu_bar.add_cascade(label="Debug", menu=debug_menu) model_frame = tk.Frame(self.root) model_frame.pack(padx=10, pady=(10, 0), fill=tk.X) model_label = tk.Label(model_frame, text="Model:") model_label.pack(side=tk.LEFT, padx=(0, 5)) model_names = [m["display"] for m in self.available_models] self.model_menu = tk.OptionMenu(model_frame, self.model_var, *model_names if model_names else ["No models available"]) self.model_menu.pack(side=tk.LEFT, fill=tk.X, expand=True) self.vision_status_label = tk.Label(model_frame, text="", font=("Segoe UI", 9, "italic")) self.vision_status_label.pack(side=tk.LEFT, padx=(10, 0)) self.model_var.trace_add("write", self.update_vision_status) self.update_vision_status() self.chat_history = scrolledtext.ScrolledText(self.root, wrap=tk.WORD, state=tk.DISABLED, font=("Segoe UI", 11), relief=tk.FLAT) self.chat_history.pack(padx=10, pady=10, fill=tk.BOTH, expand=True) self._configure_tags() self.chat_history.bind("<Button-3>", self.handle_right_click) self.staging_area = tk.Frame(self.root) self.staging_area.pack(padx=10, pady=5, fill=tk.X) input_frame = tk.Frame(self.root) input_frame.pack(padx=10, pady=(0, 10), fill=tk.X) @@ -227,7 +273,6 @@ def _create_widgets(self): self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) def _configure_tags(self): self.chat_history.tag_configure("user", justify="left", spacing3=10, lmargin1=10, rmargin=80) self.chat_history.tag_configure("assistant", justify="left", spacing3=10, lmargin1=10, rmargin=10) self.chat_history.tag_configure("system_warning", font=("Segoe UI", 10, "italic")) @@ -236,66 +281,92 @@ def _configure_tags(self): self.chat_history.tag_configure(tok.replace(".", "_"), foreground=color) def apply_theme(self): t = THEMES[self.current_theme] self.root.config(bg=t["bg"]) self.chat_history.config(bg=t["bg"], fg=t["fg"], insertbackground=t["fg"]) self.input_text.config(bg=t["input_bg"], fg=t["fg"], insertbackground=t["fg"]) self.send_button.config(bg=t["btn_bg"], fg=t["btn_fg"], activebackground=t["input_bg"], activeforeground=t["btn_fg"]) self.status_bar.config(bg=t["bg"], fg=t["status_fg"]) self.menu_bar.config(bg=t["menu_bg"], fg=t["menu_fg"], activebackground=t["input_bg"], activeforeground=t["btn_fg"]) self.staging_area.config(bg=t["bg"]) for widget in self.root.winfo_children(): if isinstance(widget, tk.Frame) and any(isinstance(child, tk.OptionMenu) for child in widget.winfo_children()): widget.config(bg=t["bg"]) for child in widget.winfo_children(): if isinstance(child, tk.Label): child.config(bg=t["bg"], fg=t["fg"]) elif isinstance(child, tk.OptionMenu): child.config(bg=t["btn_bg"], fg=t["btn_fg"], activebackground=t["input_bg"], activeforeground=t["btn_fg"], highlightthickness=0) child["menu"].config(bg=t["menu_bg"], fg=t["menu_fg"]) self.chat_history.tag_configure("user", foreground=t["user_fg"]) self.chat_history.tag_configure("assistant", foreground=t["ai_fg"]) self.chat_history.tag_configure("system_warning", foreground=t["system_fg"]) self.chat_history.tag_configure("code_block", background=t["input_bg"], foreground=t["fg"]) self.update_vision_status() def display_message(self, role, content, highlight=True, model_name=None): self.chat_history.config(state=tk.NORMAL) if role == "user": label = "You" elif role == "assistant": label = model_name or "Assistant" else: label = "System" text_content = "" image_list = [] if isinstance(content, list): for part in content: if part["type"] == "text": text_content += part["text"] elif part["type"] == "image_url": image_list.append(part["image_url"]["url"]) content_body = text_content.strip() else: content_body = content.strip() self.chat_history.insert(tk.END, f"{label}\n", (role,)) content_start_index = self.chat_history.index(tk.END) self.chat_history.insert(tk.END, content_body) if image_list: self.chat_history.insert(tk.END, " ") self.chat_history.insert(tk.END, f"[{len(image_list)} image(s)]", "system_warning") content_end_index = self.chat_history.index("end-1c") if role in ["user", "assistant"]: self._add_copy_button(content_end_index, content_body) self.chat_history.insert(tk.END, "\n\n") if role=="assistant" and highlight: self._highlight_code_in_range(content_start_index, content_end_index) self.chat_history.config(state=tk.DISABLED) self.chat_history.see(tk.END) def _add_copy_button(self, insert_pos, content_to_copy): theme = THEMES[self.current_theme] copy_btn = tk.Button(self.chat_history, image=self.copy_icon, command=lambda c=content_to_copy: self.copy_message_content(c), relief=tk.FLAT, borderwidth=0, cursor="hand2", bg=theme["bg"], activebackground=theme["input_bg"]) self.chat_history.window_create(insert_pos, window=copy_btn, padx=5, align="top") def copy_message_content(self, content): if not content: self.status_bar.config(text="Nothing to copy.") self.root.after(2000, lambda: self.status_bar.config(text="Ready (Ctrl+Enter to send)")) return if pyperclip: try: pyperclip.copy(content) self.status_bar.config(text="Copied to clipboard!") except Exception as e: self.status_bar.config(text=f"Clipboard error: {e}") else: self.root.clipboard_clear() self.root.clipboard_append(content) self.root.update() self.status_bar.config(text="Copied (fallback). Paste may not work.") self.root.after(2500, lambda: self.status_bar.config(text="Ready (Ctrl+Enter to send)")) def _highlight_code_in_range(self, start, end): seg = self.chat_history.get(start, end) for m in re.finditer(r"```(\w+)?\n(.*?)```", seg, re.DOTALL): bs=f"{start}+{m.start()}c"; be=f"{start}+{m.end()}c" @@ -309,153 +380,241 @@ def _highlight_code_in_range(self, start, end): except: continue def send_message(self, event=None): prompt=self.input_text.get("1.0",tk.END).strip() if not prompt and not self.staged_images: return model_display_name = self.model_var.get() model_api_name = next((m["api"] for m in self.available_models if m["display"] == model_display_name), None) content_parts = [] if prompt: content_parts.append({"type": "text", "text": prompt}) if self.staged_images: if not model_api_name or model_api_name not in VISION_MODELS: messagebox.showerror("Model Error", f"The selected model '{model_display_name}' is not vision-capable. Please choose a vision model to send images.") return for img_path in self.staged_images: try: with Image.open(img_path) as img: output_buffer = io.BytesIO() img.convert("RGB").save(output_buffer, format="JPEG") base64_image = base64.b64encode(output_buffer.getvalue()).decode('utf-8') content_parts.append({ "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"} }) except Exception as e: messagebox.showerror("Image Error", f"Failed to process image {os.path.basename(img_path)}:\n{e}") return final_content = content_parts if self.staged_images else prompt self.messages.append({"role": "user", "content": final_content}) self.display_message("user", final_content) self.input_text.delete("1.0",tk.END) self.staged_images.clear() for widget in self.staging_area.winfo_children(): widget.destroy() self.send_button.config(state=tk.DISABLED) self.input_text.config(state=tk.DISABLED) self.is_streaming = False threading.Thread(target=self._stream_worker_with_fallback, daemon=True).start() self.process_stream_queue() def _stream_worker_with_fallback(self): selected_display_name = self.model_var.get() try: start_index = next(i for i, model in enumerate(self.available_models) if model["display"] == selected_display_name) except StopIteration: start_index = 0 fallback_order = self.available_models[start_index:] + self.available_models[:start_index] has_succeeded = False for model in fallback_order: self.stream_queue.put({"type": "status", "message": f"Trying model: {model['display']}..."}) stream_had_content = False last_item_was_error = False for item in chat_with_cypher_alpha(self.messages, model["api"]): if item is None: break if isinstance(item, str): if not self.is_streaming: self.is_streaming = True self.stream_queue.put({"type": "content", "data": item, "model_name": model["display"]}) stream_had_content = True last_item_was_error = False elif isinstance(item, dict) and item.get("type") == "error": self.stream_queue.put(item) last_item_was_error = True if item.get("subtype") in ["auth_error", "rate_limit", "network"]: has_succeeded = True break if stream_had_content and not last_item_was_error: has_succeeded = True break if has_succeeded: break self.stream_queue.put(None) def process_stream_queue(self): try: item = self.stream_queue.get_nowait() if item is None: if self.is_streaming: self._on_stream_complete() else: self._reset_ui() return item_type = item.get("type") if item_type == "content": if not self.is_streaming: self.is_streaming = True self.current_stream_content = [] model_name = item.get("model_name", "Assistant") self.chat_history.config(state=tk.NORMAL) self.ai_header_start_index = self.chat_history.index(tk.END) self.chat_history.insert(tk.END, f"{model_name}\n", ("assistant",)) self.ai_start_index = self.chat_history.index(tk.END) self.chat_history.config(state=tk.DISABLED) self.chat_history.config(state=tk.NORMAL) self.current_stream_content.append(item["data"]) self.chat_history.insert(tk.END, item["data"]) self.chat_history.config(state=tk.DISABLED) self.chat_history.see(tk.END) elif item_type == "status": self.status_bar.config(text=item["message"]) elif item_type == "error": if item.get("subtype") == "rate_limit": self._handle_rate_limit(item) else: self._handle_generic_error(item) elif item_type == "models_updated": self._repopulate_model_menu(item.get("models", [])) elif item_type == "test_complete": failed_models = item.get("failed", []) if failed_models: self._remove_failed_models(failed_models) self.status_bar.config(text="Model testing complete.") self.menu_bar.entryconfig("Debug", state=tk.NORMAL) self.stream_queue.task_done() self.root.after(50, self.process_stream_queue) except queue.Empty: self.root.after(50, self.process_stream_queue) except Exception as e: print(f"Error in process_stream_queue: {e}") self._reset_ui() def _on_stream_complete(self): if not self.is_streaming: self._reset_ui() return self.status_bar.config(text="Stream finished. Finalizing...") self.chat_history.config(state=tk.NORMAL) full_streamed_content = "".join(self.current_stream_content) self.messages.append({"role": "assistant", "content": full_streamed_content}) content_end_index = self.chat_history.index("end-1c") self._add_copy_button(content_end_index, full_streamed_content) self.chat_history.insert(tk.END, "\n\n") if hasattr(self, 'ai_start_index') and self.ai_start_index: self._highlight_code_in_range(self.ai_start_index, content_end_index) self.chat_history.config(state=tk.DISABLED) self.chat_history.see(tk.END) self._reset_ui() def start_model_fetch(self): self.status_bar.config(text="Fetching models from OpenRouter API...") threading.Thread(target=self._fetch_models_worker, daemon=True).start() self.process_stream_queue() def _fetch_models_worker(self): self.stream_queue.put({"type": "status", "message": "Fetching model list from API..."}) try: url = "https://openrouter.ai/api/v1/models" resp = requests.get(url, timeout=20) resp.raise_for_status() all_models_data = resp.json().get("data", []) free_models = [] for model_data in all_models_data: model_id = model_data.get('id', '') if model_id.endswith(':free'): display_name = model_data.get('name', model_id).replace(" (free)", "").strip() free_models.append({"display": display_name, "api": model_id}) self.stream_queue.put({"type": "models_updated", "models": sorted(free_models, key=lambda x: x['display'])}) except Exception as e: print(f"Model Fetch Error: {e}") self.stream_queue.put({"type": "status", "message": "Failed to fetch models."}) self.stream_queue.put({"type": "models_updated", "models": []}) def _repopulate_model_menu(self, models): self.available_models = models menu = self.model_menu["menu"] menu.delete(0, "end") if not self.available_models: self.model_var.set("No models found") self.model_menu.config(state=tk.DISABLED) self.status_bar.config(text="API fetch failed. No models loaded.") messagebox.showerror("API Error", "Could not fetch the model list from OpenRouter API.") return new_model_names = [m["display"] for m in self.available_models] for name in new_model_names: menu.add_command(label=name, command=tk._setit(self.model_var, name)) self.model_var.set(new_model_names[0]) self.model_menu.config(state=tk.NORMAL) self.status_bar.config(text="Ready") self.update_vision_status() def start_model_test(self): if messagebox.askyesno("Confirm Model Test", "This will send a 'TEST' message to every model sequentially and may take a long time.\n\nModels that fail will be removed from the dropdown list for this session.\n\nContinue?"): self.menu_bar.entryconfig("Debug", state=tk.DISABLED) threading.Thread(target=self._test_all_models_worker, daemon=True).start() self.process_stream_queue() def _test_all_models_worker(self): failed_models = [] models_to_test = self.available_models[:] for model in models_to_test: self.stream_queue.put({"type": "status", "message": f"Testing: {model['display']}..."}) is_successful = False has_failed = False messages = [{"role": "user", "content": "TEST"}] for item in chat_with_cypher_alpha(messages, model["api"]): if item is None: continue if isinstance(item, str) and item.strip(): is_successful = True break elif isinstance(item, dict) and item.get("type") == "error": subtype = item.get("subtype") if subtype not in ["auth_error", "rate_limit", "network"]: print(f"Model Failure: {model['display']:<40} | Reason: {item.get('message', 'Unknown error')}") failed_models.append(model) else: self.stream_queue.put({"type": "status", "message": f"System Error during test. Stopping."}) self.stream_queue.put({"type": "test_complete", "failed": failed_models}) return has_failed = True break if not is_successful and not has_failed: print(f"Model Failure: {model['display']:<40} | Reason: No valid content in response.") failed_models.append(model) self.stream_queue.put({"type": "test_complete", "failed": failed_models}) def _remove_failed_models(self, failed_models): current_selection = self.model_var.get() failed_apis = {m["api"] for m in failed_models} self.available_models = [m for m in self.available_models if m["api"] not in failed_apis] menu = self.model_menu["menu"] menu.delete(0, "end") new_model_names = [m["display"] for m in self.available_models] if not new_model_names: self.model_var.set("No models available") self.model_menu.config(state=tk.DISABLED) messagebox.showinfo("Model Test Complete", "All models failed the test.") return for name in new_model_names: menu.add_command(label=name, command=tk._setit(self.model_var, name)) if current_selection not in new_model_names: self.model_var.set(new_model_names[0]) else: self.model_var.set(current_selection) messagebox.showinfo("Model Test Complete", f"{len(failed_models)} model(s) failed and have been removed from the list.") def _handle_rate_limit(self, error_data): self._cleanup_failed_attempt() self.display_message("system_warning", error_data["message"], highlight=False) cooldown = error_data.get("cooldown", 15) @@ -464,99 +623,179 @@ def _handle_rate_limit(self, error_data): self._update_cooldown_timer(cooldown) def _update_cooldown_timer(self, seconds_left): if seconds_left > 0: self.status_bar.config(text=f"Please wait... {seconds_left}s") self.root.after(1000, self._update_cooldown_timer, seconds_left - 1) else: self._reset_ui() def _handle_generic_error(self, error_data): self._cleanup_failed_attempt() self.display_message("system_warning", error_data["message"], highlight=False) self._reset_ui() def _cleanup_failed_attempt(self): if self.is_streaming and hasattr(self, 'ai_header_start_index') and self.ai_header_start_index: self.chat_history.config(state=tk.NORMAL) self.chat_history.delete(self.ai_header_start_index, "end-1c") self.chat_history.config(state=tk.DISABLED) def _reset_ui(self): self.send_button.config(state=tk.NORMAL) self.input_text.config(state=tk.NORMAL) self.status_bar.config(text="Ready (Ctrl+Enter to send)") self.is_streaming = False if hasattr(self, 'ai_header_start_index'): self.ai_header_start_index = None if hasattr(self, 'ai_start_index'): self.ai_start_index = None self.current_stream_content = [] def export_chat(self): path=filedialog.asksaveasfilename(defaultextension=".json", filetypes=[("JSON files","*.json"),("All files","*.*")]) if not path: return with open(path,"w",encoding="utf-8") as f: json.dump(self.messages,f,indent=4) messagebox.showinfo("Success","Chat history saved.") def import_chat(self): if self.messages and not messagebox.askyesno("Load Chat","This will replace current conversation. Continue?"): return path=filedialog.askopenfilename(filetypes=[("JSON files","*.json"),("All files","*.*")]) if not path: return with open(path,"r",encoding="utf-8") as f: data=json.load(f) self.new_chat(confirm=False) self.messages=data for m in self.messages: self.display_message(m["role"], m["content"]) def new_chat(self, confirm=True): if confirm and not messagebox.askyesno("New Chat","Clear current conversation?"): return self.messages.clear() self.chat_history.config(state=tk.NORMAL) self.chat_history.delete("1.0",tk.END) self.chat_history.config(state=tk.DISABLED) greeting = "Hello! How can I help you today? Drag and drop images to chat with vision models." self.display_message("assistant", greeting, highlight=False) self.messages.append({"role": "assistant", "content": greeting}) def toggle_theme(self): self.current_theme = "light" if self.current_theme=="dark" else "dark" self.apply_theme() def handle_right_click(self, event): """Checks if a right-click is on a code block and triggers a copy.""" tags = self.chat_history.tag_names(f"@{event.x},{event.y}") if "code_block" in tags: self.copy_code_block(event) def copy_code_block(self, event): """Copy the fenced code block you right-clicked on.""" idx = self.chat_history.index(f"@{event.x},{event.y}") ranges = self.chat_history.tag_ranges("code_block") for start,end in zip(ranges[0::2],ranges[1::2]): if (self.chat_history.compare(idx,">=",start) and self.chat_history.compare(idx,"<=",end)): raw = self.chat_history.get(start,end) cleaned=re.sub(r"^```[a-zA-Z]*\n|```$", "", raw).strip() if pyperclip: try: pyperclip.copy(cleaned) self.status_bar.config(text="Code copied!") except Exception as e: self.status_bar.config(text=f"Clipboard error: {e}") else: self.root.clipboard_clear() self.root.clipboard_append(cleaned) self.root.update() self.status_bar.config(text="Code copied (fallback).") self.root.after(2000, lambda: self.status_bar.config(text="Ready (Ctrl+Enter to send)")) return def handle_drop(self, event): """Handle dropped files.""" files = self.root.tk.splitlist(event.data) for f in files: if os.path.exists(f): if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): if f not in self.staged_images: self.staged_images.append(f) self._display_staged_thumbnail(f) else: self.status_bar.config(text=f"Unsupported file type: {os.path.basename(f)}") self.root.after(3000, lambda: self.status_bar.config(text="Ready")) def _display_staged_thumbnail(self, file_path): """Creates and displays a thumbnail for a staged image.""" thumb_frame = tk.Frame(self.staging_area, bd=1, relief=tk.RAISED) thumb_frame.pack(side=tk.LEFT, padx=5, pady=2) try: img = Image.open(file_path) img.thumbnail((64, 64)) photo = ImageTk.PhotoImage(img) img_label = tk.Label(thumb_frame, image=photo) img_label.image = photo img_label.pack(side=tk.TOP) close_btn = tk.Button(thumb_frame, text="X", command=lambda p=file_path, f=thumb_frame: self._remove_staged_image(p, f), relief=tk.FLAT, font=("Segoe UI", 7)) close_btn.pack(side=tk.BOTTOM, fill=tk.X, ipady=1, ipadx=1) except Exception as e: thumb_frame.destroy() messagebox.showerror("Image Error", f"Could not open image:\n{os.path.basename(file_path)}\n\nError: {e}") def _remove_staged_image(self, file_path, thumb_frame): """Removes an image from the staging area.""" if file_path in self.staged_images: self.staged_images.remove(file_path) thumb_frame.destroy() def update_vision_status(self, *args): """Updates the label indicating if the selected model supports vision.""" model_display_name = self.model_var.get() model_api_name = next((m["api"] for m in self.available_models if m["display"] == model_display_name), None) theme = THEMES[self.current_theme] if model_api_name in VISION_MODELS: self.vision_status_label.config(text="Vision Ready ✓", fg=theme["vision_fg_ok"]) else: self.vision_status_label.config(text="Text Only ✗", fg=theme["vision_fg_no"]) # ---------------------------------------- # Startup Self-Tests # ---------------------------------------- def run_startup_tests(): errs=[] for theme,cfg in THEMES.items(): for key in ["bg","fg","input_bg","btn_bg","btn_fg", "user_fg","ai_fg","menu_bg","menu_fg", "status_fg","system_fg", "vision_fg_ok", "vision_fg_no"]: if key not in cfg: errs.append(f"Theme '{theme}' missing '{key}'") methods_to_check = [ "_create_widgets", "_configure_tags", "apply_theme", "display_message", "_add_copy_button", "copy_message_content", "_highlight_code_in_range", "send_message", "_stream_worker_with_fallback", "process_stream_queue", "_on_stream_complete", "_fetch_models_worker", "_repopulate_model_menu", "_handle_rate_limit", "_update_cooldown_timer", "_handle_generic_error", "_cleanup_failed_attempt", "_reset_ui", "export_chat", "import_chat", "new_chat", "toggle_theme", "handle_right_click", "copy_code_block", "handle_drop", "_display_staged_thumbnail", "_remove_staged_image", "update_vision_status", "start_model_test", "_test_all_models_worker", "_remove_failed_models", "start_model_fetch" ] for m in methods_to_check: if not hasattr(ChatUI,m): errs.append(f"ChatUI missing method '{m}'") if errs: for e in errs: print("Startup test error:",e) print("Exiting due to startup test failures."); exit(1) return not errs # ---------------------------------------- # Application Entry Point # ---------------------------------------- if __name__ == "__main__": if not run_startup_tests(): exit(1) root = TkinterDnD.Tk() app = ChatUI(root) root.mainloop() -
twobob created this gist
Jul 7, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,562 @@ #!/usr/bin/env python3 """ Enterprise-grade Chat UI for OpenRouter's Cypher Alpha model using Tkinter. Features: - Full conversation history with scrollback - Dark/light theming toggle - Syntax-highlighted code blocks with copy-on-right-click - Streaming AI responses in background thread - Export/import chat history as JSON - Production/development mode with environment-based API key management - Startup self-tests to ensure no regression of core functionality - Multi-line input box for natural typing (Ctrl+Enter to send) - Granular status updates at every stage - Inline copy-icon buttons for each message - Selectable model list with automatic, intelligent fallback HOWTO: create a .env file in the same directory as this file and add an entry in ith with your key like OPENROUTER_KEY=sk-or-v1-86ce84f117cfce62a1detc """ from dotenv import load_dotenv import os load_dotenv() # Load .env file import re import json import queue import threading import requests import tkinter as tk from tkinter import scrolledtext, filedialog, messagebox, Menu, PhotoImage from pygments import lex from pygments.lexers import get_lexer_by_name from pygments.token import Token # ---------------------------------------- # Environment & Mode Configuration # ---------------------------------------- PRODUCTION = False # os.environ.get("ENV", "development").lower() == "production" if PRODUCTION: try: API_KEY = os.environ["OPENROUTER_API_KEY"] except KeyError: print("Error: OPENROUTER_API_KEY not set. Exiting.") exit(1) else: API_KEY = os.environ.get("OPENROUTER_KEY") print(API_KEY) API_URL = "https://openrouter.ai/api/v1/chat/completions" # ---------------------------------------- # Model Definitions # ---------------------------------------- AVAILABLE_MODELS = [ {"display": "Cypher Alpha", "api": "openrouter/cypher-alpha:free"}, {"display": "Mistral Small 3.2 24B", "api": "mistralai/mistral-small-3.2-24b-instruct:free"}, {"display": "Kimi Dev 72b", "api": "moonshotai/kimi-dev-72b:free"}, {"display": "DeepSeek R1 0528 Qwen3 8B", "api": "deepseek/deepseek-r1-0528-qwen3-8b:free"}, {"display": "DeepSeek R1 0528", "api": "deepseek/deepseek-r1-0528:free"}, {"display": "Sarvam-M", "api": "sarvamai/sarvam-m:free"}, {"display": "Devstral Small", "api": "mistralai/devstral-small:free"}, {"display": "Gemma 3n 4B", "api": "google/gemma-3n-4b:free"}, {"display": "Qwen3 30B A3B", "api": "qwen/qwen3-30b-a3b:free"}, {"display": "Qwen3 8B", "api": "qwen/qwen3-8b:free"}, {"display": "Qwen3 14B", "api": "qwen/qwen3-14b:free"}, {"display": "Qwen3 32B", "api": "qwen/qwen3-32b:free"}, {"display": "Qwen3 235B A22B", "api": "qwen/qwen3-235b-a22b:free"}, {"display": "DeepSeek R1T Chimera", "api": "tngtech/deepseek-r1t-chimera:free"}, {"display": "GLM-Z1-32B-0414", "api": "thudm/glm-z1-32b:free"}, {"display": "GLM-4-32B-0414", "api": "thudm/glm-4-32b:free"}, {"display": "Shisa V2 Llama 3.3 70B", "api": "shisa-ai/shisa-v2-llama-3.3-70b:free"}, {"display": "QwQ 32B RpR v1", "api": "arliai/qwq-32b-rpr-v1:free"}, {"display": "DeepCoder-14B-Preview", "api": "agentica-org/deepcoder-14b-preview:free"}, {"display": "Kimi-VL-A3B-Thinking", "api": "moonshotai/kimi-vl-a3b-thinking:free"}, {"display": "Llama 3.3 Nemotron Super 49B v1", "api": "nvidia/llama-3.3-nemotron-super-49b-v1:free"}, {"display": "Llama 3.1 Nemotron Ultra 253B v1", "api": "nvidia/llama-3.1-nemotron-ultra-253b-v1:free"}, {"display": "Qwen2.5 VL 32B Instruct", "api": "qwen/qwen2.5-vl-32b-instruct:free"}, {"display": "DeepSeek V3 0324", "api": "deepseek/deepseek-v3-0324:free"}, {"display": "Qwerky 72B", "api": "featherless/qwerky-72b:free"}, {"display": "Mistral Small 3.1 24B", "api": "mistralai/mistral-small-3.1-24b-instruct:free"}, {"display": "Gemma 3 4B", "api": "google/gemma-3-4b:free"}, {"display": "Gemma 3 12B", "api": "google/gemma-3-12b:free"}, {"display": "Reka Flash 3", "api": "reka/flash-3:free"}, {"display": "Gemini 2.0 Flash Experimental", "api": "google/gemini-2.0-flash-experimental:free"}, {"display": "Llama 3.3 70B Instruct", "api": "meta-llama/llama-3.3-70b-instruct:free"}, {"display": "Qwen2.5 Coder 32B Instruct", "api": "qwen/qwen2.5-coder-32b-instruct:free"}, {"display": "Llama 3.2 11B Vision Instruct", "api": "meta-llama/llama-3.2-11b-vision-instruct:free"}, {"display": "Qwen2.5 72B Instruct", "api": "qwen/qwen2.5-72b-instruct:free"}, {"display": "Mistral Nemo", "api": "mistralai/mistral-nemo:free"}, {"display": "Gemma 2 9B", "api": "google/gemma-2-9b:free"}, {"display": "Mistral 7B Instruct", "api": "mistralai/mistral-7b-instruct:free"} ] # ---------------------------------------- # UI Theming Definitions # ---------------------------------------- THEMES = { "dark": { "bg": "#2b2b2b", "fg": "#dcdcdc", "input_bg": "#3c3f41", "btn_bg": "#4a4d4f", "btn_fg": "#dcdcdc", "user_fg": "#87ceeb", "ai_fg": "#98fb98", "menu_bg": "#2b2b2b", "menu_fg": "#dcdcdc", "status_fg": "#a9a9a9", "system_fg": "#ff6347" }, "light": { "bg": "#ffffff", "fg": "#000000", "input_bg": "#f0f0f0", "btn_bg": "#e1e1e1", "btn_fg": "#000000", "user_fg": "#00008b", "ai_fg": "#006400", "menu_bg": "#ffffff", "menu_fg": "#000000", "status_fg": "#555555", "system_fg": "#dc143c" } } # ---------------------------------------- # Core Chat API Function # ---------------------------------------- def chat_with_cypher_alpha(messages, model_name, timeout=90): """ Streamed chat completion generator. Yields content chunks or structured errors. """ headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"} payload = {"model": model_name, "messages": messages, "stream": True} try: with requests.post(API_URL, json=payload, headers=headers, stream=True, timeout=timeout) as resp: resp.raise_for_status() for line in resp.iter_lines(): if not line.startswith(b"data: "): continue data = line[6:].strip() if data == b"[DONE]": break try: chunk = json.loads(data) delta = chunk["choices"][0]["delta"] if "content" in delta: yield delta["content"] except (json.JSONDecodeError, KeyError, IndexError): yield {"type": "error", "subtype": "parse", "message": f"Invalid data from {model_name}: {data.decode('utf-8', 'ignore')}"} continue yield None except requests.HTTPError as e: if e.response.status_code == 401: yield {"type": "error", "subtype": "auth_error", "message": "Authentication failed. The API key is invalid or has been revoked."} elif e.response.status_code == 429: yield {"type": "error", "subtype": "rate_limit", "message": "Rate limit reached. Please wait.", "cooldown": 20} elif "not found" in e.response.text.lower() or e.response.status_code == 404: yield {"type": "error", "subtype": "model_not_found", "message": f"Model not found: {model_name}"} else: details = f"Server error {e.response.status_code} on {model_name}." try: msg = e.response.json().get("error", {}).get("message", e.response.text) details += f" Details: {msg}" except: pass yield {"type": "error", "subtype": "http", "message": details} yield None except requests.RequestException as e: yield {"type": "error", "subtype": "network", "message": str(e)} yield None # ---------------------------------------- # Chat UI Class Definition # ---------------------------------------- class ChatUI: def __init__(self, root): self.root = root self.root.title("Cypher Alpha Chat") self.root.geometry("900x800") self.current_theme = "dark" self.messages = [] self.stream_queue = queue.Queue() self.ai_start_index = None self.ai_header_start_index = None self.is_streaming = False self.model_var = tk.StringVar(self.root) self.model_var.set(AVAILABLE_MODELS[0]["display"]) self.copy_icon = PhotoImage(data="R0lGODlhEAAQAMQAAORHVO5wW+5xX/B6X/KAb/ODc/SEePWFevqLg/qPhfyfiv+jkv+wmf+0oP/Crf/Js//ax//z1f/77f/++f///wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACH5BAEAAB8ALAAAAAAQABAAAAVS4CeOZGmeaKqubOu+cByDrEdg7AsLCwHn9QYABFJ4dD6zUIA4pVIIhGWAoFCYV1A4pZISyYg4p8wZAQEB8EjC0QzL53tAgYQRAQebe3xRAQAh+QQBAAAdACwAAAAAEAAQAAAFUeAnjmRpnmiqrmzrvnAcg6xHYOwLCwsB5/UGAARSOHQ+s1agOKVSiIRlgKCQmFdQOKWSEsmIOKfMGQEBAfBIwtEMy+d7QIGEERAUG3t8UQEAIflEAAAh+QQBAAAfACwAAAAAEAAQAAAFUuAnjmRpnmiqrmzrvnAcg6xHYOwLCwsB5/UGAARSOHQ+s1agOKVSiIRlgKCQmFdQOKWSEsmIOKfMGQEBAfBIwtEMy+d7QIGEERAUG3t8UQEAIpAAAh+QQBAAAfACwAAAAAEAAQAAAFUuAnjmRpnmiqrmzrvnAcg6xHYOwLCwsB5/UGAARSOHQ+s1agOKVSiIRlgKCQmFdQOKWSEsmIOKfMGQEBAfBIwtEMy+d7QIGEERAUG3t8UQEAIpAAAh+QQBAAAfACwAAAAAEAAQAAAFUuAnjmRpnmiqrmzrvnAcg6xHYOwLCwsB5/UGAARSOHQ+s1agOKVSiIRlgKCQmFdQOKWSEsmIOKfMGQEBAfBIwtEMy+d7QIGEERAUG3t8UQEAIpAAAh+QQBAAAfACwAAAAAEAAQAAAFUuAnjmRpnmiqrmzrvnAcg6xHYOwLCwsB5/UGAARSOHQ+s1agOKVSiIRlgKCQmFdQOKWSEsmIOKfMGQEBAfBIwtEMy+d7QIGEERAUG3t8UQEAIpAAAgA7") self._create_widgets() self.apply_theme() self.display_message("assistant", "Hello! How can I help you today?", highlight=False) def _create_widgets(self): """Create menus, chat area, input box, buttons, status bar.""" self.menu_bar = Menu(self.root) self.root.config(menu=self.menu_bar) file_menu = Menu(self.menu_bar, tearoff=0) file_menu.add_command(label="New Chat", command=self.new_chat) file_menu.add_command(label="Save Chat...", command=self.export_chat) file_menu.add_command(label="Load Chat...", command=self.import_chat) file_menu.add_separator() file_menu.add_command(label="Exit", command=self.root.quit) self.menu_bar.add_cascade(label="File", menu=file_menu) view_menu = Menu(self.menu_bar, tearoff=0) view_menu.add_command(label="Toggle Theme", command=self.toggle_theme) self.menu_bar.add_cascade(label="View", menu=view_menu) model_frame = tk.Frame(self.root) model_frame.pack(padx=10, pady=(10, 0), fill=tk.X) model_label = tk.Label(model_frame, text="Model:") model_label.pack(side=tk.LEFT, padx=(0, 5)) model_names = [m["display"] for m in AVAILABLE_MODELS] self.model_menu = tk.OptionMenu(model_frame, self.model_var, *model_names) self.model_menu.pack(side=tk.LEFT, fill=tk.X, expand=True) self.chat_history = scrolledtext.ScrolledText(self.root, wrap=tk.WORD, state=tk.DISABLED, font=("Segoe UI", 11), relief=tk.FLAT) self.chat_history.pack(padx=10, pady=10, fill=tk.BOTH, expand=True) self._configure_tags() self.context_menu = Menu(self.root, tearoff=0) self.context_menu.add_command(label="Copy Code Block", command=self.copy_code_block) self.chat_history.bind("<Button-3>", lambda e: self.show_context_menu(e)) input_frame = tk.Frame(self.root) input_frame.pack(padx=10, pady=(0, 10), fill=tk.X) self.input_text = tk.Text(input_frame, height=4, font=("Segoe UI", 11), relief=tk.FLAT, undo=True) self.input_text.pack(side=tk.LEFT, fill=tk.X, expand=True, ipady=5, padx=(0, 10)) self.input_text.bind("<Control-Return>", self.send_message) self.send_button = tk.Button(input_frame, text="Send", command=self.send_message, relief=tk.FLAT, borderwidth=2) self.send_button.pack(side=tk.RIGHT) self.status_bar = tk.Label(self.root, text="Ready (Ctrl+Enter to send)", bd=1, relief=tk.SUNKEN, anchor=tk.W) self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) def _configure_tags(self): """Set up text tags for roles, code blocks, syntax, warnings.""" self.chat_history.tag_configure("user", justify="left", spacing3=10, lmargin1=10, rmargin=80) self.chat_history.tag_configure("assistant", justify="left", spacing3=10, lmargin1=10, rmargin=10) self.chat_history.tag_configure("system_warning", font=("Segoe UI", 10, "italic")) self.chat_history.tag_configure("code_block", font=("Consolas", 10), lmargin1=20, lmargin2=20, rmargin=20, spacing1=5, spacing3=5, relief=tk.RAISED, borderwidth=1) for tok, color in {"Token.Keyword":"#CC7832", "Token.Name.Function":"#FFC66D", "Token.Literal.String.Single":"#A5C261", "Token.Literal.String.Double":"#A5C261", "Token.Comment.Single":"#808080", "Token.Operator":"#DA70D6", "Token.Number.Integer":"#6897BB", "Token.Punctuation": "#dcdcdc", "Token.Name.Builtin": "#93C763", "Token.Name.Class": "#DA70D6"}.items(): self.chat_history.tag_configure(tok.replace(".", "_"), foreground=color) def apply_theme(self): """Apply colors from current theme.""" t = THEMES[self.current_theme] self.root.config(bg=t["bg"]) self.chat_history.config(bg=t["bg"], fg=t["fg"], insertbackground=t["fg"]) self.input_text.config(bg=t["input_bg"], fg=t["fg"], insertbackground=t["fg"]) self.send_button.config(bg=t["btn_bg"], fg=t["btn_fg"], activebackground=t["input_bg"], activeforeground=t["btn_fg"]) self.status_bar.config(bg=t["bg"], fg=t["status_fg"]) self.menu_bar.config(bg=t["menu_bg"], fg=t["menu_fg"], activebackground=t["input_bg"], activeforeground=t["btn_fg"]) for widget in self.root.winfo_children(): if isinstance(widget, tk.Frame) and any(isinstance(child, tk.OptionMenu) for child in widget.winfo_children()): widget.config(bg=t["bg"]) for child in widget.winfo_children(): if isinstance(child, tk.Label): child.config(bg=t["bg"], fg=t["fg"]) elif isinstance(child, tk.OptionMenu): child.config(bg=t["btn_bg"], fg=t["btn_fg"], activebackground=t["input_bg"], activeforeground=t["btn_fg"], highlightthickness=0) child["menu"].config(bg=t["menu_bg"], fg=t["menu_fg"]) self.chat_history.tag_configure("user", foreground=t["user_fg"]) self.chat_history.tag_configure("assistant", foreground=t["ai_fg"]) self.chat_history.tag_configure("system_warning", foreground=t["system_fg"]) self.chat_history.tag_configure("code_block", background=t["input_bg"], foreground=t["fg"]) def display_message(self, role, content, highlight=True, model_name=None): """Insert a message; highlight code if assistant.""" self.chat_history.config(state=tk.NORMAL) if role == "user": label = "You" elif role == "assistant": label = model_name or "Assistant" else: label = "System" self.chat_history.insert(tk.END, f"{label}\n", (role,)) start = self.chat_history.index(tk.END) self.chat_history.insert(tk.END, content.strip()) end = self.chat_history.index(tk.END) if role in ["user", "assistant"]: self._add_copy_button(start, end) self.chat_history.insert(tk.END, "\n\n") if role=="assistant" and highlight: self._highlight_code_in_range(start, end) self.chat_history.config(state=tk.DISABLED) self.chat_history.see(tk.END) def _add_copy_button(self, start_index, end_index): """Inserts a clickable copy icon button into the text widget.""" theme = THEMES[self.current_theme] copy_btn = tk.Button(self.chat_history, image=self.copy_icon, command=lambda s=start_index, e=end_index: self.copy_message_content(s, e), relief=tk.FLAT, borderwidth=0, cursor="hand2", bg=theme["bg"], activebackground=theme["input_bg"]) self.chat_history.window_create(end_index, window=copy_btn, padx=5, align="top") def copy_message_content(self, start_index, end_index): """Callback to copy text from a specific range to the clipboard.""" content = self.chat_history.get(start_index, end_index).strip() self.root.clipboard_clear() self.root.clipboard_append(content) self.status_bar.config(text="Copied to clipboard!") self.root.after(2500, lambda: self.status_bar.config(text="Ready (Ctrl+Enter to send)")) def _highlight_code_in_range(self, start, end): """Syntax-highlight fenced code between start/end.""" seg = self.chat_history.get(start, end) for m in re.finditer(r"```(\w+)?\n(.*?)```", seg, re.DOTALL): bs=f"{start}+{m.start()}c"; be=f"{start}+{m.end()}c" self.chat_history.tag_add("code_block", bs, be) try: lexer=get_lexer_by_name(m.group(1) or "text", stripall=True) idx=f"{start}+{m.start(2)}c" for tok,val in lex(m.group(2), lexer): tag=str(tok).replace(".","_"); ln=len(val) self.chat_history.tag_add(tag, idx, f"{idx}+{ln}c"); idx=f"{idx}+{ln}c" except: continue def send_message(self, event=None): """Queue user text, disable inputs, and start the stream worker with fallback logic.""" prompt=self.input_text.get("1.0",tk.END).strip() if not prompt: return self.input_text.delete("1.0",tk.END) self.display_message("user", prompt) self.messages.append({"role":"user","content":prompt}) self.send_button.config(state=tk.DISABLED) self.input_text.config(state=tk.DISABLED) self.is_streaming = False threading.Thread(target=self._stream_worker_with_fallback, daemon=True).start() self.process_stream_queue() def _stream_worker_with_fallback(self): """Orchestrates trying the selected model and then falling back through the list.""" selected_display_name = self.model_var.get() try: start_index = next(i for i, model in enumerate(AVAILABLE_MODELS) if model["display"] == selected_display_name) except StopIteration: start_index = 0 fallback_order = AVAILABLE_MODELS[start_index:] + AVAILABLE_MODELS[:start_index] has_succeeded = False for model in fallback_order: self.stream_queue.put({"type": "status", "message": f"Trying model: {model['display']}..."}) stream_had_content = False last_item_was_error = False for item in chat_with_cypher_alpha(self.messages, model["api"]): if item is None: # End of a specific model's stream break if isinstance(item, str): # This is a content chunk self.stream_queue.put({"type": "content", "data": item, "model_name": model["display"]}) stream_had_content = True last_item_was_error = False elif isinstance(item, dict) and item.get("type") == "error": # This is a structured error self.stream_queue.put(item) last_item_was_error = True # Break for critical errors, continue for model-specific ones if item.get("subtype") in ["auth_error", "rate_limit", "network"]: has_succeeded = True # Prevent further fallbacks break # If the loop finished and the last item wasn't an error, it was a success. if stream_had_content and not last_item_was_error: has_succeeded = True break # Stop trying other models # If a critical error broke the inner loop, break the outer one too. if has_succeeded: break # After all attempts, signal the end of the entire process. self.stream_queue.put(None) def process_stream_queue(self): """ Processes items from the stream queue to update the UI. This function runs on the main UI thread. """ try: item = self.stream_queue.get_nowait() if item is None: # End of stream signal received if self.is_streaming: self._on_stream_complete() else: # Stream failed before any content was received self._reset_ui() return # Stop polling item_type = item.get("type") if item_type == "content": if not self.is_streaming: # This is the first chunk of a new response self.is_streaming = True model_name = item.get("model_name", "Assistant") self.chat_history.config(state=tk.NORMAL) # Store header and content start indices for later self.ai_header_start_index = self.chat_history.index(tk.END) self.chat_history.insert(tk.END, f"{model_name}\n", ("assistant",)) self.ai_start_index = self.chat_history.index(tk.END) self.chat_history.config(state=tk.DISABLED) # Append the content data self.chat_history.config(state=tk.NORMAL) self.chat_history.insert(tk.END, item["data"]) self.chat_history.config(state=tk.DISABLED) self.chat_history.see(tk.END) elif item_type == "status": self.status_bar.config(text=item["message"]) elif item_type == "error": subtype = item.get("subtype") if subtype == "rate_limit": self._handle_rate_limit(item) return # Stop polling, cooldown timer will take over elif subtype in ["model_not_found", "parse"]: # These are non-fatal, fallback will continue. Show in status. self.status_bar.config(text=item["message"]) else: # These are fatal errors for this attempt. self._handle_generic_error(item) # Don't return, let the queue processing end naturally # Mark the task as done and schedule the next check self.stream_queue.task_done() self.root.after(50, self.process_stream_queue) except queue.Empty: # If the queue is empty, check again shortly. self.root.after(50, self.process_stream_queue) except Exception as e: print(f"Error in process_stream_queue: {e}") self._reset_ui() def _on_stream_complete(self): """Finalize the successful response.""" if not self.is_streaming: self._reset_ui() return self.status_bar.config(text="Stream finished. Finalizing...") content = self.chat_history.get(self.ai_start_index, "end-1c").strip() self.chat_history.config(state=tk.NORMAL) self.messages.append({"role": "assistant", "content": content}) self.chat_history.insert(tk.END, "\n\n") content_end_index = self.chat_history.index("end-1c") self._highlight_code_in_range(self.ai_start_index, content_end_index) self._add_copy_button(self.ai_start_index, content_end_index) self.chat_history.config(state=tk.DISABLED) self.chat_history.see(tk.END) self._reset_ui() def _handle_rate_limit(self, error_data): """Handle 429 errors with a visible UI cooldown.""" self._cleanup_failed_attempt() self.display_message("system_warning", error_data["message"], highlight=False) cooldown = error_data.get("cooldown", 15) self.send_button.config(state=tk.DISABLED) self.input_text.config(state=tk.DISABLED) self._update_cooldown_timer(cooldown) def _update_cooldown_timer(self, seconds_left): """Visibly count down in the status bar.""" if seconds_left > 0: self.status_bar.config(text=f"Please wait... {seconds_left}s") self.root.after(1000, self._update_cooldown_timer, seconds_left - 1) else: self._reset_ui() def _handle_generic_error(self, error_data): """Handle other non-fallback errors from the API.""" self._cleanup_failed_attempt() self.display_message("system_warning", error_data["message"], highlight=False) self._reset_ui() def _cleanup_failed_attempt(self): """Removes the optimistic 'Model Name' header if streaming fails.""" if self.is_streaming and self.ai_header_start_index: self.chat_history.config(state=tk.NORMAL) self.chat_history.delete(self.ai_header_start_index, "end-1c") self.chat_history.config(state=tk.DISABLED) def _reset_ui(self): """Resets input widgets and status bar to the ready state.""" self.send_button.config(state=tk.NORMAL) self.input_text.config(state=tk.NORMAL) self.status_bar.config(text="Ready (Ctrl+Enter to send)") self.is_streaming = False self.ai_header_start_index = None self.ai_start_index = None def export_chat(self): """Save conversation JSON to disk.""" path=filedialog.asksaveasfilename(defaultextension=".json", filetypes=[("JSON files","*.json"),("All files","*.*")]) if not path: return with open(path,"w",encoding="utf-8") as f: json.dump(self.messages,f,indent=4) messagebox.showinfo("Success","Chat history saved.") def import_chat(self): """Load conversation from JSON file.""" if self.messages and not messagebox.askyesno("Load Chat","This will replace current conversation. Continue?"): return path=filedialog.askopenfilename(filetypes=[("JSON files","*.json"),("All files","*.*")]) if not path: return with open(path,"r",encoding="utf-8") as f: data=json.load(f) self.new_chat(confirm=False) self.messages=data for m in self.messages: self.display_message(m["role"],m["content"], model_name=self.model_var.get()) def new_chat(self, confirm=True): """Clear conversation; optional confirmation.""" if confirm and not messagebox.askyesno("New Chat","Clear current conversation?"): return self.messages.clear() self.chat_history.config(state=tk.NORMAL) self.chat_history.delete("1.0",tk.END) self.chat_history.config(state=tk.DISABLED) if confirm: self.display_message("assistant", "Hello! How can I help you today?", highlight=False) def toggle_theme(self): self.current_theme = "light" if self.current_theme=="dark" else "dark" self.apply_theme() def show_context_menu(self, event): """Right-click → copy code block if over code.""" if "code_block" in self.chat_history.tag_names(f"@{event.x},{event.y}"): self.context_menu.tk_popup(event.x_root,event.y_root) def copy_code_block(self): """Copy the fenced code block you right-clicked on.""" idx=self.chat_history.index(f"@{self.root.winfo_pointerx()-self.root.winfo_rootx()},{self.root.winfo_pointery()-self.root.winfo_rooty()}") ranges=self.chat_history.tag_ranges("code_block") for start,end in zip(ranges[0::2],ranges[1::2]): if (self.chat_history.compare(idx,">=",start) and self.chat_history.compare(idx,"<=",end)): raw=self.chat_history.get(start,end); cleaned=re.sub(r"```(?:\w+)?\n|```","",raw).strip() self.root.clipboard_clear(); self.root.clipboard_append(cleaned) self.status_bar.config(text="Code copied!"); self.root.after(2000, lambda: self.status_bar.config(text="Ready (Ctrl+Enter to send)")) return # ---------------------------------------- # Startup Self-Tests # ---------------------------------------- def run_startup_tests(): errs=[] for theme,cfg in THEMES.items(): for key in ["bg","fg","input_bg","btn_bg","btn_fg", "user_fg","ai_fg","menu_bg","menu_fg", "status_fg","system_fg"]: if key not in cfg: errs.append(f"Theme '{theme}' missing '{key}'") for m in ["_create_widgets","apply_theme","display_message", "send_message","export_chat","import_chat","new_chat", "_stream_worker_with_fallback", "_add_copy_button", "copy_message_content", "_handle_rate_limit", "_update_cooldown_timer", "_reset_ui"]: if not hasattr(ChatUI,m): errs.append(f"ChatUI missing method '{m}'") if errs: for e in errs: print("Startup test error:",e) print("Exiting due to startup test failures."); exit(1) # ---------------------------------------- # Application Entry Point # ---------------------------------------- if __name__ == "__main__": run_startup_tests() root=tk.Tk() app=ChatUI(root) root.mainloop()