diff --git a/.github/workflows/itk-nightly.yaml b/.github/workflows/itk-nightly.yaml new file mode 100644 index 000000000..e0d15748f --- /dev/null +++ b/.github/workflows/itk-nightly.yaml @@ -0,0 +1,42 @@ +name: Nightly ITK + +on: + schedule: + - cron: '0 2 * * *' + workflow_dispatch: + +permissions: + contents: write + +jobs: + nightly: + name: Nightly ITK Run + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Set up JDK 17 + uses: actions/setup-java@v5 + with: + java-version: '17' + distribution: 'temurin' + cache: maven + + - name: Run Nightly ITK Tests + run: bash run_itk.sh + working-directory: itk + env: + A2A_ITK_REVISION: main + ITK_NIGHTLY_RUN: "True" + + - name: Upload Results to Rolling Release + uses: softprops/action-gh-release@v3 + with: + tag_name: "nightly-metrics" + prerelease: true + files: | + itk/itk_java.json + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/itk.yaml b/.github/workflows/itk.yaml new file mode 100644 index 000000000..41f81cbae --- /dev/null +++ b/.github/workflows/itk.yaml @@ -0,0 +1,44 @@ +name: ITK + +on: + push: + branches: [main] + pull_request: + paths: + - 'client/**' + - 'common/**' + - 'http-client/**' + - 'itk/**' + - 'jsonrpc-common/**' + - 'reference/**' + - 'server-common/**' + - 'spec/**' + - 'spec-grpc/**' + - 'transport/**' + - 'pom.xml' + - '.github/workflows/itk.yaml' + +permissions: + contents: read + +jobs: + itk: + name: ITK + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Set up JDK 17 + uses: actions/setup-java@v5 + with: + java-version: '17' + distribution: 'temurin' + cache: maven + + - name: Run ITK Tests + run: bash run_itk.sh + working-directory: itk + env: + A2A_ITK_REVISION: main diff --git a/boms/extras/src/it/extras-usage-test/src/main/java/org/a2aproject/sdk/test/ExtrasBomVerifier.java b/boms/extras/src/it/extras-usage-test/src/main/java/org/a2aproject/sdk/test/ExtrasBomVerifier.java index c5b727633..dacae1e9a 100644 --- a/boms/extras/src/it/extras-usage-test/src/main/java/org/a2aproject/sdk/test/ExtrasBomVerifier.java +++ b/boms/extras/src/it/extras-usage-test/src/main/java/org/a2aproject/sdk/test/ExtrasBomVerifier.java @@ -14,6 +14,7 @@ public class ExtrasBomVerifier extends DynamicBomVerifier { private static final Set EXTRAS_EXCLUSIONS = Set.of( "boms/", // BOM test modules themselves "examples/", // Example applications + "itk/", // Integration Test Kit agent "tck/", // TCK test suite "tests/", // Integration tests "test-utils-docker/", // Test utilities for Docker-based tests diff --git a/boms/reference/src/it/reference-usage-test/src/main/java/org/a2aproject/sdk/test/ReferenceBomVerifier.java b/boms/reference/src/it/reference-usage-test/src/main/java/org/a2aproject/sdk/test/ReferenceBomVerifier.java index acf1143a4..d521ecf3e 100644 --- a/boms/reference/src/it/reference-usage-test/src/main/java/org/a2aproject/sdk/test/ReferenceBomVerifier.java +++ b/boms/reference/src/it/reference-usage-test/src/main/java/org/a2aproject/sdk/test/ReferenceBomVerifier.java @@ -14,6 +14,7 @@ public class ReferenceBomVerifier extends DynamicBomVerifier { private static final Set REFERENCE_EXCLUSIONS = Set.of( "boms/", // BOM test modules themselves "examples/", // Example applications + "itk/", // Integration Test Kit agent "tck/", // TCK test suite "tests/", // Integration tests "test-utils-docker/", // Test utilities for Docker-based tests diff --git a/boms/sdk/src/it/sdk-usage-test/src/main/java/org/a2aproject/sdk/test/SdkBomVerifier.java b/boms/sdk/src/it/sdk-usage-test/src/main/java/org/a2aproject/sdk/test/SdkBomVerifier.java index 0d5631c7d..551c51c9a 100644 --- a/boms/sdk/src/it/sdk-usage-test/src/main/java/org/a2aproject/sdk/test/SdkBomVerifier.java +++ b/boms/sdk/src/it/sdk-usage-test/src/main/java/org/a2aproject/sdk/test/SdkBomVerifier.java @@ -14,6 +14,7 @@ public class SdkBomVerifier extends DynamicBomVerifier { private static final Set SDK_EXCLUSIONS = Set.of( "boms/", // BOM test modules themselves "examples/", // Example applications + "itk/", // Integration Test Kit agent "tck/", // TCK test suite "compat-0.3/tck/", // Compat 0.3 TCK (not yet enabled) "compat-0.3/reference/", // Compat 0.3 reference implementations (in reference BOM) diff --git a/itk/.gitignore b/itk/.gitignore new file mode 100644 index 000000000..ce15aadf0 --- /dev/null +++ b/itk/.gitignore @@ -0,0 +1,4 @@ +a2a-itk/ +logs/ +raw_results.json +itk_java.json diff --git a/itk/pom.xml b/itk/pom.xml new file mode 100644 index 000000000..f4a298fbb --- /dev/null +++ b/itk/pom.xml @@ -0,0 +1,101 @@ + + + 4.0.0 + + + org.a2aproject.sdk + a2a-java-sdk-parent + 1.0.0.CR2-SNAPSHOT + + + a2a-java-sdk-itk + + Java SDK A2A ITK Agent + Integration Test Kit agent for cross-SDK interoperability testing + + + 4.33.2 + + + + + + io.quarkus + quarkus-bom + ${quarkus.platform.version} + pom + import + + + + + + + org.a2aproject.sdk + a2a-java-sdk-client + + + org.a2aproject.sdk + a2a-java-sdk-reference-jsonrpc + + + io.quarkus + quarkus-resteasy + provided + + + org.a2aproject.sdk + a2a-java-sdk-reference-grpc + + + org.a2aproject.sdk + a2a-java-sdk-reference-rest + + + jakarta.enterprise + jakarta.enterprise.cdi-api + provided + + + jakarta.ws.rs + jakarta.ws.rs-api + + + com.google.protobuf + protobuf-java + ${protobuf-java.version} + + + org.a2aproject.sdk + a2a-java-sdk-client-transport-grpc + + + org.a2aproject.sdk + a2a-java-sdk-client-transport-rest + + + + + + + io.quarkus + quarkus-maven-plugin + true + + + + build + generate-code + generate-code-tests + + + + + --add-opens=java.base/java.lang=ALL-UNNAMED + + + + + diff --git a/itk/run_itk.sh b/itk/run_itk.sh new file mode 100755 index 000000000..ad00fcde2 --- /dev/null +++ b/itk/run_itk.sh @@ -0,0 +1,157 @@ +#!/bin/bash +set -ex + +# Set default log level +export ITK_LOG_LEVEL="${ITK_LOG_LEVEL:-INFO}" + +# Detect container runtime (docker or podman) +if command -v docker &> /dev/null; then + CONTAINER_RT=docker +elif command -v podman &> /dev/null; then + CONTAINER_RT=podman +else + echo "Error: neither docker nor podman found" + exit 1 +fi + +# Initialize default exit code +RESULT=1 + +# Cleanup function to be called on exit +cleanup() { + set +x + echo "Cleaning up artifacts..." + $CONTAINER_RT stop itk-service > /dev/null 2>&1 || true + $CONTAINER_RT rm itk-service > /dev/null 2>&1 || true + $CONTAINER_RT rmi itk_service > /dev/null 2>&1 || true + rm -rf a2a-itk > /dev/null 2>&1 || true + echo "Done. Final exit code: $RESULT" +} + +# Register cleanup function to run on script exit +trap cleanup EXIT + +# 1. Pull a2a-itk and checkout revision +: "${A2A_ITK_REVISION:?A2A_ITK_REVISION environment variable must be set}" + +if [ ! -d "a2a-itk" ]; then + git clone https://github.com/a2aproject/a2a-itk.git a2a-itk +fi +cd a2a-itk +git fetch origin +git checkout "$A2A_ITK_REVISION" + +# Only pull if it's a branch (not a detached HEAD) +if git symbolic-ref -q HEAD > /dev/null; then + git pull origin "$A2A_ITK_REVISION" +fi +cd .. + +# 2. Copy latest instruction.proto from a2a-itk +cp a2a-itk/protos/instruction.proto src/main/proto/instruction.proto + +# 3. Build itk_service container image from root of a2a-itk +CONTAINER_BUILD_ARGS="" +if [ "$CONTAINER_RT" = "podman" ]; then + CONTAINER_BUILD_ARGS="--format docker" +fi +$CONTAINER_RT build $CONTAINER_BUILD_ARGS -t itk_service a2a-itk + +# 4. Start container service with a single mount: the a2a-java repo +A2A_JAVA_ROOT=$(cd .. && pwd) + +# Stop existing container if any +$CONTAINER_RT rm -f itk-service || true + +# Create logs directory if debug +if [ "${ITK_LOG_LEVEL^^}" = "DEBUG" ]; then + mkdir -p logs +fi + +DOCKER_MOUNT_LOGS="" +if [ "${ITK_LOG_LEVEL^^}" = "DEBUG" ]; then + DOCKER_MOUNT_LOGS="-v $(pwd)/logs:/app/logs" +fi + +$CONTAINER_RT run -d --name itk-service \ + -v "$A2A_JAVA_ROOT:/app/agents/repo" \ + $DOCKER_MOUNT_LOGS \ + -e ITK_LOG_LEVEL="$ITK_LOG_LEVEL" \ + -p 8000:8000 \ + itk_service + +# 5. Verify service is up and send post request +MAX_RETRIES=30 +echo "Waiting for ITK service to start on 127.0.0.1:8000..." +set +e +for i in $(seq 1 $MAX_RETRIES); do + if curl -s http://127.0.0.1:8000/ > /dev/null; then + echo "Service is up!" + break + fi + echo "Still waiting... ($i/$MAX_RETRIES)" + sleep 2 +done + +# If we reached the end of the loop without success +if ! curl -s http://127.0.0.1:8000/ > /dev/null; then + echo "Error: ITK service failed to start on port 8000" + $CONTAINER_RT logs itk-service + exit 1 +fi + +# Workaround: java_v10 agent targets Java 21 but the ITK image has JDK 17. +# Patch until https://github.com/a2aproject/a2a-itk/pull/XX merges. +$CONTAINER_RT exec itk-service sed -i 's|21|17|' /app/agents/java/v10/pom.xml 2>/dev/null || true + +SCENARIO_FILE="scenarios.json" +if [ "${ITK_NIGHTLY_RUN^^}" = "TRUE" ]; then + SCENARIO_FILE="scenarios_full.json" +fi + +echo "ITK Service is up! Sending compatibility test request using $SCENARIO_FILE..." +RESPONSE=$(curl -s -X POST http://127.0.0.1:8000/run \ + -H "Content-Type: application/json" \ + -d "@$SCENARIO_FILE") + +if [ "${ITK_NIGHTLY_RUN^^}" = "TRUE" ]; then + echo "Nightly run detected. Saving raw results and running process_results.py..." + echo "$RESPONSE" > raw_results.json + python3 a2a-itk/scripts/process_results.py \ + --history_output_file itk_java.json \ + --history_url https://github.com/a2aproject/a2a-java/releases/download/nightly-metrics/itk_java.json + RESULT=$? +else + echo "--------------------------------------------------------" + echo "ITK TEST RESULTS:" + echo "--------------------------------------------------------" + echo "$RESPONSE" | python3 -c " +import sys, json +try: + data = json.load(sys.stdin) + all_passed = data.get('all_passed', False) + results = data.get('results', {}) + for test, passed in results.items(): + status = 'PASSED' if passed else 'FAILED' + print(f'{test}: {status}') + print('--------------------------------------------------------') + print(f'OVERALL STATUS: {\"PASSED\" if all_passed else \"FAILED\"}') + if not all_passed: + sys.exit(1) +except Exception as e: + print(f'Error parsing results: {e}') + print(f'Raw response: {data if \"data\" in locals() else \"no data\"}') + sys.exit(1) +" + RESULT=$? +fi +set -e + +if [ $RESULT -ne 0 ]; then + echo "Tests failed. Container logs:" + $CONTAINER_RT logs itk-service +fi +echo "--------------------------------------------------------" + +# Final exit result will be captured by trap cleanup +exit $RESULT diff --git a/itk/scenarios.json b/itk/scenarios.json new file mode 100644 index 000000000..cb0c92f74 --- /dev/null +++ b/itk/scenarios.json @@ -0,0 +1,64 @@ +{ + "tests": [ + { + "name": "Star Topology (Full) - JSONRPC & GRPC", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["jsonrpc", "grpc"], + "behavior": "send_message" + }, + { + "name": "Star Topology (No Go v03) - HTTP_JSON", + "sdks": ["current", "java_v10", "python_v10", "go_v10"], + "edges": ["0->1", "0->2", "0->3", "1->0", "2->0", "3->0"], + "protocols": ["http_json"], + "behavior": "send_message" + }, + { + "name": "Star Topology (Full) - JSONRPC & GRPC (Streaming)", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["jsonrpc", "grpc"], + "streaming": true, + "behavior": "send_message" + }, + { + "name": "Star Topology (No Go v03) - HTTP_JSON (Streaming)", + "sdks": ["current", "java_v10", "python_v10", "go_v10"], + "edges": ["0->1", "0->2", "0->3", "1->0", "2->0", "3->0"], + "protocols": ["http_json"], + "streaming": true, + "behavior": "send_message" + }, + { + "name": "Push Notification Test - JSONRPC & GRPC", + "sdks": ["current", "java_v10", "python_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "1->0", "2->0", "3->0"], + "protocols": ["jsonrpc", "grpc"], + "behavior": "push_notification" + }, + { + "name": "Push Notification Test - HTTP_JSON", + "sdks": ["current", "java_v10", "python_v10"], + "edges": ["0->1", "0->2", "1->0", "2->0"], + "protocols": ["http_json"], + "behavior": "push_notification" + }, + { + "name": "Resubscribe Test - JSONRPC", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["jsonrpc"], + "streaming": true, + "behavior": "resubscribe" + }, + { + "name": "Resubscribe Test - Non-JSONRPC Protocols", + "sdks": ["current", "java_v10", "python_v10", "go_v10"], + "edges": ["0->1", "0->2", "0->3", "1->0", "2->0", "3->0"], + "protocols": ["grpc", "http_json"], + "streaming": true, + "behavior": "resubscribe" + } + ] +} diff --git a/itk/scenarios_full.json b/itk/scenarios_full.json new file mode 100644 index 000000000..96960f3e6 --- /dev/null +++ b/itk/scenarios_full.json @@ -0,0 +1,106 @@ +{ + "tests": [ + { + "name": "Nightly - JSONRPC - Send Message", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["jsonrpc"], + "behavior": "send_message", + "build_subtests": true + }, + { + "name": "Nightly - JSONRPC - Send Message (Streaming)", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["jsonrpc"], + "streaming": true, + "behavior": "send_message", + "build_subtests": true + }, + { + "name": "Nightly - JSONRPC - Push Notification", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["jsonrpc"], + "behavior": "push_notification", + "build_subtests": true + }, + { + "name": "Nightly - JSONRPC - Resubscribe", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["jsonrpc"], + "streaming": true, + "behavior": "resubscribe", + "build_subtests": true + }, + { + "name": "Nightly - GRPC - Send Message", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["grpc"], + "behavior": "send_message", + "build_subtests": true + }, + { + "name": "Nightly - GRPC - Send Message (Streaming)", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["grpc"], + "streaming": true, + "behavior": "send_message", + "build_subtests": true + }, + { + "name": "Nightly - GRPC - Push Notification", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["grpc"], + "behavior": "push_notification", + "build_subtests": true + }, + { + "name": "Nightly - GRPC - Resubscribe", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["grpc"], + "streaming": true, + "behavior": "resubscribe", + "build_subtests": true + }, + { + "name": "Nightly - HTTP_JSON - Send Message", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["http_json"], + "behavior": "send_message", + "build_subtests": true + }, + { + "name": "Nightly - HTTP_JSON - Send Message (Streaming)", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["http_json"], + "streaming": true, + "behavior": "send_message", + "build_subtests": true + }, + { + "name": "Nightly - HTTP_JSON - Push Notification", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["http_json"], + "behavior": "push_notification", + "build_subtests": true + }, + { + "name": "Nightly - HTTP_JSON - Resubscribe", + "sdks": ["current", "java_v10", "python_v10", "go_v10", "go_v03"], + "edges": ["0->1", "0->2", "0->3", "0->4", "1->0", "2->0", "3->0", "4->0"], + "protocols": ["http_json"], + "streaming": true, + "behavior": "resubscribe", + "build_subtests": true + } + ] +} diff --git a/itk/src/main/java/org/a2aproject/sdk/itk/AdditionalRoutes.java b/itk/src/main/java/org/a2aproject/sdk/itk/AdditionalRoutes.java new file mode 100644 index 000000000..7c6479e1d --- /dev/null +++ b/itk/src/main/java/org/a2aproject/sdk/itk/AdditionalRoutes.java @@ -0,0 +1,46 @@ +package org.a2aproject.sdk.itk; + +import io.vertx.ext.web.Router; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ApplicationScoped +public class AdditionalRoutes { + + private static final Logger log = LoggerFactory.getLogger(AdditionalRoutes.class); + + void addPrefixedRoutes(@Observes Router router) { + router.get("/jsonrpc/.well-known/agent-card.json") + .handler(ctx -> { + log.info("Rerouting GET /jsonrpc/.well-known/agent-card.json -> /.well-known/agent-card.json"); + ctx.reroute("/.well-known/agent-card.json"); + }); + + router.route("/jsonrpc/*").handler(ctx -> { + log.info("Rerouting /jsonrpc -> /"); + ctx.reroute("/"); + }); + router.route("/rest/*").handler(ctx -> { + log.info("Rerouting POST /rest -> /"); + ctx.reroute("/"); + }); + + router.route().order(Integer.MIN_VALUE) + .handler(ctx -> { + String method = ctx.request().method().name(); + String path = ctx.request().path(); + String contentType = ctx.request().getHeader("Content-Type"); + log.info("Incoming {} {} Content-Type={}", method, path, contentType); + if ("POST".equals(method)) { + ctx.request().body().onSuccess(buffer -> { + if (buffer != null) { + log.debug("POST {} payload: {}", path, buffer.toString()); + } + }); + } + ctx.next(); + }); + } +} diff --git a/itk/src/main/java/org/a2aproject/sdk/itk/AgentCardProducer.java b/itk/src/main/java/org/a2aproject/sdk/itk/AgentCardProducer.java new file mode 100644 index 000000000..390ad8be2 --- /dev/null +++ b/itk/src/main/java/org/a2aproject/sdk/itk/AgentCardProducer.java @@ -0,0 +1,51 @@ +package org.a2aproject.sdk.itk; + +import java.util.List; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; + +import org.a2aproject.sdk.server.PublicAgentCard; +import org.a2aproject.sdk.spec.AgentCapabilities; +import org.a2aproject.sdk.spec.AgentCard; +import org.a2aproject.sdk.spec.AgentInterface; +import org.a2aproject.sdk.spec.AgentSkill; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +@ApplicationScoped +public class AgentCardProducer { + + @ConfigProperty(name = "quarkus.http.port", defaultValue = "10102") + int httpPort; + + @ConfigProperty(name = "quarkus.grpc.server.port", defaultValue = "11002") + int grpcPort; + + @Produces + @PublicAgentCard + public AgentCard agentCard() { + return AgentCard.builder() + .name("ITK Current Agent") + .description("Java agent using A2A SDK (current source).") + .version("1.0.0") + .supportedInterfaces(List.of( + new AgentInterface("JSONRPC", "http://127.0.0.1:" + httpPort), + new AgentInterface("HTTP+JSON", "http://127.0.0.1:" + httpPort), + new AgentInterface("GRPC", "127.0.0.1:" + grpcPort) + )) + .capabilities(AgentCapabilities.builder() + .streaming(true) + .pushNotifications(true) + .build()) + .defaultInputModes(List.of("text")) + .defaultOutputModes(List.of("text")) + .skills(List.of(AgentSkill.builder() + .id("itk_current") + .name("ITK Current") + .description("Processes ITK instruction traversals") + .tags(List.of("itk")) + .examples(List.of()) + .build())) + .build(); + } +} diff --git a/itk/src/main/java/org/a2aproject/sdk/itk/AgentExecutorProducer.java b/itk/src/main/java/org/a2aproject/sdk/itk/AgentExecutorProducer.java new file mode 100644 index 000000000..f1d0de14a --- /dev/null +++ b/itk/src/main/java/org/a2aproject/sdk/itk/AgentExecutorProducer.java @@ -0,0 +1,339 @@ +package org.a2aproject.sdk.itk; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; + +import org.a2aproject.sdk.A2A; +import org.a2aproject.sdk.client.Client; +import org.a2aproject.sdk.client.ClientEvent; +import org.a2aproject.sdk.client.MessageEvent; +import org.a2aproject.sdk.client.TaskEvent; +import org.a2aproject.sdk.client.TaskUpdateEvent; +import org.a2aproject.sdk.client.config.ClientConfig; +import org.a2aproject.sdk.client.transport.grpc.GrpcTransport; +import org.a2aproject.sdk.client.transport.grpc.GrpcTransportConfigBuilder; +import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport; +import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfigBuilder; +import org.a2aproject.sdk.client.transport.rest.RestTransport; +import org.a2aproject.sdk.client.transport.rest.RestTransportConfigBuilder; +import org.a2aproject.sdk.server.agentexecution.AgentExecutor; +import org.a2aproject.sdk.server.agentexecution.RequestContext; +import org.a2aproject.sdk.server.tasks.AgentEmitter; +import org.a2aproject.sdk.spec.A2AError; +import org.a2aproject.sdk.spec.AgentCard; +import org.a2aproject.sdk.spec.FilePart; +import org.a2aproject.sdk.spec.FileWithBytes; +import org.a2aproject.sdk.spec.Message; +import org.a2aproject.sdk.spec.Part; +import org.a2aproject.sdk.spec.TaskPushNotificationConfig; +import org.a2aproject.sdk.spec.TaskState; +import org.a2aproject.sdk.spec.TextPart; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.grpc.ManagedChannelBuilder; + +import itk.InstructionOuterClass.CallAgent; +import itk.InstructionOuterClass.Instruction; + +@ApplicationScoped +public class AgentExecutorProducer { + + private static final Logger log = LoggerFactory.getLogger(AgentExecutorProducer.class); + + @Produces + public AgentExecutor agentExecutor() { + return new ItkAgentExecutor(); + } + + static class ItkAgentExecutor implements AgentExecutor { + + private static final int HOLD_ITERATIONS = 5; + private static final long HOLD_INTERVAL_MS = 2000; + private static final long TASK_TIMEOUT_SECONDS = 60; + + @Override + public void execute(RequestContext context, AgentEmitter emitter) throws A2AError { + log.info("Executing task {}", emitter.getTaskId()); + + emitter.startWork(); + + Instruction instruction = extractInstruction(context.getMessage()); + if (instruction == null) { + log.error("No valid instruction found in request"); + emitter.sendMessage("No valid instruction found in request"); + emitter.fail(); + return; + } + + try { + List results = handleInstruction(instruction); + String response = String.join("\n", results); + log.info("Response: {}", response); + + if (shouldHold(instruction)) { + log.info("Holding task {} as requested", emitter.getTaskId()); + + Message holdMsg = emitter.newAgentMessage( + List.of(new TextPart(response + "\ntask-finished")), null); + emitter.updateStatus(TaskState.TASK_STATE_WORKING, holdMsg); + + for (int i = 0; i < HOLD_ITERATIONS; i++) { + log.info("Emitting periodic status update for held task {}", emitter.getTaskId()); + try { + Thread.sleep(HOLD_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("Task {} interrupted during hold", emitter.getTaskId()); + return; + } + } + log.info("Held task {} timed out, auto-completing", emitter.getTaskId()); + Message completeMsg = emitter.newAgentMessage( + List.of(new TextPart(response + "\ntask-finished")), null); + emitter.complete(completeMsg); + } else { + Message completeMsg = emitter.newAgentMessage( + List.of(new TextPart(response)), null); + emitter.complete(completeMsg); + } + } catch (TimeoutException e) { + log.error("Timed out waiting for remote agent response", e); + emitter.fail(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Task {} interrupted while calling remote agent", emitter.getTaskId(), e); + emitter.fail(); + } catch (ExecutionException e) { + log.error("Remote agent call failed for task {}: {}", + emitter.getTaskId(), e.getCause().getMessage(), e.getCause()); + emitter.fail(); + } catch (Exception e) { + log.error("Unexpected error handling instruction for task {}", + emitter.getTaskId(), e); + emitter.fail(); + } + } + + @Override + public void cancel(RequestContext context, AgentEmitter emitter) throws A2AError { + log.info("Cancel requested for task {}", emitter.getTaskId()); + emitter.cancel(); + } + + private Instruction extractInstruction(Message message) { + if (message == null || message.parts() == null) { + return null; + } + + for (Part part : message.parts()) { + if (part instanceof FilePart filePart) { + var file = filePart.file(); + if ("application/x-protobuf".equals(file.mimeType()) + || "instruction.bin".equals(file.name())) { + try { + if (file instanceof FileWithBytes fwb) { + byte[] raw = Base64.getDecoder().decode(fwb.bytes()); + return Instruction.parseFrom(raw); + } + } catch (Exception e) { + log.debug("Failed to parse instruction from file part", e); + } + } + } + + if (part instanceof TextPart textPart) { + try { + byte[] raw = Base64.getDecoder().decode(textPart.text()); + return Instruction.parseFrom(raw); + } catch (Exception e) { + log.debug("Failed to parse instruction from text part", e); + } + } + } + return null; + } + + private List handleInstruction(Instruction inst) throws Exception { + if (inst.hasCallAgent()) { + return handleCallAgent(inst.getCallAgent()); + } + if (inst.hasReturnResponse()) { + return List.of(inst.getReturnResponse().getResponse()); + } + if (inst.hasSteps()) { + List allResults = new ArrayList<>(); + for (Instruction step : inst.getSteps().getInstructionsList()) { + allResults.addAll(handleInstruction(step)); + } + return allResults; + } + throw new IllegalStateException("Unknown instruction type"); + } + + private List handleCallAgent(CallAgent call) throws Exception { + log.info("Calling agent {} via {}", call.getAgentCardUri(), call.getTransport()); + AgentCard remoteCard = A2A.getAgentCard(call.getAgentCardUri()); + + ClientConfig.Builder configBuilder = new ClientConfig.Builder() + .setStreaming(call.getStreaming() || isGrpc(call.getTransport())) + .setUseClientPreference(true); + + if (call.hasPushNotification()) { + String url = call.getPushNotification().getUrl(); + if (url.isEmpty()) { + throw new IllegalArgumentException("URL not specified in push_notification behavior"); + } + if (!url.startsWith("http://") && !url.startsWith("https://")) { + url = "http://" + url; + } + configBuilder.setTaskPushNotificationConfig( + TaskPushNotificationConfig.builder() + .id(UUID.randomUUID().toString()) + .url(url + "/notifications") + .token("itk-token") + .build()); + } + + var clientBuilder = Client.builder(remoteCard) + .clientConfig(configBuilder.build()); + + addTransport(clientBuilder, call.getTransport()); + + byte[] instBytes = call.getInstruction().toByteArray(); + Message wrappedMsg = Message.builder() + .role(Message.Role.ROLE_USER) + .parts(List.of(new FilePart( + new FileWithBytes("application/x-protobuf", "instruction.bin", instBytes)))) + .build(); + + CompletableFuture> resultFuture = new CompletableFuture<>(); + List responses = Collections.synchronizedList(new ArrayList<>()); + + clientBuilder.addConsumer((event, card) -> { + List texts = extractTextFromEvent(event); + if (call.hasResubscribe()) { + String finished = findTaskFinishedText(texts); + if (finished != null) { + responses.add(finished); + if (!resultFuture.isDone()) { + resultFuture.complete(responses); + } + return; + } + } + responses.addAll(texts); + if (!resultFuture.isDone() && isTerminalEvent(event)) { + resultFuture.complete(responses); + } + }); + clientBuilder.streamingErrorHandler(error -> { + log.error("Streaming error calling {}", call.getAgentCardUri(), error); + if (!resultFuture.isDone()) { + resultFuture.completeExceptionally(error); + } + }); + + try (Client client = clientBuilder.build()) { + client.sendMessage(wrappedMsg, (TaskPushNotificationConfig) null, null, null); + log.info("Received responses from {}", call.getAgentCardUri()); + return resultFuture.get(TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + } + + private List extractTextFromMessage(Message message) { + List texts = new ArrayList<>(); + if (message != null && message.parts() != null) { + for (Part part : message.parts()) { + if (part instanceof TextPart tp && tp.text() != null && !tp.text().isEmpty()) { + texts.add(tp.text()); + } + } + } + return texts; + } + + private List extractTextFromEvent(ClientEvent event) { + Message message = null; + + if (event instanceof MessageEvent me) { + message = me.getMessage(); + } else if (event instanceof TaskEvent te) { + if (te.getTask().status() != null && te.getTask().status().message() != null) { + message = te.getTask().status().message(); + } + } else if (event instanceof TaskUpdateEvent tue) { + if (tue.getTask().status() != null && tue.getTask().status().message() != null) { + message = tue.getTask().status().message(); + } + } + + return extractTextFromMessage(message); + } + + private String findTaskFinishedText(List texts) { + for (String text : texts) { + if (text.contains("task-finished")) { + return text.replace("task-finished", ""); + } + } + return null; + } + + private boolean shouldHold(Instruction inst) { + if (inst.hasReturnResponse() && inst.getReturnResponse().getHoldTask()) { + return true; + } + if (inst.hasSteps()) { + for (Instruction step : inst.getSteps().getInstructionsList()) { + if (shouldHold(step)) { + return true; + } + } + } + return false; + } + + private boolean isTerminalEvent(ClientEvent event) { + if (event instanceof MessageEvent) { + return true; + } + if (event instanceof TaskEvent te) { + return te.getTask().status() != null && te.getTask().status().state().isFinal(); + } + if (event instanceof TaskUpdateEvent tue) { + return tue.getTask().status() != null && tue.getTask().status().state().isFinal(); + } + return false; + } + + private boolean isGrpc(String transport) { + return "GRPC".equalsIgnoreCase(transport); + } + + @SuppressWarnings("unchecked") + private void addTransport(org.a2aproject.sdk.client.ClientBuilder builder, String transport) { + switch (transport.toUpperCase()) { + case "GRPC" -> builder.withTransport(GrpcTransport.class, + new GrpcTransportConfigBuilder() + .channelFactory(url -> ManagedChannelBuilder.forTarget(url) + .usePlaintext() + .build())); + case "REST", "HTTP_JSON", "HTTP+JSON" -> builder.withTransport( + RestTransport.class, new RestTransportConfigBuilder()); + default -> builder.withTransport(JSONRPCTransport.class, + new JSONRPCTransportConfigBuilder()); + } + } + } +} diff --git a/itk/src/main/java/org/a2aproject/sdk/itk/Main.java b/itk/src/main/java/org/a2aproject/sdk/itk/Main.java new file mode 100644 index 000000000..67b677431 --- /dev/null +++ b/itk/src/main/java/org/a2aproject/sdk/itk/Main.java @@ -0,0 +1,45 @@ +package org.a2aproject.sdk.itk; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +public class Main { + + public static void main(String[] args) throws IOException, InterruptedException { + String httpPort = "10102"; + String grpcPort = "11002"; + + for (int i = 0; i < args.length; i++) { + if ("--httpPort".equals(args[i]) && i + 1 < args.length) { + httpPort = args[++i]; + } else if ("--grpcPort".equals(args[i]) && i + 1 < args.length) { + grpcPort = args[++i]; + } + } + + Path jarPath = Path.of("target", "quarkus-app", "quarkus-run.jar"); + if (!jarPath.toFile().exists()) { + System.err.println("quarkus-run.jar not found at " + jarPath.toAbsolutePath()); + System.exit(1); + } + + List command = new ArrayList<>(); + command.add(ProcessHandle.current().info().command().orElse("java")); + command.add("-Dquarkus.http.port=" + httpPort); + command.add("-Dquarkus.grpc.server.port=" + grpcPort); + command.add("-jar"); + command.add(jarPath.toString()); + + Process process = new ProcessBuilder(command) + .inheritIO() + .directory(new File(".")) + .start(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> process.destroyForcibly())); + + System.exit(process.waitFor()); + } +} diff --git a/itk/src/main/proto/instruction.proto b/itk/src/main/proto/instruction.proto new file mode 100644 index 000000000..c607567aa --- /dev/null +++ b/itk/src/main/proto/instruction.proto @@ -0,0 +1,60 @@ +syntax = "proto3"; + +package itk; + + +message Instruction { + // what should the "agent" do + oneof step { + CallAgent call_agent = 1; + ReturnResponse return_response = 2; + SeriesOfSteps steps = 3; + } +} + +// Behavior messages +message SendMessageBehavior {} +message PushNotificationBehavior { + string url = 1; +} +message ResubscribeBehavior {} + +// gets the agent card from a remote agent, calls it and returns the response +message CallAgent { + // which transport use to get agent card and later call + string transport = 1; + // where to get the agent card from + string agent_card_uri = 2; + // instruction for the called agent + Instruction instruction = 3; + // whether to use streaming + bool streaming = 4; + + // behavior mode + oneof behavior { + SendMessageBehavior send_message = 5; + PushNotificationBehavior push_notification = 6; + ResubscribeBehavior resubscribe = 7; + } +} + +// this option just returns a response +message ReturnResponse { + // what to return to the calling agent + string response = 1; + // whether to hold the task in WORKING state instead of completing it + bool hold_task = 2; +} + +// executes step series and returns response based on response_generator +message SeriesOfSteps { + repeated Instruction instructions = 1; + + enum ResponseGenerator { + // default + RESPONSE_GENERATOR_UNSPECIFIED = 0; + RESPONSE_GENERATOR_CONCAT = 1; + } + + ResponseGenerator response_generator = 2; +} diff --git a/itk/src/main/resources/application.properties b/itk/src/main/resources/application.properties new file mode 100644 index 000000000..c13757d9c --- /dev/null +++ b/itk/src/main/resources/application.properties @@ -0,0 +1,8 @@ +quarkus.http.port=10102 +quarkus.grpc.server.port=11002 + +# File logging +quarkus.log.file.enable=true +quarkus.log.file.path=/tmp/java-current.log +quarkus.log.file.level=DEBUG +quarkus.log.file.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%c{3.}] (%t) %s%e%n diff --git a/pom.xml b/pom.xml index 062f8cfa1..42fe8b103 100644 --- a/pom.xml +++ b/pom.xml @@ -599,6 +599,9 @@ transport/grpc transport/rest + + itk + compat-0.3