diff --git a/packages/eep-compliance-cli-python/eep_compliance_cli/__init__.py b/packages/eep-compliance-cli-python/eep_compliance_cli/__init__.py index 655383d..0e7c04b 100644 --- a/packages/eep-compliance-cli-python/eep_compliance_cli/__init__.py +++ b/packages/eep-compliance-cli-python/eep_compliance_cli/__init__.py @@ -13,9 +13,6 @@ from __future__ import annotations import argparse -import hashlib -import hmac as hmac_mod -import base64 import json import sys import time @@ -32,6 +29,7 @@ validate_cloudevents_envelope, validate_eep_extensions, check_webhook_headers, + verify_webhook_signature, ) @@ -39,6 +37,10 @@ _received_webhook: Optional[Dict[str, Any]] = None _received_headers: Optional[Dict[str, str]] = None +# The exact request-body bytes (decoded as UTF-8) the sender hashed. Kept +# separately from the parsed JSON: HMAC verification must use these bytes, not +# a re-serialization of the parse (json.dumps reorders keys / changes spacing). +_received_raw_body: Optional[str] = None class _WebhookHandler(BaseHTTPRequestHandler): @@ -57,10 +59,11 @@ def do_GET(self) -> None: self.wfile.write(b"OK") def do_POST(self) -> None: - global _received_webhook, _received_headers + global _received_webhook, _received_headers, _received_raw_body length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(length).decode("utf-8") _received_headers = {k.lower(): v for k, v in self.headers.items()} + _received_raw_body = body try: _received_webhook = json.loads(body) except json.JSONDecodeError: @@ -177,10 +180,11 @@ def main() -> None: print(" ⚪ Subscription creation (skipped)") # 4. Webhook delivery + verification - global _received_webhook, _received_headers + global _received_webhook, _received_headers, _received_raw_body if subscription_id and webhook_secret: _received_webhook = None _received_headers = None + _received_raw_body = None try: client.post( @@ -206,23 +210,23 @@ def main() -> None: runner.fail("Standard Webhooks headers present", f"missing: {', '.join(wh['missing'])}") print(f" ❌ Standard Webhooks headers: missing {wh['missing']}") - # Verify HMAC + # Verify HMAC over the exact received bytes (never a re-serialized + # parse — that is the anti-pattern the TS CLI warns against and the + # reason this check used to fail against compliant publishers). if wh["hasSignature"]: - wid = _received_headers.get("webhook-id", "") - wts = _received_headers.get("webhook-timestamp", "") - wsig = _received_headers.get("webhook-signature", "") - raw = json.dumps(_received_webhook) - signed = f"{wid}.{wts}.{raw}" - expected = base64.b64encode( - hmac_mod.new(webhook_secret.encode(), signed.encode(), hashlib.sha256).digest() - ).decode() - sig_b64 = wsig.replace("v1,", "") - if hmac_mod.compare_digest(expected, sig_b64): + result = verify_webhook_signature( + webhook_id=_received_headers.get("webhook-id", ""), + timestamp=_received_headers.get("webhook-timestamp", ""), + raw_body=_received_raw_body or "", + secret=webhook_secret, + signature_header=_received_headers.get("webhook-signature", ""), + ) + if result["valid"]: runner.pass_("HMAC-SHA256 signature is valid") print(" ✅ HMAC-SHA256 signature is valid") else: - runner.fail("HMAC-SHA256 signature is valid", "signature mismatch") - print(" ❌ HMAC-SHA256 signature mismatch") + runner.fail("HMAC-SHA256 signature is valid", result["reason"]) + print(f" ❌ HMAC-SHA256 signature: {result['reason']}") # CloudEvents ce_missing = validate_cloudevents_envelope(_received_webhook) diff --git a/packages/eep-compliance-cli-python/eep_compliance_cli/helpers.py b/packages/eep-compliance-cli-python/eep_compliance_cli/helpers.py index ed1bd5e..889bace 100644 --- a/packages/eep-compliance-cli-python/eep_compliance_cli/helpers.py +++ b/packages/eep-compliance-cli-python/eep_compliance_cli/helpers.py @@ -7,6 +7,10 @@ from __future__ import annotations +import base64 +import binascii +import hashlib +import hmac import re from dataclasses import dataclass, field from typing import Any, Dict, List, Optional @@ -98,6 +102,61 @@ def validate_eep_extensions(event: Dict[str, Any]) -> List[str]: return missing +def verify_webhook_signature( + webhook_id: str, + timestamp: str, + raw_body: str, + secret: str, + signature_header: str, +) -> Dict[str, Any]: + """Verify a Standard Webhooks v1 HMAC-SHA256 signature. + + Python port of ``verifyWebhookSignature`` in + ``@eep-dev/compliance-cli/src/helpers.ts`` (EEP SPECIFICATION.md §5.3). + + The signed content is ``f"{webhook_id}.{timestamp}.{raw_body}"`` where + ``raw_body`` MUST be the exact bytes the sender hashed — never a + re-serialized parse of the JSON. Re-serializing (``json.dumps`` of a parsed + body) changes key order, whitespace, and unicode escaping, so every + signature would falsely fail. + + The header MAY carry space-separated tokens (secret rotation), e.g. + ``v1,sigA v1,sigB``; non-``v1`` schemes are ignored. Comparison is + timing-safe on the raw digest bytes. + + Returns ``{"valid": bool, "reason": str}`` where ``reason`` is one of: + ``ok``, ``ok_via_multi_signature``, ``missing_secret``, + ``malformed_header``, ``no_v1_token``, ``signature_mismatch``. + """ + if not secret: + return {"valid": False, "reason": "missing_secret"} + if not isinstance(signature_header, str) or signature_header == "": + return {"valid": False, "reason": "malformed_header"} + + signed_content = f"{webhook_id}.{timestamp}.{raw_body}" + expected = hmac.new(secret.encode("utf-8"), signed_content.encode("utf-8"), hashlib.sha256).digest() + + tokens = [t for t in signature_header.split(" ") if t] + v1_tokens = [t for t in tokens if t.startswith("v1,")] + if not v1_tokens: + return {"valid": False, "reason": "no_v1_token"} + + for token in v1_tokens: + try: + incoming = base64.b64decode(token[len("v1,"):], validate=True) + except (binascii.Error, ValueError): + continue + if len(incoming) != len(expected): + continue + if hmac.compare_digest(incoming, expected): + return { + "valid": True, + "reason": "ok_via_multi_signature" if len(v1_tokens) > 1 else "ok", + } + + return {"valid": False, "reason": "signature_mismatch"} + + def check_webhook_headers(headers: Dict[str, Optional[str]]) -> Dict[str, Any]: """Check if Standard Webhooks headers are present.""" has_id = bool(headers.get("webhook-id")) diff --git a/packages/eep-compliance-cli-python/tests/test_helpers.py b/packages/eep-compliance-cli-python/tests/test_helpers.py index 119c9f4..94a7679 100644 --- a/packages/eep-compliance-cli-python/tests/test_helpers.py +++ b/packages/eep-compliance-cli-python/tests/test_helpers.py @@ -1,6 +1,10 @@ # Copyright 2026 EEP Contributors — Apache-2.0 """Tests for eep_compliance_cli — Python port of @eep-dev/compliance-cli.""" +import base64 +import hashlib +import hmac + from eep_compliance_cli.helpers import ( TestRunner, normalize_target, @@ -8,9 +12,17 @@ validate_cloudevents_envelope, validate_eep_extensions, check_webhook_headers, + verify_webhook_signature, ) +def _sign(secret: str, webhook_id: str, timestamp: str, raw_body: str) -> str: + """Produce a Standard Webhooks v1 token over the exact raw bytes.""" + signed = f"{webhook_id}.{timestamp}.{raw_body}" + digest = hmac.new(secret.encode(), signed.encode(), hashlib.sha256).digest() + return "v1," + base64.b64encode(digest).decode() + + class TestTestRunner: def test_pass_fail_skip(self): r = TestRunner() @@ -104,6 +116,58 @@ def test_missing(self): assert "eep_version" in validate_eep_extensions({}) +class TestVerifyWebhookSignature: + SECRET = "whsec_test" + WID = "msg_abc" + TS = "1700000000" + # Raw body whose re-serialization would differ (spacing + key order), + # which is exactly why the verifier must hash the raw bytes, not a parse. + RAW = '{"type":"com.example.entity.updated", "id":"evt_1"}' + + def test_valid_signature_over_raw_bytes(self): + sig = _sign(self.SECRET, self.WID, self.TS, self.RAW) + result = verify_webhook_signature(self.WID, self.TS, self.RAW, self.SECRET, sig) + assert result == {"valid": True, "reason": "ok"} + + def test_reserialized_body_would_fail(self): + # Demonstrates the original bug: signing the raw bytes but verifying a + # re-serialized parse no longer matches. + import json + + sig = _sign(self.SECRET, self.WID, self.TS, self.RAW) + reserialized = json.dumps(json.loads(self.RAW)) + assert reserialized != self.RAW + result = verify_webhook_signature(self.WID, self.TS, reserialized, self.SECRET, sig) + assert result["valid"] is False + assert result["reason"] == "signature_mismatch" + + def test_multi_signature_rotation(self): + good = _sign(self.SECRET, self.WID, self.TS, self.RAW) + header = f"v1,AAAA {good}" # first token is wrong length / bogus + result = verify_webhook_signature(self.WID, self.TS, self.RAW, self.SECRET, header) + assert result == {"valid": True, "reason": "ok_via_multi_signature"} + + def test_missing_secret(self): + assert verify_webhook_signature(self.WID, self.TS, self.RAW, "", "v1,x") == { + "valid": False, + "reason": "missing_secret", + } + + def test_malformed_header(self): + assert verify_webhook_signature(self.WID, self.TS, self.RAW, self.SECRET, "")[ + "reason" + ] == "malformed_header" + + def test_no_v1_token(self): + assert verify_webhook_signature(self.WID, self.TS, self.RAW, self.SECRET, "v2,abc")[ + "reason" + ] == "no_v1_token" + + def test_signature_mismatch(self): + result = verify_webhook_signature(self.WID, self.TS, self.RAW, self.SECRET, "v1,AAAA") + assert result == {"valid": False, "reason": "signature_mismatch"} + + class TestWebhookHeaders: def test_all_present(self): headers = {