diff --git a/AGENTS.md b/AGENTS.md index 20db87b..805a5bd 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,4 +1,4 @@ -# AGENTS.md — Dados Financeiros Abertos +# AGENTS.md: Dados Financeiros Abertos This file is for coding agents working in this repository. Keep it practical: follow the project conventions, avoid speculative dependencies, and produce diff --git a/docs/MCP_SURFACE.md b/docs/MCP_SURFACE.md new file mode 100644 index 0000000..db9d067 --- /dev/null +++ b/docs/MCP_SURFACE.md @@ -0,0 +1,115 @@ +# MCP surface: curated tools over the REST API + +> Status: prototype / design proposal (alpha 0.3.x). Non-breaking: the REST API +> is untouched. Implemented in [`src/findata/api/mcp_app.py`](../src/findata/api/mcp_app.py). + +## Problem + +The MCP server used to be auto-generated **1:1 from the FastAPI app**: +`FastApiMCP(app)` turns every route into a tool, so the catalog was **95 tools**, +one per dataset/endpoint. From a client/agent's point of view that means: + +- **~21k tokens of `tools/list`** loaded at the start of every session, before a + single call. +- **Worse tool selection**, a model picks worse among 95 near-duplicate names + (one tool per SGS series, per CVM fund facet…) than among ~two dozen + well-described tools. + +## Approach (A + B + C) + +A separate FastAPI app, `mcp_app`, is the **only** source of the tool catalog. +It exposes a small, hand-curated set of tools that dispatch to the same +`findata.sources.*` functions the REST routers already use. + +```python +# app.py: tools come from mcp_app; transport is served on the public app +_mcp = FastApiMCP(mcp_app, name=..., description=...) +_mcp.mount_http(router=app) # /mcp on the public app; REST routes untouched +``` + +`FastApiMCP(mcp_app)` builds the catalog from `mcp_app`'s OpenAPI and executes +each tool via `httpx.ASGITransport(app=mcp_app)`. Because the routers carry no +app-state/rate-limiter coupling, reusing the source functions in a second app is +safe. **The 95 REST routes that back the CLI and HTTP consumers never change.** + +- **A, curation.** Each tool has an explicit `operation_id`, an agent-oriented + one-line `summary`, and a docstring written *for an agent deciding whether to + call it*, not the raw route docstring. `response_model=None` + `-> Any` keeps + response schemas out of the catalog (they would re-inflate it). +- **B, consolidation.** Sprawly clusters collapse behind a `dataset`/`kind` + selector (see table). The work moves from "many thin tools" to "few tools with + good docs". +- **C, code mode.** One optional tool, `findata_run_code`, runs a Python + snippet against the `findata` library in an isolated child interpreter. It + replaces dozens of fine-grained calls for filter/join/aggregate flows that + would otherwise stream every intermediate result through the model's context. + **Gated off by default** (`FINDATA_MCP_CODE_MODE=1` to enable). + +## Result + +| | 1:1 (old) | curated (new) | +|---|---:|---:| +| MCP tools | 95 | **24** (25 with code mode) | +| `tools/list` size | ~85k chars (~21k tok) | **~29k chars (~7k tok)** | +| REST operations | 95 | **95 (unchanged)** | + +## The 24 curated tools + +``` +registry_lookup ← start here: CNPJ / ticker / code / name → entities + +bcb_series bcb_ptax bcb_focus (BCB: 12 → 3) +cvm_company cvm_financials cvm_fund cvm_structured_fund (CVM: 22 → 4) +b3_quote b3_cotahist b3_index (B3: 9 → 3) +tesouro_bonds tesouro_siconfi (Tesouro: 6 → 2) +ibge_indicator ibge_ipca_breakdown (IBGE: 4 → 2) +ipea_series ipea_search (IPEA: 4 → 2) +anbima (ANBIMA: 3 → 1) +openfinance_directory (Open Finance: 15 → 1) +basedosdados_search basedosdados_sql (BdD: 7 → 2) +receita_arrecadacao aneel_leiloes susep_empresas +findata_run_code (code mode, opt-in) +``` + +### Consolidation map + +| Tool | Folds in | Selector | +|---|---|---| +| `bcb_series` | `/series`, `/series/code/{code}`, `/series/name/{name}` | `code` / `name` / none=catalog | +| `bcb_ptax` | `/ptax/usd`, `/ptax/usd/period`, `/ptax/{currency}` | `start`+`end` → period | +| `bcb_focus` | `/focus/{indicators,annual,monthly,selic,top5}` | `horizon`, `panel`, `indicator` | +| `cvm_company` | companies search/list, `fca/*`, `ipe` | `dataset=search\|list\|fca_*\|filings` | +| `cvm_fund` | `funds`, `funds/{daily,holdings,lamina,profile,periods}`, returns | `dataset` | +| `cvm_structured_fund` | `funds/{fii,fidc,fip}/*` | `kind` + `dataset` | +| `b3_index` | index portfolio + monthly + list | `dataset`, omit `symbol` to list | +| `tesouro_bonds` | bonds list/search/history | `dataset` | +| `tesouro_siconfi` | `rreo`, `rgf`, `entes` | `report` | +| `openfinance_directory` | participants/endpoints/resources/roles | `dataset` | + +## Tradeoffs + +- **Fewer but "fatter" tools.** Each carries a `dataset` enum and more doc. The + whole bet is that good descriptions beat tool count, so the docstrings are the + deliverable, not an afterthought. +- **Consolidation can hide endpoint-specific params behind an enum.** Mitigated + by documenting each `dataset`/`kind` value and validating bad combinations with + a `400` (e.g. `cvm_fund dataset=holdings` requires `cnpj`+`month`), matching the + REST API's `ValueError → 400` behaviour. +- **Discoverability of rare endpoints.** A handful of niche REST routes are not + individually surfaced as tools. They remain fully reachable over REST and via + `findata_run_code`. + +## Code mode: security + +`findata_run_code` is a **prototype, not a hardened sandbox**. The snippet runs +in a child `python -I` (isolated mode, cwd in a tempdir) with a wall-clock +timeout and a 20k-char output cap, but it has full library and network access. +It is **disabled unless `FINDATA_MCP_CODE_MODE=1`** and is intended for trusted, +local/agent use. A production deployment should run it in a real sandbox +(container/seccomp/network egress controls) before enabling. + +## Example flows (verified through the curated MCP) + +- `registry_lookup(q="PETR4")` → PETROBRAS, CNPJ `33.000.167/0001-01`, `[PETR3, PETR4]` (offline). +- `bcb_ptax(start=2024-01-02, end=2024-01-05)` → daily PTAX USD series (the handoff's headline flow). +- `findata_run_code("import findata; ...")` → runs in the sandbox, returns captured stdout. diff --git a/pyproject.toml b/pyproject.toml index 7c47c40..edbe9aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -134,6 +134,9 @@ max-statements = 50 ] # FastAPI idiom: Query() / Depends() calls in argument defaults. "src/findata/api/routers/**" = ["B008", "PLR0913"] +# Curated MCP layer: FastAPI Query() defaults (B008), wide consolidated tools +# (PLR0913), and intentional flat dataset-dispatch switches (C901/PLR0912/PLR0911). +"src/findata/api/mcp_app.py" = ["B008", "PLR0913", "C901", "PLR0912", "PLR0911"] # CLI commands are naturally wide (many typer.Option flags). "src/findata/cli.py" = ["PLR0913"] # Banner uses rich + sys.stdout directly — not a print-statement debug. diff --git a/src/findata/api/app.py b/src/findata/api/app.py index a570afe..741683b 100644 --- a/src/findata/api/app.py +++ b/src/findata/api/app.py @@ -148,15 +148,23 @@ async def _value_error_handler(_: Request, exc: ValueError) -> JSONResponse: try: from fastapi_mcp import FastApiMCP + from findata.api.mcp_app import mcp_app + + # The MCP tool catalog is built from the *curated* `mcp_app` (a separate + # FastAPI app, ~24 well-described tools), not from the public `app`, which + # would expose one near-duplicate tool per REST route (~95) and bloat every + # agent's context. `mount_http(router=app)` serves the /mcp transport on the + # public app, while the tools are generated from and executed against + # `mcp_app` (via its ASGI transport). The 95 REST routes stay untouched. _mcp = FastApiMCP( - app, + mcp_app, name=_PROJECT_SLUG, description=( f"{_PROJECT_STATEMENT} MCP para BCB, CVM, B3, IBGE, IPEA, " "Tesouro, Base dos Dados, Open Finance e gráficos experimentais." ), ) - _mcp.mount_http() # Serves MCP at /mcp (fastapi-mcp >=0.4) + _mcp.mount_http(router=app) # Serves MCP at /mcp (fastapi-mcp >=0.4) _MCP_ENABLED = True except Exception: # optional subsystem must never break core API _MCP_ENABLED = False diff --git a/src/findata/api/mcp_app.py b/src/findata/api/mcp_app.py new file mode 100644 index 0000000..d43a48d --- /dev/null +++ b/src/findata/api/mcp_app.py @@ -0,0 +1,925 @@ +"""Curated MCP surface for the findata-br server. + +The public REST API (``findata.api.app``) exposes ~95 fine-grained routes, one +per upstream dataset/endpoint. Mapping those 1:1 to MCP tools floods an agent's +context with ~95 near-duplicate tool schemas before it makes a single call, and +hurts tool-selection accuracy. + +This module is a *separate* FastAPI app whose only purpose is to be the source +of the MCP tool catalog. It exposes a small, hand-curated set of tools, each +with an agent-oriented description, that dispatch to the same +``findata.sources.*`` functions the REST routers use. Consolidated tools collapse +sprawly clusters (e.g. the 12 BCB and 14 CVM-fund endpoints) behind a few +``dataset``/``kind`` selectors. + +Wiring lives in ``app.py``: ``FastApiMCP(mcp_app).mount_http(router=app)`` builds +the tool catalog from *this* app while serving ``/mcp`` on the public app. The +95 REST routes are never touched. + + A, curation: only the headline tools are exposed, with real descriptions. + B, consolidation: ``bcb_*``/``cvm_*``/``tesouro_*``… fold many routes into one. + C, code mode: optional ``findata_run_code`` runs a Python snippet against the + library (gated by ``FINDATA_MCP_CODE_MODE=1``; off by default). +""" + +from __future__ import annotations + +import asyncio +import contextlib +import os +import signal +import sys +import tempfile +from datetime import date +from typing import Any, Literal + +from fastapi import APIRouter, FastAPI, HTTPException, Query, Request +from fastapi.responses import JSONResponse +from pydantic import BaseModel, Field + +from findata.registry import lookup +from findata.sources.anbima import indices as anbima_src +from findata.sources.aneel import leiloes +from findata.sources.b3 import cotahist, indices +from findata.sources.basedosdados import catalog +from findata.sources.bcb import focus, ptax, sgs +from findata.sources.cvm import ( + companies, + fca, + fidc, + fii, + financials, + fip, + funds, + holdings, + ipe, + lamina, + list_periods, + profile, +) +from findata.sources.ibge import indicators +from findata.sources.ipea import series as ipea_series +from findata.sources.openfinance import directory as of_dir +from findata.sources.receita import arrecadacao +from findata.sources.susep import empresas +from findata.sources.tesouro import bonds, siconfi + +router = APIRouter() + +_MAX_TICKERS = 20 +_MIN_YEAR_B3_COTAHIST = 1986 # B3 publishes COTAHIST since 1986 +_RGF_MAX_PERIOD = 3 # RGF quadrimestre runs 1..3 + + +# ── Registry: the entry point ───────────────────────────────────── + + +@router.get( + "/registry/lookup", + operation_id="registry_lookup", + response_model=None, + summary="Resolve a CNPJ, B3 ticker, CVM/SUSEP code, or company name to canonical entities", +) +async def registry_lookup( + q: str = Query( + ..., + min_length=2, + description="CNPJ (masked or not), ticker (PETR4), CVM/SUSEP code, or name fragment", + ), + limit: int = Query(20, ge=1, le=100), +) -> Any: + """Offline cross-source resolver backed by an embedded FTS5 catalog. + + Start here to turn a fuzzy identifier into a CNPJ + tickers + source codes + before calling the source-specific tools. The BM25 ``rank`` indicates match + strength (very negative = strong exact hit; near zero = fuzzy name match). + """ + return await lookup(q, limit=limit) + + +# ── BCB: Banco Central ──────────────────────────────────────────── + + +@router.get( + "/bcb/series", + operation_id="bcb_series", + response_model=None, + summary="BCB time series (Selic, IPCA, câmbio…): list the catalog or fetch by code/name", +) +async def bcb_series( + code: int | None = Query(None, description="SGS numeric code, e.g. 432=Selic meta, 433=IPCA"), + name: str | None = Query(None, description="Catalog alias, e.g. selic, ipca, dolar_ptax"), + start: date | None = Query(None, description="Start date YYYY-MM-DD (code mode only)"), + end: date | None = Query(None, description="End date YYYY-MM-DD (code mode only)"), + last_n: int | None = Query(None, ge=1, le=1000, description="Return only the last N values"), +) -> Any: + """Three modes in one tool. Pass nothing to list the curated catalog; pass + ``code`` for a series by SGS code (optionally ``start``/``end`` or ``last_n``); + or pass ``name`` for the most recent values of a named series. + """ + if code is not None: + if last_n is not None: + return await sgs.get_series_last(code, last_n) + return await sgs.get_series(code, start, end) + if name is not None: + return await sgs.get_series_by_name(name, last_n or 10) + return sgs.SERIES_CATALOG + + +@router.get( + "/bcb/ptax", + operation_id="bcb_ptax", + response_model=None, + summary="PTAX official exchange rate for any currency, single date or a date range", +) +async def bcb_ptax( + currency: str = Query("USD", description="ISO currency code, e.g. USD, EUR, GBP"), + date_: date | None = Query(None, alias="date", description="Single date (default: latest)"), + start: date | None = Query(None, description="Range start (use with end; USD only)"), + end: date | None = Query(None, description="Range end (use with start; USD only)"), +) -> Any: + """Official PTAX from BCB. Pass ``start``+``end`` for a daily series over a + range (USD only), or ``date`` (or nothing) for a single day. ``currency=USD`` + is the common case; other currencies support single-date queries only. + """ + if start is not None and end is not None: + if currency.upper() != "USD": + raise HTTPException(400, "Range queries are USD-only; use `date` for other currencies") + return await ptax.get_ptax_usd_period(start, end) + if currency.upper() == "USD": + return await ptax.get_ptax_usd(date_) + return await ptax.get_ptax_currency(currency, date_) + + +@router.get( + "/bcb/focus", + operation_id="bcb_focus", + response_model=None, + summary="Boletim Focus expectations, annual/monthly, market or Top-5, or Selic per COPOM", +) +async def bcb_focus( + indicator: str = Query( + "IPCA", + description="Indicator, e.g. IPCA, 'PIB Total', Câmbio. Use 'Selic' for COPOM path, " + "'list' to see available indicators.", + ), + horizon: Literal["annual", "monthly"] = Query("annual"), + panel: Literal["market", "top5"] = Query( + "market", description="market = all forecasters; top5 = Top-5 ranked (annual only)" + ), + top: int = Query(20, ge=1, le=100, description="Max rows to return"), +) -> Any: + """Consolidates the Focus endpoints. ``indicator='list'`` returns the available + indicators; ``indicator='Selic'`` returns the Selic expectation per COPOM + meeting (horizon/panel ignored). Otherwise pick ``horizon`` and ``panel``. + """ + key = indicator.strip().lower() + if key == "list": + return focus.FOCUS_INDICATORS + if key == "selic": + return await focus.get_focus_selic(top) + if panel == "top5" and horizon == "monthly": + raise HTTPException(400, "panel=top5 is annual-only; use horizon=annual") + if panel == "top5": + return await focus.get_focus_top5_annual(indicator, top) + if horizon == "monthly": + return await focus.get_focus_monthly(indicator, top) + return await focus.get_focus_annual(indicator, top) + + +# ── CVM: companies & funds ──────────────────────────────────────── + + +@router.get( + "/cvm/company", + operation_id="cvm_company", + response_model=None, + summary="CVM-listed companies: search/list, registration facts (FCA), and filings (IPE)", +) +async def cvm_company( + dataset: Literal[ + "search", "list", "fca_general", "fca_securities", "fca_dri", "filings" + ] = Query("search"), + query: str | None = Query(None, min_length=2, description="Name search (dataset=search)"), + cnpj: str | None = Query( + None, description="Company CNPJ filter (recommended for fca_*/filings)" + ), + year: int | None = Query( + None, ge=2003, description="Reference year (required for fca_*/filings)" + ), + ticker: str | None = Query(None, description="B3 ticker filter (dataset=fca_securities)"), + categoria: str | None = Query( + None, description="Filing category (dataset=filings), e.g. 'Fato Relevante'" + ), + limit: int = Query(100, ge=1, le=2000), +) -> Any: + """The company side of CVM. ``search`` needs ``query``; ``list`` is the full + registry. ``fca_general|fca_securities|fca_dri`` are cadastral facets needing + ``year`` (+ optional ``cnpj``/``ticker``). ``filings`` (IPE, fatos relevantes, + comunicados) needs ``year`` (+ optional ``cnpj``/``categoria``). + """ + if dataset == "search": + if not query: + raise HTTPException(400, "dataset=search requires `query`") + return (await companies.search_company(query, True))[:limit] + if dataset == "list": + return (await companies.get_companies(True))[:limit] + if dataset == "filings": + if year is None: + raise HTTPException(400, "dataset=filings requires `year`") + return (await ipe.get_ipe(year, cnpj=cnpj, categoria=categoria))[:limit] + if year is None: + raise HTTPException(400, f"dataset={dataset} requires `year`") + if dataset == "fca_general": + return await fca.get_fca_geral(year, cnpj) + if dataset == "fca_securities": + return await fca.get_fca_valores_mobiliarios(year, cnpj=cnpj, ticker=ticker) + return await fca.get_fca_dri(year, cnpj) + + +@router.get( + "/cvm/financials", + operation_id="cvm_financials", + response_model=None, + summary="CVM financial statements, annual (DFP) or quarterly (ITR) for a company", +) +async def cvm_financials( + year: int = Query(..., ge=2010, description="Fiscal year"), + period: Literal["annual", "quarterly"] = Query( + "annual", description="annual=DFP, quarterly=ITR" + ), + statement: financials.StatementType = Query( + financials.StatementType.DRE_CON, + description="Statement type: BPA/BPP/DRE/DFC_MI/DMPL/DVA, _con (consolidated) or _ind", + ), + cnpj: str | None = Query( + None, description="Company CNPJ, strongly recommended (avoids the full dataset)" + ), + limit: int = Query(500, ge=1, le=5000), +) -> Any: + """Annual DFP or quarterly ITR statements. Statement types: BPA_con, BPP_con, + DRE_con, DFC_MI_con, DMPL_con, DVA_con (+ ``_ind`` variants). Always pass ``cnpj``. + """ + if period == "quarterly": + return (await financials.get_itr(year, statement, cnpj))[:limit] + return (await financials.get_dfp(year, statement, cnpj))[:limit] + + +@router.get( + "/cvm/fund", + operation_id="cvm_fund", + response_model=None, + summary="Open-ended CVM funds (FI): catalog, daily NAV, holdings, factsheet, returns, profile", +) +async def cvm_fund( + dataset: Literal[ + "catalog", "daily", "holdings", "lamina", "returns", "profile", "periods" + ] = Query("catalog"), + cnpj: str | None = Query( + None, description="Fund CNPJ (required for holdings; recommended elsewhere)" + ), + year: int | None = Query(None, description="Reference year (required except catalog/periods)"), + month: int | None = Query(None, ge=1, le=12, description="Reference month (monthly datasets)"), + horizon: Literal["monthly", "yearly"] = Query( + "monthly", description="returns granularity (dataset=returns)" + ), + blocks: str | None = Query( + None, + description="holdings: block whitelist, e.g. BLC_1,BLC_4 (of BLC_1..BLC_8,CONFID,PL,FIE)", + ), + product: Literal[ + "INF_DIARIO", + "CDA", + "LAMINA", + "PERFIL_MENSAL", + "BALANCETE", + "EVENTUAL", + "EXTRATO", + ] = Query("INF_DIARIO", description="periods: which CVM document set to list stamps for"), + limit: int = Query(500, ge=1, le=5000), +) -> Any: + """Open funds in one tool. ``catalog`` lists registered funds; ``periods`` lists + the YYYYMM stamps available upstream for ``product``. The rest need ``year``; + ``daily``/``holdings``/``lamina``/``returns``/``profile`` need ``month`` too, and + ``holdings`` requires ``cnpj`` (the monthly CDA file is huge). + """ + if dataset == "catalog": + return (await funds.get_fund_catalog(True, None))[:limit] + if dataset == "periods": + return await list_periods("FI", f"DOC/{product}") + if year is None: + raise HTTPException(400, f"dataset={dataset} requires `year`") + if dataset == "holdings": + if not cnpj or month is None: + raise HTTPException(400, "dataset=holdings requires `cnpj` and `month`") + block_list = [b.strip() for b in blocks.split(",") if b.strip()] if blocks else None + return await holdings.get_fund_holdings(cnpj, year, month, block_list) + if month is None: + raise HTTPException(400, f"dataset={dataset} requires `month`") + if dataset == "daily": + return (await funds.get_fund_daily(year, month, cnpj))[:limit] + if dataset == "lamina": + return (await lamina.get_fund_lamina(year, month, cnpj))[:limit] + if dataset == "profile": + return (await profile.get_fund_profile(year, month, cnpj))[:limit] + if horizon == "yearly": + return (await lamina.get_fund_yearly_returns(year, month, cnpj))[:limit] + return (await lamina.get_fund_monthly_returns(year, month, cnpj))[:limit] + + +async def _structured_fii( + dataset: str | None, cnpj: str | None, year: int, month: int | None +) -> Any: + if dataset in (None, "geral"): + return await fii.get_fii_geral(year, cnpj=cnpj, month=month) + if dataset == "complemento": + return await fii.get_fii_complemento(year, cnpj=cnpj, month=month) + raise HTTPException(400, f"unknown FII dataset {dataset!r} (use geral|complemento)") + + +async def _structured_fidc( + dataset: str | None, cnpj: str | None, year: int, month: int | None +) -> Any: + if month is None: + raise HTTPException(400, "FIDC datasets require `month`") + if dataset in (None, "geral"): + return await fidc.get_fidc_geral(year, month, cnpj=cnpj) + if dataset == "pl": + return await fidc.get_fidc_pl(year, month, cnpj=cnpj) + if dataset in ("direitos", "direitos-creditorios"): + return await fidc.get_fidc_direitos_creditorios(year, month, cnpj=cnpj) + raise HTTPException(400, f"unknown FIDC dataset {dataset!r} (use geral|pl|direitos)") + + +@router.get( + "/cvm/structured-fund", + operation_id="cvm_structured_fund", + response_model=None, + summary="Structured CVM funds, FII (real estate), FIDC (receivables), FIP (private equity)", +) +async def cvm_structured_fund( + kind: Literal["fii", "fidc", "fip"] = Query(...), + dataset: str | None = Query( + None, description="fii: geral|complemento; fidc: geral|pl|direitos; fip: (n/a)" + ), + cnpj: str | None = Query(None, description="Fund CNPJ filter"), + year: int = Query(..., description="Reference year"), + month: int | None = Query(None, ge=1, le=12, description="Required for FIDC; optional for FII"), + quarter: int | None = Query(None, ge=1, le=4, description="FIP only, informe quarter"), + limit: int = Query(500, ge=1, le=5000), +) -> Any: + """Structured funds by ``kind``. FII has ``geral`` (cadastral) and ``complemento`` + (cotistas/PL/taxa) facets. FIDC has ``geral``/``pl``/``direitos`` (needs ``month``). + FIP returns the quarterly informe (optional ``quarter``). + """ + if kind == "fii": + return await _structured_fii(dataset, cnpj, year, month) + if kind == "fidc": + return await _structured_fidc(dataset, cnpj, year, month) + if dataset is not None: + raise HTTPException(400, "kind=fip takes no `dataset` (use `quarter`)") + return (await fip.get_fip(year, cnpj=cnpj, quarter=quarter))[:limit] + + +# ── B3: Bolsa ───────────────────────────────────────────────────── + + +def _b3_quotes() -> Any: + try: + from findata.sources.b3 import quotes + except ImportError as exc: # pragma: no cover, only without the [b3] extra + raise HTTPException( + 503, "Live quotes need the optional extra: pip install 'openfindata[b3]'" + ) from exc + return quotes + + +@router.get( + "/b3/quote", + operation_id="b3_quote", + response_model=None, + summary="Live B3 stock quote(s) (optional [b3] extra), prefer b3_cotahist for official EOD", +) +async def b3_quote( + tickers: str = Query( + ..., description="One ticker or comma-separated list (max 20), e.g. PETR4,VALE3" + ), +) -> Any: + """Current quote(s) from the optional yfinance-backed source. For canonical, + official end-of-day history use ``b3_cotahist`` instead. + """ + quotes = _b3_quotes() + ticker_list = [t.strip() for t in tickers.split(",") if t.strip()] + if not ticker_list: + raise HTTPException(400, "at least one ticker is required") + if len(ticker_list) > _MAX_TICKERS: + raise HTTPException(400, f"max {_MAX_TICKERS} tickers per request") + if len(ticker_list) == 1: + return await quotes.get_quote(ticker_list[0]) + return await quotes.get_multiple_quotes(ticker_list) + + +@router.get( + "/b3/cotahist", + operation_id="b3_cotahist", + response_model=None, + summary="Official B3 COTAHIST daily quotes, by year, month, or single day", +) +async def b3_cotahist( + year: int = Query(..., ge=_MIN_YEAR_B3_COTAHIST, description="Year (B3 publishes since 1986)"), + month: int | None = Query(None, ge=1, le=12), + day: int | None = Query(None, ge=1, le=31), + ticker: str | None = Query( + None, description="CODNEG filter, e.g. PETR4, recommended (annual files are ~85 MB)" + ), + market_codes: str | None = Query( + None, description="CODBDI whitelist, comma-separated, e.g. 02,96" + ), +) -> Any: + """Granularity follows the args: ``day`` (needs ``month``) → one trading day, + ``month`` → one month, otherwise the whole ``year``. Pass ``ticker`` for + single-issuer queries. + """ + codes = [c.strip() for c in market_codes.split(",") if c.strip()] if market_codes else None + if day is not None: + if month is None: + raise HTTPException(400, "`day` requires `month`") + return await cotahist.get_cotahist_day(year, month, day, ticker, codes) + if month is not None: + return await cotahist.get_cotahist_month(year, month, ticker, codes) + return await cotahist.get_cotahist_year(year, ticker, codes) + + +@router.get( + "/b3/index", + operation_id="b3_index", + response_model=None, + summary="B3 index theoretical portfolio & monthly history (IBOV, IBrX, SMLL, IDIV, IFIX…)", +) +async def b3_index( + symbol: str | None = Query( + None, description="Index symbol, e.g. IBOV; omit to list known indices" + ), + dataset: Literal["portfolio", "monthly"] = Query( + "portfolio", description="portfolio=current composição; monthly=closing levels" + ), + start: date | None = Query(None, description="monthly: start date YYYY-MM-DD"), + end: date | None = Query(None, description="monthly: end date YYYY-MM-DD"), + months: int = Query(120, ge=1, le=360, description="monthly window when start omitted"), +) -> Any: + """Omit ``symbol`` to list the indices we can fetch. With ``symbol``, + ``portfolio`` returns the current composição (constituents + weights); + ``monthly`` returns closing levels for charting. + """ + if symbol is None: + return await indices.list_known_indices() + if dataset == "monthly": + return await indices.get_index_monthly_evolution( + symbol, start=start, end=end, months=months + ) + return await indices.get_index_portfolio(symbol) + + +# ── Tesouro / SICONFI ────────────────────────────────────────────── + + +@router.get( + "/tesouro/bonds", + operation_id="tesouro_bonds", + response_model=None, + summary="Tesouro Direto bonds, list/filter, search names, or price+rate history", +) +async def tesouro_bonds( + dataset: Literal["list", "search", "history"] = Query("list"), + titulo: str | None = Query( + None, description="Bond name for history, e.g. 'Tesouro IPCA+ 2035'" + ), + q: str | None = Query(None, min_length=2, description="Search query (dataset=search)"), + tipo: str | None = Query(None, description="Type filter (dataset=list), e.g. 'Tesouro IPCA+'"), + start: date | None = Query(None), + end: date | None = Query(None), + limit: int = Query(500, ge=1, le=5000), +) -> Any: + """``list`` returns current bond prices/rates (filter by ``tipo``/date); + ``search`` finds bond names from ``q``; ``history`` returns the series for a + single ``titulo``. + """ + if dataset == "search": + if not q: + raise HTTPException(400, "dataset=search requires `q`") + return await bonds.search_bonds(q) + if dataset == "history": + if not titulo: + raise HTTPException(400, "dataset=history requires `titulo`") + return await bonds.get_bond_history(titulo, start, end) + return await bonds.get_treasury_bonds(tipo, start, end, limit) + + +@router.get( + "/tesouro/siconfi", + operation_id="tesouro_siconfi", + response_model=None, + summary="SICONFI public-finance reports, RREO, RGF, or the federation-entity list", +) +async def tesouro_siconfi( + report: Literal["rreo", "rgf", "entes"] = Query("entes"), + year: int | None = Query(None, ge=2013), + period: int | None = Query( + None, ge=1, le=6, description="RREO: bimestre 1-6; RGF: quadrimestre 1-3" + ), + cod_ibge: int | None = Query( + None, description="IBGE entity code (1=União); discover via report=entes" + ), + poder: str = Query("E", description="RGF only: E/L/J/M/D power branch"), + anexo: str | None = Query(None, description='e.g. "RREO-Anexo 01"'), +) -> Any: + """``entes`` lists every federation entity with its IBGE code (start here). + ``rreo`` (bimestral) and ``rgf`` (quadrimestral) need ``year``, ``period``, and + ``cod_ibge``. + """ + if report == "entes": + return await siconfi.get_entes() + if year is None or period is None or cod_ibge is None: + raise HTTPException(400, f"report={report} requires year, period, and cod_ibge") + if report == "rgf": + if not 1 <= period <= _RGF_MAX_PERIOD: + raise HTTPException(400, "RGF period is the quadrimestre 1-3") + return await siconfi.get_rgf(year, period, cod_ibge, poder=poder) # type: ignore[arg-type] + return await siconfi.get_rreo(year, period, cod_ibge, anexo=anexo) + + +# ── IBGE ─────────────────────────────────────────────────────────── + + +@router.get( + "/ibge/indicator", + operation_id="ibge_indicator", + response_model=None, + summary="IBGE economic indicators, list the catalog or fetch one by name (e.g. ipca_mensal)", +) +async def ibge_indicator( + name: str | None = Query(None, description="Indicator name; omit to list all available"), + periods: int = Query(12, ge=1, le=120, description="Recent periods to return"), +) -> Any: + """Omit ``name`` to list every IBGE indicator we expose; pass ``name`` to fetch + its recent values. + """ + if name is None: + return indicators.IBGE_INDICATORS + return await indicators.get_indicator(name, periods) + + +@router.get( + "/ibge/ipca-breakdown", + operation_id="ibge_ipca_breakdown", + response_model=None, + summary="IPCA monthly variation broken down by the major groups (not available from BCB SGS)", +) +async def ibge_ipca_breakdown( + periods: int = Query(6, ge=1, le=60, description="Recent months to return"), +) -> Any: + """IPCA monthly variation for all major groups (food, housing, transport, + health, …), granularity BCB SGS does not provide. + """ + return await indicators.get_ipca_breakdown(periods) + + +# ── IPEA ─────────────────────────────────────────────────────────── + + +@router.get( + "/ipea/series", + operation_id="ipea_series", + response_model=None, + summary="IPEA series, curated catalog, series values, or metadata by SERCODIGO", +) +async def ipea_series_tool( + sercodigo: str | None = Query( + None, description="Series code, e.g. BM12_TJOVER12; omit to list the curated catalog" + ), + dataset: Literal["values", "metadata"] = Query("values"), + top: int | None = Query(None, ge=1, le=5000, description="Most recent N values"), +) -> Any: + """Omit ``sercodigo`` to list the curated catalog. With it, ``values`` returns + the observations and ``metadata`` returns name/unit/periodicity/source. For + discovery across the full ~8k-series catalog use ``ipea_search``. + """ + if sercodigo is None: + return ipea_series.IPEA_CATALOG + if dataset == "metadata": + meta = await ipea_series.get_metadata(sercodigo) + if meta is None: + raise HTTPException(404, f"unknown SERCODIGO: {sercodigo}") + return meta + return await ipea_series.get_series_values(sercodigo, top) + + +@router.get( + "/ipea/search", + operation_id="ipea_search", + response_model=None, + summary="Full-text search across the ~8k-series IPEA catalog", +) +async def ipea_search( + q: str = Query(..., min_length=2, description="Search query"), + top: int = Query(25, ge=1, le=200), +) -> Any: + """Find IPEA series by free-text query; returns metadata you can feed back to + ``ipea_series`` as ``sercodigo``. + """ + return await ipea_series.search_series(q, top) + + +# ── ANBIMA ───────────────────────────────────────────────────────── + + +@router.get( + "/anbima", + operation_id="anbima", + response_model=None, + summary="ANBIMA public data, IMA index family, ETTJ yield curve, or debenture quotes", +) +async def anbima_tool( + dataset: Literal["ima", "ettj", "debentures"] = Query("ima"), + family: str | None = Query( + None, description="ima: filter to one IMA family, e.g. IRF-M, IMA-B" + ), + data: date | None = Query(None, description="Reference date (ettj/debentures; default latest)"), + emissor: str | None = Query(None, description="debentures: issuer-name substring filter"), + limit: int = Query(500, ge=1, le=5000), +) -> Any: + """``ima`` returns the latest IMA snapshot (optionally one ``family``); ``ettj`` + returns the zero-coupon yield curve for ``data``; ``debentures`` returns daily + secondary-market quotes (optionally filtered by ``emissor``). + """ + if dataset == "ettj": + return await anbima_src.get_ettj(data) + if dataset == "debentures": + rows = await anbima_src.get_debentures(data) + if emissor: + needle = emissor.upper() + rows = [r for r in rows if needle in r.emissor.upper()] + return rows[:limit] + fam = anbima_src.IMAFamily(family) if family else None + return (await anbima_src.get_ima(fam))[:limit] + + +# ── Open Finance Brasil ──────────────────────────────────────────── + + +@router.get( + "/openfinance/directory", + operation_id="openfinance_directory", + response_model=None, + summary="Open Finance Brasil Directory, participants, API endpoints, resources, or roles", +) +async def openfinance_directory( + dataset: Literal["participants", "endpoints", "resources", "roles"] = Query("participants"), + role: str | None = Query(None, description="participants: Directory role filter, e.g. DADOS"), + status: str | None = Query( + "Active", description="participants/endpoints: status; empty for all" + ), + api_family: str | None = Query( + None, description="participants/endpoints: API family substring" + ), + q: str | None = Query(None, min_length=2, description="participants: name/CNPJ substring"), + limit: int = Query(100, ge=1, le=1000), +) -> Any: + """``participants`` lists ecosystem participants (summarised); ``endpoints`` + flattens their advertised API endpoints; ``resources`` lists supported public + resources; ``roles`` lists Directory roles. + """ + env: of_dir.Environment = "production" + if dataset == "resources": + return of_dir.public_resources(env) + if dataset == "roles": + return (await of_dir.get_roles(env))[:limit] + raw = await of_dir.get_participants(env) + if dataset == "endpoints": + return of_dir.flatten_api_endpoints(raw, api_family=api_family, status=status or None)[ + :limit + ] + filtered = of_dir.filter_participants( + raw, role=role, status=status or None, api_family=api_family, query=q + ) + return of_dir.summarise_participants(filtered[:limit]) + + +# ── Base dos Dados ───────────────────────────────────────────────── + + +@router.get( + "/basedosdados/search", + operation_id="basedosdados_search", + response_model=None, + summary="Search the Base dos Dados catalog (free BigQuery datasets)", +) +async def basedosdados_search( + q: str | None = Query(None, min_length=2, description="Free-text query"), + theme: str | None = Query(None, description="Theme filter, e.g. economics"), + only_free_download: bool = Query( + False, description="Restrict to datasets marked free direct-download" + ), + page: int = Query(1, ge=1), +) -> Any: + """Search the public catalog. Set ``only_free_download=true`` to restrict to + datasets you can download without BigQuery. Use ``basedosdados_sql`` to get a + starter query for a chosen table. + """ + if only_free_download: + return await catalog.search_direct_download_free(theme=theme, page=page) + return await catalog.search_datasets(q=q, theme=theme, page=page) + + +@router.get( + "/basedosdados/sql", + operation_id="basedosdados_sql", + response_model=None, + summary="Generate a starter BigQuery SQL snippet for a Base dos Dados table", +) +async def basedosdados_sql( + dataset_id: str = Query(..., min_length=1), + table_id: str = Query(..., min_length=1), + limit: int = Query(100, ge=1, le=10_000), +) -> Any: + """Returns a ready-to-run BigQuery reference (project.dataset.table + a LIMITed + SELECT) for the given Base dos Dados table. + """ + return catalog.table_ref(dataset_id, table_id, limit) + + +# ── Receita Federal ──────────────────────────────────────────────── + + +@router.get( + "/receita/arrecadacao", + operation_id="receita_arrecadacao", + response_model=None, + summary="Receita Federal monthly tax revenue (arrecadação) by period, UF, and tributo", +) +async def receita_arrecadacao( + year: int | None = Query(None, ge=2000), + month: int | None = Query(None, ge=1, le=12), + uf: str | None = Query(None, description="State UF, e.g. SP, RJ"), + tributo: str | None = Query(None, description="Tax-category substring, e.g. IRPF, COFINS"), + limit: int = Query(500, ge=1, le=5000), +) -> Any: + """Federal-tax revenue in long form (one row per period × UF × tributo). + Filter by any combination of ``year``/``month``/``uf``/``tributo``. + """ + rows = await arrecadacao.get_arrecadacao(year, month, uf, tributo) + return rows[:limit] + + +# ── ANEEL ────────────────────────────────────────────────────────── + + +@router.get( + "/aneel/leiloes", + operation_id="aneel_leiloes", + response_model=None, + summary="ANEEL energy-auction results, generation or transmission", +) +async def aneel_leiloes( + kind: Literal["geracao", "transmissao"] = Query("geracao"), + year: int | None = Query(None), + fonte: str | None = Query( + None, description="geracao: energy-source substring, e.g. Eólica, Solar" + ), + uf: str | None = Query(None), + limit: int = Query(500, ge=1, le=5000), +) -> Any: + """Winning bids per auction. ``geracao`` (since 2005) supports a ``fonte`` filter; + ``transmissao`` (since 1999) does not. Both support ``year``/``uf``. + """ + if kind == "transmissao": + return (await leiloes.get_aneel_leiloes_transmissao(year=year, uf=uf))[:limit] + return (await leiloes.get_aneel_leiloes_geracao(year=year, fonte=fonte, uf=uf))[:limit] + + +# ── SUSEP ────────────────────────────────────────────────────────── + + +@router.get( + "/susep/empresas", + operation_id="susep_empresas", + response_model=None, + summary="SUSEP-supervised entities (insurance, previdência, capitalização), list or search", +) +async def susep_empresas( + q: str | None = Query(None, min_length=2, description="Name substring; omit to list all"), + limit: int = Query(500, ge=1, le=5000), +) -> Any: + """Pass ``q`` to search SUSEP entities by name; omit it to list all (paginated).""" + if q: + return await empresas.search_susep_empresa(q) + return (await empresas.get_susep_empresas())[:limit] + + +# ── C: Code mode (optional, gated) ──────────────────────────────── + +_CODE_MODE_ENABLED = os.getenv("FINDATA_MCP_CODE_MODE", "").strip().lower() in { + "1", + "true", + "yes", + "on", +} +_CODE_OUTPUT_CAP = 20_000 +_CODE_TIMEOUT_MAX = 120 + + +class RunCodeRequest(BaseModel): + """Input for the code-mode tool.""" + + code: str = Field( + ..., + description="Python source to execute. The `findata` library is importable. " + "Source functions are async, wrap calls in asyncio.run(). Print results to stdout.", + ) + timeout_s: int = Field( + 30, ge=1, le=_CODE_TIMEOUT_MAX, description="Wall-clock timeout in seconds" + ) + + +async def _execute_code(code: str, timeout_s: int) -> dict[str, Any]: + """Run ``code`` in an isolated child interpreter, capturing combined output. + + PROTOTYPE, this is NOT a security sandbox: the child runs arbitrary Python + with full library and network access. It is gated off by default and intended + for trusted, local/agent use only. The child runs in its own process group so + a timeout kills the whole tree, not just the direct child. Output is read in + full before the cap is applied, so a deployment that enables this should add + OS-level memory/output limits for the child. + """ + timeout = max(1, min(timeout_s, _CODE_TIMEOUT_MAX)) + proc = await asyncio.create_subprocess_exec( + sys.executable, + "-I", # isolated mode: ignore env vars and user site, don't add cwd to path + "-c", + code, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + cwd=tempfile.gettempdir(), + start_new_session=True, # own process group so a timeout can kill the tree + ) + try: + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=timeout) + except TimeoutError: + # Kill the whole process GROUP: a snippet that spawned its own subprocesses + # must not outlive the timeout as an orphan. + with contextlib.suppress(ProcessLookupError, PermissionError): + os.killpg(os.getpgid(proc.pid), signal.SIGKILL) + await proc.wait() + return {"timed_out": True, "exit_code": None, "output": f"(killed: exceeded {timeout}s)"} + text = stdout.decode("utf-8", errors="replace") + return { + "timed_out": False, + "exit_code": proc.returncode, + "truncated": len(text) > _CODE_OUTPUT_CAP, + "output": text[:_CODE_OUTPUT_CAP], + } + + +if _CODE_MODE_ENABLED: + + @router.post( + "/run-code", + operation_id="findata_run_code", + response_model=None, + summary="Run a Python snippet against the findata library and return its stdout", + ) + async def findata_run_code(payload: RunCodeRequest) -> Any: + """Execute arbitrary Python with the ``findata`` library available, returning + captured stdout/stderr. This replaces dozens of fine-grained calls: filter, + join, and aggregate across sources in one round-trip instead of streaming + every intermediate result through the model's context. + + Example:: + + import asyncio + from findata.sources.bcb import ptax + + print(asyncio.run(ptax.get_ptax_usd())) + + Security: runs in an isolated child interpreter with a timeout and output + cap, but is NOT a hardened sandbox. Enabled only when the server sets + FINDATA_MCP_CODE_MODE=1. + """ + return await _execute_code(payload.code, payload.timeout_s) + + +# ── The MCP-only FastAPI app ─────────────────────────────────────── + +mcp_app = FastAPI( + title="findata-br (MCP)", + description="Curated MCP tool surface for findata-br.", + version="1", +) + + +@mcp_app.exception_handler(ValueError) +async def _value_error_handler(_: Request, exc: ValueError) -> JSONResponse: + return JSONResponse(status_code=400, content={"detail": str(exc)}) + + +mcp_app.include_router(router) diff --git a/tests/test_mcp_surface.py b/tests/test_mcp_surface.py new file mode 100644 index 0000000..ebffc61 --- /dev/null +++ b/tests/test_mcp_surface.py @@ -0,0 +1,152 @@ +"""Tests for the curated MCP surface (findata.api.mcp_app). + +Guards the three promises of the MCP curation: + 1. the tool catalog is small and curated, not 1:1 with the 95 REST routes, + 2. the public REST API still exposes all 95 routes (curation is MCP-only), + 3. consolidated tools dispatch by their ``dataset``/``kind`` selector and + validate bad combinations with a 400. + +All assertions are offline, no live gov-API calls. +""" + +from __future__ import annotations + +import importlib + +import pytest +from fastapi.testclient import TestClient + +from findata.api.app import app +from findata.api.mcp_app import mcp_app + +EXPECTED_TOOLS = 24 # curated tools with code mode OFF (the default) +EXPECTED_REST_OPERATIONS = 95 # all REST routes (unconditional); bump when the surface changes + +_HTTP_METHODS = {"get", "post", "put", "delete", "patch"} + + +def _operation_ids(fastapi_app: object) -> set[str]: + ids: set[str] = set() + for path, methods in fastapi_app.openapi()["paths"].items(): # type: ignore[attr-defined] + for method, spec in methods.items(): + if method in _HTTP_METHODS: + ids.add(spec.get("operationId") or f"{method} {path}") + return ids + + +# ── catalog size & REST integrity ────────────────────────────────── + + +def test_curated_mcp_is_a_small_fraction_of_the_rest_surface() -> None: + mcp_ids = _operation_ids(mcp_app) + rest_ids = _operation_ids(app) + assert len(mcp_ids) == EXPECTED_TOOLS + assert len(rest_ids) == EXPECTED_REST_OPERATIONS + # the whole point of curation: catalog << REST surface + assert len(mcp_ids) < len(rest_ids) // 3 + + +def test_rest_api_untouched_by_curation() -> None: + # the consolidated REST routes that MCP tools fold together must still exist, + # they back the CLI and HTTP consumers. + paths = set(app.openapi()["paths"]) + for p in ( + "/bcb/ptax/usd/period", + "/bcb/focus/selic", + "/cvm/funds/holdings", + "/cvm/funds/fidc/direitos-creditorios", + ): + assert p in paths, f"REST route {p} disappeared" + + +def test_mcp_transport_mounted_on_public_app() -> None: + paths = {getattr(r, "path", None) for r in app.routes} + assert "/mcp" in paths + + +def test_every_tool_has_an_agent_oriented_summary() -> None: + for _path, methods in mcp_app.openapi()["paths"].items(): + for method, spec in methods.items(): + if method not in _HTTP_METHODS: + continue + summary = spec.get("summary", "") + # a real description, not the auto-generated "GET /path" + assert summary and not summary.startswith(("GET ", "POST ")) + assert len(summary) > 20 + + +# ── consolidated-tool dispatch (offline) ─────────────────────────── + + +def test_bcb_series_lists_catalog_with_no_args() -> None: + r = TestClient(mcp_app).get("/bcb/series") + assert r.status_code == 200 + assert len(r.json()) > 10 # the curated SGS catalog + + +def test_registry_lookup_resolves_ticker_offline() -> None: + r = TestClient(mcp_app).get("/registry/lookup", params={"q": "PETR4", "limit": 3}) + assert r.status_code == 200 + body = r.json() + assert body["entities"], "expected at least one match for PETR4" + assert body["entities"][0]["cnpj"].startswith("33.000.167") + + +def test_consolidated_tool_validates_missing_selector_args() -> None: + # cvm_company dataset=filings requires `year` -> 400 (not a 500) + r = TestClient(mcp_app).get("/cvm/company", params={"dataset": "filings"}) + assert r.status_code == 400 + assert "year" in r.json()["detail"] + + +def test_cvm_fund_holdings_requires_cnpj_and_month() -> None: + r = TestClient(mcp_app).get("/cvm/fund", params={"dataset": "holdings", "year": 2024}) + assert r.status_code == 400 + assert "cnpj" in r.json()["detail"] + + +# ── code-mode gating ─────────────────────────────────────────────── + + +def test_code_mode_is_off_by_default() -> None: + assert "findata_run_code" not in _operation_ids(mcp_app) + + +def test_code_mode_registers_tool_when_enabled(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("FINDATA_MCP_CODE_MODE", "1") + import findata.api.mcp_app as fresh + + reloaded = importlib.reload(fresh) + try: + assert "findata_run_code" in _operation_ids(reloaded.mcp_app) + finally: + # restore the canonical (code-mode off) module for any later imports + monkeypatch.delenv("FINDATA_MCP_CODE_MODE", raising=False) + importlib.reload(fresh) + + +# -- added validations (offline) ------------------------------------ + + +def test_siconfi_rgf_rejects_out_of_range_period() -> None: + # RGF is the quadrimestre 1-3; period 6 is valid only for RREO bimestre. + r = TestClient(mcp_app).get( + "/tesouro/siconfi", + params={"report": "rgf", "year": 2024, "period": 6, "cod_ibge": 1}, + ) + assert r.status_code == 400 + assert "1-3" in r.json()["detail"] + + +def test_focus_rejects_top5_monthly() -> None: + # Top-5 panel exists only for the annual horizon. + r = TestClient(mcp_app).get("/bcb/focus", params={"panel": "top5", "horizon": "monthly"}) + assert r.status_code == 400 + + +def test_structured_fund_fip_rejects_dataset() -> None: + # FIP has no dataset facet; passing one is a client error, not silently ignored. + r = TestClient(mcp_app).get( + "/cvm/structured-fund", params={"kind": "fip", "year": 2024, "dataset": "geral"} + ) + assert r.status_code == 400