From c09c58f4d490893e72b5d08d2becb98a9a102e18 Mon Sep 17 00:00:00 2001 From: Sam Petherbridge Date: Tue, 9 Jun 2026 11:40:54 +1000 Subject: [PATCH 1/2] =?UTF-8?q?=E2=AC=86=EF=B8=8F=20Adopt=20asa-api-client?= =?UTF-8?q?=200.2.1=20and=20adapt=20report=20fields?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bump the dependency floor to >=0.2.1 and refresh the lockfile to pick up the v0.2.0 API v5 coverage release (and v0.2.1 maintenance fixes). Adapt to renamed report metric fields: - installs -> tap_installs - conversion_rate -> tap_install_rate - avg_cpa -> total_avg_cpi Closes #30 --- asa_api_cli/optimize.py | 4 ++-- asa_api_cli/reports.py | 12 ++++++------ pyproject.toml | 2 +- uv.lock | 8 ++++---- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/asa_api_cli/optimize.py b/asa_api_cli/optimize.py index 0f2990c..716d9e6 100644 --- a/asa_api_cli/optimize.py +++ b/asa_api_cli/optimize.py @@ -1120,7 +1120,7 @@ def review_keyword_bids( impressions = row.total.impressions or 0 taps = row.total.taps or 0 - conversions = row.total.installs or 0 + conversions = row.total.tap_installs or 0 spend_amount = row.total.local_spend.amount if row.total.local_spend else "0" spend = Decimal(str(spend_amount)) currency = row.total.local_spend.currency if row.total.local_spend else "USD" @@ -1650,7 +1650,7 @@ def aggregate_report(report: Any) -> dict[int, dict[str, Any]]: } perf[ag_id]["impressions"] += row.total.impressions or 0 perf[ag_id]["taps"] += row.total.taps or 0 - perf[ag_id]["installs"] += row.total.installs or 0 + perf[ag_id]["installs"] += row.total.tap_installs or 0 if row.total.local_spend: perf[ag_id]["spend"] += Decimal(str(row.total.local_spend.amount)) perf[ag_id]["currency"] = row.total.local_spend.currency diff --git a/asa_api_cli/reports.py b/asa_api_cli/reports.py index 4353201..f7484dd 100644 --- a/asa_api_cli/reports.py +++ b/asa_api_cli/reports.py @@ -59,12 +59,12 @@ def report_row_to_dict(row: object) -> dict[str, Any]: total = row.total # type: ignore result["impressions"] = total.impressions result["taps"] = total.taps - result["installs"] = total.installs + result["installs"] = total.tap_installs result["ttr"] = total.ttr - result["conv_rate"] = total.conversion_rate + result["conv_rate"] = total.tap_install_rate result["spend"] = total.local_spend.amount if total.local_spend else None result["avg_cpt"] = total.avg_cpt.amount if total.avg_cpt else None - result["avg_cpa"] = total.avg_cpa.amount if total.avg_cpa else None + result["avg_cpa"] = total.total_avg_cpi.amount if total.total_avg_cpi else None return result @@ -130,12 +130,12 @@ def print_grand_totals(report: object) -> None: lines = [] lines.append(f"[label]Impressions:[/label] [value]{format_number(total.impressions)}[/value]") lines.append(f"[label]Taps:[/label] [value]{format_number(total.taps)}[/value]") - lines.append(f"[label]Installs:[/label] [value]{format_number(total.installs)}[/value]") + lines.append(f"[label]Installs:[/label] [value]{format_number(total.tap_installs)}[/value]") if total.ttr is not None: lines.append(f"[label]TTR:[/label] [value]{format_percent(total.ttr)}[/value]") - if total.conversion_rate is not None: - lines.append(f"[label]Conv Rate:[/label] [value]{format_percent(total.conversion_rate)}[/value]") + if total.tap_install_rate is not None: + lines.append(f"[label]Conv Rate:[/label] [value]{format_percent(total.tap_install_rate)}[/value]") if total.local_spend: spend = total.local_spend lines.append(f"[label]Total Spend:[/label] [value]{spend.amount} {spend.currency}[/value]") diff --git a/pyproject.toml b/pyproject.toml index 333d3a2..e72d61f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ classifiers = [ "Typing :: Typed", ] dependencies = [ - "asa-api-client>=0.1.5", + "asa-api-client>=0.2.1", "typer>=0.13.0", "rich>=13.9.0", "pydantic-ai>=0.1.0", diff --git a/uv.lock b/uv.lock index 14c0835..1d6a8f7 100644 --- a/uv.lock +++ b/uv.lock @@ -201,7 +201,7 @@ dev = [ [package.metadata] requires-dist = [ - { name = "asa-api-client", specifier = ">=0.1.5" }, + { name = "asa-api-client", specifier = ">=0.2.1" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.13.0" }, { name = "pre-commit", marker = "extra == 'dev'", specifier = ">=4.0.0" }, { name = "pydantic-ai", specifier = ">=0.1.0" }, @@ -217,7 +217,7 @@ provides-extras = ["dev"] [[package]] name = "asa-api-client" -version = "0.1.6" +version = "0.2.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "httpx" }, @@ -225,9 +225,9 @@ dependencies = [ { name = "pydantic-settings" }, { name = "pyjwt", extra = ["crypto"] }, ] -sdist = { url = "https://files.pythonhosted.org/packages/cb/3c/6e4746ab9b9ae67b8bdc62247fa794f59721915550db3c284d45968ec506/asa_api_client-0.1.6.tar.gz", hash = "sha256:c0b27d5d91d1ea4b775afb7291a5d974e53de3f9a02343864d7909430a5bface", size = 104081, upload-time = "2025-12-11T23:06:42.809Z" } +sdist = { url = "https://files.pythonhosted.org/packages/f7/e8/6c0f77eabb48d28e1239e65bea3e118d756d49313817b311b2023b64d272/asa_api_client-0.2.1.tar.gz", hash = "sha256:5069e5e613b924d4cd1efca9601c43d35e61df998775913c328c99ddeac9cbaf", size = 113157, upload-time = "2026-06-09T00:43:03.74Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/fb/32/33715fe700870397fb196699e4b06334c6ead28608f9822bb77abd56a7be/asa_api_client-0.1.6-py3-none-any.whl", hash = "sha256:0a478048247ebaca1d7439a6bff577943e02f0b525e9226c4736f6dcba7ea35f", size = 49703, upload-time = "2025-12-11T23:06:41.318Z" }, + { url = "https://files.pythonhosted.org/packages/6f/4b/9ebc458fc0030f562ee5dcc00cffb7d673f7489d49ae3043969b1e9bae3d/asa_api_client-0.2.1-py3-none-any.whl", hash = "sha256:1e474bce3db750f2cb70cfad7f948132a7236ff42b1bc2a8bc071578d3338ef0", size = 65543, upload-time = "2026-06-09T00:43:02.259Z" }, ] [[package]] From 6c5c0a8d887fed0aa5c9daaf5e5ffb8ef1d507d1 Mon Sep 17 00:00:00 2001 From: Sam Petherbridge Date: Tue, 9 Jun 2026 11:41:13 +1000 Subject: [PATCH 2/2] =?UTF-8?q?=E2=9C=A8=20Add=20AI-powered=20negative=20k?= =?UTF-8?q?eyword=20suggestions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add 'asa optimize negatives' command that pulls search term reports across campaigns, sends performance data to an AI model (Anthropic or Gemini via PydanticAI) for relevance classification, and optionally applies suggestions as campaign-level negative keywords. Supports filtering by spend/impressions/country, CSV export, dry-run, and batched analysis for large term sets. --- asa_api_cli/optimize.py | 472 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 471 insertions(+), 1 deletion(-) diff --git a/asa_api_cli/optimize.py b/asa_api_cli/optimize.py index 716d9e6..cb66e65 100644 --- a/asa_api_cli/optimize.py +++ b/asa_api_cli/optimize.py @@ -6,7 +6,8 @@ from dataclasses import dataclass, field from datetime import date, timedelta from decimal import Decimal -from typing import Annotated, Any, TypeVar +from pathlib import Path +from typing import Annotated, Any, Literal, TypeVar import typer from asa_api_client.exceptions import AppleSearchAdsError, NotFoundError @@ -24,6 +25,7 @@ NegativeKeywordCreate, Selector, ) +from pydantic import BaseModel, Field from rich.table import Table from asa_api_cli.utils import ( @@ -1984,3 +1986,471 @@ def aggregate_report(report: Any) -> dict[int, dict[str, Any]]: except AppleSearchAdsError as e: handle_api_error(e) raise typer.Exit(EXIT_ERROR) from None + + +# ============================================================================ +# Negative Keyword Suggestions (AI-Powered) +# ============================================================================ + + +@dataclass +class SearchTermAnalysis: + """A search term with performance data for negative keyword analysis.""" + + search_term: str + campaign_id: int + campaign_name: str + ad_group_id: int + ad_group_name: str + impressions: int + taps: int + installs: int + spend: Decimal + currency: str + matched_keyword: str | None = None + + +class NegativeSuggestion(BaseModel): # type: ignore[misc] + """AI suggestion for a single search term.""" + + search_term: str = Field(description="The search term being evaluated") + action: Literal["negative", "keep", "review"] = Field( + description="Whether to add as negative keyword, keep, or flag for manual review" + ) + reason: str = Field(description="Brief explanation for the recommendation") + + +class NegativeKeywordSuggestions(BaseModel): # type: ignore[misc] + """Batch of negative keyword suggestions from AI analysis.""" + + suggestions: list[NegativeSuggestion] = Field(description="List of suggestions for each search term") + + +NEGATIVE_KEYWORD_SYSTEM_PROMPT = ( + "You are an expert Apple Search Ads optimiser specialising in " + "negative keyword strategy.\n\n" + "You will receive a list of search terms from App Store Search Ads " + "campaigns along with their performance metrics and the app description.\n\n" + "Your job is to classify each search term as:\n" + '- "negative": Irrelevant to the app — add as a negative keyword to stop wasting spend.\n' + '- "keep": Relevant and performing well or worth testing further.\n' + '- "review": Ambiguous — may be relevant but performance is poor, needs human judgement.\n\n' + "Guidelines:\n" + "1. A search term is irrelevant if it clearly targets a different app, product, or intent.\n" + '2. High spend with zero installs over a meaningful period is a strong signal for "negative".\n' + "3. Low-volume terms (few impressions/taps) with no installs may just need more data " + '— prefer "review" over "negative" unless clearly irrelevant.\n' + "4. Brand names of competitors are usually worth keeping unless spend is excessive.\n" + '5. Generic terms tangentially related to the app deserve "review", not automatic "negative".\n' + "6. Consider the app context provided — what the app does and who it serves.\n" + '7. Be conservative: only recommend "negative" when you are confident the term is irrelevant.' +) + + +def _get_negative_analysis_agent( + settings: Any, + provider: str | None = None, +) -> Any: + """Create a PydanticAI agent for negative keyword analysis.""" + from pydantic_ai import Agent + from pydantic_ai.models import Model + + active_provider = provider or settings.translate_provider + + model: Model + if active_provider == "anthropic": + if not settings.anthropic_api_key: + raise RuntimeError( + "ANTHROPIC_API_KEY is required for AI analysis.\n" + "Add it to your .env file or set the environment variable." + ) + from pydantic_ai.models.anthropic import AnthropicModel + from pydantic_ai.providers.anthropic import AnthropicProvider + + model = AnthropicModel("claude-sonnet-4-5", provider=AnthropicProvider(api_key=settings.anthropic_api_key)) + elif active_provider == "gemini": + if not settings.gemini_api_key: + raise RuntimeError( + "GEMINI_API_KEY is required for AI analysis.\nAdd it to your .env file or set the environment variable." + ) + from pydantic_ai.models.google import GoogleModel + from pydantic_ai.providers.google import GoogleProvider + + model = GoogleModel("gemini-2.0-flash", provider=GoogleProvider(api_key=settings.gemini_api_key)) + else: + raise ValueError(f"Unknown provider: {active_provider}") + + return Agent( + model, + output_type=NegativeKeywordSuggestions, + system_prompt=NEGATIVE_KEYWORD_SYSTEM_PROMPT, + ) + + +async def _analyse_search_terms( + terms: list[SearchTermAnalysis], + app_description: str, + settings: Any, + provider: str | None = None, +) -> NegativeKeywordSuggestions: + """Send search terms to AI for negative keyword classification.""" + agent = _get_negative_analysis_agent(settings, provider) + + lines = [] + for t in terms: + cpt = t.spend / t.taps if t.taps > 0 else Decimal("0") + lines.append( + f'- "{t.search_term}" | matched: "{t.matched_keyword or "?"}" ' + f"| impr: {t.impressions} | taps: {t.taps} | installs: {t.installs} " + f"| spend: {t.spend:.2f} {t.currency} | CPT: {cpt:.2f}" + ) + + prompt = f"""App context: {app_description} + +Analyse these search terms and recommend which should be added as negative keywords: + +{chr(10).join(lines)} + +Classify each search term as "negative", "keep", or "review" with a brief reason.""" + + result = await agent.run(prompt) + return result.output # type: ignore[no-any-return] + + +@app.command("negatives") +def negatives( + campaign_id: Annotated[ + int | None, + typer.Option("--campaign-id", "-c", help="Analyse a single campaign (default: all campaigns)"), + ] = None, + days: Annotated[ + int, + typer.Option("--days", "-d", help="Days of search term data to analyse"), + ] = 30, + min_spend: Annotated[ + float, + typer.Option("--min-spend", help="Minimum spend to include a search term"), + ] = 0.0, + min_impressions: Annotated[ + int, + typer.Option("--min-impressions", help="Minimum impressions to include a search term"), + ] = 0, + country: Annotated[ + str | None, + typer.Option("--country", help="Filter campaigns by country code (e.g. US, GB)"), + ] = None, + provider: Annotated[ + str | None, + typer.Option("--provider", "-p", help="AI provider: anthropic or gemini"), + ] = None, + env_file: Annotated[ + Path | None, + typer.Option("--env-file", "-e", help="Path to .env file with API keys"), + ] = None, + apply: Annotated[ + bool, + typer.Option("--apply", help="Apply suggested negatives (campaign-level exact match)"), + ] = False, + dry_run: Annotated[ + bool, + typer.Option("--dry-run", "-n", help="Show suggestions without applying changes"), + ] = False, + output: Annotated[ + Path | None, + typer.Option("--output", "-o", help="Export suggestions to CSV file"), + ] = None, +) -> None: + """Analyse search terms with AI and suggest negative keywords. + + Pulls search term reports across all campaigns (or a single campaign), + sends performance data to an AI model for relevance classification, + and optionally applies the suggestions as campaign-level negative keywords. + + Requires ANTHROPIC_API_KEY or GEMINI_API_KEY in environment or .env file. + """ + import asyncio + + from asa_api_cli.translate import get_translate_settings + + try: + settings = get_translate_settings(env_file) + except Exception as e: + print_error("Configuration Error", str(e)) + raise typer.Exit(EXIT_ERROR) from None + + end_date = date.today() + start_date = end_date - timedelta(days=days) + + client = get_client() + + try: + with client: + # Collect campaigns to analyse + if campaign_id: + with spinner("Fetching campaign..."): + campaign = client.campaigns.get(campaign_id) + target_campaigns = [campaign] + else: + with spinner("Loading campaigns..."): + target_campaigns = list(client.campaigns.find(Selector().where("status", "==", "ENABLED"))) + + if country: + country = country.upper() + target_campaigns = [c for c in target_campaigns if country in (c.countries_or_regions or [])] + + if not target_campaigns: + print_warning("No campaigns found matching filters") + return + + print_info(f"Analysing search terms from {len(target_campaigns)} campaign(s) ({start_date} to {end_date})") + + # Collect search term data across all campaigns + all_terms: list[SearchTermAnalysis] = [] + + with spinner("Pulling search term reports..."): + for camp in target_campaigns: + try: + report = client.reports.search_terms( + campaign_id=camp.id, + start_date=start_date, + end_date=end_date, + granularity=GranularityType.DAILY, + ) + + if not report.row: + continue + + # Aggregate by (search_term, ad_group_id) across daily rows + agg: dict[tuple[str, int], SearchTermAnalysis] = {} + for row in report.row: + if not row.total or not row.metadata.search_term_text: + continue + + key = (row.metadata.search_term_text, row.metadata.ad_group_id or 0) + + if key not in agg: + spend_amt = ( + Decimal(str(row.total.local_spend.amount)) + if row.total.local_spend + else Decimal("0") + ) + currency = row.total.local_spend.currency if row.total.local_spend else "USD" + agg[key] = SearchTermAnalysis( + search_term=row.metadata.search_term_text, + campaign_id=camp.id, + campaign_name=camp.name, + ad_group_id=row.metadata.ad_group_id or 0, + ad_group_name=row.metadata.ad_group_name or "", + impressions=row.total.impressions or 0, + taps=row.total.taps or 0, + installs=row.total.tap_installs or 0, + spend=spend_amt, + currency=currency, + matched_keyword=row.metadata.keyword, + ) + else: + existing = agg[key] + existing.impressions += row.total.impressions or 0 + existing.taps += row.total.taps or 0 + existing.installs += row.total.tap_installs or 0 + if row.total.local_spend: + existing.spend += Decimal(str(row.total.local_spend.amount)) + + all_terms.extend(agg.values()) + except AppleSearchAdsError: + continue + + if not all_terms: + print_warning("No search term data found for the specified period") + return + + # Filter by thresholds + if min_spend > 0: + all_terms = [t for t in all_terms if t.spend >= Decimal(str(min_spend))] + if min_impressions > 0: + all_terms = [t for t in all_terms if t.impressions >= min_impressions] + + if not all_terms: + print_warning("No search terms match the specified thresholds") + return + + # Sort by spend descending to prioritise highest waste + all_terms.sort(key=lambda t: t.spend, reverse=True) + + print_info(f"Found {len(all_terms)} search terms to analyse") + + # Build app context from campaign names + app_names = {c.name.split(" - ")[0].strip() for c in target_campaigns} + app_description = f"App: {', '.join(app_names)}" if app_names else "Unknown app" + + # Process in batches (AI context limits) + batch_size = 100 + all_suggestions: list[tuple[SearchTermAnalysis, NegativeSuggestion]] = [] + + for i in range(0, len(all_terms), batch_size): + batch = all_terms[i : i + batch_size] + batch_num = i // batch_size + 1 + total_batches = (len(all_terms) + batch_size - 1) // batch_size + + with spinner(f"AI analysis batch {batch_num}/{total_batches}..."): + try: + result = asyncio.run(_analyse_search_terms(batch, app_description, settings, provider)) + except RuntimeError as e: + print_error("AI Error", str(e)) + raise typer.Exit(EXIT_ERROR) from None + + # Match suggestions back to term data + suggestion_map = {s.search_term: s for s in result.suggestions} + for term in batch: + suggestion = suggestion_map.get(term.search_term) + if suggestion: + all_suggestions.append((term, suggestion)) + + if not all_suggestions: + print_warning("AI returned no suggestions") + return + + # Separate by action + negatives_list = [(t, s) for t, s in all_suggestions if s.action == "negative"] + review_list = [(t, s) for t, s in all_suggestions if s.action == "review"] + keep_list = [(t, s) for t, s in all_suggestions if s.action == "keep"] + + # Display results + console.print() + + if negatives_list: + table = Table(title=f"Suggested Negatives ({len(negatives_list)})") + table.add_column("Search Term", style="red", max_width=30) + table.add_column("Campaign", style="cyan", max_width=20) + table.add_column("Impr", justify="right") + table.add_column("Taps", justify="right") + table.add_column("Installs", justify="right") + table.add_column("Spend", justify="right") + table.add_column("Reason", max_width=35) + + for term, suggestion in negatives_list: + table.add_row( + term.search_term, + term.campaign_name[:20], + f"{term.impressions:,}", + f"{term.taps:,}", + f"{term.installs:,}", + f"{term.spend:.2f} {term.currency}", + suggestion.reason, + ) + + console.print(table) + console.print() + + if review_list: + table = Table(title=f"Needs Review ({len(review_list)})") + table.add_column("Search Term", style="yellow", max_width=30) + table.add_column("Campaign", style="cyan", max_width=20) + table.add_column("Impr", justify="right") + table.add_column("Taps", justify="right") + table.add_column("Installs", justify="right") + table.add_column("Spend", justify="right") + table.add_column("Reason", max_width=35) + + for term, suggestion in review_list: + table.add_row( + term.search_term, + term.campaign_name[:20], + f"{term.impressions:,}", + f"{term.taps:,}", + f"{term.installs:,}", + f"{term.spend:.2f} {term.currency}", + suggestion.reason, + ) + + console.print(table) + console.print() + + # Summary + total_negative_spend = sum(t.spend for t, _ in negatives_list) + print_result_panel( + "Analysis Summary", + { + "Search terms analysed": str(len(all_suggestions)), + "Suggested negatives": str(len(negatives_list)), + "Needs review": str(len(review_list)), + "Keep": str(len(keep_list)), + "Wasted spend (negatives)": f"{total_negative_spend:.2f}", + }, + ) + + # Export to CSV + if output: + try: + import csv + + with open(output, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow( + [ + "search_term", + "action", + "reason", + "campaign_name", + "ad_group_name", + "matched_keyword", + "impressions", + "taps", + "installs", + "spend", + "currency", + ] + ) + for term, suggestion in all_suggestions: + writer.writerow( + [ + term.search_term, + suggestion.action, + suggestion.reason, + term.campaign_name, + term.ad_group_name, + term.matched_keyword or "", + term.impressions, + term.taps, + term.installs, + f"{term.spend:.2f}", + term.currency, + ] + ) + print_success(f"Exported {len(all_suggestions)} suggestions to {output}") + except Exception as e: + print_error("Export failed", str(e)) + + # Apply negatives + if apply and not dry_run and negatives_list: + by_campaign: dict[int, list[tuple[SearchTermAnalysis, NegativeSuggestion]]] = defaultdict(list) + for term, suggestion in negatives_list: + by_campaign[term.campaign_id].append((term, suggestion)) + + total_added = 0 + for camp_id, items in by_campaign.items(): + camp_name = items[0][0].campaign_name + neg_creates = [ + NegativeKeywordCreate( + text=term.search_term, + match_type=KeywordMatchType.EXACT, + ) + for term, _ in items + ] + + try: + with spinner(f"Adding {len(neg_creates)} negatives to '{camp_name}'..."): + client.campaigns(camp_id).negative_keywords.create_bulk(neg_creates) + total_added += len(neg_creates) + except AppleSearchAdsError as e: + print_error(f"Failed to add negatives to '{camp_name}'", e.message) + + if total_added > 0: + print_success(f"Added {total_added} negative keywords across {len(by_campaign)} campaign(s)") + + elif apply and dry_run: + print_info("Dry run — no changes applied. Remove --dry-run to apply negatives.") + + except AppleSearchAdsError as e: + handle_api_error(e) + raise typer.Exit(EXIT_ERROR) from None