diff --git a/asa_api_cli/optimize.py b/asa_api_cli/optimize.py index 0f2990c..cb66e65 100644 --- a/asa_api_cli/optimize.py +++ b/asa_api_cli/optimize.py @@ -6,7 +6,8 @@ from dataclasses import dataclass, field from datetime import date, timedelta from decimal import Decimal -from typing import Annotated, Any, TypeVar +from pathlib import Path +from typing import Annotated, Any, Literal, TypeVar import typer from asa_api_client.exceptions import AppleSearchAdsError, NotFoundError @@ -24,6 +25,7 @@ NegativeKeywordCreate, Selector, ) +from pydantic import BaseModel, Field from rich.table import Table from asa_api_cli.utils import ( @@ -1120,7 +1122,7 @@ def review_keyword_bids( impressions = row.total.impressions or 0 taps = row.total.taps or 0 - conversions = row.total.installs or 0 + conversions = row.total.tap_installs or 0 spend_amount = row.total.local_spend.amount if row.total.local_spend else "0" spend = Decimal(str(spend_amount)) currency = row.total.local_spend.currency if row.total.local_spend else "USD" @@ -1650,7 +1652,7 @@ def aggregate_report(report: Any) -> dict[int, dict[str, Any]]: } perf[ag_id]["impressions"] += row.total.impressions or 0 perf[ag_id]["taps"] += row.total.taps or 0 - perf[ag_id]["installs"] += row.total.installs or 0 + perf[ag_id]["installs"] += row.total.tap_installs or 0 if row.total.local_spend: perf[ag_id]["spend"] += Decimal(str(row.total.local_spend.amount)) perf[ag_id]["currency"] = row.total.local_spend.currency @@ -1984,3 +1986,471 @@ def aggregate_report(report: Any) -> dict[int, dict[str, Any]]: except AppleSearchAdsError as e: handle_api_error(e) raise typer.Exit(EXIT_ERROR) from None + + +# ============================================================================ +# Negative Keyword Suggestions (AI-Powered) +# ============================================================================ + + +@dataclass +class SearchTermAnalysis: + """A search term with performance data for negative keyword analysis.""" + + search_term: str + campaign_id: int + campaign_name: str + ad_group_id: int + ad_group_name: str + impressions: int + taps: int + installs: int + spend: Decimal + currency: str + matched_keyword: str | None = None + + +class NegativeSuggestion(BaseModel): # type: ignore[misc] + """AI suggestion for a single search term.""" + + search_term: str = Field(description="The search term being evaluated") + action: Literal["negative", "keep", "review"] = Field( + description="Whether to add as negative keyword, keep, or flag for manual review" + ) + reason: str = Field(description="Brief explanation for the recommendation") + + +class NegativeKeywordSuggestions(BaseModel): # type: ignore[misc] + """Batch of negative keyword suggestions from AI analysis.""" + + suggestions: list[NegativeSuggestion] = Field(description="List of suggestions for each search term") + + +NEGATIVE_KEYWORD_SYSTEM_PROMPT = ( + "You are an expert Apple Search Ads optimiser specialising in " + "negative keyword strategy.\n\n" + "You will receive a list of search terms from App Store Search Ads " + "campaigns along with their performance metrics and the app description.\n\n" + "Your job is to classify each search term as:\n" + '- "negative": Irrelevant to the app — add as a negative keyword to stop wasting spend.\n' + '- "keep": Relevant and performing well or worth testing further.\n' + '- "review": Ambiguous — may be relevant but performance is poor, needs human judgement.\n\n' + "Guidelines:\n" + "1. A search term is irrelevant if it clearly targets a different app, product, or intent.\n" + '2. High spend with zero installs over a meaningful period is a strong signal for "negative".\n' + "3. Low-volume terms (few impressions/taps) with no installs may just need more data " + '— prefer "review" over "negative" unless clearly irrelevant.\n' + "4. Brand names of competitors are usually worth keeping unless spend is excessive.\n" + '5. Generic terms tangentially related to the app deserve "review", not automatic "negative".\n' + "6. Consider the app context provided — what the app does and who it serves.\n" + '7. Be conservative: only recommend "negative" when you are confident the term is irrelevant.' +) + + +def _get_negative_analysis_agent( + settings: Any, + provider: str | None = None, +) -> Any: + """Create a PydanticAI agent for negative keyword analysis.""" + from pydantic_ai import Agent + from pydantic_ai.models import Model + + active_provider = provider or settings.translate_provider + + model: Model + if active_provider == "anthropic": + if not settings.anthropic_api_key: + raise RuntimeError( + "ANTHROPIC_API_KEY is required for AI analysis.\n" + "Add it to your .env file or set the environment variable." + ) + from pydantic_ai.models.anthropic import AnthropicModel + from pydantic_ai.providers.anthropic import AnthropicProvider + + model = AnthropicModel("claude-sonnet-4-5", provider=AnthropicProvider(api_key=settings.anthropic_api_key)) + elif active_provider == "gemini": + if not settings.gemini_api_key: + raise RuntimeError( + "GEMINI_API_KEY is required for AI analysis.\nAdd it to your .env file or set the environment variable." + ) + from pydantic_ai.models.google import GoogleModel + from pydantic_ai.providers.google import GoogleProvider + + model = GoogleModel("gemini-2.0-flash", provider=GoogleProvider(api_key=settings.gemini_api_key)) + else: + raise ValueError(f"Unknown provider: {active_provider}") + + return Agent( + model, + output_type=NegativeKeywordSuggestions, + system_prompt=NEGATIVE_KEYWORD_SYSTEM_PROMPT, + ) + + +async def _analyse_search_terms( + terms: list[SearchTermAnalysis], + app_description: str, + settings: Any, + provider: str | None = None, +) -> NegativeKeywordSuggestions: + """Send search terms to AI for negative keyword classification.""" + agent = _get_negative_analysis_agent(settings, provider) + + lines = [] + for t in terms: + cpt = t.spend / t.taps if t.taps > 0 else Decimal("0") + lines.append( + f'- "{t.search_term}" | matched: "{t.matched_keyword or "?"}" ' + f"| impr: {t.impressions} | taps: {t.taps} | installs: {t.installs} " + f"| spend: {t.spend:.2f} {t.currency} | CPT: {cpt:.2f}" + ) + + prompt = f"""App context: {app_description} + +Analyse these search terms and recommend which should be added as negative keywords: + +{chr(10).join(lines)} + +Classify each search term as "negative", "keep", or "review" with a brief reason.""" + + result = await agent.run(prompt) + return result.output # type: ignore[no-any-return] + + +@app.command("negatives") +def negatives( + campaign_id: Annotated[ + int | None, + typer.Option("--campaign-id", "-c", help="Analyse a single campaign (default: all campaigns)"), + ] = None, + days: Annotated[ + int, + typer.Option("--days", "-d", help="Days of search term data to analyse"), + ] = 30, + min_spend: Annotated[ + float, + typer.Option("--min-spend", help="Minimum spend to include a search term"), + ] = 0.0, + min_impressions: Annotated[ + int, + typer.Option("--min-impressions", help="Minimum impressions to include a search term"), + ] = 0, + country: Annotated[ + str | None, + typer.Option("--country", help="Filter campaigns by country code (e.g. US, GB)"), + ] = None, + provider: Annotated[ + str | None, + typer.Option("--provider", "-p", help="AI provider: anthropic or gemini"), + ] = None, + env_file: Annotated[ + Path | None, + typer.Option("--env-file", "-e", help="Path to .env file with API keys"), + ] = None, + apply: Annotated[ + bool, + typer.Option("--apply", help="Apply suggested negatives (campaign-level exact match)"), + ] = False, + dry_run: Annotated[ + bool, + typer.Option("--dry-run", "-n", help="Show suggestions without applying changes"), + ] = False, + output: Annotated[ + Path | None, + typer.Option("--output", "-o", help="Export suggestions to CSV file"), + ] = None, +) -> None: + """Analyse search terms with AI and suggest negative keywords. + + Pulls search term reports across all campaigns (or a single campaign), + sends performance data to an AI model for relevance classification, + and optionally applies the suggestions as campaign-level negative keywords. + + Requires ANTHROPIC_API_KEY or GEMINI_API_KEY in environment or .env file. + """ + import asyncio + + from asa_api_cli.translate import get_translate_settings + + try: + settings = get_translate_settings(env_file) + except Exception as e: + print_error("Configuration Error", str(e)) + raise typer.Exit(EXIT_ERROR) from None + + end_date = date.today() + start_date = end_date - timedelta(days=days) + + client = get_client() + + try: + with client: + # Collect campaigns to analyse + if campaign_id: + with spinner("Fetching campaign..."): + campaign = client.campaigns.get(campaign_id) + target_campaigns = [campaign] + else: + with spinner("Loading campaigns..."): + target_campaigns = list(client.campaigns.find(Selector().where("status", "==", "ENABLED"))) + + if country: + country = country.upper() + target_campaigns = [c for c in target_campaigns if country in (c.countries_or_regions or [])] + + if not target_campaigns: + print_warning("No campaigns found matching filters") + return + + print_info(f"Analysing search terms from {len(target_campaigns)} campaign(s) ({start_date} to {end_date})") + + # Collect search term data across all campaigns + all_terms: list[SearchTermAnalysis] = [] + + with spinner("Pulling search term reports..."): + for camp in target_campaigns: + try: + report = client.reports.search_terms( + campaign_id=camp.id, + start_date=start_date, + end_date=end_date, + granularity=GranularityType.DAILY, + ) + + if not report.row: + continue + + # Aggregate by (search_term, ad_group_id) across daily rows + agg: dict[tuple[str, int], SearchTermAnalysis] = {} + for row in report.row: + if not row.total or not row.metadata.search_term_text: + continue + + key = (row.metadata.search_term_text, row.metadata.ad_group_id or 0) + + if key not in agg: + spend_amt = ( + Decimal(str(row.total.local_spend.amount)) + if row.total.local_spend + else Decimal("0") + ) + currency = row.total.local_spend.currency if row.total.local_spend else "USD" + agg[key] = SearchTermAnalysis( + search_term=row.metadata.search_term_text, + campaign_id=camp.id, + campaign_name=camp.name, + ad_group_id=row.metadata.ad_group_id or 0, + ad_group_name=row.metadata.ad_group_name or "", + impressions=row.total.impressions or 0, + taps=row.total.taps or 0, + installs=row.total.tap_installs or 0, + spend=spend_amt, + currency=currency, + matched_keyword=row.metadata.keyword, + ) + else: + existing = agg[key] + existing.impressions += row.total.impressions or 0 + existing.taps += row.total.taps or 0 + existing.installs += row.total.tap_installs or 0 + if row.total.local_spend: + existing.spend += Decimal(str(row.total.local_spend.amount)) + + all_terms.extend(agg.values()) + except AppleSearchAdsError: + continue + + if not all_terms: + print_warning("No search term data found for the specified period") + return + + # Filter by thresholds + if min_spend > 0: + all_terms = [t for t in all_terms if t.spend >= Decimal(str(min_spend))] + if min_impressions > 0: + all_terms = [t for t in all_terms if t.impressions >= min_impressions] + + if not all_terms: + print_warning("No search terms match the specified thresholds") + return + + # Sort by spend descending to prioritise highest waste + all_terms.sort(key=lambda t: t.spend, reverse=True) + + print_info(f"Found {len(all_terms)} search terms to analyse") + + # Build app context from campaign names + app_names = {c.name.split(" - ")[0].strip() for c in target_campaigns} + app_description = f"App: {', '.join(app_names)}" if app_names else "Unknown app" + + # Process in batches (AI context limits) + batch_size = 100 + all_suggestions: list[tuple[SearchTermAnalysis, NegativeSuggestion]] = [] + + for i in range(0, len(all_terms), batch_size): + batch = all_terms[i : i + batch_size] + batch_num = i // batch_size + 1 + total_batches = (len(all_terms) + batch_size - 1) // batch_size + + with spinner(f"AI analysis batch {batch_num}/{total_batches}..."): + try: + result = asyncio.run(_analyse_search_terms(batch, app_description, settings, provider)) + except RuntimeError as e: + print_error("AI Error", str(e)) + raise typer.Exit(EXIT_ERROR) from None + + # Match suggestions back to term data + suggestion_map = {s.search_term: s for s in result.suggestions} + for term in batch: + suggestion = suggestion_map.get(term.search_term) + if suggestion: + all_suggestions.append((term, suggestion)) + + if not all_suggestions: + print_warning("AI returned no suggestions") + return + + # Separate by action + negatives_list = [(t, s) for t, s in all_suggestions if s.action == "negative"] + review_list = [(t, s) for t, s in all_suggestions if s.action == "review"] + keep_list = [(t, s) for t, s in all_suggestions if s.action == "keep"] + + # Display results + console.print() + + if negatives_list: + table = Table(title=f"Suggested Negatives ({len(negatives_list)})") + table.add_column("Search Term", style="red", max_width=30) + table.add_column("Campaign", style="cyan", max_width=20) + table.add_column("Impr", justify="right") + table.add_column("Taps", justify="right") + table.add_column("Installs", justify="right") + table.add_column("Spend", justify="right") + table.add_column("Reason", max_width=35) + + for term, suggestion in negatives_list: + table.add_row( + term.search_term, + term.campaign_name[:20], + f"{term.impressions:,}", + f"{term.taps:,}", + f"{term.installs:,}", + f"{term.spend:.2f} {term.currency}", + suggestion.reason, + ) + + console.print(table) + console.print() + + if review_list: + table = Table(title=f"Needs Review ({len(review_list)})") + table.add_column("Search Term", style="yellow", max_width=30) + table.add_column("Campaign", style="cyan", max_width=20) + table.add_column("Impr", justify="right") + table.add_column("Taps", justify="right") + table.add_column("Installs", justify="right") + table.add_column("Spend", justify="right") + table.add_column("Reason", max_width=35) + + for term, suggestion in review_list: + table.add_row( + term.search_term, + term.campaign_name[:20], + f"{term.impressions:,}", + f"{term.taps:,}", + f"{term.installs:,}", + f"{term.spend:.2f} {term.currency}", + suggestion.reason, + ) + + console.print(table) + console.print() + + # Summary + total_negative_spend = sum(t.spend for t, _ in negatives_list) + print_result_panel( + "Analysis Summary", + { + "Search terms analysed": str(len(all_suggestions)), + "Suggested negatives": str(len(negatives_list)), + "Needs review": str(len(review_list)), + "Keep": str(len(keep_list)), + "Wasted spend (negatives)": f"{total_negative_spend:.2f}", + }, + ) + + # Export to CSV + if output: + try: + import csv + + with open(output, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow( + [ + "search_term", + "action", + "reason", + "campaign_name", + "ad_group_name", + "matched_keyword", + "impressions", + "taps", + "installs", + "spend", + "currency", + ] + ) + for term, suggestion in all_suggestions: + writer.writerow( + [ + term.search_term, + suggestion.action, + suggestion.reason, + term.campaign_name, + term.ad_group_name, + term.matched_keyword or "", + term.impressions, + term.taps, + term.installs, + f"{term.spend:.2f}", + term.currency, + ] + ) + print_success(f"Exported {len(all_suggestions)} suggestions to {output}") + except Exception as e: + print_error("Export failed", str(e)) + + # Apply negatives + if apply and not dry_run and negatives_list: + by_campaign: dict[int, list[tuple[SearchTermAnalysis, NegativeSuggestion]]] = defaultdict(list) + for term, suggestion in negatives_list: + by_campaign[term.campaign_id].append((term, suggestion)) + + total_added = 0 + for camp_id, items in by_campaign.items(): + camp_name = items[0][0].campaign_name + neg_creates = [ + NegativeKeywordCreate( + text=term.search_term, + match_type=KeywordMatchType.EXACT, + ) + for term, _ in items + ] + + try: + with spinner(f"Adding {len(neg_creates)} negatives to '{camp_name}'..."): + client.campaigns(camp_id).negative_keywords.create_bulk(neg_creates) + total_added += len(neg_creates) + except AppleSearchAdsError as e: + print_error(f"Failed to add negatives to '{camp_name}'", e.message) + + if total_added > 0: + print_success(f"Added {total_added} negative keywords across {len(by_campaign)} campaign(s)") + + elif apply and dry_run: + print_info("Dry run — no changes applied. Remove --dry-run to apply negatives.") + + except AppleSearchAdsError as e: + handle_api_error(e) + raise typer.Exit(EXIT_ERROR) from None diff --git a/asa_api_cli/reports.py b/asa_api_cli/reports.py index 4353201..f7484dd 100644 --- a/asa_api_cli/reports.py +++ b/asa_api_cli/reports.py @@ -59,12 +59,12 @@ def report_row_to_dict(row: object) -> dict[str, Any]: total = row.total # type: ignore result["impressions"] = total.impressions result["taps"] = total.taps - result["installs"] = total.installs + result["installs"] = total.tap_installs result["ttr"] = total.ttr - result["conv_rate"] = total.conversion_rate + result["conv_rate"] = total.tap_install_rate result["spend"] = total.local_spend.amount if total.local_spend else None result["avg_cpt"] = total.avg_cpt.amount if total.avg_cpt else None - result["avg_cpa"] = total.avg_cpa.amount if total.avg_cpa else None + result["avg_cpa"] = total.total_avg_cpi.amount if total.total_avg_cpi else None return result @@ -130,12 +130,12 @@ def print_grand_totals(report: object) -> None: lines = [] lines.append(f"[label]Impressions:[/label] [value]{format_number(total.impressions)}[/value]") lines.append(f"[label]Taps:[/label] [value]{format_number(total.taps)}[/value]") - lines.append(f"[label]Installs:[/label] [value]{format_number(total.installs)}[/value]") + lines.append(f"[label]Installs:[/label] [value]{format_number(total.tap_installs)}[/value]") if total.ttr is not None: lines.append(f"[label]TTR:[/label] [value]{format_percent(total.ttr)}[/value]") - if total.conversion_rate is not None: - lines.append(f"[label]Conv Rate:[/label] [value]{format_percent(total.conversion_rate)}[/value]") + if total.tap_install_rate is not None: + lines.append(f"[label]Conv Rate:[/label] [value]{format_percent(total.tap_install_rate)}[/value]") if total.local_spend: spend = total.local_spend lines.append(f"[label]Total Spend:[/label] [value]{spend.amount} {spend.currency}[/value]") diff --git a/pyproject.toml b/pyproject.toml index 333d3a2..e72d61f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ classifiers = [ "Typing :: Typed", ] dependencies = [ - "asa-api-client>=0.1.5", + "asa-api-client>=0.2.1", "typer>=0.13.0", "rich>=13.9.0", "pydantic-ai>=0.1.0", diff --git a/uv.lock b/uv.lock index 14c0835..1d6a8f7 100644 --- a/uv.lock +++ b/uv.lock @@ -201,7 +201,7 @@ dev = [ [package.metadata] requires-dist = [ - { name = "asa-api-client", specifier = ">=0.1.5" }, + { name = "asa-api-client", specifier = ">=0.2.1" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.13.0" }, { name = "pre-commit", marker = "extra == 'dev'", specifier = ">=4.0.0" }, { name = "pydantic-ai", specifier = ">=0.1.0" }, @@ -217,7 +217,7 @@ provides-extras = ["dev"] [[package]] name = "asa-api-client" -version = "0.1.6" +version = "0.2.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "httpx" }, @@ -225,9 +225,9 @@ dependencies = [ { name = "pydantic-settings" }, { name = "pyjwt", extra = ["crypto"] }, ] -sdist = { url = "https://files.pythonhosted.org/packages/cb/3c/6e4746ab9b9ae67b8bdc62247fa794f59721915550db3c284d45968ec506/asa_api_client-0.1.6.tar.gz", hash = "sha256:c0b27d5d91d1ea4b775afb7291a5d974e53de3f9a02343864d7909430a5bface", size = 104081, upload-time = "2025-12-11T23:06:42.809Z" } +sdist = { url = "https://files.pythonhosted.org/packages/f7/e8/6c0f77eabb48d28e1239e65bea3e118d756d49313817b311b2023b64d272/asa_api_client-0.2.1.tar.gz", hash = "sha256:5069e5e613b924d4cd1efca9601c43d35e61df998775913c328c99ddeac9cbaf", size = 113157, upload-time = "2026-06-09T00:43:03.74Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/fb/32/33715fe700870397fb196699e4b06334c6ead28608f9822bb77abd56a7be/asa_api_client-0.1.6-py3-none-any.whl", hash = "sha256:0a478048247ebaca1d7439a6bff577943e02f0b525e9226c4736f6dcba7ea35f", size = 49703, upload-time = "2025-12-11T23:06:41.318Z" }, + { url = "https://files.pythonhosted.org/packages/6f/4b/9ebc458fc0030f562ee5dcc00cffb7d673f7489d49ae3043969b1e9bae3d/asa_api_client-0.2.1-py3-none-any.whl", hash = "sha256:1e474bce3db750f2cb70cfad7f948132a7236ff42b1bc2a8bc071578d3338ef0", size = 65543, upload-time = "2026-06-09T00:43:02.259Z" }, ] [[package]]