From 65d72d3066bd0e8595e1ef4bbdc4d08297cb04c9 Mon Sep 17 00:00:00 2001 From: Neha Dhumal Date: Mon, 1 Jun 2026 12:52:05 +0000 Subject: [PATCH 1/4] Restore removed chatbot support modules --- .gitignore | 1 + src/chatbot/chatbot_core.py | 701 +++++++++++++++++++++++++++++++++ src/chatbot/error_solutions.py | 106 +++++ src/chatbot/image_handler.py | 247 ++++++++++++ src/chatbot/knowledge_base.py | 144 +++++++ src/chatbot/ollama_runner.py | 192 +++++++++ src/chatbot/stt_handler.py | 92 +++++ 7 files changed, 1483 insertions(+) create mode 100644 src/chatbot/chatbot_core.py create mode 100644 src/chatbot/error_solutions.py create mode 100644 src/chatbot/image_handler.py create mode 100644 src/chatbot/knowledge_base.py create mode 100644 src/chatbot/ollama_runner.py create mode 100644 src/chatbot/stt_handler.py diff --git a/.gitignore b/.gitignore index 78c7f16d0..74adb247b 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ dist/ # Local deploy script (user-specific VM config) scripts/deploy_to_vm.ps1 +.venv/ diff --git a/src/chatbot/chatbot_core.py b/src/chatbot/chatbot_core.py new file mode 100644 index 000000000..24b8eef09 --- /dev/null +++ b/src/chatbot/chatbot_core.py @@ -0,0 +1,701 @@ +# chatbot_core.py + +import os +import re +import json +from typing import Dict, Any, Tuple, List +from .error_solutions import get_error_solution +from .image_handler import analyze_and_extract +from .ollama_runner import run_ollama +from .knowledge_base import search_knowledge +from .ollama_runner import get_embedding + +# ==================== ESIM WORKFLOW KNOWLEDGE ==================== + +ESIM_WORKFLOWS = """ +=== COMMON ESIM WORKFLOWS === + +HOW TO ADD GROUND: +1. In KiCad schematic, press 'A' key (Add Component) +2. Type "GND" in the search box +3. Select ground symbol from "power" library +4. Click to place it on schematic +5. Press 'W' to add wire and connect to circuit +6. Save (Ctrl+S) → eSim: Simulation → Convert KiCad to NgSpice + +HOW TO ADD ANY COMPONENT: +1. In KiCad schematic, press 'A' key +2. Type component name (e.g., "Q2N3904", "1N4148", "uA741") +3. Select from appropriate library (eSim_Devices, eSim_Subckt, etc.) +4. Place on schematic and connect with wires +5. Save → Convert KiCad to NgSpice + +HOW TO FIX MISSING SPICE MODELS (3 Methods): + +Method 1 - Direct Netlist Edit (FASTEST, but temporary): +1. eSim: Tools → Spice Editor (or Ctrl+E) +2. Open your_project.cir.out file +3. Scroll to bottom (before .end line) +4. Add model definition: + BJT: .model Q2N3904 NPN(Bf=200 Is=1e-14 Vaf=100) + Diode: .model 1N4148 D(Is=1e-14 Rs=1) + Zener: .model DZ5V1 D(Is=1e-14 Bv=5.1 Ibv=5m) +5. Save (Ctrl+S) → Run Simulation +NOTE: This gets overwritten when you "Convert KiCad to NgSpice" again + +Method 2 - Component Properties (PERMANENT): +1. Open KiCad schematic (double-click .proj in Project Explorer) +2. Find the component that uses the missing model (e.g., transistor Q1) +3. Right-click on it → Properties (or press E when hovering over it) +4. Click "Edit Spice Model" button in the Properties dialog +5. In the Spice Model field, paste the model definition: + .model Q2N3904 NPN(Bf=200 Is=1e-14 Vaf=100) +6. Click OK → Save schematic (Ctrl+S) +7. eSim: Simulation → Convert KiCad to NgSpice +NOTE: This permanently associates the model with the component + +Method 3 - Include Library: +1. Spice Editor → Open .cir.out +2. Add at top: .include /usr/share/ngspice/models/bjt.lib +3. Save → Simulate + +HOW TO FIX MISSING SUBCIRCUITS: +1. Spice Editor → Open .cir.out +2. Add before .end: + .subckt OPAMP_IDEAL inp inn out vdd vss + Rin inp inn 1Meg + E1 out 0 inp inn 100000 + Rout out 0 75 + .ends +3. Save → Simulate +OR: Replace with eSim library opamp (uA741, LM324) + +HOW TO FIX FLOATING NODES: +1. Open KiCad schematic +2. Find the unconnected pin/node +3. Either connect it with wire (press W) or delete component +4. For sense points: Add Rleak node 0 1Meg +5. Save → Convert to NgSpice + +KICAD SHORTCUTS: +A = Add component +W = Add wire +M = Move item +R = Rotate item +C = Copy item +Delete = Remove item +Ctrl+S = Save + +ESIM MENU PATHS: +Convert to NgSpice: Simulation → Convert KiCad to NgSpice +Run Simulation: Simulation → Simulate +Spice Editor: Tools → Spice Editor (Ctrl+E) +Model Editor: Tools → Model Editor +Open KiCad: Double-click .proj file in Project Explorer + +FILE LOCATIONS: +Project folder: ~/eSim-Workspace// +Netlist: .cir.out +Schematic: .proj +""" + +LAST_BOT_REPLY: str = "" +LAST_IMAGE_CONTEXT: Dict[str, Any] = {} +LAST_NETLIST_ISSUES: Dict[str, Any] = {} + + +def get_history() -> Dict[str, Any]: + return LAST_IMAGE_CONTEXT + + +def clear_history() -> None: + global LAST_IMAGE_CONTEXT, LAST_NETLIST_ISSUES + LAST_IMAGE_CONTEXT = {} + LAST_NETLIST_ISSUES = {} + +# ==================== ESIM ERROR LOGIC ==================== + +def answer_with_rag_fallback(user_input: str) -> str: + """ + Try to answer using eSim manuals (RAG). + If nothing relevant is found, fallback to Ollama. + """ + + rag_context = search_knowledge(user_input) + + if rag_context.strip(): + prompt = f""" +You are eSim Copilot. + +Use ONLY the following official eSim documentation +to answer the question. Do NOT invent information. + +{rag_context} + +Question: +{user_input} + +Answer clearly and step-by-step. +""" + return run_ollama(prompt) + + # Fallback: general LLM answer + prompt = f""" +Answer the following question clearly: + +{user_input} +""" + return run_ollama(prompt) + +def detect_esim_errors(image_context: Dict[str, Any], user_input: str) -> str: + """ + Display errors from hybrid analysis with SMART FILTERING to remove hallucinations. + """ + if not image_context: + return "" + + analysis = image_context.get("circuit_analysis", {}) + raw_errors = analysis.get("design_errors", []) + warnings = analysis.get("design_warnings", []) + + # === SMART FILTERING === + components_str = str(image_context.get("components", [])).lower() + summary_str = str(image_context.get("vision_summary", "")).lower() + context_text = components_str + summary_str + + filtered_errors: List[str] = [] + for err in raw_errors: + err_lower = err.lower() + + if "ground" in err_lower and ( + "gnd" in context_text or "ground" in context_text or " 0 " in context_text + ): + continue + + if "floating" in err_lower and ( + "vin" in err_lower or "vout" in err_lower or "label" in err_lower + ): + continue + + filtered_errors.append(err) + + output: List[str] = [] + + if filtered_errors: + output.append("**🚨 CRITICAL ERRORS:**") + for err in filtered_errors: + output.append(f"❌ {err}") + + if warnings: + output.append("\n**⚠️ WARNINGS:**") + for warn in warnings: + output.append(f"⚠️ {warn}") + + text = user_input.lower() + if "singular matrix" in text: + output.append("\n**🔧 FIX:** Add 1GΩ resistors to all nodes → GND") + if "timestep" in text: + output.append("\n**🔧 FIX:** Reduce timestep or add 0.1Ω series R") + + if not output: + return "**✅ No errors detected**" + + return "\n".join(output) + + +# ==================== UTILITIES ==================== + +VALID_EXTS = (".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".gif") + + +def _is_image_file(path: str) -> bool: + if not path: + return False + clean = re.sub(r"\[Image:\s*(.*?)\]", r"\1", path).strip() + return clean.lower().endswith(VALID_EXTS) + + +def _is_image_query(user_input: str) -> bool: + if not user_input: + return False + if "[Image:" in user_input: + return True + if "|" in user_input: + parts = user_input.split("|", 1) + if len(parts) == 2 and _is_image_file(parts[1]): + return True + return _is_image_file(user_input) + + +def _parse_image_query(user_input: str) -> Tuple[str, str]: + user_input = user_input.strip() + + match = re.search(r"\[Image:\s*(.*?)\]", user_input) + if match: + return user_input.replace(match.group(0), "").strip(), match.group(1).strip() + + if "|" in user_input: + q, p = [x.strip() for x in user_input.split("|", 1)] + if _is_image_file(p): + return q, p + if _is_image_file(q): + return p, q + + if _is_image_file(user_input): + return "", user_input + + return user_input, "" + + +def clean_response_raw(raw: str) -> str: + cleaned = re.sub(r"<\|.*?\|>", "", raw.strip()) + cleaned = re.sub(r"\[Context:.*?\]", "", cleaned, flags=re.DOTALL) + cleaned = re.sub(r"\[FACT .*?\]", "", cleaned, flags=re.MULTILINE) + cleaned = re.sub( + r"\[ESIM_NETLIST_START\].*?\[ESIM_NETLIST_END\]", "", cleaned, flags=re.DOTALL + ) + return cleaned.strip() + + +def _history_to_text(history: List[Dict[str, str]] | None, max_turns: int = 6) -> str: + """Convert history to readable text with MORE context (6 turns).""" + if not history: + return "" + recent = history[-max_turns:] + lines: List[str] = [] + for i, t in enumerate(recent, 1): + u = (t.get("user") or "").strip() + b = (t.get("bot") or "").strip() + if u: + lines.append(f"[Turn {i}] User: {u}") + if b: + if len(b) > 300: + b = b[:300] + "..." + lines.append(f"[Turn {i}] Assistant: {b}") + return "\n".join(lines).strip() + + +def _is_follow_up_question(user_input: str, history: List[Dict[str, str]] | None) -> bool: + """ + Detect if this is a follow-up question that needs history context. + Returns True if question lacks standalone context. + """ + if not history: + return False + + user_lower = user_input.lower().strip() + words = user_lower.split() + + + if len(words) <= 7: + return True + + pronouns = ["it", "that", "this", "those", "these", "they", "them"] + if any(pronoun in words for pronoun in pronouns): + return True + + continuations = [ + "what next", "next step", "after that", "and then", "then what", + "what about", "how about", "what if", "but why", "why not" + ] + if any(phrase in user_lower for phrase in continuations): + return True + + question_starters = ["why", "how", "where", "when", "what", "which"] + if words[0] in question_starters and len(words) <= 5: + return True + + return False +import numpy as np + +def is_semantic_topic_switch( + user_input: str, + history: list, + threshold: float = 0.30 +) -> bool: + """ + Detect topic switch using embedding similarity. + Returns True if new question is unrelated to previous assistant reply. + """ + + if not history: + return False + + last_assistant_msg = None + for item in reversed(history): + if item.get("role") == "assistant": + last_assistant_msg = item.get("content") + break + + if not last_assistant_msg: + return False + + try: + emb_new = get_embedding(user_input) + emb_prev = get_embedding(last_assistant_msg) + + if not emb_new or not emb_prev: + return False + + emb_new = np.array(emb_new) + emb_prev = np.array(emb_prev) + + similarity = np.dot(emb_new, emb_prev) / ( + np.linalg.norm(emb_new) * np.linalg.norm(emb_prev) + ) + + print(f"[COPILOT] Semantic similarity = {similarity:.3f}") + + return similarity < threshold + + except Exception as e: + print(f"[COPILOT] Topic switch check failed: {e}") + return False + +# ==================== QUESTION CLASSIFICATION ==================== + +def classify_question_type(user_input: str, has_image_context: bool, + history: List[Dict[str, str]] | None = None) -> str: + """ + Classify question type for smart routing. + Returns: 'greeting', 'simple', 'esim', 'image_query', 'follow_up_image', + 'follow_up', 'netlist' + """ + user_lower = user_input.lower() + + if "[ESIM_NETLIST_START]" in user_input: + return "netlist" + + if _is_image_query(user_input): + return "image_query" + + if has_image_context: + follow_phrases = [ + "this circuit", "that circuit", "in this schematic", + "components here", "what is the value", "how many", + "the circuit", "this schematic","what","can","how" + ] + if any(p in user_lower for p in follow_phrases): + return "follow_up_image" + + greetings = ["hello", "hi", "hey", "howdy", "greetings"] + user_words = user_lower.strip().split() + if len(user_words) <= 3 and any(g in user_words for g in greetings): + return "greeting" + + is_followup = _is_follow_up_question(user_input, history) + if is_semantic_topic_switch(user_input, history): + print("[COPILOT] Topic switch detected (semantic)") + is_followup = False + + if not is_followup: + history.clear() + LAST_IMAGE_CONTEXT = None + + esim_keywords = [ + "esim", "kicad", "ngspice", "spice", "simulation", "netlist", + "schematic", "convert", "gnd", "ground", ".model", ".subckt", + "singular matrix", "floating", "timestep", "convergence" + ] + if any(keyword in user_lower for keyword in esim_keywords): + return "esim" + + error_keywords = [ + "error", "fix", "problem", "issue", "warning", "missing", + "not working", "failed", "crash" + ] + if any(keyword in user_lower for keyword in error_keywords): + return "esim" + + return "simple" + + +# ==================== HANDLERS ==================== + +def handle_greeting() -> str: + return ( + "Hello! I'm eSim Copilot. I can help you with:\n" + "• Circuit analysis and netlist debugging\n" + "• Electronics concepts and SPICE simulation\n" + "• Component selection and circuit design\n\n" + "What would you like to know?" + ) + + +def handle_simple_question(user_input: str) -> str: + """ + Handles standalone questions. + Uses RAG first, then falls back to Ollama. + keep in mind that your a copilot of eSim an EDA tool + """ + return answer_with_rag_fallback(user_input) + + +def handle_follow_up(user_input: str, + image_context: Dict[str, Any], + history: List[Dict[str, str]] | None = None) -> str: + """ + Handle follow-up questions that depend on conversation history. + This handler PRIORITIZES history over RAG. + """ + history_text = _history_to_text(history, max_turns=6) + + if not history_text: + return "I need more context. Could you provide more details about your question?" + + rag_context = "" + user_lower = user_input.lower() + if any(kw in user_lower for kw in ["model", "spice", "ground", "error", "netlist"]): + rag_context = search_knowledge(user_input, n_results=2) + + prompt = ( + "You are an eSim expert assistant. The user is asking a follow-up question.\n\n" + "=== CONVERSATION HISTORY (MOST IMPORTANT) ===\n" + f"{history_text}\n" + "=============================================\n\n" + f"=== CURRENT USER QUESTION (FOLLOW-UP) ===\n{user_input}\n\n" + ) + + if rag_context: + prompt += f"=== REFERENCE MANUAL (if needed) ===\n{rag_context}\n\n" + + if image_context: + prompt += ( + f"=== CURRENT CIRCUIT CONTEXT ===\n" + f"Type: {image_context.get('circuit_analysis', {}).get('circuit_type', 'Unknown')}\n" + f"Components: {image_context.get('components', [])}\n\n" + ) + + prompt += ( + "CRITICAL INSTRUCTIONS:\n" + "1. The user's question refers to the CONVERSATION HISTORY above.\n" + "2. Identify what 'it', 'that', 'this', or 'next step' refers to by reading the history.\n" + "3. Answer based on the conversation context first, then use manual/workflows if needed.\n" + "4. If the user asks 'why', explain based on what was just discussed.\n" + "5. If the user asks 'what next' or 'next step', continue from the last instruction.\n" + "6. Be specific and reference what you're talking about (e.g., 'In the previous step, I mentioned...').\n" + "7. Keep answer concise (max 150 words).\n\n" + "Answer:" + ) + + return run_ollama(prompt, mode="default") + + +def handle_esim_question(user_input: str, + image_context: Dict[str, Any], + history: List[Dict[str, str]] | None = None) -> str: + """ + Handle eSim-specific questions with RAG + conversation history. + """ + user_lower = user_input.lower() + + sol = get_error_solution(user_input) + if sol and sol.get("description") != "General schematic error": + fixes = "\n".join(f"- {f}" for f in sol.get("fixes", [])) + cmd = sol.get("eSim_command", "") + answer = ( + f"**Detected issue:** {sol['description']}\n" + f"**Severity:** {sol.get('severity', 'unknown')}\n\n" + f"**Recommended fixes:**\n{fixes}\n\n" + ) + if cmd: + answer += f"**eSim action:** {cmd}\n" + return answer_with_rag_fallback(user_input) + + history_text = _history_to_text(history, max_turns=6) + + rag_context = search_knowledge(user_input, n_results=5) + + image_context_str = "" + if image_context: + image_context_str = ( + f"\n=== CURRENT CIRCUIT ===\n" + f"Type: {image_context.get('circuit_analysis', {}).get('circuit_type', 'Unknown')}\n" + f"Components: {image_context.get('components', [])}\n" + f"Values: {image_context.get('values', {})}\n" + ) + + prompt = ( + "You are an eSim expert. Answer using the workflows, manual, and conversation history.\n\n" + f"{ESIM_WORKFLOWS}\n\n" + f"=== MANUAL CONTEXT ===\n{rag_context}\n" + f"{image_context_str}\n" + ) + + if history_text: + prompt += f"=== CONVERSATION HISTORY ===\n{history_text}\n\n" + + prompt += ( + f"USER QUESTION: {user_input}\n\n" + "INSTRUCTIONS:\n" + "1. If the question refers to previous conversation, use the history.\n" + "2. Use exact menu paths and shortcuts from the workflows when relevant.\n" + "3. If the manual context does not contain the answer, say you need to check the manual.\n" + "4. Keep the answer concise (max 150 words).\n\n" + "Answer:" + ) + + return run_ollama(prompt, mode="default") + + +def handle_image_query(user_input: str) -> Tuple[str, Dict[str, Any]]: + """ + Handle image analysis queries. + Returns: (response_text, image_context_dict) + """ + question, image_path = _parse_image_query(user_input) + image_path = image_path.strip("'\"").strip() + + if not image_path or not os.path.exists(image_path): + return f"Error: Image not found: {image_path}", {} + + extraction = analyze_and_extract(image_path) + + if extraction.get("error"): + return f"Analysis Failed: {extraction['error']}", {} + + if not question: + error_report = detect_esim_errors(extraction, "") + + summary = ( + "**Image Analysis Complete**\n" + f"**Type:** {extraction.get('circuit_analysis', {}).get('circuit_type', 'Unknown')}\n" + f"**Components:** {extraction.get('component_counts', {})}\n" + f"**Description:** {extraction.get('vision_summary', '')}\n\n" + ) + + if extraction.get("components"): + summary += f"**Detected Components:** {', '.join(extraction['components'])}\n" + + if extraction.get("values"): + summary += "**Component Values:**\n" + for comp, val in extraction["values"].items(): + summary += f" • {comp}: {val}\n" + + summary += ( + "\n**Note:** Vision analysis may have errors. Use 'Analyze netlist' for precise results.\n" + ) + + if "🚨" in error_report or "⚠️" in error_report: + summary += f"\n{error_report}" + + return summary, extraction + + return handle_follow_up_image_question(question, extraction), extraction + + +def handle_follow_up_image_question(user_input: str, + image_context: Dict[str, Any]) -> str: + """ + Answer questions about an analyzed image using ONLY extracted data. + """ + image_context_str = ( + f"**Circuit Type:** {image_context.get('circuit_analysis', {}).get('circuit_type', 'Unknown')}\n" + f"**Components Detected:** {image_context.get('components', [])}\n" + f"**Component Values:** {image_context.get('values', {})}\n" + f"**Component Counts:** {image_context.get('component_counts', {})}\n" + f"**Description:** {image_context.get('vision_summary', '')}\n" + ) + + prompt = ( + "You are analyzing a circuit schematic. Answer using ONLY the circuit data below.\n\n" + "=== ANALYZED CIRCUIT DATA ===\n" + f"{image_context_str}\n" + "==============================\n\n" + f"USER QUESTION: {user_input}\n\n" + "STRICT INSTRUCTIONS:\n" + "1. Answer ONLY using the circuit data above - DO NOT use external knowledge.\n" + "2. For counts: use 'Component Counts'.\n" + "3. For values: use 'Component Values'.\n" + "4. For lists: use 'Components Detected'.\n" + "5. If data is missing, answer: 'The image analysis did not detect that information.'\n" + "6. Keep answer brief (2-3 sentences).\n\n" + "Answer:" + ) + + return run_ollama(prompt, mode="default") + + +def handle_netlist_analysis(user_input: str) -> str: + """ + Handle netlist analysis prompts (FACT-based prompt from GUI). + """ + raw_reply = run_ollama(user_input) + return clean_response_raw(raw_reply) + + +# ==================== MAIN ROUTER ==================== + +def handle_input(user_input: str, + history: List[Dict[str, str]] | None = None) -> str: + """ + Main router. Accepts optional conversation history for follow-up understanding. + """ + global LAST_IMAGE_CONTEXT, LAST_BOT_REPLY + + user_input = (user_input or "").strip() + if not user_input: + return "Please enter a query." + + if "[ESIM_NETLIST_START]" in user_input: + raw_reply = run_ollama(user_input) + cleaned = clean_response_raw(raw_reply) + LAST_BOT_REPLY = cleaned + return cleaned + + question_type = classify_question_type( + user_input, bool(LAST_IMAGE_CONTEXT), history + ) + print(f"[COPILOT] Question type: {question_type}") + + try: + if question_type == "netlist": + response = handle_netlist_analysis(user_input) + + elif question_type == "greeting": + response = handle_greeting() + + elif question_type == "image_query": + response, LAST_IMAGE_CONTEXT = handle_image_query(user_input) + + elif question_type == "follow_up_image": + response = handle_follow_up_image_question(user_input, LAST_IMAGE_CONTEXT) + + elif question_type == "simple": + response = handle_simple_question(user_input) + + elif question_type == "follow_up" and history: + response = handle_follow_up(user_input, LAST_IMAGE_CONTEXT, history) + else: + response = handle_simple_question(user_input) + + LAST_BOT_REPLY = response + return response + + except Exception as e: + error_msg = f"Error processing question: {str(e)}" + print(f"[COPILOT ERROR] {error_msg}") + return error_msg + + +# ==================== WRAPPER ==================== + +class ESIMCopilotWrapper: + def __init__(self) -> None: + self.history: List[Dict[str, str]] = [] + + def handle_input(self, user_input: str) -> str: + reply = handle_input(user_input, self.history) + self.history.append({"user": user_input, "bot": reply}) + if len(self.history) > 12: + self.history = self.history[-12:] + return reply + + def analyze_schematic(self, query: str) -> str: + return self.handle_input(query) + +_GLOBAL_WRAPPER = ESIMCopilotWrapper() + + +def analyze_schematic(query: str) -> str: + return _GLOBAL_WRAPPER.handle_input(query) diff --git a/src/chatbot/error_solutions.py b/src/chatbot/error_solutions.py new file mode 100644 index 000000000..615a3d63c --- /dev/null +++ b/src/chatbot/error_solutions.py @@ -0,0 +1,106 @@ +# error_solutions.py +from typing import Dict,Any + +ERROR_SOLUTIONS = { + "no ground": { + "description": "Missing ground reference (Node 0)", + "severity": "critical", + "fixes": [ + "Add GND symbol (0) to schematic", + "Ensure all nodes have DC path to ground", + "Add 1GΩ resistors from floating nodes to GND for simulation stability", + "Use GND symbol from eSim power library" + ], + "eSim_command": "Add 'GND' symbol from 'power' library" + }, + + "floating pins": { + "description": "Unconnected component pins", + "severity": "moderate", + "fixes": [ + "Connect all unused pins to appropriate nets", + "For unused inputs: tie to VCC or GND through resistors", + "For unused outputs: leave unconnected but label properly" + ], + "eSim_command": "Use 'Place Wire' tool to connect pins" + }, + + "disconnected wires": { + "description": "Wires not properly connected to pins", + "severity": "critical", + "fixes": [ + "Zoom in and check wire endpoints touch pins", + "Use junction dots at wire intersections", + "Re-route wires to ensure proper connections" + ], + "eSim_command": "Press 'J' to add junction dots" + }, + + "missing spice model": { + "description": "Component lacks SPICE model definition", + "severity": "critical", + "fixes": [ + "Add .lib statement: .lib /usr/share/esim/models.lib", + "Check IC availability in Components/ICs.pdf", + "Use eSim library components only", + "Create custom model using Model Editor" + ], + "eSim_command": "Add '.lib /usr/share/esim/models.lib' in schematic" + }, + + "singular matrix": { + "description": "Simulation convergence error", + "severity": "critical", + "fixes": [ + "Add 1GΩ resistors from ALL nodes → GND", + "Add .options gmin=1e-12 reltol=0.01", + "Use .nodeset for initial voltages", + "Add 0.1Ω series resistors to voltage sources" + ], + "eSim_command": "Add '.options gmin=1e-12 reltol=0.01' in .cir file" + }, + + "missing component values": { + "description": "Components without specified values", + "severity": "moderate", + "fixes": [ + "Double-click components to edit values", + "Set R, C, L values before simulation", + "For ICs: specify model number", + "For sources: set voltage/current values" + ], + "eSim_command": "Double-click component → Edit Properties → Set Value" + }, + + "no load after rectifier": { + "description": "Rectifier output has no load capacitor", + "severity": "warning", + "fixes": [ + "Add filter capacitor after rectifier (100-1000μF)", + "Add load resistor to establish DC operating point", + "Add voltage regulator for stable output" + ], + "eSim_command": "Add capacitor between rectifier output and GND" + } +} + +def get_error_solution(error_message: str) -> Dict[str, Any]: + """Get detailed solution for specific error.""" + error_lower = error_message.lower() + + for error_key, solution in ERROR_SOLUTIONS.items(): + if error_key in error_lower: + return solution + + # Default solution for unknown errors + return { + "description": "General schematic error", + "severity": "unknown", + "fixes": [ + "Check all connections are proper", + "Verify component values are set", + "Ensure ground symbol is present", + "Check for duplicate component IDs" + ], + "eSim_command": "Run Design Rule Check (DRC) in KiCad" + } diff --git a/src/chatbot/image_handler.py b/src/chatbot/image_handler.py new file mode 100644 index 000000000..cd8744791 --- /dev/null +++ b/src/chatbot/image_handler.py @@ -0,0 +1,247 @@ +import os +import json +import base64 +import io +import time +from typing import Dict, Any +from PIL import Image +MAX_IMAGE_BYTES = int(0.5*1024 * 1024) +from .ollama_runner import run_ollama_vision + +# === IMPORT PADDLE OCR === +try: + from paddleocr import PaddleOCR + import logging + logging.getLogger("ppocr").setLevel(logging.ERROR) + + # CRITICAL FIX: Disabled MKLDNN and Angle Classification to prevent VM Crashes + ocr_engine = PaddleOCR( + use_angle_cls=False, # <--- MUST BE FALSE TO STOP SIGABRT + lang='en', + use_gpu=False, # Force CPU + enable_mkldnn=False, # <--- MUST BE FALSE FOR PADDLE v3 COMPATIBILITY + use_mp=False, # Disable multiprocessing + show_log=False + ) + HAS_PADDLE = True + print("[INIT] PaddleOCR initialized (Safe Mode).") +except Exception as e: + HAS_PADDLE = False + print(f"[INIT] PaddleOCR init failed: {e}") + print("[INIT] Vision analysis unavailable. Text and netlist analysis still work.") + + +def encode_image(image_path: str) -> str: + """Convert image to base64 string.""" + with open(image_path, "rb") as image_file: + return base64.b64encode(image_file.read()).decode("utf-8") + + +def optimize_image_for_vision(image_path: str) -> bytes: + """ + Resize large images to reduce vision model processing time. + Target: Max 1920x1080 while maintaining aspect ratio. + """ + try: + img = Image.open(image_path) + + if img.mode not in ('RGB', 'L'): + img = img.convert('RGB') + + max_width = 1920 + max_height = 1080 + + if img.width > max_width or img.height > max_height: + # Calculate scaling factor + scale = min(max_width / img.width, max_height / img.height) + new_size = (int(img.width * scale), int(img.height * scale)) + img = img.resize(new_size, Image.Resampling.LANCZOS) + print(f"[IMAGE] Resized from {img.width}x{img.height} to {new_size[0]}x{new_size[1]}") + + # Convert to bytes (PNG format prevents compression artifacts on text) + buffer = io.BytesIO() + img.save(buffer, format='PNG', optimize=True, quality=85) + return buffer.getvalue() + + except Exception as e: + print(f"[IMAGE] Optimization failed: {e}, using original") + with open(image_path, 'rb') as f: + return f.read() + + +def extract_text_with_paddle(image_path: str) -> str: + """Extract text using PaddleOCR (Handles rotated/vertical text excellently).""" + if not HAS_PADDLE: + return "" + try: + result = ocr_engine.ocr(image_path, cls=True) + detected_texts = [] + if result and result[0]: + for line in result[0]: + text = line[1][0] + conf = line[1][1] + + if conf > 0.6: + detected_texts.append(text) + + full_text = " ".join(detected_texts) + return full_text + + except Exception as e: + print(f"[OCR] PaddleOCR Failed: {e}") + return "" + +def analyze_and_extract(image_path: str) -> Dict[str, Any]: + """ + Analyze schematic with image optimization, PaddleOCR text injection, and timeout handling. + Rejects images larger than 0.5 MB. + """ + if not os.path.exists(image_path): + return { + "error": "Image file not found", + "vision_summary": "", + "component_counts": {}, + "circuit_analysis": { + "circuit_type": "Unknown", + "design_errors": [], + "design_warnings": [] + }, + "components": [], + "values": {} + } + + try: + file_size = os.path.getsize(image_path) + except OSError as e: + return { + "error": f"Could not read image size: {e}", + "vision_summary": "", + "component_counts": {}, + "circuit_analysis": { + "circuit_type": "Unknown", + "design_errors": [], + "design_warnings": [] + }, + "components": [], + "values": {} + } + + if file_size > MAX_IMAGE_BYTES: + size_mb = round(file_size / (1024 * 1024), 2) + return { + "error": f"Image too large ({size_mb} MB). Max allowed size is 0.5 MB.", + "vision_summary": "", + "component_counts": {}, + "circuit_analysis": { + "circuit_type": "Unknown", + "design_errors": ["Image file size exceeded 0.5 MB limit"], + "design_warnings": [] + }, + "components": [], + "values": {} + } + + # === OPTIMIZE IMAGE BEFORE SENDING === + print(f"[VISION] Processing image: {os.path.basename(image_path)}") + image_bytes = optimize_image_for_vision(image_path) + + # === EXTRACT OCR TEXT (CRITICAL STEP) === + ocr_text = extract_text_with_paddle(image_path) + + if ocr_text: + clean_ocr = ocr_text.strip() + print(f"[VISION] PaddleOCR Hints injected: {clean_ocr[:100]}...") + else: + clean_ocr = "No readable text detected." + + # === PROMPT WITH CONTEXT === + prompt = f""" +ANALYZE THIS ELECTRONICS SCHEMATIC IMAGE. + +CONTEXT FROM OCR SCAN (Text detected in image): +"{clean_ocr}" + +INSTRUCTIONS: +1. Use the OCR text to identify component labels (e.g., if you see "D1" text, there is a Diode, R1,R2,R3... for resistor). +2. Look for rotated text labels near symbols. +3. Identify the circuit topology. + +VERY IMPORTANT INSTRUCTIONS: +1. DON'T OVERCALCULATE MODEL COUNT LIKE MODEL COUNT + OCR COUNT +2. IF THERE IS ANY VALUE NOT PRESENT FOR ANY COMPONENT JUST ADD A QUESTION MARK IN FRONT OF IT + +OUTPUT RULES: +1. Return ONLY valid JSON. +2. Structure: + + +RESPOND WITH JSON ONLY. +""" + + max_retries = 2 + for attempt in range(max_retries): + try: + print(f"[VISION] Attempt {attempt + 1}/{max_retries}...") + + response_text = run_ollama_vision(prompt, image_bytes) + + cleaned_json = response_text.replace("```json", "").replace("```", "").strip() + + if "{" in cleaned_json and "}" in cleaned_json: + start = cleaned_json.index("{") + end = cleaned_json.rindex("}") + 1 + cleaned_json = cleaned_json[start:end] + + data = json.loads(cleaned_json) + + required_keys = ["vision_summary", "component_counts", "circuit_analysis", "components", "values"] + for key in required_keys: + if key not in data: + raise ValueError(f"Missing required key: {key}") + + if not isinstance(data.get("circuit_analysis"), dict): + data["circuit_analysis"] = {"circuit_type": "Unknown", "design_errors": [], "design_warnings": []} + + if "design_errors" not in data["circuit_analysis"]: + data["circuit_analysis"]["design_errors"] = [] + + if not data.get("component_counts") or all(v == 0 for v in data.get("component_counts", {}).values()): + counts = {"R": 0, "C": 0, "U": 0, "Q": 0, "D": 0, "L": 0, "Misc": 0} + for comp in data.get("components", []): + if isinstance(comp, str) and len(comp) > 0: + comp_type = comp[0].upper() + if comp_type in counts: + counts[comp_type] += 1 + elif "DIODE" in comp.upper() or comp.startswith("D"): + counts["D"] = counts.get("D", 0) + 1 + data["component_counts"] = counts + + if data.get("components"): + data["components"] = list(dict.fromkeys(data["components"])) + + print(f"[VISION] Success: {data.get('circuit_analysis', {}).get('circuit_type', 'Unknown')}") + return data + + except Exception as e: + print(f"[VISION] Attempt {attempt + 1} failed: {str(e)}") + if attempt == max_retries - 1: + return { + "error": f"Vision analysis failed: {str(e)}", + "vision_summary": "Unable to analyze circuit image", + "component_counts": {}, + "circuit_analysis": { + "circuit_type": "Unknown", + "design_errors": ["Analysis timed out or failed"], + "design_warnings": [] + }, + "components": [], + "values": {} + } + else: + import time + time.sleep(2) + + +def analyze_image(image_path: str, question: str | None = None, preprocess: bool = True) -> str: + """Helper for manual testing.""" + return str(analyze_and_extract(image_path)) \ No newline at end of file diff --git a/src/chatbot/knowledge_base.py b/src/chatbot/knowledge_base.py new file mode 100644 index 000000000..14ea4cc17 --- /dev/null +++ b/src/chatbot/knowledge_base.py @@ -0,0 +1,144 @@ +import os +import chromadb +from .ollama_runner import get_embedding + +# ==================== DATABASE SETUP ==================== + +def _default_db_path() -> str: + xdg_data_home = os.environ.get("XDG_DATA_HOME", "").strip() + if not xdg_data_home: + xdg_data_home = os.path.join(os.path.expanduser("~"), ".local", "share") + return os.path.join(xdg_data_home, "esim-copilot", "chroma") + +db_path = os.environ.get("ESIM_COPILOT_DB_PATH", "").strip() or _default_db_path() +os.makedirs(db_path, exist_ok=True) +chroma_client = chromadb.PersistentClient(path=db_path) + +collection = chroma_client.get_or_create_collection(name="esim_manuals") + +# ==================== INGESTION ==================== +def ingest_pdfs(manuals_directory: str) -> None: + """ + Read the single master text file and index it. + Call this once from src/ingest.py. + """ + if not os.path.exists(manuals_directory): + print("Directory not found.") + return + + # Clear existing DB to ensure no duplicates from old files + print("Clearing old database...") + try: + chroma_client.delete_collection("esim_manuals") + global collection + collection = chroma_client.get_or_create_collection(name="esim_manuals") + except Exception as e: + print(f"Warning clearing DB: {e}") + + # Look for .txt files only + files = [f for f in os.listdir(manuals_directory) if f.lower().endswith(".txt")] + + if not files: + print("❌ No .txt files found to ingest!") + return + + for filename in files: + path = os.path.join(manuals_directory, filename) + print(f"\n📄 Processing Master File: {filename}") + + try: + with open(path, "r", encoding="utf-8") as f: + text = f.read() + + raw_sections = text.split("\n\n") + + documents, embeddings, metadatas, ids = [], [], [], [] + + chunk_counter = 0 + for section in raw_sections: + section = section.strip() + if len(section) < 50: + continue + + # Further split large sections by double newlines if needed + sub_chunks = [c.strip() for c in section.split("\n\n") if len(c) > 50] + + for chunk in sub_chunks: + embed = get_embedding(chunk) + if embed: + documents.append(chunk) + embeddings.append(embed) + metadatas.append({"source": filename, "type": "master_ref"}) + ids.append(f"{filename}_{chunk_counter}") + chunk_counter += 1 + + if documents: + collection.add( + documents=documents, + embeddings=embeddings, + metadatas=metadatas, + ids=ids, + ) + print(f" ✅ Indexed {len(documents)} chunks from {filename}") + else: + print(f" ⚠️ No valid chunks found in {filename}") + + except Exception as e: + print(f" ❌ Failed to process {filename}: {e}") + + +# ==================== SEARCH ==================== + +# Relevance threshold: ChromaDB returns distances (L2 or cosine). +# Lower distance = more similar. Filter out chunks with distance > threshold. +RELEVANCE_THRESHOLD = float(os.environ.get("ESIM_RAG_RELEVANCE_THRESHOLD", "1.0")) + + +def search_knowledge(query: str, n_results: int = 4) -> str: + """ + Semantic search with relevance threshold to reduce hallucination. + Filters out chunks with distance > RELEVANCE_THRESHOLD. + """ + try: + query_embed = get_embedding(query) + if not query_embed: + return "" + + results = collection.query( + query_embeddings=[query_embed], + n_results=n_results, + include=["documents", "distances"], + ) + + docs_list = results.get("documents", [[]]) + distances_list = results.get("distances", [[]]) + + if not docs_list or not docs_list[0]: + return "" + + docs = docs_list[0] + distances = distances_list[0] if distances_list else [] + + # Filter by relevance threshold (lower distance = more similar) + if distances and len(distances) == len(docs): + filtered = [ + (doc, d) for doc, d in zip(docs, distances) + if d <= RELEVANCE_THRESHOLD + ] + if filtered: + selected_chunks = [doc for doc, _ in filtered] + else: + return "" + else: + selected_chunks = docs + + context_text = "\n\n...\n\n".join(selected_chunks) + if len(context_text) > 3500: + context_text = context_text[:3500] + + header = "=== ESIM OFFICIAL DOCUMENTATION ===\n" + return f"{header}{context_text}\n===================================\n" + + except Exception as e: + print(f"RAG Error: {e}") + return "" diff --git a/src/chatbot/ollama_runner.py b/src/chatbot/ollama_runner.py new file mode 100644 index 000000000..ae754bd0b --- /dev/null +++ b/src/chatbot/ollama_runner.py @@ -0,0 +1,192 @@ +import os +import ollama +import json +import time + +# ==================== CLIENT ==================== + +ollama_client = ollama.Client( + host="http://localhost:11434", + timeout=300.0, +) + +# ==================== SETTINGS ==================== + +_SETTINGS_DIR = os.path.join( + os.path.expanduser("~"), ".local", "share", "esim-copilot" +) +_SETTINGS_PATH = os.path.join(_SETTINGS_DIR, "settings.json") + +_DEFAULT_TEXT_MODEL = "qwen2.5:3b" +_DEFAULT_VISION_MODEL = "minicpm-v:latest" +EMBED_MODEL = "nomic-embed-text" + + +def load_model_settings() -> dict: + """Load persisted model preferences from disk.""" + try: + with open(_SETTINGS_PATH, "r", encoding="utf-8") as f: + return json.load(f) + except Exception: + return {} + + +def save_model_settings(text_model: str, vision_model: str) -> None: + """Persist model preferences to disk.""" + os.makedirs(_SETTINGS_DIR, exist_ok=True) + try: + with open(_SETTINGS_PATH, "w", encoding="utf-8") as f: + json.dump({"text_model": text_model, "vision_model": vision_model}, f, indent=2) + except Exception as e: + print(f"[SETTINGS] Failed to save: {e}") + + +def list_available_models() -> list: + """Query Ollama for installed models. Returns list of model name strings.""" + try: + resp = ollama_client.list() + names = [m["name"] for m in resp.get("models", [])] + return names if names else [_DEFAULT_TEXT_MODEL, _DEFAULT_VISION_MODEL] + except Exception: + return [_DEFAULT_TEXT_MODEL, _DEFAULT_VISION_MODEL, EMBED_MODEL] + + +# Load settings and initialise model dicts +_settings = load_model_settings() + +VISION_MODELS = {"primary": _settings.get("vision_model", _DEFAULT_VISION_MODEL)} +TEXT_MODELS = {"default": _settings.get("text_model", _DEFAULT_TEXT_MODEL)} + + +def reload_model_settings() -> None: + """Re-read settings from disk and update running dicts (called after save).""" + s = load_model_settings() + VISION_MODELS["primary"] = s.get("vision_model", _DEFAULT_VISION_MODEL) + TEXT_MODELS["default"] = s.get("text_model", _DEFAULT_TEXT_MODEL) + + +# ==================== VISION ==================== + +def run_ollama_vision(prompt: str, image_input) -> str: + """Call vision model with Chain-of-Thought for better accuracy.""" + model = VISION_MODELS["primary"] + + try: + import base64 + + image_b64 = "" + if isinstance(image_input, bytes): + image_b64 = base64.b64encode(image_input).decode("utf-8") + elif isinstance(image_input, str) and os.path.isfile(image_input): + with open(image_input, "rb") as f: + image_b64 = base64.b64encode(f.read()).decode("utf-8") + elif isinstance(image_input, str) and len(image_input) > 100: + image_b64 = image_input + else: + raise ValueError("Invalid image input format") + + system_prompt = ( + "You are an expert Electronics Engineer using eSim.\n" + "Analyze the schematic image carefully.\n\n" + "STEP 1: THINKING PROCESS\n" + "- List visible components (e.g., 'I see 4 diodes in a bridge...').\n" + "- Trace connections (e.g., 'Resistor R1 is in series...').\n" + "- Check against the OCR text provided.\n\n" + "STEP 2: JSON OUTPUT\n" + "After your analysis, output a SINGLE JSON object wrapped in ```json ... ```.\n" + "Structure:\n" + "{\n" + ' "vision_summary": "Summary string",\n' + ' "component_counts": {"R": 0, "C": 0, "D": 0, "Q": 0, "U": 0},\n' + ' "circuit_analysis": {\n' + ' "circuit_type": "Rectifier/Amplifier/etc",\n' + ' "design_errors": [],\n' + ' "design_warnings": []\n' + ' },\n' + ' "components": ["R1", "D1"],\n' + ' "values": {"R1": "1k"}\n' + "}\n" + ) + + resp = ollama_client.chat( + model=model, + messages=[ + {"role": "system", "content": system_prompt}, + { + "role": "user", + "content": prompt, + "images": [image_b64], + }, + ], + options={ + "temperature": 0.0, + "num_ctx": 8192, + "num_predict": 1024, + }, + ) + + content = resp["message"]["content"] + + import re + json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL) + if json_match: + return json_match.group(1) + + start = content.find('{') + end = content.rfind('}') + 1 + if start != -1 and end != -1: + return content[start:end] + + return "{}" + + except Exception as e: + print(f"[VISION ERROR] {e}") + return json.dumps({ + "vision_summary": f"Vision failed: {str(e)[:50]}", + "component_counts": {}, + "circuit_analysis": {"circuit_type": "Error", "design_errors": [], "design_warnings": []}, + "components": [], + "values": {}, + }) + + +# ==================== TEXT ==================== + +def run_ollama(prompt: str, mode: str = "default") -> str: + """Run text model with focused parameters.""" + model = TEXT_MODELS.get(mode, TEXT_MODELS["default"]) + + try: + resp = ollama_client.chat( + model=model, + messages=[ + { + "role": "system", + "content": "You are an eSim and electronics expert. Be concise, accurate, and practical.", + }, + {"role": "user", "content": prompt}, + ], + options={ + "temperature": 0.05, + "num_ctx": 2048, + "num_predict": 400, + "top_p": 0.9, + "repeat_penalty": 1.1, + }, + ) + return resp["message"]["content"].strip() + + except Exception as e: + return f"[Error] {str(e)}" + + +# ==================== EMBEDDINGS ==================== + +def get_embedding(text: str): + """Get text embeddings for RAG.""" + try: + r = ollama_client.embeddings(model=EMBED_MODEL, prompt=text) + return r["embedding"] + except Exception as e: + print(f"[EMBED ERROR] {e}") + return None diff --git a/src/chatbot/stt_handler.py b/src/chatbot/stt_handler.py new file mode 100644 index 000000000..f2d536066 --- /dev/null +++ b/src/chatbot/stt_handler.py @@ -0,0 +1,92 @@ +import os +import json +import queue +import time + +try: + import sounddevice as sd + from vosk import Model, KaldiRecognizer + _HAS_STT = True +except Exception: + sd = None + Model = None + KaldiRecognizer = None + _HAS_STT = False + +_MODEL = None + +DEFAULT_VOSK_DIR = os.path.join( + os.path.expanduser("~"), ".local", "share", + "esim-copilot", "vosk-model-small-en-us-0.15", +) + +def _get_model(): + global _MODEL + if not _HAS_STT: + raise RuntimeError( + "Speech-to-text is not available (missing vosk/sounddevice)." + ) + model_path = os.environ.get("VOSK_MODEL_PATH", "").strip() + if not model_path: + model_path = DEFAULT_VOSK_DIR + if not os.path.isdir(model_path): + raise RuntimeError( + f"Vosk model path not found. Set VOSK_MODEL_PATH or install at: {model_path}" + ) + if _MODEL is None: + _MODEL = Model(model_path) + return _MODEL + +def listen_to_mic(should_stop=lambda: False, max_silence_sec=3, samplerate=16000, phrase_limit_sec=8) -> str: + """ + Offline STT using Vosk. + Returns recognized text, or "" if cancelled / timed out. + """ + if not _HAS_STT: + raise RuntimeError("Speech-to-text is not installed or failed to load.") + q = queue.Queue() + rec = KaldiRecognizer(_get_model(), samplerate) + + started = False + t0 = time.time() + t_speech = None + + def callback(indata, frames, time_info, status): + q.put(bytes(indata)) + + with sd.RawInputStream( + samplerate=samplerate, + channels=1, + dtype="int16", + blocksize=8000, + callback=callback, + ): + while True: + if should_stop(): + return "" + + now = time.time() + + # Stop after silence + if not started and (now - t0) >= max_silence_sec: + return "" + + if started and t_speech and (now - t_speech) >= phrase_limit_sec: + break + + try: + data = q.get(timeout=0.2) + except queue.Empty: + continue + + if rec.AcceptWaveform(data): + text = json.loads(rec.Result()).get("text", "").strip() + if text: + return text + else: + partial = json.loads(rec.PartialResult()).get("partial", "").strip() + if partial and not started: + started = True + t_speech = now + + return json.loads(rec.FinalResult()).get("text", "").strip() From dc6018d28ddde6a1b32e429e0f5d2c2847a71d55 Mon Sep 17 00:00:00 2001 From: Neha Dhumal Date: Mon, 1 Jun 2026 12:53:54 +0000 Subject: [PATCH 2/4] Add RAG retrieval to chatbot thread --- src/chatbot/chatbot_thread.py | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/src/chatbot/chatbot_thread.py b/src/chatbot/chatbot_thread.py index c8cce620b..0a6b4546f 100644 --- a/src/chatbot/chatbot_thread.py +++ b/src/chatbot/chatbot_thread.py @@ -6,7 +6,7 @@ import threading import ollama from PyQt5.QtCore import QThread, pyqtSignal - +from chatbot.knowledge_base import search_knowledge # ── Optional imports ────────────────────────────────────────────────────────── try: @@ -307,11 +307,38 @@ def run(self): if not _ensure_ollama_running(self): return self.status_signal.emit("Ollama is ready! Getting response…") + latest_user = "" + + for line in reversed(self.chat_history): + if line.startswith("User:"): + latest_user = line[5:].strip() + break + + rag_context = "" + if latest_user: + try: + rag_context = search_knowledge(latest_user, n_results=5) + except Exception as e: + print(f"RAG Error: {e}") + rag_context = "" # Keep last 10 history lines (5 turns). # Sending 20 lines fills most of the context window before the # question is even added, forcing the model to load more tokens. - messages = [{"role": "system", "content": _SYSTEM_PROMPT}] + system_prompt = _SYSTEM_PROMPT + + if rag_context.strip(): + system_prompt += f""" + + === ESIM REFERENCE MANUAL === + + {rag_context} + + Use this documentation when answering eSim-related questions. + Prefer the documentation over assumptions whenever applicable. + """ + + messages = [{"role": "system", "content": system_prompt}] for line in self.chat_history[-10:]: if line.startswith("User:"): messages.append({"role": "user", "content": line[5:].strip()}) From d6f683eba1f79404fc1234054fc88103dbb32165 Mon Sep 17 00:00:00 2001 From: Neha Dhumal Date: Tue, 2 Jun 2026 06:12:48 +0000 Subject: [PATCH 3/4] Fix RAG relevance threshold for retrieval --- src/chatbot/knowledge_base.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/chatbot/knowledge_base.py b/src/chatbot/knowledge_base.py index 14ea4cc17..ae12ab6aa 100644 --- a/src/chatbot/knowledge_base.py +++ b/src/chatbot/knowledge_base.py @@ -91,7 +91,13 @@ def ingest_pdfs(manuals_directory: str) -> None: # Relevance threshold: ChromaDB returns distances (L2 or cosine). # Lower distance = more similar. Filter out chunks with distance > threshold. -RELEVANCE_THRESHOLD = float(os.environ.get("ESIM_RAG_RELEVANCE_THRESHOLD", "1.0")) +# ChromaDB distances for nomic-embed-text embeddings are typically +# in the hundreds. Results above 500 are considered insufficiently +# relevant and are filtered out. +RELEVANCE_THRESHOLD = float( + os.environ.get("ESIM_RAG_RELEVANCE_THRESHOLD", "500") +) +RELEVANCE_THRESHOLD = float(os.environ.get("ESIM_RAG_RELEVANCE_THRESHOLD", "500")) def search_knowledge(query: str, n_results: int = 4) -> str: From a030898956114c4eb929e8643578d6e6ed6ab58c Mon Sep 17 00:00:00 2001 From: Neha Dhumal Date: Tue, 2 Jun 2026 07:41:23 +0000 Subject: [PATCH 4/4] Improve chatbot setup script and automate RAG ingestion --- scripts/setup_copilot_ubuntu.sh | 57 +++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 14 deletions(-) diff --git a/scripts/setup_copilot_ubuntu.sh b/scripts/setup_copilot_ubuntu.sh index 3e3af20d5..93659ef82 100644 --- a/scripts/setup_copilot_ubuntu.sh +++ b/scripts/setup_copilot_ubuntu.sh @@ -3,7 +3,7 @@ set -euo pipefail ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" -echo "[1/7] Installing system packages (Ubuntu/Debian)…" +echo "[1/8] Installing system packages (Ubuntu/Debian)…" sudo apt-get update sudo apt-get install -y \ python3.10 python3.10-venv python3-pip \ @@ -15,35 +15,44 @@ sudo apt-get install -y \ libxcb-icccm4 libxcb-image0 libxcb-keysyms1 libxcb-render-util0 \ libxcb-xinput0 libxcb-shape0 libxcb-randr0 libxcb-util1 -echo "[2/7] Creating Python virtualenv…" +echo "[2/8] Creating Python virtualenv…" cd "$ROOT_DIR" python3.10 -m venv .venv source .venv/bin/activate -echo "[3/7] Installing Python dependencies…" +echo "[3/8] Installing Python dependencies…" python -m pip install --upgrade pip wheel # hdlparse needs setuptools<58 (use_2to3 removed in setuptools 58+) python -m pip install setuptools==57.5.0 -python -m pip install hdlparse==1.0.4 --no-build-isolation +python -m pip install hdlparse==1.0.4 --no-build-isolation || { + echo + echo "ERROR: hdlparse installation failed." + echo "The original hdlparse package may fail on some systems." + echo + echo "Alternative:" + echo "pip install 'hdlparse @ git+https://github.com/nehadhumal-dev/hdlparse.git'" + echo + exit 1 +} echo "setuptools<58" > /tmp/pip-constraints.txt python -m pip install -c /tmp/pip-constraints.txt -r requirements.txt python -m pip install -c /tmp/pip-constraints.txt -r requirements-copilot.txt -echo "[4/7] Installing PaddlePaddle (CPU, AVX, MKL)…" -python -m pip install "paddlepaddle==2.5.2" \ +echo "[4/8] Installing PaddlePaddle (CPU, AVX, MKL)…" +python -m pip install "paddlepaddle==2.6.2" \ -f https://www.paddlepaddle.org.cn/whl/linux/mkl/avx/stable.html -echo "[5/7] Installing Ollama if missing…" +echo "[5/8] Installing Ollama if missing…" if ! command -v ollama >/dev/null 2>&1; then curl -fsSL https://ollama.com/install.sh | sh fi -echo "[6/7] Pulling required Ollama models…" +echo "[6/8] Pulling required Ollama models…" ollama pull qwen2.5:3b ollama pull minicpm-v ollama pull nomic-embed-text -echo "[7/7] Installing Vosk small English model…" +echo "[7/8] Installing Vosk small English model…" VOSK_BASE="${XDG_DATA_HOME:-$HOME/.local/share}/esim-copilot" mkdir -p "$VOSK_BASE" cd "$VOSK_BASE" @@ -52,14 +61,34 @@ if [ ! -d "vosk-model-small-en-us-0.15" ]; then unzip -q vosk-model-small-en-us-0.15.zip rm -f vosk-model-small-en-us-0.15.zip fi +echo "[8/8] Running RAG ingestion..." +CHROMA_DB="$ROOT_DIR/src/chroma_db" + +if [ ! -d "$CHROMA_DB" ] || [ -z "$(ls -A "$CHROMA_DB" 2>/dev/null)" ]; then + echo "Creating knowledge base..." + ( + cd "$ROOT_DIR/src" + python ingest.py + ) || { + echo "ERROR: RAG ingestion failed." + exit 1 + } +else + echo "Knowledge base already exists, skipping ingestion." +fi echo echo "Done." -echo "- Activate venv: source \"$ROOT_DIR/.venv/bin/activate\"" -echo "- Run ingestion (optional for RAG): (cd \"$ROOT_DIR/src\" && python ingest.py)" -echo "- Run eSim: (cd \"$ROOT_DIR/src/frontEnd\" && QT_QPA_PLATFORM=xcb python Application.py)" +echo +echo "To start a new session:" +echo " cd \"$ROOT_DIR\"" +echo " source .venv/bin/activate" +echo " cd src/frontEnd" +echo " python Application.py" +echo +echo "RAG knowledge base:" +echo " Stored in: $ROOT_DIR/src/chroma_db" echo echo "Optional env vars:" echo "- export VOSK_MODEL_PATH=\"$VOSK_BASE/vosk-model-small-en-us-0.15\"" -echo "- export ESIM_COPILOT_DB_PATH=\"${XDG_DATA_HOME:-$HOME/.local/share}/esim-copilot/chroma\"" - +echo "- export ESIM_COPILOT_DB_PATH=\"${XDG_DATA_HOME:-$HOME/.local/share}/esim-copilot/chroma\"" \ No newline at end of file