Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions packages/eep-compliance-cli-python/eep_compliance_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
from __future__ import annotations

import argparse
import hashlib
import hmac as hmac_mod
import base64
import json
import sys
import time
Expand All @@ -32,13 +29,18 @@
validate_cloudevents_envelope,
validate_eep_extensions,
check_webhook_headers,
verify_webhook_signature,
)


# ── Webhook Receiver ───────────────────────────────────────────────────────────

_received_webhook: Optional[Dict[str, Any]] = None
_received_headers: Optional[Dict[str, str]] = None
# The exact request-body bytes (decoded as UTF-8) the sender hashed. Kept
# separately from the parsed JSON: HMAC verification must use these bytes, not
# a re-serialization of the parse (json.dumps reorders keys / changes spacing).
_received_raw_body: Optional[str] = None


class _WebhookHandler(BaseHTTPRequestHandler):
Expand All @@ -57,10 +59,11 @@ def do_GET(self) -> None:
self.wfile.write(b"OK")

def do_POST(self) -> None:
global _received_webhook, _received_headers
global _received_webhook, _received_headers, _received_raw_body
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length).decode("utf-8")
_received_headers = {k.lower(): v for k, v in self.headers.items()}
_received_raw_body = body
try:
_received_webhook = json.loads(body)
except json.JSONDecodeError:
Expand Down Expand Up @@ -177,10 +180,11 @@ def main() -> None:
print(" ⚪ Subscription creation (skipped)")

# 4. Webhook delivery + verification
global _received_webhook, _received_headers
global _received_webhook, _received_headers, _received_raw_body
if subscription_id and webhook_secret:
_received_webhook = None
_received_headers = None
_received_raw_body = None

try:
client.post(
Expand All @@ -206,23 +210,23 @@ def main() -> None:
runner.fail("Standard Webhooks headers present", f"missing: {', '.join(wh['missing'])}")
print(f" ❌ Standard Webhooks headers: missing {wh['missing']}")

# Verify HMAC
# Verify HMAC over the exact received bytes (never a re-serialized
# parse — that is the anti-pattern the TS CLI warns against and the
# reason this check used to fail against compliant publishers).
if wh["hasSignature"]:
wid = _received_headers.get("webhook-id", "")
wts = _received_headers.get("webhook-timestamp", "")
wsig = _received_headers.get("webhook-signature", "")
raw = json.dumps(_received_webhook)
signed = f"{wid}.{wts}.{raw}"
expected = base64.b64encode(
hmac_mod.new(webhook_secret.encode(), signed.encode(), hashlib.sha256).digest()
).decode()
sig_b64 = wsig.replace("v1,", "")
if hmac_mod.compare_digest(expected, sig_b64):
result = verify_webhook_signature(
webhook_id=_received_headers.get("webhook-id", ""),
timestamp=_received_headers.get("webhook-timestamp", ""),
raw_body=_received_raw_body or "",
secret=webhook_secret,
signature_header=_received_headers.get("webhook-signature", ""),
)
if result["valid"]:
runner.pass_("HMAC-SHA256 signature is valid")
print(" ✅ HMAC-SHA256 signature is valid")
else:
runner.fail("HMAC-SHA256 signature is valid", "signature mismatch")
print(" ❌ HMAC-SHA256 signature mismatch")
runner.fail("HMAC-SHA256 signature is valid", result["reason"])
print(f" ❌ HMAC-SHA256 signature: {result['reason']}")

# CloudEvents
ce_missing = validate_cloudevents_envelope(_received_webhook)
Expand Down
59 changes: 59 additions & 0 deletions packages/eep-compliance-cli-python/eep_compliance_cli/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

from __future__ import annotations

import base64
import binascii
import hashlib
import hmac
import re
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
Expand Down Expand Up @@ -98,6 +102,61 @@ def validate_eep_extensions(event: Dict[str, Any]) -> List[str]:
return missing


def verify_webhook_signature(
webhook_id: str,
timestamp: str,
raw_body: str,
secret: str,
signature_header: str,
) -> Dict[str, Any]:
"""Verify a Standard Webhooks v1 HMAC-SHA256 signature.

Python port of ``verifyWebhookSignature`` in
``@eep-dev/compliance-cli/src/helpers.ts`` (EEP SPECIFICATION.md §5.3).

The signed content is ``f"{webhook_id}.{timestamp}.{raw_body}"`` where
``raw_body`` MUST be the exact bytes the sender hashed — never a
re-serialized parse of the JSON. Re-serializing (``json.dumps`` of a parsed
body) changes key order, whitespace, and unicode escaping, so every
signature would falsely fail.

The header MAY carry space-separated tokens (secret rotation), e.g.
``v1,sigA v1,sigB``; non-``v1`` schemes are ignored. Comparison is
timing-safe on the raw digest bytes.

Returns ``{"valid": bool, "reason": str}`` where ``reason`` is one of:
``ok``, ``ok_via_multi_signature``, ``missing_secret``,
``malformed_header``, ``no_v1_token``, ``signature_mismatch``.
"""
if not secret:
return {"valid": False, "reason": "missing_secret"}
if not isinstance(signature_header, str) or signature_header == "":
return {"valid": False, "reason": "malformed_header"}

signed_content = f"{webhook_id}.{timestamp}.{raw_body}"
expected = hmac.new(secret.encode("utf-8"), signed_content.encode("utf-8"), hashlib.sha256).digest()

tokens = [t for t in signature_header.split(" ") if t]
v1_tokens = [t for t in tokens if t.startswith("v1,")]
if not v1_tokens:
return {"valid": False, "reason": "no_v1_token"}

for token in v1_tokens:
try:
incoming = base64.b64decode(token[len("v1,"):], validate=True)
except (binascii.Error, ValueError):
continue
if len(incoming) != len(expected):
continue
if hmac.compare_digest(incoming, expected):
return {
"valid": True,
"reason": "ok_via_multi_signature" if len(v1_tokens) > 1 else "ok",
}

return {"valid": False, "reason": "signature_mismatch"}


def check_webhook_headers(headers: Dict[str, Optional[str]]) -> Dict[str, Any]:
"""Check if Standard Webhooks headers are present."""
has_id = bool(headers.get("webhook-id"))
Expand Down
64 changes: 64 additions & 0 deletions packages/eep-compliance-cli-python/tests/test_helpers.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
# Copyright 2026 EEP Contributors — Apache-2.0
"""Tests for eep_compliance_cli — Python port of @eep-dev/compliance-cli."""

import base64
import hashlib
import hmac

from eep_compliance_cli.helpers import (
TestRunner,
normalize_target,
validate_args,
validate_cloudevents_envelope,
validate_eep_extensions,
check_webhook_headers,
verify_webhook_signature,
)


def _sign(secret: str, webhook_id: str, timestamp: str, raw_body: str) -> str:
"""Produce a Standard Webhooks v1 token over the exact raw bytes."""
signed = f"{webhook_id}.{timestamp}.{raw_body}"
digest = hmac.new(secret.encode(), signed.encode(), hashlib.sha256).digest()
return "v1," + base64.b64encode(digest).decode()


class TestTestRunner:
def test_pass_fail_skip(self):
r = TestRunner()
Expand Down Expand Up @@ -104,6 +116,58 @@ def test_missing(self):
assert "eep_version" in validate_eep_extensions({})


class TestVerifyWebhookSignature:
SECRET = "whsec_test"
WID = "msg_abc"
TS = "1700000000"
# Raw body whose re-serialization would differ (spacing + key order),
# which is exactly why the verifier must hash the raw bytes, not a parse.
RAW = '{"type":"com.example.entity.updated", "id":"evt_1"}'

def test_valid_signature_over_raw_bytes(self):
sig = _sign(self.SECRET, self.WID, self.TS, self.RAW)
result = verify_webhook_signature(self.WID, self.TS, self.RAW, self.SECRET, sig)
assert result == {"valid": True, "reason": "ok"}

def test_reserialized_body_would_fail(self):
# Demonstrates the original bug: signing the raw bytes but verifying a
# re-serialized parse no longer matches.
import json

sig = _sign(self.SECRET, self.WID, self.TS, self.RAW)
reserialized = json.dumps(json.loads(self.RAW))
assert reserialized != self.RAW
result = verify_webhook_signature(self.WID, self.TS, reserialized, self.SECRET, sig)
assert result["valid"] is False
assert result["reason"] == "signature_mismatch"

def test_multi_signature_rotation(self):
good = _sign(self.SECRET, self.WID, self.TS, self.RAW)
header = f"v1,AAAA {good}" # first token is wrong length / bogus
result = verify_webhook_signature(self.WID, self.TS, self.RAW, self.SECRET, header)
assert result == {"valid": True, "reason": "ok_via_multi_signature"}

def test_missing_secret(self):
assert verify_webhook_signature(self.WID, self.TS, self.RAW, "", "v1,x") == {
"valid": False,
"reason": "missing_secret",
}

def test_malformed_header(self):
assert verify_webhook_signature(self.WID, self.TS, self.RAW, self.SECRET, "")[
"reason"
] == "malformed_header"

def test_no_v1_token(self):
assert verify_webhook_signature(self.WID, self.TS, self.RAW, self.SECRET, "v2,abc")[
"reason"
] == "no_v1_token"

def test_signature_mismatch(self):
result = verify_webhook_signature(self.WID, self.TS, self.RAW, self.SECRET, "v1,AAAA")
assert result == {"valid": False, "reason": "signature_mismatch"}


class TestWebhookHeaders:
def test_all_present(self):
headers = {
Expand Down