Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions agents/mcp/src/hyperforge_mcp/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,14 @@ async def process_tool(
"; ".join(error_texts) if error_texts else str(tool_result.meta)
)
logger.error(f"Tool {tool_name} encountered an error: {error_message}")
context.chunks.append(
Chunk(
chunk_id=f"mcp_{self.config.id}_{tool_name}_error",
title=f"MCP tool error: {tool_name}",
text=(f"Tool: {tool_name}\nError: {error_message}"),
origin_agent=self.config.module,
)
)
await memory.add_step(
step_module=self.config.module,
step_title=self.step_title("Tool error"),
Expand All @@ -429,10 +437,32 @@ async def process_tool(
timeit=time() - t0,
)
return
# TODO: extract better information from the tool result
context.structured.append(
json.dumps(tool_result.structuredContent, indent=2, default=str)
text_blocks = sum(
1 for block in tool_result.content if isinstance(block, types.TextContent)
)
image_blocks = sum(
1 for block in tool_result.content if isinstance(block, types.ImageContent)
)
resource_blocks = sum(
1 for block in tool_result.content if isinstance(block, types.ResourceLink)
)

trace_lines = [
f"Tool: {tool_name}",
f"is_error: {tool_result.isError}",
f"text_blocks: {text_blocks}",
f"image_blocks: {image_blocks}",
f"resource_links: {resource_blocks}",
]
if tool_result.structuredContent is not None:
structured = json.dumps(
tool_result.structuredContent, indent=2, default=str
)
trace_lines.append("Structured content (truncated):")
trace_lines.append(
structured[:2000] + ("...(truncated)" if len(structured) > 2000 else "")
)
context.structured.append(structured)
messages.append(
Message(author=Author.NUCLIA, text=f"Tool {tool_name} executed")
)
Expand Down Expand Up @@ -527,8 +557,10 @@ async def process_tool(
if block.resource.mimeType is not None
else "application/octet-stream",
b64encoded=block.resource.blob,
)
)
)

messages.append(Message(author=Author.NUCLIA, text="\n".join(trace_lines)))

step_value = (
f"Used tool: {tool_name} with arguments: {tool_arguments}"
Expand Down Expand Up @@ -1127,6 +1159,7 @@ async def _get_question_context(
loaded_tools = False
max_retries = 2
for attempt in range(max_retries):
interaction_completed = False
try:
async with self.http_streaming_session_ctx(
manager=manager, memory=memory
Expand All @@ -1142,6 +1175,19 @@ async def _get_question_context(
f"Failed to preload tools from MCP server: {e}"
)
self.tools = []
if self.tools and loaded_tools and attempt == 0:
tools_text = "\n".join(
f"- {t.name}: {t.description or '(no description)'}"
for t in self.tools
)
context.chunks.append(
Chunk(
chunk_id=f"mcp_{self.config.id}_tools_list",
title="Available MCP tools",
text=f"The following tools are available:\n{tools_text}",
origin_agent=self.config.module,
)
)
if attempt == 0:
# Only preload prompts and resources on the first attempt
try:
Expand All @@ -1166,14 +1212,24 @@ async def _get_question_context(
) = await self.mcp_interaction(
memory, manager, question, context
)
interaction_completed = True
break # Success, exit retry loop
except Exception as e:
if interaction_completed:
logger.warning(
"Ignoring MCP HTTP teardown error after successful interaction: %s",
repr(e),
)
break

logger.exception(
f"Error during MCP HTTP interaction (attempt {attempt + 1}/{max_retries})"
)

if attempt + 1 == max_retries:
raise e
finally:
self.session = None
elif self.driver_context_manager is not None:
async with self.driver_context_manager as (read_stream, write_stream): # type: ignore
if read_stream is None or write_stream is None:
Expand Down
71 changes: 67 additions & 4 deletions agents/mcp/src/hyperforge_mcp/config_driver.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
from enum import Enum
from typing import ClassVar, Dict, Literal, Optional
from typing import Any, ClassVar, Dict, Literal, Optional

from hyperforge.driver import DriverConfig, EncryptedPayload
from hyperforge.settings import OAuthSettings
from hyperforge.utils import WidgetType
from pydantic import Field
from pydantic import Field, model_validator
from pydantic.config import ConfigDict


def _redirect_uris_schema_default(schema: Dict[str, Any]) -> None:
try:
callback_url = OAuthSettings().mcp_callback_url
if callback_url:
schema["default"] = [callback_url]
except Exception:
pass


class MCPHTTPInnerConfig(EncryptedPayload):
encrypted_fields: ClassVar[list[str]] = []
encrypted_fields: ClassVar[list[str]] = ["client_secret"]

uri: str
timeout: float = 60 * 5
headers: Dict[str, str] = Field(default_factory=dict)
sse_read_timeout: float = Field(default=300, title="SSE read timeout in seconds")
ca_certificate: Optional[str] = Field(
default=None,
title="CA certificate for HTTPS",
Expand All @@ -27,7 +38,12 @@ class MCPHTTPInnerConfig(EncryptedPayload):
server_url: Optional[str] = Field(
default=None, title="OAuth Authorization Server URL"
)
redirect_uris: list[str] = Field(default_factory=list, title="OAuth Redirect URIs")
redirect_uris: list[str] = Field(
default_factory=list,
title="OAuth Redirect URI",
description="The callback URL registered in your OAuth Connected App. Auto-filled from the server configuration - do not change.",
json_schema_extra=_redirect_uris_schema_default,
)
auth_server_url: Optional[str] = Field(
default=None, title="OAuth Authorization Server URL"
)
Expand All @@ -44,6 +60,53 @@ class MCPHTTPInnerConfig(EncryptedPayload):
scope: str = Field(
default="user", title="OAuth Scopes", description="Default: 'user'"
)
client_id: Optional[str] = Field(
default=None,
title="OAuth Client ID",
description="Pre-registered client ID. If set, skips Dynamic Client Registration.",
)
client_secret: Optional[str] = Field(
default=None,
title="OAuth Client Secret",
description="Pre-registered client secret. Required when the AS is not a public client.",
)
authorization_endpoint: Optional[str] = Field(
default=None,
title="OAuth Authorization Endpoint Override",
description=(
"Override the authorization endpoint discovered via RFC 8414 metadata. "
"Use when the AS advertises a non-functional /authorize path."
),
)
token_endpoint: Optional[str] = Field(
default=None,
title="OAuth Token Endpoint Override",
description=(
"Override the token endpoint discovered via RFC 8414 metadata. "
"Use when the AS uses a non-standard token path (e.g. uses "
"/services/oauth2/token instead of /token)."
),
)
pkce: bool = Field(
default=True,
title="Enable PKCE",
description=(
"Whether to use PKCE (Proof Key for Code Exchange) in the OAuth 2.0 flow. "
"Set to false for Authorization Servers that do not support PKCE "
"(e.g. Connected Apps without PKCE enabled)."
),
)

@model_validator(mode="after")
def _force_redirect_uris(self) -> "MCPHTTPInnerConfig":
"""Always override redirect_uris with the zone callback URL."""
try:
callback_url = OAuthSettings().mcp_callback_url
if callback_url:
self.redirect_uris = [callback_url]
except Exception:
pass
return self


class MCPHTTPDriverConfig(DriverConfig[MCPHTTPInnerConfig]):
Expand Down
Loading
Loading