diff --git a/app/jira_agent/__init__.py b/app/jira_agent/__init__.py new file mode 100644 index 0000000..c58d752 --- /dev/null +++ b/app/jira_agent/__init__.py @@ -0,0 +1,24 @@ +"""JIRA agent for automating interaction with JIRA tickets. + +This package provides tools for: +1. Authenticating with JIRA (including SSO) +2. Navigating to and extracting data from JIRA tickets +3. Performing actions on tickets (commenting, status updates, etc.) +4. Integrating with external APIs and parsing API documentation +""" + +from app.jira_agent.jira import JiraAgent +from app.jira_agent.api_utils import ( + select_api_with_llm, + extract_endpoints_rule_based, + get_api_documentation, + determine_headers +) + +__all__ = [ + "JiraAgent", + "select_api_with_llm", + "extract_endpoints_rule_based", + "get_api_documentation", + "determine_headers" +] \ No newline at end of file diff --git a/app/jira_agent/api_utils.py b/app/jira_agent/api_utils.py new file mode 100644 index 0000000..0d08fdb --- /dev/null +++ b/app/jira_agent/api_utils.py @@ -0,0 +1,349 @@ +"""Utilities for interacting with APIs and parsing documentation. + +This module provides functions for: +1. Parsing API documentation using LLMs +2. Extracting endpoints and authentication requirements +3. Fallback methods using rule-based parsing +""" + +import json +import re +import requests +import logging +import os +from typing import Dict, List, Optional, Any, Union + +# Set up logging +logger = logging.getLogger(__name__) + +# Default LLM settings +DEFAULT_LLM_MODEL = "gpt-4" # For OpenAI +DEFAULT_ANTHROPIC_MODEL = "claude-3-opus-20240229" # For Anthropic + + +def select_api_with_llm( + ticket_data: Dict[str, Any], + documentation: Union[Dict, str], + llm_api_key: Optional[str] = None, + llm_api_url: Optional[str] = None +) -> Dict[str, Any]: + """ + Use an LLM to analyze API documentation and ticket data to select the best endpoint. + + Args: + ticket_data: Dictionary containing ticket information + documentation: API documentation as text or JSON + llm_api_key: API key for the LLM service (OpenAI, Anthropic, etc.) + llm_api_url: URL for the LLM API endpoint + + Returns: + Dictionary with selected endpoint, parameters, and relevant ticket information + """ + if not documentation: + logger.warning("No documentation provided for parsing") + return {"endpoint": None, "relevant_info": None, "endpoint_params": {}} + + # Convert documentation to string if it's a dictionary + doc_text = json.dumps(documentation) if isinstance(documentation, dict) else str(documentation) + + # Extract key ticket information for the prompt + ticket_id = ticket_data.get("id", "") + summary = ticket_data.get("summary", "") + description = ticket_data.get("description", "") + assignee = ticket_data.get("assignee", {}) + assignee_email = "" + if isinstance(assignee, dict): + assignee_email = assignee.get("emailAddress", "") + elif isinstance(assignee, str): + assignee_email = assignee + + # Check if we have an LLM API key (from args or environment) + llm_api_key = llm_api_key or os.getenv("LLM_API_KEY", "") + llm_api_url = llm_api_url or os.getenv("LLM_API_URL", "https://api.openai.com/v1/chat/completions") + + if not llm_api_key: + logger.warning("No LLM API key provided. Using rule-based parsing as fallback.") + endpoints = extract_endpoints_rule_based(documentation) + return { + "endpoint": endpoints.get("analysis_endpoint"), + "relevant_info": None, + "endpoint_params": {} + } + + try: + logger.info("Using LLM to intelligently select API endpoint based on ticket data") + + # Prepare prompt for the LLM + prompt = f""" + You are an AI assistant helping to analyze a JIRA ticket and determine the appropriate API endpoint to call. + + JIRA Ticket Information: + ID: {ticket_id} + Summary: {summary} + Description: {description} + Assignee Email: {assignee_email} + + API Documentation: + {doc_text[:4000]} # Truncate if too large + + Task: + 1. Analyze the JIRA ticket to identify what action needs to be performed + 2. Determine the most appropriate API endpoint to call based on the documentation + 3. Extract specific information from the ticket that would be needed as parameters for the API call + 4. Identify the required parameters for the selected endpoint + + Response format: + {{ + "endpoint": "The most appropriate endpoint path", + "relevant_info": {{ + "extracted_values_from_ticket": "that_are_relevant", + "can_include_multiple": "key_value_pairs" + }}, + "endpoint_params": {{ + "param_name": "description of what this parameter requires", + "another_param": "another description" + }} + }} + + Please return ONLY valid JSON in exactly the format specified. No additional text. + """ + + # Make request to LLM API + headers = { + "Authorization": f"Bearer {llm_api_key}", + "Content-Type": "application/json" + } + + # Determine which model to use based on the API URL + is_openai = "openai" in llm_api_url.lower() + model = DEFAULT_LLM_MODEL if is_openai else DEFAULT_ANTHROPIC_MODEL + + payload = { + "model": model, + "messages": [{"role": "user", "content": prompt}], + "temperature": 0.2, # Low temperature for more deterministic results + "max_tokens": 2000 + } + + # Make the actual API call to the LLM + response = requests.post(llm_api_url, headers=headers, json=payload) + + if response.status_code == 200: + llm_response = response.json() + + # Extract content based on API format (different for OpenAI vs Anthropic vs others) + content = "" + if is_openai and "choices" in llm_response: # OpenAI format + content = llm_response["choices"][0]["message"]["content"] + elif not is_openai and "content" in llm_response: # Anthropic format + content = llm_response["content"][0]["text"] + else: + content = str(llm_response) + logger.warning(f"Unexpected LLM response format: {content[:100]}...") + + try: + # Parse the JSON response + extracted_data = json.loads(content) + logger.info(f"LLM successfully analyzed ticket and selected endpoint: {extracted_data.get('endpoint')}") + + # Ensure the response has the expected structure + if "endpoint" not in extracted_data: + extracted_data["endpoint"] = None + if "relevant_info" not in extracted_data: + extracted_data["relevant_info"] = {} + if "endpoint_params" not in extracted_data: + extracted_data["endpoint_params"] = {} + + return extracted_data + + except json.JSONDecodeError: + logger.error(f"Failed to parse LLM response as JSON: {content[:100]}...") + # Try to extract JSON from the content if it's embedded in other text + json_match = re.search(r'({.*})', content.replace('\n', '')) + if json_match: + try: + extracted_data = json.loads(json_match.group(1)) + logger.info(f"Extracted JSON from LLM response with endpoint: {extracted_data.get('endpoint')}") + + # Ensure the response has the expected structure + if "endpoint" not in extracted_data: + extracted_data["endpoint"] = None + if "relevant_info" not in extracted_data: + extracted_data["relevant_info"] = {} + if "endpoint_params" not in extracted_data: + extracted_data["endpoint_params"] = {} + + return extracted_data + except: + pass + else: + logger.error(f"LLM API call failed with status {response.status_code}: {response.text}") + + # If we get here, something went wrong with the LLM parsing + logger.warning("Falling back to rule-based parsing") + endpoints = extract_endpoints_rule_based(documentation) + return { + "endpoint": endpoints.get("analysis_endpoint"), + "relevant_info": extract_ticket_info(ticket_data), + "endpoint_params": {} + } + + except Exception as e: + logger.exception(f"Error using LLM to select API endpoint: {e}") + logger.warning("Falling back to rule-based parsing") + endpoints = extract_endpoints_rule_based(documentation) + return { + "endpoint": endpoints.get("analysis_endpoint"), + "relevant_info": extract_ticket_info(ticket_data), + "endpoint_params": {} + } + + +def extract_ticket_info(ticket_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Extract relevant information from a ticket for API calls. + Used as a fallback when LLM parsing fails. + + Args: + ticket_data: Dictionary containing ticket information + + Returns: + Dictionary with extracted information + """ + info = {} + + # Extract email from description or summary + description = ticket_data.get("description", "") + summary = ticket_data.get("summary", "") + + # Try to find an email in the description or summary + email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' + email_matches = [] + + if isinstance(description, str): + email_matches = re.findall(email_pattern, description) + if not email_matches and isinstance(summary, str): + email_matches = re.findall(email_pattern, summary) + + if email_matches: + info["email"] = email_matches[0] + + # Add ticket ID and assignee + info["ticket_id"] = ticket_data.get("id", "") + + assignee = ticket_data.get("assignee", {}) + if isinstance(assignee, dict): + info["assignee"] = assignee.get("displayName", "") + info["assignee_email"] = assignee.get("emailAddress", "") + elif isinstance(assignee, str): + info["assignee"] = assignee + + return info + + +def extract_endpoints_rule_based(documentation: Union[Dict, str]) -> Dict[str, Any]: + """ + Extract endpoints from documentation using simple rules. + Used as a fallback when LLM parsing fails. + + Args: + documentation: Dictionary or string of API documentation + + Returns: + Dictionary with extracted endpoints and analysis endpoint + """ + endpoints = [] + + if isinstance(documentation, dict): + # Look for endpoints in various common documentation formats + if "paths" in documentation: # OpenAPI/Swagger format + endpoints = list(documentation["paths"].keys()) + elif "endpoints" in documentation: + endpoints = documentation["endpoints"] + elif "resources" in documentation: + endpoints = documentation["resources"] + else: + # Try to find endpoints by looking at all keys + for key, value in documentation.items(): + if isinstance(value, dict) and ("url" in value or "path" in value or "endpoint" in value): + endpoints.append(key) + else: + # For text documentation, try to extract paths using regex + doc_text = str(documentation) + endpoints = re.findall(r'["\']?(/[a-zA-Z0-9/_-]+)', doc_text) + endpoints = list(set(endpoints)) # Remove duplicates + + # Determine which endpoint to use for analysis + analysis_endpoint = None + for endpoint in endpoints: + if "analy" in endpoint.lower(): + analysis_endpoint = endpoint + break + + logger.info(f"Rule-based parsing found {len(endpoints)} endpoints") + return { + "endpoints": endpoints, + "analysis_endpoint": analysis_endpoint + } + + +def get_api_documentation(api_url: str, guide_path: str = "/docs/guide") -> Dict[str, Any]: + """ + Fetch API documentation from the given URL. + + Args: + api_url: Base URL of the API + guide_path: Path to the API documentation or guide + + Returns: + Tuple of (documentation_content, content_type) + """ + guide_url = f"{api_url.rstrip('/')}{guide_path}" + logger.info(f"Fetching API documentation from {guide_url}") + + try: + response = requests.get(guide_url, timeout=5) + + if response.status_code == 200: + content_type = response.headers.get('Content-Type', '') + + # Return the appropriate format based on content type + if 'application/json' in content_type: + return response.json(), 'json' + else: + return response.text, 'text' + else: + logger.warning(f"Failed to get documentation: HTTP {response.status_code}") + return None, None + + except Exception as e: + logger.exception(f"Error fetching API documentation: {e}") + return None, None + + +def determine_headers(parsed_docs: Dict[str, Any], api_key: Optional[str] = None) -> Dict[str, str]: + """ + Determine the headers to use for API requests based on parsed documentation. + + Args: + parsed_docs: Documentation parsed by LLM or rule-based methods + api_key: API key to use for authentication (if required) + + Returns: + Dictionary of headers to use for requests + """ + headers = {} + + # If we have information about required headers from the documentation + if "required_headers" in parsed_docs: + for header, _ in parsed_docs.get("required_headers", {}).items(): + if header.lower() == "authorization" and api_key: + auth_method = parsed_docs.get("auth_method", "Bearer").split()[0] + headers["Authorization"] = f"{auth_method} {api_key}" + logger.info(f"Added authentication header: {header}") + # Default to Bearer token if no specific auth method is specified + elif api_key: + headers["Authorization"] = f"Bearer {api_key}" + logger.info("Added default Bearer token authentication header") + + return headers \ No newline at end of file diff --git a/app/jira_agent/auth.py b/app/jira_agent/auth.py new file mode 100644 index 0000000..bce7682 --- /dev/null +++ b/app/jira_agent/auth.py @@ -0,0 +1,844 @@ +"""Authentication utilities for JIRA. + +This module provides functions for authenticating with JIRA using various methods: +- Username/password +- SSO providers (Google, Microsoft, Okta, etc.) +- API tokens +""" + +import logging +from typing import Optional, Tuple +from app.browser_agent.browser import Browser +from app.jira_agent.selectors import DEFAULT_SELECTORS, SSO_SELECTORS + +logger = logging.getLogger(__name__) + + +def is_login_page(browser: Browser) -> bool: + """Check if we're on a login page. + + Args: + browser: Browser instance + + Returns: + True if this appears to be a login page + """ + try: + # First wait to ensure page is fully loaded + browser.wait(2000) + + login_selectors = [ + DEFAULT_SELECTORS["login_page"]["username_field"], + DEFAULT_SELECTORS["login_page"]["password_field"], + DEFAULT_SELECTORS["login_page"]["login_button"] + ] + + # Add SSO selectors + for provider_selectors in SSO_SELECTORS.values(): + login_selectors.extend(provider_selectors) + + for selector in login_selectors: + try: + # Use get_element_info for better detection + element_info = browser.get_element_info(selector) + if element_info: + logger.info(f"Login element detected: {element_info['tag']}") + return True + except Exception as e: + logger.debug(f"Error checking selector {selector}: {e}") + continue + + # Fallback to wait_for_selector for complex selectors + try: + if browser.wait_for_selector(selector, timeout=1000): + return True + except Exception as e: + logger.debug(f"Error waiting for selector {selector}: {e}") + continue + + return False + except Exception as e: + logger.warning(f"Error checking if on login page: {e}") + # If we can't determine, assume we're not on a login page + return False + + +def login(browser: Browser, username: str, password: str, use_sso: bool = True, + prefer_google: bool = True) -> bool: + """Handle JIRA login with support for SSO. + + Args: + browser: Browser instance + username: JIRA username/email + password: JIRA password/API token + use_sso: Whether to attempt SSO login (default: True) + prefer_google: Whether to prefer Google SSO if available (default: True) + + Returns: + True if login appears successful + """ + try: + # First check for SSO options if requested + if use_sso: + logger.info("Attempting SSO login") + + # First check what SSO options are available + available_sso = [] + + # Check for Google + google_present = browser.execute_script(""" + const googleTexts = ['google', 'continue with google', 'sign in with google']; + const elements = Array.from(document.querySelectorAll('button, a, div[role="button"]')); + + for (const el of elements) { + const text = el.innerText ? el.innerText.toLowerCase() : ''; + if (googleTexts.some(gt => text.includes(gt))) { + return true; + } + + // Check for Google images + const images = el.querySelectorAll('img'); + for (const img of images) { + if ((img.alt && img.alt.toLowerCase().includes('google')) || + (img.src && img.src.toLowerCase().includes('google'))) { + return true; + } + } + } + return false; + """) + + # Check for Microsoft + microsoft_present = browser.execute_script(""" + const microsoftTexts = ['microsoft', 'continue with microsoft', 'sign in with microsoft', 'azure', 'office 365']; + const elements = Array.from(document.querySelectorAll('button, a, div[role="button"]')); + + for (const el of elements) { + const text = el.innerText ? el.innerText.toLowerCase() : ''; + if (microsoftTexts.some(mt => text.includes(mt))) { + return true; + } + + // Check for Microsoft images + const images = el.querySelectorAll('img'); + for (const img of images) { + if ((img.alt && img.alt.toLowerCase().includes('microsoft')) || + (img.src && img.src.toLowerCase().includes('microsoft'))) { + return true; + } + } + } + return false; + """) + + if google_present: + available_sso.append("google") + logger.info("Google SSO option detected") + + if microsoft_present: + available_sso.append("microsoft") + logger.info("Microsoft SSO option detected") + + # Prioritize Google SSO if preferred and available + if prefer_google and "google" in available_sso: + logger.info("Attempting Google SSO login (preferred)") + if _try_sso_provider(browser, username, password, provider="google", avoid_providers=["microsoft"]): + logger.info("Google SSO login successful") + + # Wait to make sure we're fully logged in + browser.wait(10000) + + # Check if we're still on a login page + if not is_login_page(browser): + return True + + # Otherwise try each SSO option in order + for provider in ["generic_sso", "google", "microsoft", "okta"]: + if provider in available_sso or provider == "generic_sso": + logger.info(f"Trying {provider} SSO") + avoid = [] + if provider != "microsoft": + avoid = ["microsoft"] # Avoid clicking Microsoft when trying other providers + + if _try_sso_provider(browser, username, password, provider=provider, avoid_providers=avoid): + logger.info(f"{provider} SSO login successful") + + # Wait to make sure we're fully logged in + browser.wait(10000) + + # Check if we're still on a login page + if not is_login_page(browser): + return True + + # Regular username/password flow if no SSO or SSO not requested + logger.info("Attempting standard login") + if _standard_login(browser, username, password): + # Wait to make sure we're fully logged in + browser.wait(10000) + + # Check if we're still on a login page + if not is_login_page(browser): + logger.info("Standard login successful") + return True + else: + logger.warning("Still on login page after login attempt") + return False + + except Exception as e: + logger.error(f"Login error: {e}") + return False + + logger.warning("Login process completed but result unclear") + return True # Default to True as we want to continue trying + + +def _try_sso_provider(browser: Browser, username: str, password: str, provider: str, avoid_providers: list = None) -> bool: + """Try to log in using a specific SSO provider. + + Args: + browser: Browser instance + username: SSO username/email + password: SSO password + provider: Provider name ("google", "microsoft", "okta", or "generic_sso") + avoid_providers: List of providers to avoid clicking (e.g., ["microsoft"]) + + Returns: + True if login appears successful + """ + if avoid_providers is None: + avoid_providers = [] + + if provider not in SSO_SELECTORS: + logger.warning(f"Unknown SSO provider: {provider}") + return False + + # Get selectors for the provider + provider_selectors = SSO_SELECTORS[provider] + + # Output debug info about the current page + try: + html = browser.get_page_html() + logger.info(f"Current page HTML length: {len(html)}") + logger.info(f"Current URL: {browser.get_current_url()}") + + # Look for provider-related content in the HTML + if provider.lower() in html.lower(): + logger.info(f"Page contains '{provider}' references") + except Exception as e: + logger.warning(f"Error getting page debug info: {e}") + + # JavaScript fallback for finding and clicking login buttons + try: + logger.info(f"Trying JavaScript fallback to find {provider} button") + found = browser.execute_script(f""" + // Look for buttons/links containing the text {provider} + function findElement() {{ + // Convert provider to lowercase for case-insensitive matching + const providerLower = "{provider}".toLowerCase(); + + // List of providers to avoid + const avoidProviders = {avoid_providers}; + + // First look for buttons/links with text containing the provider name + const elements = Array.from(document.querySelectorAll('button, a, div[role="button"], span[role="button"]')); + + for (const el of elements) {{ + // Skip any privacy policy links + if (el.href && ( + el.href.includes("policies.google.com/privacy") || + el.href.includes("privacy") || + el.href.includes("terms") + )) {{ + console.log("Skipping privacy/terms link:", el); + continue; + }} + + // Skip elements that are likely not login buttons + if (el.innerText && ( + el.innerText.toLowerCase().includes("privacy") || + el.innerText.toLowerCase().includes("policy") || + el.innerText.toLowerCase().includes("terms") || + el.innerText.toLowerCase().includes("cookie") + )) {{ + console.log("Skipping policy/terms text element:", el); + continue; + }} + + // Skip elements for providers we want to avoid + let shouldAvoid = false; + for (const avoidProvider of avoidProviders) {{ + if (el.innerText && el.innerText.toLowerCase().includes(avoidProvider.toLowerCase())) {{ + console.log(`Skipping element containing avoided provider '${avoidProvider}':`, el); + shouldAvoid = true; + break; + }} + + // Check if any images contain the avoid provider + const images = el.querySelectorAll('img'); + for (const img of images) {{ + if ((img.alt && img.alt.toLowerCase().includes(avoidProvider.toLowerCase())) || + (img.src && img.src.toLowerCase().includes(avoidProvider.toLowerCase()))) {{ + console.log(`Skipping element with image of avoided provider '${avoidProvider}':`, el); + shouldAvoid = true; + break; + }} + }} + + if (shouldAvoid) {{ + break; + }} + }} + + if (shouldAvoid) {{ + continue; + }} + + // Check the text content for login-related text + if (el.innerText && el.innerText.toLowerCase().includes(providerLower)) {{ + // Make sure it's a login button by checking for login-related text + if ( + el.innerText.toLowerCase().includes("sign in") || + el.innerText.toLowerCase().includes("login") || + el.innerText.toLowerCase().includes("log in") || + el.innerText.toLowerCase().includes("continue with") + ) {{ + console.log("Found login button by text content:", el); + return el; + }} + + // If it mentions the provider prominently, it's likely a login button + if (el.tagName === "BUTTON" || el.role === "button") {{ + console.log("Found button with provider mention:", el); + return el; + }} + }} + + // For Google specific detection + if (providerLower === "google" && el.innerText && + (el.innerText.toLowerCase().includes("google") || + el.innerText.toLowerCase() === "g")) {{ + console.log("Found Google-specific button:", el); + return el; + }} + + // For Microsoft specific detection + if (providerLower === "microsoft" && el.innerText && + (el.innerText.toLowerCase().includes("microsoft") || + el.innerText.toLowerCase().includes("azure") || + el.innerText.toLowerCase().includes("office 365"))) {{ + console.log("Found Microsoft-specific button:", el); + return el; + }} + + // Check aria-label for login-related text + if (el.getAttribute('aria-label') && + el.getAttribute('aria-label').toLowerCase().includes(providerLower)) {{ + if ( + el.getAttribute('aria-label').toLowerCase().includes("sign in") || + el.getAttribute('aria-label').toLowerCase().includes("login") || + el.getAttribute('aria-label').toLowerCase().includes("log in") + ) {{ + console.log("Found by aria-label:", el); + return el; + }} + }} + + // Check for nested images with alt text or src containing provider + const images = el.querySelectorAll('img'); + for (const img of images) {{ + if ((img.alt && img.alt.toLowerCase().includes(providerLower)) || + (img.src && img.src.toLowerCase().includes(providerLower))) {{ + // Skip if the parent has href to privacy + if (el.href && ( + el.href.includes("policies.google.com") || + el.href.includes("privacy") || + el.href.includes("terms") + )) {{ + console.log("Skipping privacy link with provider image:", el); + continue; + }} + console.log("Found via nested image:", el); + return el; + }} + }} + + // Check for class or id containing provider and looks like a login button + if ((el.id && el.id.toLowerCase().includes(providerLower)) || + (el.className && el.className.toLowerCase().includes(providerLower))) {{ + // Skip if looks like a privacy element + if ( + (el.id && (el.id.toLowerCase().includes("privacy") || el.id.toLowerCase().includes("term"))) || + (el.className && (el.className.toLowerCase().includes("privacy") || el.className.toLowerCase().includes("term"))) + ) {{ + console.log("Skipping privacy/terms element:", el); + continue; + }} + console.log("Found by class/id:", el); + return el; + }} + }} + + return null; + }} + + const element = findElement(); + if (element) {{ + // Get element details for logging + const details = {{ + tag: element.tagName, + text: element.innerText, + className: element.className, + id: element.id, + href: element.href || null + }}; + + console.log("Clicking element:", details); + element.click(); + return details; + }} + return null; + """) + + if found: + logger.info(f"JavaScript found and clicked {provider} button: {found}") + browser.wait(10000) # Wait for redirect + return _handle_sso_flow(browser, username, password, provider) + except Exception as e: + logger.error(f"Error using JavaScript fallback: {e}") + + # Now try the regular selectors as fallback + logger.info(f"Trying {len(provider_selectors)} {provider} selectors: {provider_selectors}") + + # Look for any SSO buttons for this provider + for selector in provider_selectors: + try: + # Check if element exists and is visible + element_info = browser.get_element_info(selector) + if element_info: + # Skip if it looks like a privacy policy link + href = element_info.get('attributes', {}).get('href', '') + if href and ('privacy' in href or 'policies.google.com' in href or 'terms' in href): + logger.info(f"Skipping privacy/terms link: {href}") + continue + + # Check if this element contains text of a provider we want to avoid + should_avoid = False + for avoid_provider in avoid_providers: + text = element_info.get('text', '').lower() + if avoid_provider.lower() in text: + logger.info(f"Skipping element containing avoided provider '{avoid_provider}': {text}") + should_avoid = True + break + + if should_avoid: + continue + + logger.info(f"Found {provider} element with selector '{selector}': {element_info}") + if element_info.get('isVisible', False): + logger.info(f"VISIBLE {provider} SSO option: {element_info['tag']} with selector '{selector}'") + browser.click_selector(selector) + browser.wait(8000) # Increased wait for redirect + + # Handle provider-specific login + return _handle_sso_flow(browser, username, password, provider) + else: + logger.info(f"Element found but NOT VISIBLE: {element_info}") + else: + logger.debug(f"No element found with selector: {selector}") + + # Fallback to simple selector check + if browser.wait_for_selector(selector, timeout=2000): # Increased timeout + # Check if it's a privacy policy link before clicking + avoid_check = browser.execute_script(f""" + const el = document.querySelector("{selector}"); + if (!el) return null; + + // Check for privacy policy + if (el.href && ( + el.href.includes("privacy") || + el.href.includes("policies.google.com") || + el.href.includes("terms") + )) {{ + return "privacy"; + }} + + // Check for avoided providers + const avoidProviders = {avoid_providers}; + for (const avoid of avoidProviders) {{ + if (el.innerText && el.innerText.toLowerCase().includes(avoid.toLowerCase())) {{ + return avoid; + }} + }} + + return null; + """) + + if avoid_check: + logger.info(f"Skipping element with '{avoid_check}' content: {selector}") + continue + + logger.info(f"Successfully waited for {provider} SSO option: {selector}") + browser.click_selector(selector) + browser.wait(8000) # Increased wait for redirect + + # Handle provider-specific login + return _handle_sso_flow(browser, username, password, provider) + except Exception as e: + logger.debug(f"Error trying SSO selector {selector}: {e}") + continue + + logger.warning(f"Could not find any {provider} SSO options on the page") + return False + + +def _handle_sso_flow(browser: Browser, username: str, password: str, provider: str) -> bool: + """Handle SSO provider authentication flow. + + Args: + browser: Browser instance + username: SSO username/email + password: SSO password + provider: Provider name ("google", "microsoft", "okta", or "generic_sso") + + Returns: + True if login appears successful + """ + # Wait for SSO page to load + browser.wait(5000) # Increased wait time + + # Get current URL to determine provider if not explicitly specified + if provider == "generic_sso": + current_url = browser.get_current_url() + logger.info(f"SSO redirect URL: {current_url}") + + # Detect provider from URL + if "google" in current_url.lower(): + provider = "google" + elif any(p in current_url.lower() for p in ["microsoft", "azure", "live"]): + provider = "microsoft" + elif "okta" in current_url.lower(): + provider = "okta" + + # Handle different providers + if provider == "google": + return _handle_google_sso(browser, username, password) + elif provider == "microsoft": + return _handle_microsoft_sso(browser, username, password) + elif provider == "okta": + return _handle_okta_sso(browser, username, password) + else: + return _handle_generic_sso(browser, username, password) + + +def _standard_login(browser: Browser, username: str, password: str) -> bool: + """Handle standard username/password login flow. + + Args: + browser: Browser instance + username: JIRA username + password: JIRA password + + Returns: + True if login appears successful + """ + # Enter username + username_selector = DEFAULT_SELECTORS["login_page"]["username_field"] + if browser.wait_for_selector(username_selector, timeout=3000): + browser.click_selector(username_selector) + browser.type(username) + + # Look for Continue or Submit button + continue_selector = "button[id='login-submit'], button[type='submit'], button:contains('Continue')" + if browser.wait_for_selector(continue_selector, timeout=2000): + browser.click_selector(continue_selector) + browser.wait(3000) + + # Enter password (might be on a second screen after username) + password_selector = DEFAULT_SELECTORS["login_page"]["password_field"] + if browser.wait_for_selector(password_selector, timeout=3000): + browser.click_selector(password_selector) + browser.type(password) + + # Click login button + login_selector = DEFAULT_SELECTORS["login_page"]["login_button"] + if browser.wait_for_selector(login_selector, timeout=2000): + browser.click_selector(login_selector) + browser.wait(5000) # Wait longer for login to complete + + return True + + +def _handle_google_sso(browser: Browser, username: str, password: str) -> bool: + """Handle Google SSO login flow. + + Args: + browser: Browser instance + username: Google email + password: Google password + + Returns: + True if login appears successful + """ + try: + # Enter email + email_selector = "input[type='email']" + if browser.wait_for_selector(email_selector, timeout=10000): # Increased timeout + # Check element state + element_info = browser.get_element_info(email_selector) + if element_info: + logger.info(f"Found email input: {element_info.get('attributes', {})}") + + browser.click_selector(email_selector) + browser.type(username) + + # Click next + next_selector = "button:contains('Next'), button[id='identifierNext']" + if browser.wait_for_selector(next_selector, timeout=5000): # Increased timeout + browser.click_selector(next_selector) + browser.wait(5000) # Increased wait time + + # Enter password + password_selector = "input[type='password']" + if browser.wait_for_selector(password_selector, timeout=10000): # Increased timeout + browser.click_selector(password_selector) + browser.type(password) + + # Click sign in + signin_selector = "button:contains('Sign in'), button[id='passwordNext']" + if browser.wait_for_selector(signin_selector, timeout=5000): # Increased timeout + browser.click_selector(signin_selector) + + # Wait longer for Google authentication to complete and redirect back + browser.wait(15000) # Substantially increased wait time + + # Check if we're redirected back to JIRA + current_url = browser.get_current_url() + logger.info(f"Current URL after Google auth: {current_url}") + + # Check if we successfully returned to JIRA + if "atlassian" in current_url or "jira" in current_url: + # Wait for JIRA UI to fully load + browser.wait(5000) + return True + + # Handle potential 2FA challenge or other authentication steps + browser.wait(5000) + verify_selectors = [ + "input[id='totpPin']", # TOTP verification code + "button:contains('Try another way')", + "button:contains('Verify')" + ] + + for selector in verify_selectors: + if browser.wait_for_selector(selector, timeout=2000): + logger.warning("Additional verification detected - may require manual intervention") + # Wait longer for manual intervention + browser.wait(30000) + return True # Hope the user has completed manual verification + + return True # Assume success if we got this far + except Exception as e: + logger.error(f"Google SSO error: {e}") + return False + + +def _handle_microsoft_sso(browser: Browser, username: str, password: str) -> bool: + """Handle Microsoft/Azure SSO login flow. + + Args: + browser: Browser instance + username: Microsoft email + password: Microsoft password + + Returns: + True if login appears successful + """ + try: + # Enter email + email_selector = "input[type='email'], input[name='loginfmt']" + if browser.wait_for_selector(email_selector, timeout=5000): + element_info = browser.get_element_info(email_selector) + if element_info: + logger.info(f"Found Microsoft email input: {element_info.get('attributes', {})}") + + browser.click_selector(email_selector) + browser.type(username) + + # Click next + next_selector = "input[type='submit'], button:contains('Next')" + if browser.wait_for_selector(next_selector, timeout=2000): + browser.click_selector(next_selector) + browser.wait(3000) + + # Enter password + password_selector = "input[type='password'], input[name='passwd']" + if browser.wait_for_selector(password_selector, timeout=5000): + browser.click_selector(password_selector) + browser.type(password) + + # Click sign in + signin_selector = "input[type='submit'], button:contains('Sign in')" + if browser.wait_for_selector(signin_selector, timeout=2000): + browser.click_selector(signin_selector) + browser.wait(3000) + + # Handle "Stay signed in?" if it appears + stay_selector = "input[type='submit'][value='Yes'], button:contains('Yes')" + if browser.wait_for_selector(stay_selector, timeout=3000): + browser.click_selector(stay_selector) + browser.wait(3000) + + return True + except Exception as e: + logger.error(f"Microsoft SSO error: {e}") + return False + + +def _handle_okta_sso(browser: Browser, username: str, password: str) -> bool: + """Handle Okta SSO login flow. + + Args: + browser: Browser instance + username: Okta username/email + password: Okta password + + Returns: + True if login appears successful + """ + try: + # Enter username/email + username_selector = "input[name='username'], input[id='okta-signin-username']" + if browser.wait_for_selector(username_selector, timeout=5000): + element_info = browser.get_element_info(username_selector) + if element_info: + logger.info(f"Found Okta username input: {element_info.get('attributes', {})}") + + browser.click_selector(username_selector) + browser.type(username) + + # Enter password + password_selector = "input[name='password'], input[id='okta-signin-password']" + if browser.wait_for_selector(password_selector, timeout=3000): + browser.click_selector(password_selector) + browser.type(password) + + # Click sign in + submit_selector = "input[type='submit'], button[type='submit'], button:contains('Sign in')" + if browser.wait_for_selector(submit_selector, timeout=2000): + browser.click_selector(submit_selector) + browser.wait(5000) + + # Handle MFA if present (this is very org-specific) + # This is a simplified example - real MFA handling would need to be customized + push_selector = "button:contains('Send Push'), a:contains('Send Push')" + if browser.wait_for_selector(push_selector, timeout=3000): + browser.click_selector(push_selector) + logger.info("MFA push notification sent - please approve on your device") + browser.wait(20000) # Wait longer for MFA approval + + return True + except Exception as e: + logger.error(f"Okta SSO error: {e}") + return False + + +def _handle_generic_sso(browser: Browser, username: str, password: str) -> bool: + """Handle a generic SSO login flow for unknown providers. + + Args: + browser: Browser instance + username: SSO username/email + password: SSO password + + Returns: + True if login appears successful + """ + try: + # Look for common username/email fields + username_selectors = [ + "input[type='email']", + "input[name='username']", + "input[id='username']", + "input[name='email']" + ] + + for selector in username_selectors: + element_info = browser.get_element_info(selector) + if element_info and element_info.get('isVisible', False): + logger.info(f"Found username input: {element_info['tag']} with attributes: {element_info.get('attributes', {})}") + browser.click_selector(selector) + browser.type(username) + browser.wait(1000) + + # Look for a next/continue button + next_selectors = [ + "button:contains('Next')", + "button:contains('Continue')", + "input[type='submit']", + "button[type='submit']" + ] + + for next_selector in next_selectors: + if browser.wait_for_selector(next_selector, timeout=1000): + browser.click_selector(next_selector) + browser.wait(3000) + break + break + elif browser.wait_for_selector(selector, timeout=1000): + browser.click_selector(selector) + browser.type(username) + browser.wait(1000) + + # Look for a next/continue button + for next_selector in ["button:contains('Next')", "button:contains('Continue')", "input[type='submit']"]: + if browser.wait_for_selector(next_selector, timeout=1000): + browser.click_selector(next_selector) + browser.wait(3000) + break + break + + # Look for common password fields + password_selectors = [ + "input[type='password']", + "input[name='password']", + "input[id='password']" + ] + + for selector in password_selectors: + element_info = browser.get_element_info(selector) + if element_info and element_info.get('isVisible', False): + browser.click_selector(selector) + browser.type(password) + browser.wait(1000) + + # Look for a login/submit button + submit_selectors = [ + "button:contains('Sign in')", + "button:contains('Log in')", + "input[type='submit']", + "button[type='submit']" + ] + + for submit_selector in submit_selectors: + if browser.wait_for_selector(submit_selector, timeout=1000): + browser.click_selector(submit_selector) + browser.wait(5000) + break + break + elif browser.wait_for_selector(selector, timeout=3000): + browser.click_selector(selector) + browser.type(password) + browser.wait(1000) + + # Look for a login/submit button + for submit_selector in ["button:contains('Sign in')", "button:contains('Log in')", "input[type='submit']"]: + if browser.wait_for_selector(submit_selector, timeout=1000): + browser.click_selector(submit_selector) + browser.wait(5000) + break + break + + return True + except Exception as e: + logger.error(f"Generic SSO error: {e}") + return False \ No newline at end of file diff --git a/app/jira_agent/jira.py b/app/jira_agent/jira.py new file mode 100644 index 0000000..c4eb862 --- /dev/null +++ b/app/jira_agent/jira.py @@ -0,0 +1,429 @@ +"""JIRA agent for interacting with JIRA tickets. + +This module provides the JiraAgent class for interacting with JIRA: +- Reading ticket information +- Adding comments +- Changing ticket status +- Extracting data for analysis +""" + +import os +import json +import time +import logging +from typing import Dict, List, Optional, Any +from pathlib import Path +from dotenv import load_dotenv + +# Conditionally import jira for API mode +try: + from jira import JIRA + JIRA_API_AVAILABLE = True +except ImportError: + JIRA_API_AVAILABLE = False + logging.warning("JIRA API package not installed. To use API mode, run: pip install jira") + +# Load environment variables, first trying .env.local +load_dotenv(dotenv_path=".env.local", override=True) +# If .env.local doesn't exist, fall back to .env +if os.getenv("JIRA_URL") is None and os.path.exists(".env"): + load_dotenv(dotenv_path=".env", override=True) +logger = logging.getLogger(__name__) + + +class JiraAgent: + """Agent for interacting with JIRA to read and manipulate tickets.""" + + def __init__(self, + jira_url: str = None, + username: Optional[str] = None, + password: Optional[str] = None, + cache_dir: str = "./cache"): + """Initialize the JIRA agent. + + Args: + jira_url: URL of the JIRA instance + username: JIRA username (if None, reads from JIRA_USERNAME env var) + password: JIRA password (if None, reads from JIRA_PASSWORD env var) + cache_dir: Directory for caching memory + """ + self.jira_url = jira_url or os.getenv("JIRA_URL", "https://mydomain.atlassian.net") + self.username = username or os.getenv("JIRA_USERNAME") + self.password = password or os.getenv("JIRA_PASSWORD") + + if not self.username or not self.password: + raise ValueError("JIRA credentials are required. Set them in .env file or pass them to the constructor.") + + if not JIRA_API_AVAILABLE: + logger.warning("JIRA API package not installed. Install with: pip install jira") + + def get_ticket(self, ticket_id: str, extract_fields: Optional[List[str]] = None) -> Dict[str, Any]: + """Get information about a JIRA ticket. + + Args: + ticket_id: The JIRA ticket ID (e.g., "PROJ-123") + extract_fields: List of additional fields to extract (defaults to common fields) + + Returns: + Dictionary with ticket information + """ + # Default fields to extract if not specified + if extract_fields is None: + extract_fields = ["summary", "description"] + + logger.info(f"Getting information for ticket {ticket_id}") + + # Initialize with basic info in case of errors + ticket_info = { + "id": ticket_id, + "url": f"{self.jira_url}/browse/{ticket_id}", + } + + return self._get_ticket_api(ticket_id, extract_fields) + + def _get_ticket_api(self, ticket_id: str, extract_fields: List[str]) -> Dict[str, Any]: + """Get ticket information using JIRA API. + + Args: + ticket_id: The JIRA ticket ID + extract_fields: List of fields to extract + + Returns: + Dictionary with ticket information + """ + if not JIRA_API_AVAILABLE: + logger.warning("JIRA API not available. Install with: pip install jira") + return { + "id": ticket_id, + "url": f"{self.jira_url}/browse/{ticket_id}", + "error": "JIRA API package not installed. Run: pip install jira" + } + + logger.info(f"Using API to get ticket {ticket_id}") + + # Initialize with basic info + ticket_info = { + "id": ticket_id, + "url": f"{self.jira_url}/browse/{ticket_id}" + } + + try: + # Connect to JIRA + jira = JIRA( + server=self.jira_url, + basic_auth=(self.username, self.password) + ) + + # Get issue + issue = jira.issue(ticket_id) + + # Extract common fields + if "summary" in extract_fields or not extract_fields: + ticket_info["summary"] = issue.fields.summary + + if "description" in extract_fields or not extract_fields: + ticket_info["description"] = issue.fields.description or "" + + if "status" in extract_fields: + ticket_info["status"] = issue.fields.status.name + + if "assignee" in extract_fields: + if issue.fields.assignee: + ticket_info["assignee"] = issue.fields.assignee.displayName + else: + ticket_info["assignee"] = "Unassigned" + + if "priority" in extract_fields: + if issue.fields.priority: + ticket_info["priority"] = issue.fields.priority.name + else: + ticket_info["priority"] = "None" + + if "type" in extract_fields: + if issue.fields.issuetype: + ticket_info["type"] = issue.fields.issuetype.name + else: + ticket_info["type"] = "Unknown" + + if "reporter" in extract_fields: + if issue.fields.reporter: + ticket_info["reporter"] = issue.fields.reporter.displayName + else: + ticket_info["reporter"] = "Unknown" + + if "created" in extract_fields: + ticket_info["created"] = issue.fields.created + + if "updated" in extract_fields: + ticket_info["updated"] = issue.fields.updated + + if "comments" in extract_fields: + comments = [] + for comment in issue.fields.comment.comments: + comments.append({ + "author": comment.author.displayName, + "text": comment.body, + "created": comment.created + }) + ticket_info["comments"] = comments + + if "labels" in extract_fields: + ticket_info["labels"] = issue.fields.labels if issue.fields.labels else [] + + # Extract any custom fields that were requested + field_map = {field['name'].lower(): field['id'] for field in jira.fields()} + for field in extract_fields: + if field not in ticket_info and field.lower() in field_map: + field_id = field_map[field.lower()] + if hasattr(issue.fields, field_id): + value = getattr(issue.fields, field_id) + if value is not None: + ticket_info[field] = value + + return ticket_info + + except Exception as e: + logger.error(f"JIRA API error: {e}") + ticket_info["error"] = f"JIRA API error: {str(e)}" + return ticket_info + + def add_comment(self, ticket_id: str, comment_text: str) -> bool: + """Add a comment to a JIRA ticket. + + Args: + ticket_id: The JIRA ticket ID (e.g., "PROJ-123") + comment_text: The text of the comment to add + + Returns: + True if comment was added successfully + """ + logger.info(f"Adding comment to ticket {ticket_id}") + + if not JIRA_API_AVAILABLE: + logger.warning("JIRA API not available. Install with: pip install jira") + return False + + try: + # Connect to JIRA + jira = JIRA( + server=self.jira_url, + basic_auth=(self.username, self.password) + ) + + # Add comment to the issue + jira.add_comment(ticket_id, comment_text) + + logger.info(f"Comment added to {ticket_id} via API") + return True + + except Exception as e: + logger.error(f"Error adding comment via API: {e}") + return False + + def change_status(self, ticket_id: str, new_status: str) -> bool: + """Change the status of a JIRA ticket. + + Args: + ticket_id: The JIRA ticket ID (e.g., "PROJ-123") + new_status: The new status to set (e.g., "In Progress", "Done") + + Returns: + True if status was changed successfully + """ + logger.info(f"Changing status of ticket {ticket_id} to {new_status}") + + if not JIRA_API_AVAILABLE: + logger.warning("JIRA API not available. Install with: pip install jira") + return False + + try: + # Connect to JIRA + jira = JIRA( + server=self.jira_url, + basic_auth=(self.username, self.password) + ) + + # Get the issue + issue = jira.issue(ticket_id) + + # Get available transitions + transitions = jira.transitions(issue) + + # Find the transition ID for the requested status + transition_id = None + for t in transitions: + if t['name'].lower() == new_status.lower() or t['to']['name'].lower() == new_status.lower(): + transition_id = t['id'] + break + + if not transition_id: + logger.error(f"No transition found for status: {new_status}") + available_statuses = [t['to']['name'] for t in transitions] + logger.info(f"Available statuses: {available_statuses}") + return False + + # Perform the transition + jira.transition_issue(issue, transition_id) + + logger.info(f"Changed status of {ticket_id} to {new_status} via API") + return True + + except Exception as e: + logger.error(f"Error changing status via API: {e}") + return False + + def save_ticket_data(self, ticket_info: Dict[str, Any], output_dir: Optional[str] = None) -> str: + """Save ticket data to a JSON file. + + Args: + ticket_info: Dictionary of ticket data + output_dir: Directory to save file (defaults to ./ticket_data) + + Returns: + Path to saved file + """ + if output_dir is None: + output_dir = os.path.join(os.getcwd(), "ticket_data") + + # Create directory if it doesn't exist + os.makedirs(output_dir, exist_ok=True) + + # Filename from ticket ID + filename = f"{ticket_info['id'].replace('-', '_').lower()}.json" + filepath = os.path.join(output_dir, filename) + + # Save data + with open(filepath, 'w') as f: + json.dump(ticket_info, f, indent=2) + + logger.info(f"Ticket data saved to: {filepath}") + return filepath + + def analyze_ticket(self, ticket_id: str, analysis_endpoint: Optional[str] = None, + analysis_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """Analyze a JIRA ticket using an external service or local processing. + + Args: + ticket_id: The JIRA ticket ID (e.g., "PROJ-123") + analysis_endpoint: Optional endpoint for analysis service + analysis_params: Additional parameters for analysis + + Returns: + Analysis results as a dictionary + """ + if not JIRA_API_AVAILABLE: + logger.warning("JIRA API not available. Install with: pip install jira") + return { + "ticket_id": ticket_id, + "error": "JIRA API package not installed. Run: pip install jira" + } + + logger.info(f"Analyzing ticket {ticket_id}") + + try: + # Connect to JIRA + jira = JIRA( + server=self.jira_url, + basic_auth=(self.username, self.password) + ) + + # Get the issue + issue = jira.issue(ticket_id) + + # Get ticket data + ticket_data = { + "id": ticket_id, + "summary": issue.fields.summary, + "description": issue.fields.description or "", + "status": issue.fields.status.name, + "assignee": issue.fields.assignee.displayName if issue.fields.assignee else "Unassigned", + "reporter": issue.fields.reporter.displayName if issue.fields.reporter else "Unknown", + "type": issue.fields.issuetype.name if issue.fields.issuetype else "Unknown", + "priority": issue.fields.priority.name if issue.fields.priority else "None", + "created": issue.fields.created, + "updated": issue.fields.updated + } + + # Get comments + comments = [] + for comment in issue.fields.comment.comments: + comments.append({ + "author": comment.author.displayName, + "text": comment.body, + "created": comment.created + }) + ticket_data["comments"] = comments + + # Basic analysis + analysis_results = { + "ticket_id": ticket_id, + "data": ticket_data, + "analysis": { + "word_count": len(ticket_data["description"].split()), + "comment_count": len(comments), + "age_days": self._days_since(ticket_data["created"]), + "last_updated_days": self._days_since(ticket_data["updated"]) + } + } + + # If there's an external analysis endpoint, use it + if analysis_endpoint: + try: + import requests + + # Prepare payload + payload = { + "ticket_id": ticket_id, + "ticket_data": ticket_data + } + + # Add any additional parameters + if analysis_params: + payload.update(analysis_params) + + # Call the analysis service + response = requests.post(analysis_endpoint, json=payload) + + if response.status_code == 200: + external_analysis = response.json() + analysis_results["external_analysis"] = external_analysis + logger.info("External analysis completed successfully") + else: + analysis_results["external_analysis_error"] = f"Error: {response.status_code}" + logger.error(f"External analysis failed: {response.status_code}") + except Exception as e: + analysis_results["external_analysis_error"] = str(e) + logger.error(f"Error calling external analysis service: {e}") + + return analysis_results + + except Exception as e: + logger.error(f"Error analyzing ticket via API: {e}") + return { + "ticket_id": ticket_id, + "error": f"JIRA API error: {str(e)}" + } + + def _days_since(self, date_string: str) -> int: + """Calculate days between a date string and now. + + Args: + date_string: Date string in JIRA format + + Returns: + Number of days + """ + from datetime import datetime + import dateutil.parser + + try: + # Parse the date string + issue_date = dateutil.parser.parse(date_string) + + # Calculate difference from now + now = datetime.now(issue_date.tzinfo) + delta = now - issue_date + + return delta.days + except Exception: + return 0 \ No newline at end of file diff --git a/app/jira_agent/selectors.py b/app/jira_agent/selectors.py new file mode 100644 index 0000000..96a7ded --- /dev/null +++ b/app/jira_agent/selectors.py @@ -0,0 +1,84 @@ +"""Selectors for JIRA UI elements. + +This module provides selector dictionaries for various parts of the JIRA UI, +allowing the agent to locate and interact with UI elements consistently. +""" + +# Default selectors for common elements in JIRA UI +DEFAULT_SELECTORS = { + "login_page": { + "username_field": "input[name='username'], input[id='username'], input[name='email'], input[type='email']", + "password_field": "input[name='password'], input[id='password'], input[type='password']", + "login_button": "button[id='login-submit'], button[type='submit'], button:contains('Log in')", + "sso_options": { + "google": [ + "button:contains('Continue with Google')", + "button:contains('Sign in with Google')", + "a:contains('Google')", + "button:contains('Google')", + "button[data-provider='google']", + "div:contains('Google') button", + "a[data-provider='google']", + "a.google", + "a[href*='google']", + ".google-button", + "[id*='google']", + "[class*='google']", + "button[data-testid*='google']", + "img[alt*='Google']", + "img[src*='google']", + "*[aria-label*='Google']" + ], + "microsoft": [ + "button:contains('Continue with Microsoft')", + "button:contains('Sign in with Microsoft')", + "a:contains('Microsoft')", + "button:contains('Microsoft')" + ], + "okta": [ + "button:contains('Continue with Okta')", + "button:contains('Sign in with Okta')", + "a:contains('Okta')" + ], + "generic_sso": [ + "button:contains('Log in with SSO')", + "a:contains('Single Sign-on')", + "button:contains('SSO')" + ] + } + }, + "ticket_page": { + "summary": "[data-test-id='issue.views.issue-base.foundation.summary.heading']", + "description": "[data-test-id='issue.views.field.rich-text.description']", + "status": "[data-test-id='issue.views.issue-base.foundation.status.status-field-wrapper']", + "assignee": "[data-test-id='issue.views.field.user.assignee']", + "priority": "[data-test-id='issue.views.field.select.priority']", + "type": "[data-test-id='issue.views.issue-base.foundation.issue-type.issue-type-field-wrapper']", + "reporter": "[data-test-id='issue.views.field.user.reporter']", + "created": "[data-test-id='issue.views.field.date.created']", + "updated": "[data-test-id='issue.views.field.date.updated']", + "comments": "[data-test-id='issue.views.comments.comment-container']", + "labels": "[data-test-id='issue.views.field.labels']", + "components": "[data-test-id='issue.views.field.select.components']", + "fix_versions": "[data-test-id='issue.views.field.select.fixversions']", + "add_comment": { + "button": "[data-testid='comment-button'], button:contains('Comment')", + "field": "[data-testid='comment-field'], div[role='textbox']", + "submit": "button[type='submit'], button:contains('Save')" + }, + "status_transition": { + "dropdown": "[data-testid='status-dropdown'], button:contains('In Progress')", + "options": { + "in_progress": "button:contains('In Progress')", + "done": "button:contains('Done')", + "to_do": "button:contains('To Do')" + } + } + } +} + +# Common field selectors for extracting ticket data +FIELD_SELECTORS = DEFAULT_SELECTORS["ticket_page"] + +# Authentication providers and their selectors +SSO_SELECTORS = DEFAULT_SELECTORS["login_page"]["sso_options"] \ No newline at end of file diff --git a/examples/jira_api_integration.py b/examples/jira_api_integration.py new file mode 100644 index 0000000..f94c72e --- /dev/null +++ b/examples/jira_api_integration.py @@ -0,0 +1,349 @@ +""" +Example demonstrating how to use the JIRA agent with an external API. + +This example shows how to: +1. Initialize the JIRA agent +2. Extract data from JIRA tickets +3. Send ticket data to an external API for analysis +4. Display the API response +""" + +import os +import json +import requests +import tkinter as tk +from tkinter import scrolledtext +import threading +from dotenv import load_dotenv +from app.jira_agent import ( + JiraAgent, + select_api_with_llm, + extract_endpoints_rule_based, + get_api_documentation +) +import sys + +# Load environment variables from .env.local file +load_dotenv(dotenv_path=".env.local", override=True) + +# Define your API endpoint +# This can be any API that takes JIRA ticket data and returns an analysis +API_ENDPOINT = os.getenv("PROD_SUPPORT_API_URL", "http://localhost:8080/") +API_KEY = os.getenv("ANALYSIS_API_KEY", "") +# Optional: Add LLM API key for documentation parsing +LLM_API_KEY = os.getenv("LLM_API_KEY", "") +LLM_API_URL = os.getenv("LLM_API_URL", "https://api.openai.com/v1/chat/completions") + + +class RedirectText: + """ + A class that redirects print statements to both the console and a tkinter text widget. + """ + def __init__(self, text_widget): + self.text_widget = text_widget + self.buffer = "" + self.original_stdout = sys.__stdout__ + + def write(self, string): + self.buffer += string + self.text_widget.config(state=tk.NORMAL) + self.text_widget.insert(tk.END, string) + self.text_widget.see(tk.END) + self.text_widget.config(state=tk.DISABLED) + # Write to original stdout to avoid recursion + self.original_stdout.write(string) + + def flush(self): + pass + + +def analyze_with_api(ticket_data): + """ + Send ticket data to an external API for analysis. + + Args: + ticket_data: Dictionary containing ticket information + + Returns: + API response text + """ + print(f"Connecting to API at: {API_ENDPOINT}") + + # Get API documentation using the refactored utility + api_documentation, content_type = get_api_documentation(API_ENDPOINT, "/docs/guide") + + if not api_documentation: + print("Could not retrieve API documentation") + api_analysis_result = {"endpoint": None, "relevant_info": None, "endpoint_params": {}} + else: + try: + # Use the refactored LLM parser + api_analysis_result = select_api_with_llm( + ticket_data, + api_documentation, + llm_api_key=LLM_API_KEY, + llm_api_url=LLM_API_URL + ) + + # Make sure api_analysis_result is a dictionary + if not isinstance(api_analysis_result, dict): + print(f"Warning: Expected dictionary from select_api_with_llm but got {type(api_analysis_result)}") + api_analysis_result = {"endpoint": None, "relevant_info": str(api_analysis_result), "endpoint_params": {}} + except Exception as e: + print(f"Error during API documentation analysis: {e}") + api_analysis_result = {"endpoint": None, "relevant_info": None, "endpoint_params": {}} + + # Safely get values from api_analysis_result + selected_endpoint = None + relevant_info = None + endpoint_params = {} + + if isinstance(api_analysis_result, dict): + selected_endpoint = api_analysis_result.get("endpoint") + relevant_info = api_analysis_result.get("relevant_info") + endpoint_params = api_analysis_result.get("endpoint_params", {}) + + # Display the endpoints found + print(f"Selected endpoint: {selected_endpoint}") + + # Display additional information if provided by the LLM + if relevant_info: + print(f"Relevant information from ticket: {relevant_info}") + + if endpoint_params: + print(f"Required parameters for endpoint: {endpoint_params}") + + # Now proceed with the actual analysis + print(f"\nSending ticket data to API for analysis...") + + try: + # Use default endpoint if none was selected + if not selected_endpoint: + selected_endpoint = "/api/v1/analyze" + print(f"No endpoint selected, using default: {selected_endpoint}") + + # Build the request URL + base_url = f"{API_ENDPOINT.rstrip('/')}{selected_endpoint}" + + # Create headers for API request + headers = {} + if API_KEY: + headers["Authorization"] = f"Bearer {API_KEY}" + + # Prepare request parameters based on relevant_info and endpoint_params + request_params = {} + if isinstance(relevant_info, dict): + # For each parameter required by the endpoint, try to find it in relevant_info + if endpoint_params: + for param_name in endpoint_params.keys(): + if param_name in relevant_info: + request_params[param_name] = relevant_info[param_name] + else: + # If we don't have endpoint_params, just use all relevant_info + request_params = relevant_info + + # Include some ticket information if not already in request_params + if "ticket_id" not in request_params and "id" in ticket_data: + request_params["ticket_id"] = ticket_data["id"] + + if "email" not in request_params and isinstance(relevant_info, dict) and "email" in relevant_info: + request_params["email"] = relevant_info["email"] + + # Build the final URL with parameters + url_params = "&".join([f"{k}={v}" for k, v in request_params.items()]) + if "?" not in base_url: + full_url = f"{base_url}?{url_params}" if url_params else base_url + else: + full_url = f"{base_url}&{url_params}" if url_params else base_url + + print(f"Making GET request to: {full_url}") + + # Make the actual API call (GET method only) + response = requests.get(full_url, headers=headers) + + if response.status_code == 200: + return response.text + else: + error_message = f"API call failed with status {response.status_code}: {response.text[:100]}" + print(error_message) + return error_message + + except Exception as e: + error_message = f"Error during API analysis: {e}" + print(error_message) + return error_message + + +def run_analysis(ticket_id, submit_button, status_label, comment_frame, yes_button, no_button): + """Process JIRA ticket and display results in GUI""" + + # Update status + status_label.config(text="Working on your request...") + submit_button.config(state=tk.DISABLED) + + # Hide comment buttons initially + comment_frame.pack_forget() + + # Initialize the JIRA agent + agent = JiraAgent( + jira_url=os.getenv("JIRA_URL"), + username=os.getenv("JIRA_USERNAME"), + password=os.getenv("JIRA_PASSWORD") + ) + + # Get ticket information with additional fields + print(f"\nGetting information for ticket {ticket_id}...") + import time + time.sleep(2) # Add a 2-second delay + + ticket_info = agent.get_ticket( + ticket_id, + extract_fields=["summary", "description", "assignee"] + ) + + # Display basic ticket information + print(f"\nTicket: {ticket_info.get('id')}") + time.sleep(2) # Add a 2-second delay + + print(f"Summary: {ticket_info.get('summary', 'Unknown')}") + time.sleep(2) # Add a 2-second delay + + print(f"Assignee: {ticket_info.get('assignee', 'Unknown')}") + time.sleep(2) # Add a 2-second delay + + # Analyze with external API + print("\nSending to API for analysis...") + time.sleep(2) # Add a 2-second delay + + api_response = analyze_with_api(ticket_info) + + # Display raw API response + print("\nAPI Response:") + time.sleep(2) # Add a 2-second delay + + print(api_response) + + # Store the current ticket_id and api_response for the comment buttons + yes_button.config(command=lambda: post_comment_to_jira(agent, ticket_id, api_response, status_label, comment_frame)) + no_button.config(command=lambda: skip_comment(status_label, comment_frame)) + + # Show the comment option in the UI + comment_frame.pack(fill=tk.X, pady=10) + + # Update status + status_label.config(text="Analysis completed! Post as comment?") + + +def post_comment_to_jira(agent, ticket_id, api_response, status_label, comment_frame): + """Post the API response as a comment to the JIRA ticket""" + status_label.config(text="Posting comment...") + + # Hide comment buttons + comment_frame.pack_forget() + + print(f"\nAdding API response as comment to ticket {ticket_id}...") + result = agent.add_comment(ticket_id, api_response) + if result: + print("API response comment added successfully!") + status_label.config(text="Comment added successfully!") + else: + print("Failed to add comment.") + status_label.config(text="Failed to add comment.") + + print("\nJIRA API integration example completed!") + + +def skip_comment(status_label, comment_frame): + """Skip posting the comment""" + # Hide comment buttons + comment_frame.pack_forget() + + print("\nSkipped posting comment.") + print("\nJIRA API integration example completed!") + status_label.config(text="Analysis completed!") + + +def create_gui(): + """Create a GUI window for JIRA ticket analysis""" + root = tk.Tk() + root.title("JIRA API Integration") + + # Set window size + window_width = 800 + window_height = 600 + root.geometry(f"{window_width}x{window_height}") + + # Center the window on the screen + screen_width = root.winfo_screenwidth() + screen_height = root.winfo_screenheight() + x_position = int((screen_width - window_width) / 2) + y_position = int((screen_height - window_height) / 2) + root.geometry(f"{window_width}x{window_height}+{x_position}+{y_position}") + + # Create frames + input_frame = tk.Frame(root, padx=10, pady=10) + input_frame.pack(fill=tk.X) + + output_frame = tk.Frame(root, padx=10, pady=10) + output_frame.pack(fill=tk.BOTH, expand=True) + + # Create comment frame (initially hidden) + comment_frame = tk.Frame(root, padx=10, pady=10) + + # Add comment question and buttons + tk.Label(comment_frame, text="Post API response as comment to JIRA ticket?").pack(side=tk.LEFT) + yes_button = tk.Button(comment_frame, text="Yes", width=8) + yes_button.pack(side=tk.LEFT, padx=5) + no_button = tk.Button(comment_frame, text="No", width=8) + no_button.pack(side=tk.LEFT, padx=5) + + # Ticket ID input + tk.Label(input_frame, text="Enter JIRA Ticket ID:").pack(side=tk.LEFT) + ticket_entry = tk.Entry(input_frame, width=20) + ticket_entry.pack(side=tk.LEFT, padx=5) + + # Status label + status_label = tk.Label(input_frame, text="Ready") + status_label.pack(side=tk.RIGHT) + + # Output text area + output_text = scrolledtext.ScrolledText(output_frame, wrap=tk.WORD, state=tk.DISABLED) + output_text.pack(fill=tk.BOTH, expand=True) + + # Submit button + def on_submit(): + ticket_id = ticket_entry.get().strip() + if not ticket_id: + status_label.config(text="Please enter a ticket ID") + return + + # Clear output + output_text.config(state=tk.NORMAL) + output_text.delete(1.0, tk.END) + output_text.config(state=tk.DISABLED) + + # Run analysis in a separate thread to keep UI responsive + thread = threading.Thread( + target=run_analysis, + args=(ticket_id, submit_button, status_label, comment_frame, yes_button, no_button) + ) + thread.daemon = True + thread.start() + + submit_button = tk.Button(input_frame, text="Analyze Ticket", command=on_submit) + submit_button.pack(side=tk.LEFT, padx=5) + + # Redirect stdout to the text widget + redirect = RedirectText(output_text) + sys.stdout = redirect + + return root + + +def main(): + """Run the JIRA API integration example with GUI.""" + root = create_gui() + root.mainloop() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/jira_example.py b/examples/jira_example.py new file mode 100644 index 0000000..b3835bc --- /dev/null +++ b/examples/jira_example.py @@ -0,0 +1,132 @@ +""" +Example demonstrating how to use the JIRA agent for interacting with JIRA tickets. + +This example shows how to: +1. Initialize the JIRA agent +2. Get information about a ticket +3. Add comments to tickets +4. Change ticket status +5. Save ticket data +""" + +import os +from dotenv import load_dotenv +from app.jira_agent import JiraAgent + +# Load environment variables from .env.local file +load_dotenv(dotenv_path=".env.local", override=True) + +def main(): + """Run the JIRA example.""" + # Get ticket ID from user + ticket_id = input("Enter JIRA ticket ID (e.g., PROJ-123): ") + + # Initialize the JIRA agent + # Load environment variables, first trying .env.local + load_dotenv(dotenv_path='.env.local') # Try local env first + if not os.getenv("JIRA_URL"): # If not found, try .env + load_dotenv() + + # Get JIRA configuration from environment + jira_url = os.getenv("JIRA_URL") + jira_username = os.getenv("JIRA_USERNAME") + jira_password = os.getenv("JIRA_PASSWORD") + + print("\nUsing JIRA API to interact with tickets") + + # Initialize the JIRA agent + jira_agent = JiraAgent( + jira_url=jira_url, + username=jira_username, + password=jira_password + ) + + # Display menu of options + print("\nJIRA Agent Example") + print("1. Get ticket information") + print("2. Add a comment to the ticket") + print("3. Change ticket status") + print("4. Analyze ticket") + print("5. Do all of the above") + choice = input("\nSelect an option (1-5): ") + + if choice in ("1", "5"): + # Get ticket information + print(f"\nGetting information for ticket {ticket_id}...") + ticket_info = jira_agent.get_ticket(ticket_id) + + # Display ticket information + print("\nTicket Information:") + for key, value in ticket_info.items(): + if key == "comments" and isinstance(value, list): + print(f" {key}: {len(value)} comments") + if value and len(value) > 0: + print(f" First comment: {value[0]['text'][:100]}..." if len(value[0]['text']) > 100 else f" First comment: {value[0]['text']}") + elif isinstance(value, str) and len(value) > 150: + print(f" {key}: {value[:150]}...") + else: + print(f" {key}: {value}") + + # Save ticket data + save_option = input("\nSave ticket data to file? (y/n): ").lower() + if save_option == 'y': + file_path = jira_agent.save_ticket_data(ticket_info) + print(f"Ticket data saved to: {file_path}") + + if choice in ("2", "5"): + # Add a comment + comment_text = input("\nEnter comment text (leave empty to skip): ") + if comment_text: + print(f"Adding comment to ticket {ticket_id}...") + result = jira_agent.add_comment(ticket_id, comment_text) + if result: + print("Comment added successfully!") + else: + print("Failed to add comment.") + + if choice in ("3", "5"): + # Change ticket status + status_options = ["To Do", "In Progress", "Done"] + print("\nStatus options:") + for i, status in enumerate(status_options, 1): + print(f"{i}. {status}") + + status_choice = input("Select new status (1-3, or enter custom status): ") + try: + new_status = status_options[int(status_choice) - 1] + except (ValueError, IndexError): + new_status = status_choice + + if new_status: + print(f"Changing status of ticket {ticket_id} to '{new_status}'...") + result = jira_agent.change_status(ticket_id, new_status) + if result: + print("Status changed successfully!") + else: + print("Failed to change status.") + + if choice in ("4", "5"): + # Analyze ticket + print(f"\nAnalyzing ticket {ticket_id}...") + + # You can specify an API endpoint here if you have one + analysis_endpoint = input("Enter API endpoint for analysis (leave empty to skip): ") + + if analysis_endpoint: + analysis_results = jira_agent.analyze_ticket(ticket_id, analysis_endpoint=analysis_endpoint) + else: + analysis_results = jira_agent.analyze_ticket(ticket_id) + + print("\nAnalysis Results:") + for key, value in analysis_results.items(): + if isinstance(value, dict): + print(f" {key}:") + for k, v in value.items(): + print(f" {k}: {v}") + else: + print(f" {key}: {value}") + + print("\nJIRA example completed!") + +if __name__ == "__main__": + main() \ No newline at end of file