diff --git a/Cargo.lock b/Cargo.lock index b65c3ffb..0948562d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5649,6 +5649,7 @@ dependencies = [ "prost-types 0.14.3", "rand 0.8.6", "regex", + "reqwest 0.13.3", "serde", "serde_json", "test-case", @@ -5658,6 +5659,7 @@ dependencies = [ "tower", "tracing", "tree_hash", + "url", "vise", "wiremock", ] diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 741f1482..2d71650c 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -23,6 +23,7 @@ pluto-featureset.workspace = true prost.workspace = true prost-types.workspace = true regex.workspace = true +reqwest = { workspace = true, features = ["stream"] } serde.workspace = true serde_json.workspace = true base64.workspace = true @@ -34,6 +35,7 @@ pluto-eth2util.workspace = true pluto-ssz.workspace = true ssz.workspace = true tree_hash.workspace = true +url.workspace = true [dev-dependencies] anyhow.workspace = true diff --git a/crates/core/src/ssz_codec.rs b/crates/core/src/ssz_codec.rs index 2ae992d9..0a20939b 100644 --- a/crates/core/src/ssz_codec.rs +++ b/crates/core/src/ssz_codec.rs @@ -449,6 +449,57 @@ fn encode_proposal_block(block: &versioned::SignedProposalBlock) -> Result Result { + decode_proposal_block(version, false, bytes) +} + +/// Decodes a bare per-fork blinded signed proposal block body from SSZ binary, +/// selecting the variant by `version`. +/// +/// The raw beacon-API SSZ block body posted to +/// `/eth/v{1,2}/beacon/blinded_blocks`; the fork is taken from the +/// `Eth-Consensus-Version` request header. +pub fn decode_signed_blinded_proposal_block_body( + version: DataVersion, + bytes: &[u8], +) -> Result { + use versioned::SignedBlindedProposalBlock; + Ok(match version { + DataVersion::Bellatrix => SignedBlindedProposalBlock::Bellatrix( + bellatrix::SignedBlindedBeaconBlock::from_ssz_bytes(bytes)?, + ), + DataVersion::Capella => SignedBlindedProposalBlock::Capella( + capella::SignedBlindedBeaconBlock::from_ssz_bytes(bytes)?, + ), + DataVersion::Deneb => SignedBlindedProposalBlock::Deneb( + deneb::SignedBlindedBeaconBlock::from_ssz_bytes(bytes)?, + ), + DataVersion::Electra => SignedBlindedProposalBlock::Electra( + electra::SignedBlindedBeaconBlock::from_ssz_bytes(bytes)?, + ), + // Fulu blinded blocks share the Electra layout. + DataVersion::Fulu => SignedBlindedProposalBlock::Fulu( + electra::SignedBlindedBeaconBlock::from_ssz_bytes(bytes)?, + ), + DataVersion::Phase0 | DataVersion::Altair | DataVersion::Unknown => { + return Err(SszCodecError::UnknownVersion( + version.to_legacy_u64().unwrap_or(u64::MAX), + )); + } + }) +} + fn decode_proposal_block( version: DataVersion, blinded: bool, diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 1f9c099e..5230c629 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -10,8 +10,9 @@ use async_trait::async_trait; use axum::http::StatusCode; use pluto_eth2api::{ EthBeaconNodeApiClient, GetAttesterDutiesRequest, GetAttesterDutiesResponse, - GetProposerDutiesRequest, GetProposerDutiesResponse, GetSyncCommitteeDutiesRequest, - GetSyncCommitteeDutiesResponse, + GetProposerDutiesRequest, GetProposerDutiesResponse, GetStateValidatorsResponseResponse, + GetSyncCommitteeDutiesRequest, GetSyncCommitteeDutiesResponse, PostStateValidatorsRequest, + PostStateValidatorsRequestPath, PostStateValidatorsResponse, ValidatorRequestBody, spec::phase0::{BLSPubKey, Epoch, Root, ValidatorIndex}, valcache::{ActiveValidators, CachedValidatorsProvider}, versioned::{DataVersion, SignedBlindedProposalBlock, SignedProposalBlock}, @@ -38,7 +39,9 @@ use super::{ use crate::{ dutydb::{Error as DutyDbError, MemDB}, signeddata::{ - SignedDataError, SignedRandao, SyncContribution, VersionedAggregatedAttestation, + SignedDataError, SignedRandao, SignedSyncContributionAndProof, SignedSyncMessage, + SyncCommitteeSelection as SyncCommitteeSelectionData, SyncContribution, + SyncContributionAndProof, VersionedAggregatedAttestation, VersionedProposal as UnsignedVersionedProposal, }, types::{Duty, DutyDefinitionSet, ParSignedDataSet, PubKey, Signature, SignedData, SlotNumber}, @@ -133,6 +136,13 @@ const ATTESTATION_DATA_TIMEOUT: Duration = Duration::from_secs(24); /// cannot park a tokio task indefinitely. const PROPOSAL_TIMEOUT: Duration = Duration::from_secs(24); +/// Hard deadline for the sync-committee submit / selection handler bodies. +/// Bounds the active-validators lookup, per-item partial-signature +/// verification (each calling upstream `signing::verify`), the synchronous +/// subscriber fan-out, and — for selections — the aggsigdb await, so a hung +/// upstream or slow subscriber cannot park a tokio task indefinitely. +const SYNC_TIMEOUT: Duration = Duration::from_secs(24); + /// Validator API [`Handler`] implementation. /// /// Holds the upstream beacon-node client and the cluster's public-key / @@ -421,12 +431,11 @@ impl Component { /// against this node's public share for the given DV root pubkey. /// /// The BLS domain / epoch / message-root are passed directly rather - /// than projected through a signed-data trait — each submit handler in - /// later PRs derives the triple from the concrete signed-data wrapper - /// it is processing, then invokes this helper. + /// than projected through a signed-data trait — each submit handler + /// derives the triple from the concrete signed-data wrapper it is + /// processing, then invokes this helper. /// /// Skipped entirely when [`Self::insecure_test`] is set. - #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] #[instrument(skip_all, fields(domain = ?domain_name, epoch))] pub async fn verify_partial_sig( &self, @@ -459,6 +468,49 @@ impl Component { Ok(()) } + + /// Verifies the BLS signature carried by `par_sig` against `pubkey` + /// directly (not a share), for the given domain and epoch. + /// + /// Used by `submit_sync_committee_contributions` to check the inner + /// selection-proof against the aggregator's full validator public key. + /// Skipped entirely when [`Self::insecure_test`] is set. + async fn verify_full_pubkey_sig( + &self, + pubkey: &BLSPubKey, + domain_name: DomainName, + epoch: Epoch, + message_root: Root, + signature: &Signature, + ) -> Result<(), SigningError> { + if self.insecure_test { + return Ok(()); + } + + signing::verify( + &self.eth2_cl, + domain_name, + epoch, + message_root, + signature, + pubkey, + ) + .await + } + + /// Fans the validated partial-signed-data `set` out to every registered + /// subscriber for `duty`. Each subscriber receives its own clone of the + /// set (the registered wrapper clones once). A subscriber failure aborts + /// the fan-out and surfaces as `500`. + async fn broadcast(&self, duty: &Duty, set: &ParSignedDataSet) -> Result<(), ApiError> { + for sub in &self.subs { + sub(duty, set).await.map_err(|err| { + ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, "subscriber failed") + .with_boxed_source(err) + })?; + } + Ok(()) + } } /// Errors returned by [`Component::verify_partial_sig`]. @@ -873,17 +925,188 @@ impl Handler for Component { #[instrument(skip_all)] async fn sync_committee_selections( &self, - _selections: Vec, + selections: Vec, ) -> Result>, ApiError> { - unimplemented!("sync_committee_selections not yet ported") + tokio::time::timeout(SYNC_TIMEOUT, async { + let vals = self.fetch_active_validators().await?; + + let await_agg_sig_db = self.await_agg_sig_db_fn.as_ref().ok_or_else(|| { + ApiError::new( + StatusCode::SERVICE_UNAVAILABLE, + "await aggsigdb hook not registered", + ) + })?; + + let mut psigs_by_slot: HashMap = HashMap::new(); + for selection in selections { + let slot = selection.slot; + + let (pubkey, root_bls) = + resolve_active_validator(&vals, selection.validator_index)?; + + let epoch = pluto_eth2util::helpers::epoch_from_slot(&self.eth2_cl, slot) + .await + .map_err(epoch_lookup_error)?; + + let par_sig = SyncCommitteeSelectionData::new_partial(selection, self.share_idx); + let signature = par_sig + .signed_data + .signature() + .map_err(message_root_error)?; + let message_root = par_sig + .signed_data + .message_root() + .map_err(message_root_error)?; + self.verify_partial_sig( + &root_bls, + DomainName::SyncCommitteeSelectionProof, + epoch, + message_root, + &signature, + ) + .await + .map_err(verify_partial_sig_error)?; + + psigs_by_slot + .entry(slot) + .or_default() + .insert(pubkey, par_sig); + } + + for (slot, set) in &psigs_by_slot { + let duty = Duty::new_prepare_sync_contribution_duty(SlotNumber::new(*slot)); + self.broadcast(&duty, set).await?; + } + + // Collect the aggregated selection for each (duty, pubkey) from the + // aggsigdb. Each await blocks until the threshold aggregation lands. + let mut resp: Vec = Vec::new(); + for (slot, set) in &psigs_by_slot { + let duty = Duty::new_prepare_sync_contribution_duty(SlotNumber::new(*slot)); + for pubkey in set.inner().keys() { + let aggregated = + await_agg_sig_db(duty.clone(), *pubkey) + .await + .map_err(|err| { + ApiError::new( + StatusCode::SERVICE_UNAVAILABLE, + "await aggsigdb hook failed", + ) + .with_boxed_source(err) + })?; + + let selection = (&*aggregated as &dyn Any) + .downcast_ref::() + .ok_or_else(|| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "invalid sync committee selection", + ) + })?; + + resp.push(selection.0.clone()); + } + } + + Ok(EthResponse { + data: resp, + execution_optimistic: false, + finalized: false, + dependent_root: None, + }) + }) + .await + .map_err(|_: Elapsed| sync_timeout())? } #[instrument(skip_all)] async fn validators( &self, - _opts: ValidatorsOpts, + opts: ValidatorsOpts, ) -> Result>, ApiError> { - unimplemented!("validators not yet ported") + // The VC sends share pubkeys (one per DV root). Translate each share + // back to the cluster's root pubkey before forwarding upstream, since + // the beacon node only knows the root keys. An empty `pubkeys` is + // forwarded as `None` so the upstream is not artificially narrowed. + // + // Port of `Validators` in + // `core/validatorapi/validatorapi.go` (lines 1218–1296). + let pubkey_by_share = invert_pub_share_map(&self.pub_share_by_pubkey); + + let mut root_pubkeys: Vec = Vec::with_capacity(opts.pubkeys.len()); + for share in &opts.pubkeys { + let root = pubkey_by_share.get(share).ok_or_else(|| { + // Mirrors the Go `getPubKeyFunc` "unknown public key" branch. + ApiError::new( + StatusCode::BAD_REQUEST, + "unknown validator public key in request", + ) + })?; + root_pubkeys.push(format_bls_pubkey(root)); + } + + // Upstream's `id` field accepts either a pubkey hex string or a + // decimal validator-index string — both go in the same `ids` array. + let mut ids: Vec = root_pubkeys; + ids.extend(opts.indices.iter().map(|idx| idx.to_string())); + + let request = PostStateValidatorsRequest { + path: PostStateValidatorsRequestPath { + state_id: opts.state.clone(), + }, + body: ValidatorRequestBody { + ids: if ids.is_empty() { None } else { Some(ids) }, + // Status filter is not exposed by Pluto's `ValidatorsOpts`; the + // Go reference also omits it from the upstream call. + statuses: None, + }, + }; + + let response = tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.eth2_cl.post_state_validators(request), + ) + .await + .map_err(|_| upstream_timeout("validators"))? + .map_err(|err| upstream_call_failed("validators", err.into()))?; + + let payload: GetStateValidatorsResponseResponse = match response { + PostStateValidatorsResponse::Ok(payload) => payload, + PostStateValidatorsResponse::BadRequest(body) => { + return Err(upstream_status_error( + StatusCode::BAD_REQUEST, + "validators", + body, + )); + } + PostStateValidatorsResponse::NotFound(body) => { + return Err(upstream_status_error( + StatusCode::NOT_FOUND, + "validators", + body, + )); + } + other @ (PostStateValidatorsResponse::InternalServerError(_) + | PostStateValidatorsResponse::Unknown) => { + return Err(upstream_unexpected("validators", other)); + } + }; + + // The `ignoreNotFound` flag mirrors the Go contract: when the caller + // filtered by indices only (no pubkeys), the upstream may return + // validators that are not part of this cluster's share map, and + // those validators should pass through with their root pubkey + // untouched. Otherwise an unknown pubkey indicates a configuration + // error and is surfaced to the caller. + let ignore_not_found = opts.indices.is_empty(); + let data = convert_validators(payload.data, &self.pub_share_by_pubkey, ignore_not_found)?; + + Ok(EthResponse { + data, + execution_optimistic: payload.execution_optimistic, + finalized: payload.finalized, + dependent_root: None, + }) } #[instrument(skip_all)] @@ -899,28 +1122,169 @@ impl Handler for Component { unimplemented!("submit_voluntary_exit not yet ported") } - #[instrument(skip_all)] + #[instrument(skip_all, fields(slot = opts.slot, subcommittee_index = opts.subcommittee_index))] async fn sync_committee_contribution( &self, - _opts: SyncCommitteeContributionOpts, + opts: SyncCommitteeContributionOpts, ) -> Result, ApiError> { - unimplemented!("sync_committee_contribution not yet ported") + let f = self.await_sync_contribution_fn.as_ref().ok_or_else(|| { + ApiError::new( + StatusCode::SERVICE_UNAVAILABLE, + "await sync contribution hook not registered", + ) + })?; + + let contribution = tokio::time::timeout( + SYNC_TIMEOUT, + f(opts.slot, opts.subcommittee_index, opts.beacon_block_root), + ) + .await + .map_err(|_: Elapsed| sync_timeout())? + .map_err(|err| { + ApiError::new( + StatusCode::SERVICE_UNAVAILABLE, + "await sync contribution hook failed", + ) + .with_boxed_source(err) + })?; + + Ok(EthResponse { + data: contribution.0, + execution_optimistic: false, + finalized: false, + dependent_root: None, + }) } #[instrument(skip_all)] async fn submit_sync_committee_contributions( &self, - _contributions: Vec, + contributions: Vec, ) -> Result<(), ApiError> { - unimplemented!("submit_sync_committee_contributions not yet ported") + tokio::time::timeout(SYNC_TIMEOUT, async { + let vals = self.fetch_active_validators().await?; + + let mut psigs_by_slot: HashMap = HashMap::new(); + for contrib in contributions { + let slot = contrib.message.contribution.slot; + let v_idx = contrib.message.aggregator_index; + + let (pubkey, root_bls) = resolve_active_validator(&vals, v_idx)?; + + let epoch = pluto_eth2util::helpers::epoch_from_slot(&self.eth2_cl, slot) + .await + .map_err(epoch_lookup_error)?; + + // Verify the inner selection proof against the aggregator's + // full validator public key (not this node's share). + let inner = SyncContributionAndProof::new(contrib.message.clone()); + let inner_sig = inner.signature().map_err(message_root_error)?; + let inner_root = inner.message_root().map_err(message_root_error)?; + self.verify_full_pubkey_sig( + &root_bls, + DomainName::SyncCommitteeSelectionProof, + epoch, + inner_root, + &inner_sig, + ) + .await + .map_err(|err| { + ApiError::new( + StatusCode::BAD_REQUEST, + "sync contribution selection proof verification failed", + ) + .with_source(err) + })?; + + // Verify the outer partial signature against this node's share. + let par_sig = SignedSyncContributionAndProof::new_partial(contrib, self.share_idx); + let outer_sig = par_sig + .signed_data + .signature() + .map_err(message_root_error)?; + let outer_root = par_sig + .signed_data + .message_root() + .map_err(message_root_error)?; + self.verify_partial_sig( + &root_bls, + DomainName::ContributionAndProof, + epoch, + outer_root, + &outer_sig, + ) + .await + .map_err(verify_partial_sig_error)?; + + psigs_by_slot + .entry(slot) + .or_default() + .insert(pubkey, par_sig); + } + + for (slot, set) in psigs_by_slot { + let duty = Duty::new_sync_contribution_duty(SlotNumber::new(slot)); + self.broadcast(&duty, &set).await?; + } + + Ok(()) + }) + .await + .map_err(|_: Elapsed| sync_timeout())? } #[instrument(skip_all)] async fn submit_sync_committee_messages( &self, - _messages: Vec, + messages: Vec, ) -> Result<(), ApiError> { - unimplemented!("submit_sync_committee_messages not yet ported") + tokio::time::timeout(SYNC_TIMEOUT, async { + let vals = self.fetch_active_validators().await?; + + let mut psigs_by_slot: HashMap = HashMap::new(); + for msg in messages { + let slot = msg.slot; + + let (pubkey, root_bls) = resolve_active_validator(&vals, msg.validator_index)?; + + let epoch = pluto_eth2util::helpers::epoch_from_slot(&self.eth2_cl, slot) + .await + .map_err(epoch_lookup_error)?; + + let par_sig = SignedSyncMessage::new_partial(msg, self.share_idx); + let signature = par_sig + .signed_data + .signature() + .map_err(message_root_error)?; + let message_root = par_sig + .signed_data + .message_root() + .map_err(message_root_error)?; + self.verify_partial_sig( + &root_bls, + DomainName::SyncCommittee, + epoch, + message_root, + &signature, + ) + .await + .map_err(verify_partial_sig_error)?; + + psigs_by_slot + .entry(slot) + .or_default() + .insert(pubkey, par_sig); + } + + for (slot, set) in psigs_by_slot { + let duty = Duty::new_sync_message_duty(SlotNumber::new(slot)); + self.broadcast(&duty, &set).await?; + } + + Ok(()) + }) + .await + .map_err(|_: Elapsed| sync_timeout())? } } @@ -942,6 +1306,51 @@ fn proposal_timeout() -> ApiError { ) } +/// Builds the `ApiError` returned when a sync-committee handler elapses past +/// [`SYNC_TIMEOUT`]. +fn sync_timeout() -> ApiError { + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "sync committee request not completed before deadline", + ) +} + +/// Resolves a validator index against the active-validators map, returning the +/// DV root [`PubKey`] (the partial-signed-data set key) alongside the same key +/// as a [`BLSPubKey`] for signature verification. Mirrors Go's +/// `vals[idx]` lookup followed by `core.PubKeyFromBytes`; an unknown index +/// surfaces the exact Go error string "validator not found". +fn resolve_active_validator( + vals: &ActiveValidators, + validator_index: ValidatorIndex, +) -> Result<(PubKey, BLSPubKey), ApiError> { + let root_bls = *vals + .get(&validator_index) + .ok_or_else(|| ApiError::new(StatusCode::BAD_REQUEST, "validator not found"))?; + // `root_bls` is a fixed 48-byte `BLSPubKey`, so the length-checked + // conversion to the core `PubKey` cannot fail. + let pubkey = PubKey::try_from(root_bls.as_slice()) + .expect("BLSPubKey is always 48 bytes, the PubKey length"); + Ok((pubkey, root_bls)) +} + +/// Builds the `ApiError` returned when `epoch_from_slot` fails for a +/// sync-committee handler. +fn epoch_lookup_error(err: pluto_eth2util::helpers::HelperError) -> ApiError { + ApiError::new(StatusCode::BAD_GATEWAY, "could not resolve epoch from slot").with_source(err) +} + +/// Builds the `ApiError` returned when a signed-data projection +/// (signature / message root extraction) fails inside a sync-committee +/// handler. Such failures reflect a malformed VC-supplied payload. +fn message_root_error(err: SignedDataError) -> ApiError { + ApiError::new( + StatusCode::BAD_REQUEST, + "could not project sync committee signed data", + ) + .with_source(err) +} + /// Builds the `ApiError` returned when an upstream beacon-node call returns a /// transport-level error. Boxed so `anyhow::Error` (which doesn't itself /// implement `std::error::Error`) can be attached via `.into()`. @@ -1066,6 +1475,57 @@ fn swap_sync_committee_pubshares( Ok(()) } +/// Replaces the root public key on each upstream validator entry with this +/// node's public share. Port of `convertValidators` in +/// `core/validatorapi/validatorapi.go` (lines 1305–1332). +/// +/// When `ignore_not_found` is `true`, validators whose root pubkey is not +/// part of this cluster's share map are passed through with their original +/// root pubkey — the upstream filter was index-based, so the result may +/// include validators we do not own and those entries are not dropped. +/// When `false`, an unknown pubkey is rejected: the caller explicitly asked +/// for validators by pubkey, so every entry should be translatable. +fn convert_validators( + upstream: Vec, + pub_share_by_pubkey: &HashMap, + ignore_not_found: bool, +) -> Result, ApiError> { + let mut out = Vec::with_capacity(upstream.len()); + for mut validator in upstream { + let pubkey = parse_bls_pubkey(&validator.validator.pubkey)?; + match pub_share_by_pubkey.get(&pubkey) { + Some(share) => { + validator.validator.pubkey = format_bls_pubkey(share); + } + None if ignore_not_found => { + // Validator does not belong to this cluster — keep the + // entry with its root pubkey unchanged. Mirrors the Go + // `convertValidators` `else if ok` branch. + } + None => { + return Err(ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "pubshare not found for validator", + )); + } + } + out.push(validator); + } + Ok(out) +} + +/// Builds the share → root pubkey map by inverting `pub_share_by_pubkey`. +/// Used by the `validators` handler to translate VC-side share pubkeys back +/// into the cluster's root pubkeys before forwarding upstream. +fn invert_pub_share_map( + pub_share_by_pubkey: &HashMap, +) -> HashMap { + pub_share_by_pubkey + .iter() + .map(|(root, share)| (*share, *root)) + .collect() +} + fn parse_bls_pubkey(s: &str) -> Result { let trimmed = s.strip_prefix("0x").unwrap_or(s); let bytes = hex::decode(trimmed).map_err(|err| { @@ -3017,4 +3477,792 @@ mod tests { .await .unwrap(); } + + // ---------------------------------------------------------------------- + // `validators` tests + // ---------------------------------------------------------------------- + + use pluto_eth2api::{ValidatorResponseValidator, ValidatorStatus}; + use wiremock::{ + Mock, MockServer, ResponseTemplate, + matchers::{method, path}, + }; + + /// Builds a `Validator` (i.e. `GetStateValidatorsResponseResponseDatum`) + /// with the given index and pubkey. Other fields are filled with + /// placeholder values acceptable to the eth2api type. + fn make_validator_datum(index: u64, pubkey: &BLSPubKey) -> Validator { + Validator { + balance: "32000000000".to_owned(), + index: index.to_string(), + status: ValidatorStatus::ActiveOngoing, + validator: ValidatorResponseValidator { + pubkey: format_bls_pubkey(pubkey), + withdrawal_credentials: + "0x0000000000000000000000000000000000000000000000000000000000000000".to_owned(), + effective_balance: "32000000000".to_owned(), + slashed: false, + activation_eligibility_epoch: "0".to_owned(), + activation_epoch: "0".to_owned(), + exit_epoch: "18446744073709551615".to_owned(), + withdrawable_epoch: "18446744073709551615".to_owned(), + }, + } + } + + /// Builds a `Component` whose upstream client points at the given + /// `MockServer`, with the supplied root → share map. The dutydb is the + /// usual never-expiring stub since the `validators` handler does not + /// consult it. + fn make_component_with_upstream( + server: &MockServer, + pub_share_by_pubkey: HashMap, + ) -> Component { + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = + DeadlinerTask::start(cancel.clone(), "validatorapi-tests", FarFutureCalculator); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(server.uri()).unwrap()); + Component::new( + eth2_cl, + dutydb, + 1, + pub_share_by_pubkey, + false, + TestValidatorCache::empty(), + ) + } + + /// Happy path: every upstream entry has a known root pubkey, so each + /// inner `validator.pubkey` is rewritten to this node's share. Mirrors + /// the `else if ok` branch of `convertValidators`. + #[test] + fn convert_validators_rewrites_known_pubkeys() { + let root = [0xAA_u8; 48]; + let share = [0xBB_u8; 48]; + let map = HashMap::from([(root, share)]); + + let upstream = vec![make_validator_datum(7, &root)]; + let out = convert_validators(upstream, &map, false).unwrap(); + + assert_eq!(out.len(), 1); + assert_eq!(out[0].validator.pubkey, format_bls_pubkey(&share)); + assert_eq!(out[0].index, "7"); + } + + /// With `ignore_not_found = true`, an unknown pubkey is passed through + /// unchanged (Go: `else if ok` — the entry is still appended to `resp` + /// with the original root pubkey). + #[test] + fn convert_validators_ignore_not_found_keeps_entry_unchanged() { + let known_root = [0x11_u8; 48]; + let share = [0x22_u8; 48]; + let unknown = [0x33_u8; 48]; + let map = HashMap::from([(known_root, share)]); + + let upstream = vec![ + make_validator_datum(1, &known_root), + make_validator_datum(2, &unknown), + ]; + let out = convert_validators(upstream, &map, true).unwrap(); + + assert_eq!(out.len(), 2); + assert_eq!(out[0].validator.pubkey, format_bls_pubkey(&share)); + // Unknown entry is preserved verbatim. + assert_eq!(out[1].validator.pubkey, format_bls_pubkey(&unknown)); + assert_eq!(out[1].index, "2"); + } + + /// With `ignore_not_found = false`, an unknown pubkey is rejected. + /// Mirrors Go: `if !ok && !ignoreNotFound { return nil, errors.New(...) }`. + #[test] + fn convert_validators_rejects_unknown_when_not_ignoring() { + let known_root = [0x44_u8; 48]; + let share = [0x55_u8; 48]; + let unknown = [0x66_u8; 48]; + let map = HashMap::from([(known_root, share)]); + + let upstream = vec![make_validator_datum(3, &unknown)]; + let err = convert_validators(upstream, &map, false).unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } + + /// A malformed pubkey from the upstream is surfaced as 502 — the + /// gateway returned data we cannot interpret. + #[test] + fn convert_validators_rejects_malformed_upstream_pubkey() { + let mut datum = make_validator_datum(0, &[0; 48]); + datum.validator.pubkey = "0xnothex".to_owned(); + let err = convert_validators(vec![datum], &HashMap::new(), true).unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + } + + /// `invert_pub_share_map` is the share → root direction needed when + /// translating VC-supplied pubshares back into root pubkeys before the + /// upstream call. + #[test] + fn invert_pub_share_map_round_trips() { + let root = [0x77_u8; 48]; + let share = [0x88_u8; 48]; + let forward = HashMap::from([(root, share)]); + + let inverted = invert_pub_share_map(&forward); + assert_eq!(inverted.get(&share), Some(&root)); + assert_eq!(inverted.len(), 1); + } + + /// End-to-end happy path: the upstream returns one validator keyed by + /// the cluster's root pubkey; the handler rewrites it to the VC's + /// share pubkey before returning. + #[tokio::test] + async fn validators_rewrites_root_pubkeys_to_shares() { + let server = MockServer::start().await; + let root = [0xCA_u8; 48]; + let share = [0xFE_u8; 48]; + let body = GetStateValidatorsResponseResponse { + data: vec![make_validator_datum(42, &root)], + execution_optimistic: false, + finalized: true, + }; + Mock::given(method("POST")) + .and(path("/eth/v1/beacon/states/head/validators")) + .respond_with(ResponseTemplate::new(200).set_body_json(body)) + .expect(1) + .mount(&server) + .await; + + let component = make_component_with_upstream(&server, HashMap::from([(root, share)])); + let response = component + .validators(ValidatorsOpts { + state: "head".to_owned(), + // VC sends the share pubkey it knows. + pubkeys: vec![share], + indices: vec![], + }) + .await + .unwrap(); + + assert_eq!(response.data.len(), 1); + assert_eq!(response.data[0].validator.pubkey, format_bls_pubkey(&share)); + assert_eq!(response.data[0].index, "42"); + assert!(response.finalized); + assert!(!response.execution_optimistic); + assert!(response.dependent_root.is_none()); + } + + /// When the caller filters by pubkey only (no indices), `ignoreNotFound` + /// is `true` per the Go reference, so an upstream entry whose pubkey is + /// not part of this cluster's share map passes through with its root + /// pubkey unchanged. Mirrors `len(opts.Indices) == 0` in + /// `validatorapi.go:1288`. + #[tokio::test] + async fn validators_passes_through_unknown_when_filtering_by_pubkey_only() { + let server = MockServer::start().await; + let known_root = [0x10_u8; 48]; + let share = [0x20_u8; 48]; + let stranger = [0x30_u8; 48]; + let body = GetStateValidatorsResponseResponse { + data: vec![ + make_validator_datum(1, &known_root), + make_validator_datum(2, &stranger), + ], + execution_optimistic: false, + finalized: true, + }; + Mock::given(method("POST")) + .and(path("/eth/v1/beacon/states/head/validators")) + .respond_with(ResponseTemplate::new(200).set_body_json(body)) + .expect(1) + .mount(&server) + .await; + + let component = make_component_with_upstream(&server, HashMap::from([(known_root, share)])); + let response = component + .validators(ValidatorsOpts { + state: "head".to_owned(), + pubkeys: vec![share], + indices: vec![], + }) + .await + .unwrap(); + + assert_eq!(response.data.len(), 2); + assert_eq!(response.data[0].validator.pubkey, format_bls_pubkey(&share)); + // Stranger entry is preserved with the upstream's root pubkey. + assert_eq!( + response.data[1].validator.pubkey, + format_bls_pubkey(&stranger) + ); + } + + /// When the caller filters by index (any non-empty `Indices`), + /// `ignoreNotFound` is `false` per the Go reference, so an upstream + /// validator that does not belong to this cluster surfaces as + /// `INTERNAL_SERVER_ERROR`. Mirrors `len(opts.Indices) == 0 == false` in + /// `validatorapi.go:1288`. + #[tokio::test] + async fn validators_rejects_unknown_pubkey_when_index_filter_used() { + let server = MockServer::start().await; + let known_root = [0x40_u8; 48]; + let share = [0x50_u8; 48]; + let stranger = [0x60_u8; 48]; + let body = GetStateValidatorsResponseResponse { + // The upstream returned a validator we did not ask for — its + // pubkey is not in our share map. + data: vec![make_validator_datum(99, &stranger)], + execution_optimistic: false, + finalized: false, + }; + Mock::given(method("POST")) + .and(path("/eth/v1/beacon/states/head/validators")) + .respond_with(ResponseTemplate::new(200).set_body_json(body)) + .expect(1) + .mount(&server) + .await; + + let component = make_component_with_upstream(&server, HashMap::from([(known_root, share)])); + let err = component + .validators(ValidatorsOpts { + state: "head".to_owned(), + pubkeys: vec![], + indices: vec![99], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } + + /// A pubkey from the VC that is not part of this cluster's share map is + /// rejected as `BAD_REQUEST` before any upstream call. Mirrors Go's + /// `getPubKeyFunc` "unknown public key" error. + #[tokio::test] + async fn validators_rejects_unknown_input_pubshare() { + let server = MockServer::start().await; + // No mock mounted — if the handler reaches the upstream, the call + // will surface as a different (non-400) error. + let root = [0x70_u8; 48]; + let share = [0x80_u8; 48]; + let unknown_share = [0x90_u8; 48]; + let component = make_component_with_upstream(&server, HashMap::from([(root, share)])); + let err = component + .validators(ValidatorsOpts { + state: "head".to_owned(), + pubkeys: vec![unknown_share], + indices: vec![], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// Upstream stall longer than [`UPSTREAM_REQUEST_TIMEOUT`] surfaces as + /// 504. + #[tokio::test(start_paused = true)] + async fn validators_upstream_timeout_returns_504() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/eth/v1/beacon/states/head/validators")) + .respond_with( + ResponseTemplate::new(200) + .set_delay(UPSTREAM_REQUEST_TIMEOUT * 2) + .set_body_json(GetStateValidatorsResponseResponse { + data: vec![], + execution_optimistic: false, + finalized: false, + }), + ) + .mount(&server) + .await; + + let component = make_component_with_upstream(&server, HashMap::new()); + let err = component + .validators(ValidatorsOpts { + state: "head".to_owned(), + pubkeys: vec![], + indices: vec![], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::GATEWAY_TIMEOUT); + } + + /// A malformed pubkey from the upstream surfaces as 502. + #[tokio::test] + async fn validators_malformed_upstream_pubkey_returns_502() { + let server = MockServer::start().await; + let root = [0xA1_u8; 48]; + let share = [0xA2_u8; 48]; + let mut bad = make_validator_datum(1, &root); + bad.validator.pubkey = "not-a-hex-pubkey".to_owned(); + Mock::given(method("POST")) + .and(path("/eth/v1/beacon/states/head/validators")) + .respond_with(ResponseTemplate::new(200).set_body_json( + GetStateValidatorsResponseResponse { + data: vec![bad], + execution_optimistic: false, + finalized: false, + }, + )) + .mount(&server) + .await; + + let component = make_component_with_upstream(&server, HashMap::from([(root, share)])); + let err = component + .validators(ValidatorsOpts { + state: "head".to_owned(), + pubkeys: vec![], + indices: vec![1], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + } + + /// Upstream 400 propagates faithfully; the upstream body must not leak + /// into the client-visible message. + #[tokio::test] + async fn validators_propagates_upstream_400() { + use pluto_eth2api::BlindedBlock400Response; + + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/eth/v1/beacon/states/head/validators")) + .respond_with( + ResponseTemplate::new(400).set_body_json(BlindedBlock400Response { + code: 400.0, + message: "secret upstream message".to_owned(), + stacktraces: None, + }), + ) + .mount(&server) + .await; + + let component = make_component_with_upstream(&server, HashMap::new()); + let err = component + .validators(ValidatorsOpts { + state: "head".to_owned(), + pubkeys: vec![], + indices: vec![], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + assert!(!err.message.contains("secret")); + } + + // ==================================================================== + // sync committee: submit_sync_committee_messages / + // submit_sync_committee_contributions / sync_committee_selections / + // sync_committee_contribution + // ==================================================================== + + use pluto_eth2api::spec::altair; + + /// Build an insecure-test component pinned to the proposal beacon mock + /// (so `epoch_from_slot` resolves) with the given active-validators cache. + /// BLS verification is skipped, so these tests exercise lookup, grouping, + /// and subscriber fan-out rather than signature crypto. + async fn make_sync_component( + validators: HashMap, + ) -> (Component, BeaconMock) { + let mock = mock_beacon_for_proposal().await; + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-sync-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + let component = Component::new_insecure( + eth2_cl, + Arc::clone(&dutydb), + 1, + TestValidatorCache::arc(validators), + ); + (component, mock) + } + + /// Records every `(duty, set)` a subscriber observes, so tests can assert + /// on duty type and per-slot grouping. + fn recording_sub(component: &mut Component) -> Arc>> { + let recorded: Arc>> = Arc::new(Mutex::new(Vec::new())); + let sink = Arc::clone(&recorded); + component.subscribe(move |duty, set| { + let sink = Arc::clone(&sink); + async move { + sink.lock().unwrap().push((duty, set)); + Ok(()) + } + }); + recorded + } + + fn sync_message(slot: u64, validator_index: u64) -> altair::SyncCommitteeMessage { + altair::SyncCommitteeMessage { + slot, + beacon_block_root: [0x11; 32], + validator_index, + signature: [0; 96], + } + } + + /// Messages for two validators in the same slot land in one duty/set + /// keyed by their DV root pubkeys; messages in distinct slots produce + /// distinct `SyncMessage` duties. + #[tokio::test] + async fn submit_sync_messages_groups_by_slot_and_broadcasts() { + let root_a = dv_pubkey(0xA1); + let root_b = dv_pubkey(0xB2); + let vals = HashMap::from([(10_u64, root_a), (20_u64, root_b)]); + + let (mut component, _mock) = make_sync_component(vals).await; + let recorded = recording_sub(&mut component); + + component + .submit_sync_committee_messages(vec![ + sync_message(5, 10), + sync_message(5, 20), + sync_message(6, 10), + ]) + .await + .expect("submit succeeds"); + + let mut events = recorded.lock().unwrap().clone(); + events.sort_by_key(|(duty, _)| duty.slot.inner()); + + assert_eq!(events.len(), 2, "two distinct slots -> two duties"); + for (duty, _) in &events { + assert_eq!(duty.duty_type, DutyType::SyncMessage); + } + // Slot 5 carries both validators; slot 6 carries one. + assert_eq!(events[0].0.slot.inner(), 5); + assert_eq!(events[0].1.inner().len(), 2); + assert_eq!(events[1].0.slot.inner(), 6); + assert_eq!(events[1].1.inner().len(), 1); + } + + /// An unknown validator index surfaces the Go-parity "validator not + /// found" error as `400`. + #[tokio::test] + async fn submit_sync_messages_rejects_unknown_validator() { + let (component, _mock) = make_sync_component(HashMap::new()).await; + let err = component + .submit_sync_committee_messages(vec![sync_message(5, 99)]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + assert_eq!(err.message, "validator not found"); + } + + fn signed_contribution(slot: u64, aggregator_index: u64) -> altair::SignedContributionAndProof { + altair::SignedContributionAndProof { + message: altair::ContributionAndProof { + aggregator_index, + contribution: altair::SyncCommitteeContribution { + slot, + beacon_block_root: [0x22; 32], + subcommittee_index: 3, + aggregation_bits: pluto_ssz::BitVector::new(), + signature: [0; 96], + }, + selection_proof: [0; 96], + }, + signature: [0; 96], + } + } + + /// Contributions group by `Contribution.Slot` into `SyncContribution` + /// duties keyed by the aggregator's DV root pubkey. + #[tokio::test] + async fn submit_sync_contributions_groups_by_slot_and_broadcasts() { + let root = dv_pubkey(0xC3); + let vals = HashMap::from([(7_u64, root)]); + + let (mut component, _mock) = make_sync_component(vals).await; + let recorded = recording_sub(&mut component); + + component + .submit_sync_committee_contributions(vec![signed_contribution(9, 7)]) + .await + .expect("submit succeeds"); + + let events = recorded.lock().unwrap().clone(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].0.duty_type, DutyType::SyncContribution); + assert_eq!(events[0].0.slot.inner(), 9); + assert_eq!(events[0].1.inner().len(), 1); + } + + #[tokio::test] + async fn submit_sync_contributions_rejects_unknown_validator() { + let (component, _mock) = make_sync_component(HashMap::new()).await; + let err = component + .submit_sync_committee_contributions(vec![signed_contribution(9, 42)]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + assert_eq!(err.message, "validator not found"); + } + + fn selection(slot: u64, validator_index: u64) -> SyncCommitteeSelection { + SyncCommitteeSelection { + slot, + validator_index, + subcommittee_index: 2, + selection_proof: [0; 96], + } + } + + /// `sync_committee_selections` broadcasts a `PrepareSyncContribution` + /// duty, then collects the aggregated selection per `(duty, pubkey)` from + /// the aggsigdb hook. + #[tokio::test] + async fn sync_committee_selections_broadcasts_and_collects_aggregated() { + let root = dv_pubkey(0xD4); + let vals = HashMap::from([(3_u64, root)]); + + let (mut component, _mock) = make_sync_component(vals).await; + let recorded = recording_sub(&mut component); + + // The aggsigdb returns an aggregated selection (selection_proof set to + // a recognisable non-zero value) for the (duty, pubkey) await. + let aggregated = SyncCommitteeSelection { + slot: 1, + validator_index: 3, + subcommittee_index: 2, + selection_proof: [0x7E; 96], + }; + let agg_for_hook = aggregated.clone(); + component.register_await_agg_sig_db(move |_duty, _pk| { + let agg = agg_for_hook.clone(); + async move { Ok(Box::new(SyncCommitteeSelectionData::new(agg)) as Box) } + }); + + let response = component + .sync_committee_selections(vec![selection(1, 3)]) + .await + .expect("selections succeed"); + + assert_eq!(response.data.len(), 1); + assert_eq!(response.data[0].selection_proof, [0x7E; 96]); + + let events = recorded.lock().unwrap().clone(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].0.duty_type, DutyType::PrepareSyncContribution); + assert_eq!(events[0].0.slot.inner(), 1); + } + + /// When the aggsigdb returns a non-selection signed-data, the handler + /// rejects with the Go-parity "invalid sync committee selection" message. + #[tokio::test] + async fn sync_committee_selections_rejects_wrong_aggregated_type() { + let root = dv_pubkey(0xE5); + let vals = HashMap::from([(4_u64, root)]); + + let (mut component, _mock) = make_sync_component(vals).await; + let _recorded = recording_sub(&mut component); + + // Return a SignedRandao instead of a SyncCommitteeSelection. + component.register_await_agg_sig_db(|_duty, _pk| async { + Ok(Box::new(SignedRandao::new(0, [0; 96])) as Box) + }); + + let err = component + .sync_committee_selections(vec![selection(2, 4)]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(err.message, "invalid sync committee selection"); + } + + /// `sync_committee_selections` without a registered aggsigdb hook is a + /// `503` (not wired up). + #[tokio::test] + async fn sync_committee_selections_without_hook_is_unavailable() { + let root = dv_pubkey(0xF6); + let vals = HashMap::from([(5_u64, root)]); + let (component, _mock) = make_sync_component(vals).await; + let err = component + .sync_committee_selections(vec![selection(3, 5)]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::SERVICE_UNAVAILABLE); + } + + /// `sync_committee_contribution` forwards the registered hook's result. + #[tokio::test] + async fn sync_committee_contribution_returns_hook_result() { + let (mut component, _mock) = make_sync_component(HashMap::new()).await; + + let contribution = altair::SyncCommitteeContribution { + slot: 12, + beacon_block_root: [0x33; 32], + subcommittee_index: 1, + aggregation_bits: pluto_ssz::BitVector::new(), + signature: [0xAB; 96], + }; + let hook_value = contribution.clone(); + component.register_await_sync_contribution(move |_slot, _subcomm, _root| { + let value = hook_value.clone(); + async move { Ok(SyncContribution(value)) } + }); + + let response = component + .sync_committee_contribution(SyncCommitteeContributionOpts { + slot: 12, + subcommittee_index: 1, + beacon_block_root: [0x33; 32], + }) + .await + .expect("contribution returned"); + + assert_eq!(response.data.slot, 12); + assert_eq!(response.data.signature, [0xAB; 96]); + } + + /// `sync_committee_contribution` without a registered hook is a `503`. + #[tokio::test] + async fn sync_committee_contribution_without_hook_is_unavailable() { + let (component, _mock) = make_sync_component(HashMap::new()).await; + let err = component + .sync_committee_contribution(SyncCommitteeContributionOpts { + slot: 1, + subcommittee_index: 0, + beacon_block_root: [0; 32], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::SERVICE_UNAVAILABLE); + } + + /// Two aggregators in the same slot land in one `SyncContribution` duty + /// keyed by both DV root pubkeys — exercises the grouping map-merge path + /// for the contributions handler. + #[tokio::test] + async fn submit_sync_contributions_groups_two_validators_in_one_slot() { + let root_a = dv_pubkey(0xC1); + let root_b = dv_pubkey(0xC2); + let vals = HashMap::from([(7_u64, root_a), (8_u64, root_b)]); + + let (mut component, _mock) = make_sync_component(vals).await; + let recorded = recording_sub(&mut component); + + component + .submit_sync_committee_contributions(vec![ + signed_contribution(9, 7), + signed_contribution(9, 8), + ]) + .await + .expect("submit succeeds"); + + let events = recorded.lock().unwrap().clone(); + assert_eq!(events.len(), 1, "single slot -> single duty"); + assert_eq!(events[0].0.duty_type, DutyType::SyncContribution); + assert_eq!(events[0].0.slot.inner(), 9); + assert_eq!(events[0].1.inner().len(), 2, "both aggregators in the set"); + } + + /// A subscriber failure aborts the fan-out and surfaces as `500`, and no + /// further subscriber is invoked after the first error — matching Go's + /// return-on-first-error. + #[tokio::test] + async fn submit_sync_messages_subscriber_failure_is_500_and_aborts() { + let root = dv_pubkey(0xA9); + let vals = HashMap::from([(10_u64, root)]); + let (mut component, _mock) = make_sync_component(vals).await; + + let second_called = Arc::new(Mutex::new(false)); + component.subscribe(|_duty, _set| async { Err("boom".into()) }); + { + let flag = Arc::clone(&second_called); + component.subscribe(move |_duty, _set| { + let flag = Arc::clone(&flag); + async move { + *flag.lock().unwrap() = true; + Ok(()) + } + }); + } + + let err = component + .submit_sync_committee_messages(vec![sync_message(5, 10)]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + assert!( + !*second_called.lock().unwrap(), + "fan-out aborts after the first subscriber error" + ); + } + + /// Empty input arrays are valid: no duty is broadcast, and selections + /// returns an empty `data` set. + #[tokio::test] + async fn empty_inputs_broadcast_nothing() { + let (mut component, _mock) = make_sync_component(HashMap::new()).await; + let recorded = recording_sub(&mut component); + component.register_await_agg_sig_db(|_duty, _pk| async { + Err::, _>("aggsigdb must not be queried".into()) + }); + + component + .submit_sync_committee_messages(vec![]) + .await + .expect("empty messages ok"); + component + .submit_sync_committee_contributions(vec![]) + .await + .expect("empty contributions ok"); + let response = component + .sync_committee_selections(vec![]) + .await + .expect("empty selections ok"); + + assert!(response.data.is_empty()); + assert!( + recorded.lock().unwrap().is_empty(), + "no duties broadcast for empty inputs" + ); + } + + /// Two validators across two slots: each `(duty, pubkey)` is collected + /// from the aggsigdb (keyed on pubkey here) and returned. Sorted before + /// asserting since the response order is HashMap-nondeterministic. + #[tokio::test] + async fn sync_committee_selections_collects_multiple_entries() { + let root_a = dv_pubkey(0xD1); + let root_b = dv_pubkey(0xD2); + let vals = HashMap::from([(3_u64, root_a), (4_u64, root_b)]); + + let (mut component, _mock) = make_sync_component(vals).await; + let _recorded = recording_sub(&mut component); + + // The aggsigdb echoes a selection whose validator_index encodes which + // pubkey was queried, so the test can assert both were collected. + component.register_await_agg_sig_db(move |_duty, pk| { + let v_idx = if pk == core_pubkey(0xD1) { 3 } else { 4 }; + async move { + Ok( + Box::new(SyncCommitteeSelectionData::new(SyncCommitteeSelection { + slot: 1, + validator_index: v_idx, + subcommittee_index: 2, + selection_proof: [0; 96], + })) as Box, + ) + } + }); + + let response = component + .sync_committee_selections(vec![selection(1, 3), selection(2, 4)]) + .await + .expect("selections succeed"); + + let mut indices: Vec = response.data.iter().map(|s| s.validator_index).collect(); + indices.sort_unstable(); + assert_eq!(indices, vec![3, 4]); + } } diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index a32ab9b9..ca21b919 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -7,32 +7,55 @@ use std::sync::Arc; use axum::{ Json, Router, + body::Bytes, extract::{ - DefaultBodyLimit, Path, Query, Request, State, + DefaultBodyLimit, Path, Query, RawQuery, Request, State, rejection::{JsonRejection, QueryRejection}, }, - http::{HeaderValue, StatusCode, header}, + http::{HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Uri, header}, middleware::{self, Next}, response::{IntoResponse, Response}, routing::{MethodRouter, get, post}, }; +use pluto_crypto::types::PublicKey as BlsPubKey; +use pluto_eth2api::{ + spec::DataVersion, + versioned::{ + SignedBlindedProposalBlock, SignedProposalBlock, VersionedSignedBlindedProposal, + VersionedSignedProposal as RawVersionedSignedProposal, + }, +}; use serde::Deserialize; - -/// Cap on the `POST /eth/v1/validator/duties/{attester,sync}/{epoch}` request -/// bodies. A realistic cluster ships at most a few thousand validator indices; -/// 64 KiB still allows ~10k indices in either numeric or string encoding, -/// well above any plausible workload. -const DUTIES_BODY_LIMIT: usize = 64 * 1024; +use serde_json::{Value, json}; use super::{ error::ApiError, handler::Handler, + metrics::{ApiLatencyTimer, ProxyLatencyTimer}, types::{ AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, AttesterDutiesResponse, - CommitteeIndex, NodeVersionResponse, ProposerDutiesOpts, ProposerDutiesResponse, - SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, ValIndexes, + CommitteeIndex, NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, + ProposerDutiesResponse, SignedContributionAndProof, SyncCommitteeContributionOpts, + SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, SyncCommitteeMessage, + SyncCommitteeSelection, ValIndexes, ValidatorsOpts, }, }; +use crate::signeddata::{ProposalBlock, VersionedSignedProposal}; + +/// Cap on the `POST /eth/v1/validator/duties/{attester,sync}/{epoch}` request +/// bodies. A realistic cluster ships at most a few thousand validator indices; +/// 64 KiB still allows ~10k indices in either numeric or string encoding, +/// well above any plausible workload. +const DUTIES_BODY_LIMIT: usize = 64 * 1024; + +/// Response/request header carrying the consensus fork name (e.g. `deneb`). +const VERSION_HEADER: &str = "Eth-Consensus-Version"; +/// Response header signalling whether the returned proposal is blinded. +const EXECUTION_PAYLOAD_BLINDED_HEADER: &str = "Eth-Execution-Payload-Blinded"; +/// Response header carrying the execution payload value, in Wei. +const EXECUTION_PAYLOAD_VALUE_HEADER: &str = "Eth-Execution-Payload-Value"; +/// Response header carrying the consensus block value, in Wei. +const CONSENSUS_BLOCK_VALUE_HEADER: &str = "Eth-Consensus-Block-Value"; /// Query parameters for `GET /eth/v1/validator/attestation_data`. #[derive(Debug, Clone, Deserialize)] @@ -45,22 +68,36 @@ struct AttestationDataQuery { pub(super) struct AppState { /// Request handler invoked by each route. pub handler: Arc, - /// Whether builder mode is enabled. Read by `propose_block_v3`. - #[allow(dead_code, reason = "consumed by propose_block_v3 in a later PR")] + /// Whether builder mode is enabled. Read by `propose_block_v3` to maximise + /// the builder boost factor. pub builder_enabled: bool, + /// Upstream beacon-node base URL. The fallback reverse-proxies every + /// non-DV request here. A `userinfo` component (`user:pass@host`) is + /// applied as HTTP basic auth on the proxied request. + pub upstream_base_url: reqwest::Url, + /// HTTP client used by the reverse-proxy fallback. + pub proxy_client: reqwest::Client, } /// Builds the validator API HTTP router. /// /// Registers the distributed-validator-related endpoints and a fallback -/// that reverse-proxies everything else to the upstream beacon node. +/// that reverse-proxies everything else to `upstream_base_url`. /// /// `builder_enabled` is consumed by `propose_block_v3` to maximise the -/// builder boost factor. -pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { +/// builder boost factor. `upstream_base_url` is the beacon-node address the +/// fallback proxies to; a `user:pass@host` component is applied as HTTP basic +/// auth on each proxied request. +pub fn new_router( + handler: Arc, + builder_enabled: bool, + upstream_base_url: reqwest::Url, +) -> Router { let state = Arc::new(AppState { handler, builder_enabled, + upstream_base_url, + proxy_client: reqwest::Client::new(), }); Router::new() @@ -282,24 +319,173 @@ async fn submit_attestations() { todo!("vapi: submit_attestations"); } -async fn get_validators() { - todo!("vapi: get_validators"); +/// `GET,POST /eth/v1/beacon/states/{state_id}/validators`. +/// +/// Validator ids arrive as repeated/CSV `id` query parameters; when the query +/// carries none and the request has a JSON body, the body's `ids` array is +/// used instead. The whole id batch is dispatched on the first element's +/// `0x` prefix exactly as Charon's `getValidatorsByID` does: all-pubkeys if +/// `ids[0]` begins `0x`, otherwise all decimal indices. +async fn get_validators( + State(state): State>, + Path(state_id): Path, + RawQuery(query): RawQuery, + body: Bytes, +) -> Result, ApiError> { + let mut ids = validator_ids_from_query(query.as_deref()); + if ids.is_empty() && !body.is_empty() { + ids = validator_ids_from_json_body(&body)?; + } + + let opts = validators_opts(state_id, &ids)?; + let response = state.handler.validators(opts).await?; + + let data = serde_json::to_value(&response.data) + .map_err(|err| internal_error("could not serialize validators", err))?; + Ok(Json(json!({ + "execution_optimistic": response.execution_optimistic, + "finalized": response.finalized, + "data": data, + }))) } -async fn get_validator() { - todo!("vapi: get_validator"); +/// `GET /eth/v1/beacon/states/{state_id}/validators/{validator_id}`. +/// +/// Returns a single validator; `404` when the upstream has none and `500` +/// when it unexpectedly returns more than one. Mirrors `getValidator`. +async fn get_validator( + State(state): State>, + Path((state_id, validator_id)): Path<(String, String)>, +) -> Result, ApiError> { + let opts = validators_opts(state_id, std::slice::from_ref(&validator_id))?; + let response = state.handler.validators(opts).await?; + + let mut data = response.data; + match data.len() { + 0 => Err(ApiError::not_found()), + 1 => { + let validator = serde_json::to_value(data.remove(0)) + .map_err(|err| internal_error("could not serialize validator", err))?; + Ok(Json(json!({ + "execution_optimistic": response.execution_optimistic, + "finalized": response.finalized, + "data": validator, + }))) + } + _ => Err(ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "unexpected number of validators", + )), + } } -async fn propose_block_v3() { - todo!("vapi: propose_block_v3"); +/// `GET /eth/v3/validator/blocks/{slot}`. +/// +/// Produces an unsigned (possibly blinded) beacon block. `builder_enabled` +/// maximises the builder boost factor so builder payloads win. The block is +/// returned as JSON with the consensus-version / payload-blinded / value +/// headers Charon sets in `proposeBlockV3`. +async fn propose_block_v3( + State(state): State>, + Path(slot): Path, + RawQuery(query): RawQuery, +) -> Result { + let params = parse_query(query.as_deref()); + + let randao_reveal = hex_query_fixed::<96>(¶ms, "randao_reveal")?; + let graffiti = graffiti_query(¶ms, "graffiti")?; + + // Builder mode gives maximum priority to builder blocks (`u64::MAX`); + // otherwise the factor is `0`. Charon always sends the factor (it is never + // omitted), so use `Some` in both branches. + let builder_boost_factor = Some(if state.builder_enabled { u64::MAX } else { 0 }); + + let response = state + .handler + .proposal(ProposalOpts { + slot, + randao_reveal, + graffiti, + builder_boost_factor, + }) + .await?; + + let proposal = &response.data; + let version = proposal.version(); + let blinded = proposal.is_blinded(); + let execution_value = proposal.execution_payload_value.to_string(); + let consensus_value = proposal.consensus_block_value.to_string(); + + let body = json!({ + "version": version.as_str(), + "execution_payload_blinded": blinded, + "execution_payload_value": execution_value, + "consensus_block_value": consensus_value, + "data": serialize_proposal_block(&proposal.block)?, + }); + + let mut headers = HeaderMap::new(); + insert_header(&mut headers, VERSION_HEADER, version.as_str())?; + insert_header( + &mut headers, + EXECUTION_PAYLOAD_BLINDED_HEADER, + &blinded.to_string(), + )?; + insert_header( + &mut headers, + EXECUTION_PAYLOAD_VALUE_HEADER, + &execution_value, + )?; + insert_header(&mut headers, CONSENSUS_BLOCK_VALUE_HEADER, &consensus_value)?; + + Ok((headers, Json(body)).into_response()) } -async fn submit_proposal() { - todo!("vapi: submit_proposal"); +/// `POST /eth/v{1,2}/beacon/blocks`. +/// +/// Decodes the submitted full signed block, selecting the fork from the +/// `Eth-Consensus-Version` header (JSON or SSZ body per content type), then +/// forwards it to the handler. Mirrors `submitProposal`. +async fn submit_proposal( + State(state): State>, + headers: HeaderMap, + body: Bytes, +) -> Result { + let version = consensus_version_header(&headers)?; + let ssz = request_is_ssz(&headers)?; + + let block = decode_signed_proposal_block(version, &body, ssz)?; + let proposal = VersionedSignedProposal::new(RawVersionedSignedProposal { + version, + blinded: false, + block, + }) + .map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid submitted block").with_source(err) + })?; + + state.handler.submit_proposal(proposal).await?; + Ok(StatusCode::OK.into_response()) } -async fn submit_blinded_block() { - todo!("vapi: submit_blinded_block"); +/// `POST /eth/v{1,2}/beacon/blinded_blocks`. +/// +/// Decodes the submitted blinded signed block, selecting the fork from the +/// `Eth-Consensus-Version` header, then forwards it to the handler. +/// Mirrors `submitBlindedBlock`. +async fn submit_blinded_block( + State(state): State>, + headers: HeaderMap, + body: Bytes, +) -> Result { + let version = consensus_version_header(&headers)?; + let ssz = request_is_ssz(&headers)?; + + let block = decode_signed_blinded_proposal_block(version, &body, ssz)?; + let proposal = VersionedSignedBlindedProposal { version, block }; + + state.handler.submit_blinded_proposal(proposal).await?; + Ok(StatusCode::OK.into_response()) } async fn submit_validator_registrations() { @@ -322,24 +508,106 @@ async fn submit_aggregate_attestations() { todo!("vapi: submit_aggregate_attestations"); } -async fn submit_sync_committee_messages() { - todo!("vapi: submit_sync_committee_messages"); +/// `POST /eth/v1/beacon/pool/sync_committees`. +/// +/// Decodes the JSON array of sync-committee messages and forwards them to the +/// handler. JSON-only, matching Charon's `submitSyncCommitteeMessages`. +async fn submit_sync_committee_messages( + State(state): State>, + headers: HeaderMap, + body: Bytes, +) -> Result { + let messages: Vec = + parse_json_array(&headers, &body, "sync committee messages")?; + state + .handler + .submit_sync_committee_messages(messages) + .await?; + Ok(StatusCode::OK.into_response()) +} + +/// Query parameters for `GET /eth/v1/validator/sync_committee_contribution`. +struct SyncCommitteeContributionQuery { + slot: u64, + subcommittee_index: u64, + beacon_block_root: [u8; 32], } -async fn sync_committee_contribution() { - todo!("vapi: sync_committee_contribution"); +/// `GET /eth/v1/validator/sync_committee_contribution`. +/// +/// Reads `slot`, `subcommittee_index` and the `0x`-hex `beacon_block_root` +/// query parameters, then returns the aggregated contribution. Mirrors +/// `syncCommitteeContribution`. +async fn sync_committee_contribution( + State(state): State>, + RawQuery(query): RawQuery, +) -> Result { + let params = parse_query(query.as_deref()); + let query = SyncCommitteeContributionQuery { + slot: uint_query(¶ms, "slot")?, + subcommittee_index: uint_query(¶ms, "subcommittee_index")?, + beacon_block_root: hex_query_fixed::<32>(¶ms, "beacon_block_root")?, + }; + + let response = state + .handler + .sync_committee_contribution(SyncCommitteeContributionOpts { + slot: query.slot, + subcommittee_index: query.subcommittee_index, + beacon_block_root: query.beacon_block_root, + }) + .await?; + + let data = serde_json::to_value(&response.data) + .map_err(|err| internal_error("could not serialize sync committee contribution", err))?; + Ok(Json(json!({ "data": data })).into_response()) } -async fn submit_contribution_and_proofs() { - todo!("vapi: submit_contribution_and_proofs"); +/// `POST /eth/v1/validator/contribution_and_proofs`. +/// +/// Decodes the JSON array of signed contribution-and-proofs and forwards them +/// to the handler. JSON-only, matching Charon's `submitContributionAndProofs`. +async fn submit_contribution_and_proofs( + State(state): State>, + headers: HeaderMap, + body: Bytes, +) -> Result { + let contributions: Vec = + parse_json_array(&headers, &body, "signed contribution and proofs")?; + state + .handler + .submit_sync_committee_contributions(contributions) + .await?; + Ok(StatusCode::OK.into_response()) } -async fn submit_proposal_preparations() { - todo!("vapi: submit_proposal_preparations"); +/// `POST /eth/v1/validator/prepare_beacon_proposer`. +/// +/// Swallows the fee-recipient preparation: Charon derives the fee recipient +/// from `cluster-lock.json`, so the validator client need not be configured +/// with one. Returns `200` with no body. Mirrors `submitProposalPreparations`. +async fn submit_proposal_preparations() -> impl IntoResponse { + StatusCode::OK } -async fn sync_committee_selections() { - todo!("vapi: sync_committee_selections"); +/// `POST /eth/v1/validator/sync_committee_selections`. +/// +/// Decodes the JSON array of partial sync-committee selections, forwards them +/// to the handler, and returns the aggregated selections as `{ "data": [...] +/// }`. Mirrors `syncCommitteeSelections`. +async fn sync_committee_selections( + State(state): State>, + headers: HeaderMap, + body: Bytes, +) -> Result { + let selections: Vec = + parse_json_array(&headers, &body, "sync committee selections")?; + + let response = state.handler.sync_committee_selections(selections).await?; + + let data = serde_json::to_value(&response.data) + .map_err(|err| internal_error("could not serialize sync committee selections", err))?; + Ok(Json(json!({ "data": data })).into_response()) } async fn node_version( @@ -354,8 +622,561 @@ async fn respond_404() -> impl IntoResponse { ApiError::not_found() } -async fn proxy_handler() { - todo!("vapi: proxy_handler"); +/// Reverse-proxy fallback: forwards every request not handled by a registered +/// distributed-validator route to the upstream beacon node. Mirrors +/// `proxyHandler`. +/// +/// Basic-auth credentials in the upstream URL's `userinfo` are applied to the +/// proxied request and the `Host` header is rewritten to the upstream host, +/// matching Charon's reverse-proxy director. The upstream response body is +/// streamed straight through (not buffered), so long-lived endpoints such as +/// the SSE `/eth/v1/events` stream proxy incrementally. Charon clones the +/// request with the lifecycle context so in-flight proxied requests are +/// cancelled on soft shutdown; here the proxied request inherits the axum +/// request's own lifetime, which is cancelled when the connection/server is +/// torn down. +async fn proxy_handler( + State(state): State>, + method: Method, + uri: Uri, + headers: HeaderMap, + body: Bytes, +) -> Result { + let path = uri.path().to_owned(); + let _proxy_timer = ProxyLatencyTimer::start(&path); + let _api_timer = ApiLatencyTimer::start("proxy"); + + // Build the target URL: upstream base + request path (+ query). The + // userinfo is stripped from the URL and applied as a basic-auth header + // instead (below), mirroring Charon's reverse-proxy director and avoiding + // a duplicate Authorization header from URL-embedded credentials. + let mut target = state.upstream_base_url.clone(); + target.set_path(uri.path()); + target.set_query(uri.query()); + // These setters only fail on cannot-be-a-base URLs, which an HTTP(S) base + // URL never is; ignore the result to keep the proxy infallible here. + let _ = target.set_username(""); + let _ = target.set_password(None); + + let reqwest_method = reqwest::Method::from_bytes(method.as_str().as_bytes()) + .map_err(|err| internal_error("invalid proxy method", err))?; + + let mut request = state + .proxy_client + .request(reqwest_method, target.clone()) + .body(reqwest::Body::from(body)); + + // When the upstream URL carries credentials we own the auth, so the + // client's own Authorization header must not be relayed (it would produce + // a second, conflicting Authorization header on the proxied request). + let upstream_user = state.upstream_base_url.username(); + let has_upstream_auth = !upstream_user.is_empty(); + + // Forward request headers, skipping the Host (rewritten below), + // Content-Length (reqwest sets it from the body), the hop-by-hop headers a + // proxy must not relay, and — when we apply our own basic auth — the + // client Authorization header. + for (name, value) in &headers { + if name == header::HOST + || name == header::CONTENT_LENGTH + || is_hop_by_hop_header(name) + || (has_upstream_auth && name == header::AUTHORIZATION) + { + continue; + } + request = request.header(name.as_str(), value.as_bytes()); + } + if let Some(host) = target.host_str() { + let host_header = match target.port() { + Some(port) => format!("{host}:{port}"), + None => host.to_owned(), + }; + request = request.header(header::HOST, host_header); + } + + // Apply basic auth from the upstream URL's userinfo, if present. + if has_upstream_auth { + request = request.basic_auth(upstream_user, state.upstream_base_url.password()); + } + + let upstream = request.send().await.map_err(|err| { + ApiError::new( + StatusCode::BAD_GATEWAY, + "proxy request to beacon node failed", + ) + .with_source(err) + })?; + + let status = StatusCode::from_u16(upstream.status().as_u16()) + .map_err(|err| internal_error("invalid upstream status", err))?; + + // Re-emit upstream response headers, dropping Content-Length (axum derives + // it from the streamed body) and hop-by-hop headers. + let mut response_headers = HeaderMap::new(); + for (name, value) in upstream.headers() { + if let (Ok(name), Ok(value)) = ( + HeaderName::from_bytes(name.as_str().as_bytes()), + HeaderValue::from_bytes(value.as_bytes()), + ) { + if name == header::CONTENT_LENGTH || is_hop_by_hop_header(&name) { + continue; + } + response_headers.append(name, value); + } + } + + // Stream the body straight through rather than buffering it, so + // long-lived/streaming endpoints (e.g. the SSE `/eth/v1/events`) are + // proxied incrementally. Charon achieves the same with a flushing reverse + // proxy writer. + let body = axum::body::Body::from_stream(upstream.bytes_stream()); + + Ok((status, response_headers, body).into_response()) +} + +/// Reports whether `name` is an HTTP hop-by-hop header that a proxy must not +/// forward end to end (RFC 7230 §6.1). Comparison is case-insensitive via the +/// normalised [`HeaderName`]. +fn is_hop_by_hop_header(name: &HeaderName) -> bool { + matches!( + name.as_str(), + "connection" + | "keep-alive" + | "proxy-authenticate" + | "proxy-authorization" + | "te" + | "trailer" + | "transfer-encoding" + | "upgrade" + ) +} + +/// Parses a raw URL query string into decoded `(key, value)` pairs, preserving +/// order and duplicate keys. An absent query yields an empty list. +fn parse_query(query: Option<&str>) -> Vec<(String, String)> { + match query { + Some(q) => url::form_urlencoded::parse(q.as_bytes()) + .map(|(k, v)| (k.into_owned(), v.into_owned())) + .collect(), + None => Vec::new(), + } +} + +/// Collects validator ids from the `id` query parameter, splitting CSV values +/// and trimming each, mirroring Charon's `getQueryArrayParameter`. +fn validator_ids_from_query(query: Option<&str>) -> Vec { + parse_query(query) + .into_iter() + .filter(|(key, _)| key == "id") + .flat_map(|(_, value)| { + value + .split(',') + .map(|id| id.trim().to_owned()) + .collect::>() + }) + .collect() +} + +/// Validator-ids POST body: `{ "ids": [...] }`. Mirrors +/// `getValidatorIDsFromJSON`. +#[derive(Debug, Deserialize)] +struct ValidatorIdsBody { + #[serde(default)] + ids: Vec, +} + +/// Extracts validator ids from a JSON POST body. A parse failure surfaces as +/// `400`, matching Charon's wrapped "failed to parse request body" error. +fn validator_ids_from_json_body(body: &[u8]) -> Result, ApiError> { + let parsed: ValidatorIdsBody = serde_json::from_slice(body).map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "failed to parse request body").with_source(err) + })?; + Ok(parsed.ids) +} + +/// Builds [`ValidatorsOpts`] from a state id and a batch of validator ids. +/// +/// The whole batch is dispatched on `ids[0]`'s `0x` prefix exactly as Charon's +/// `getValidatorsByID` does: if the first id is `0x`-prefixed every id is +/// parsed as a public key, otherwise every id is parsed as a decimal validator +/// index. An empty batch forwards no filter. +fn validators_opts(state: String, ids: &[String]) -> Result { + let mut pubkeys = Vec::new(); + let mut indices = Vec::new(); + + if ids.first().is_some_and(|id| id.starts_with("0x")) { + for id in ids { + pubkeys.push(parse_pubkey_id(id)?); + } + } else { + for id in ids { + let index = id.parse::().map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid validator index").with_source(err) + })?; + indices.push(index); + } + } + + Ok(ValidatorsOpts { + state, + pubkeys, + indices, + }) +} + +/// Parses a `0x`-prefixed 48-byte hex public key. +fn parse_pubkey_id(id: &str) -> Result { + let stripped = id.strip_prefix("0x").unwrap_or(id); + let bytes = hex::decode(stripped).map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid validator public key hex").with_source(err) + })?; + bytes.as_slice().try_into().map_err(|_| { + ApiError::new( + StatusCode::BAD_REQUEST, + "invalid validator public key length", + ) + }) +} + +/// Returns the value of the first query parameter named `name`, if present. +fn query_value<'a>(params: &'a [(String, String)], name: &str) -> Option<&'a str> { + params + .iter() + .find(|(key, _)| key == name) + .map(|(_, value)| value.as_str()) +} + +/// Decodes a required fixed-length `0x`-hex query parameter into an `N`-byte +/// array. Mirrors Charon's `hexQueryFixed`. +fn hex_query_fixed( + params: &[(String, String)], + name: &str, +) -> Result<[u8; N], ApiError> { + optional_hex_query_fixed::(params, name)?.ok_or_else(|| { + ApiError::new( + StatusCode::BAD_REQUEST, + format!("missing 0x-hex query parameter {name}"), + ) + }) +} + +/// Decodes an optional fixed-length `0x`-hex query parameter into an `N`-byte +/// array. Returns `None` when absent; rejects wrong lengths. Mirrors Charon's +/// `hexQuery` + `hexQueryFixed` length check. +fn optional_hex_query_fixed( + params: &[(String, String)], + name: &str, +) -> Result, ApiError> { + let Some(value) = query_value(params, name) else { + return Ok(None); + }; + let stripped = value.strip_prefix("0x").unwrap_or(value); + let bytes = hex::decode(stripped).map_err(|err| { + ApiError::new( + StatusCode::BAD_REQUEST, + format!("invalid 0x-hex query parameter {name} [{value}]"), + ) + .with_source(err) + })?; + let array: [u8; N] = bytes.as_slice().try_into().map_err(|_| { + ApiError::new( + StatusCode::BAD_REQUEST, + format!("invalid length for 0x-hex query parameter {name}, expect {N} bytes"), + ) + })?; + Ok(Some(array)) +} + +/// Decodes the optional `graffiti` query parameter into a 32-byte array. +/// +/// Graffiti is lenient on length, mirroring Charon's `getProposeBlockParams` +/// (`hexQuery` + `copy(graffiti[:], graffitiBytes)`): any-length hex is +/// accepted, then left-aligned into 32 bytes — longer input is truncated and +/// shorter input is zero-padded. An absent parameter yields all-zero graffiti. +fn graffiti_query(params: &[(String, String)], name: &str) -> Result<[u8; 32], ApiError> { + let Some(value) = query_value(params, name) else { + return Ok([0u8; 32]); + }; + let stripped = value.strip_prefix("0x").unwrap_or(value); + let bytes = hex::decode(stripped).map_err(|err| { + ApiError::new( + StatusCode::BAD_REQUEST, + format!("invalid 0x-hex query parameter {name} [{value}]"), + ) + .with_source(err) + })?; + let mut graffiti = [0u8; 32]; + let len = bytes.len().min(32); + graffiti[..len].copy_from_slice(&bytes[..len]); + Ok(graffiti) +} + +/// Decodes a required unsigned-integer query parameter. Mirrors Charon's +/// `uintQuery`: a missing parameter is a `400`, as is a non-numeric value. +fn uint_query(params: &[(String, String)], name: &str) -> Result { + let value = query_value(params, name).ok_or_else(|| { + ApiError::new( + StatusCode::BAD_REQUEST, + format!("missing query parameter {name}"), + ) + })?; + value.parse::().map_err(|err| { + ApiError::new( + StatusCode::BAD_REQUEST, + format!("invalid uint query parameter {name} [{value}]"), + ) + .with_source(err) + }) +} + +/// Decodes a JSON array request body, enforcing Charon's JSON-only content-type +/// policy for the sync-committee endpoints (`Encodings: [JSON]`). +/// +/// A missing `Content-Type` is treated as JSON (matching Charon's default); any +/// other content type is rejected with `415`. An empty body is a `400` +/// ("empty request body"), and a JSON parse failure is a `400`, both mirroring +/// Charon's `unmarshal`. +fn parse_json_array( + headers: &HeaderMap, + body: &[u8], + what: &'static str, +) -> Result, ApiError> { + if let Some(value) = headers.get(header::CONTENT_TYPE) { + // A present but non-ASCII header is unrecognised, not JSON: surface it + // as 415 rather than silently defaulting to JSON. + let unsupported = || { + ApiError::new( + StatusCode::UNSUPPORTED_MEDIA_TYPE, + format!("unsupported media type {value:?}"), + ) + }; + let s = value.to_str().map_err(|_| unsupported())?; + if !s.contains("application/json") { + return Err(unsupported()); + } + } + + if body.is_empty() { + return Err(ApiError::new(StatusCode::BAD_REQUEST, "empty request body")); + } + + serde_json::from_slice(body).map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, format!("failed to parse {what}")).with_source(err) + }) +} + +/// Parses the `Eth-Consensus-Version` request header into a [`DataVersion`]. +/// +/// The header is matched case-insensitively (lowercased before lookup) to +/// mirror go-eth2-client's `DataVersion.UnmarshalJSON`. A missing or +/// unrecognised value is a `400`, matching Charon's "missing consensus version +/// header". +fn consensus_version_header(headers: &HeaderMap) -> Result { + let missing = || ApiError::new(StatusCode::BAD_REQUEST, "missing consensus version header"); + let raw = headers.get(VERSION_HEADER).ok_or_else(missing)?; + let value = raw.to_str().map_err(|_| missing())?.to_ascii_lowercase(); + match value.as_str() { + "phase0" => Ok(DataVersion::Phase0), + "altair" => Ok(DataVersion::Altair), + "bellatrix" => Ok(DataVersion::Bellatrix), + "capella" => Ok(DataVersion::Capella), + "deneb" => Ok(DataVersion::Deneb), + "electra" => Ok(DataVersion::Electra), + "fulu" => Ok(DataVersion::Fulu), + _ => Err(missing()), + } +} + +/// Classifies the request body encoding from its `Content-Type`, mirroring +/// Charon's `wrap` content negotiation for JSON+SSZ endpoints: a missing or +/// `application/json` header is JSON, `application/octet-stream` is SSZ, and +/// anything else is rejected with `415 Unsupported Media Type` carrying the +/// offending content type. Returns `true` for SSZ. +fn request_is_ssz(headers: &HeaderMap) -> Result { + let Some(value) = headers.get(header::CONTENT_TYPE) else { + return Ok(false); + }; + // A present but non-ASCII header is unrecognised, not JSON: surface it as + // 415 like any other unsupported type rather than silently defaulting. + let unsupported = || { + ApiError::new( + StatusCode::UNSUPPORTED_MEDIA_TYPE, + format!("unsupported media type {value:?}"), + ) + }; + let value = value.to_str().map_err(|_| unsupported())?; + if value.is_empty() || value.contains("application/json") { + Ok(false) + } else if value.contains("application/octet-stream") { + Ok(true) + } else { + Err(ApiError::new( + StatusCode::UNSUPPORTED_MEDIA_TYPE, + format!("unsupported media type {value}"), + )) + } +} + +/// Decodes a submitted full signed proposal block (JSON or SSZ) for the given +/// fork. A decode failure surfaces as `400`, mirroring Charon's +/// "invalid submitted block". +fn decode_signed_proposal_block( + version: DataVersion, + body: &[u8], + ssz: bool, +) -> Result { + if body.is_empty() { + return Err(ApiError::new(StatusCode::BAD_REQUEST, "empty request body")); + } + let invalid = |source: Box| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid submitted block").with_boxed_source(source) + }; + + if ssz { + return crate::ssz_codec::decode_signed_proposal_block_body(version, body) + .map_err(|err| invalid(Box::new(err))); + } + + let value: Value = serde_json::from_slice(body).map_err(|err| invalid(Box::new(err)))?; + decode_signed_proposal_block_json(version, value).map_err(invalid) +} + +/// Selects the per-fork (non-blinded) `SignedProposalBlock` variant and parses +/// the JSON block body into it. Mirrors the `submitProposal` version switch. +fn decode_signed_proposal_block_json( + version: DataVersion, + value: Value, +) -> Result> { + Ok(match version { + DataVersion::Phase0 => SignedProposalBlock::Phase0(serde_json::from_value(value)?), + DataVersion::Altair => SignedProposalBlock::Altair(serde_json::from_value(value)?), + DataVersion::Bellatrix => SignedProposalBlock::Bellatrix(serde_json::from_value(value)?), + DataVersion::Capella => SignedProposalBlock::Capella(serde_json::from_value(value)?), + DataVersion::Deneb => SignedProposalBlock::Deneb(serde_json::from_value(value)?), + DataVersion::Electra => SignedProposalBlock::Electra(serde_json::from_value(value)?), + DataVersion::Fulu => SignedProposalBlock::Fulu(serde_json::from_value(value)?), + DataVersion::Unknown => return Err("unknown consensus version".into()), + }) +} + +/// Decodes a submitted blinded signed proposal block (JSON or SSZ) for the +/// given fork. Mirrors `submitBlindedBlock`. +fn decode_signed_blinded_proposal_block( + version: DataVersion, + body: &[u8], + ssz: bool, +) -> Result { + if body.is_empty() { + return Err(ApiError::new(StatusCode::BAD_REQUEST, "empty request body")); + } + let invalid = |source: Box| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid submitted blinded block") + .with_boxed_source(source) + }; + + if ssz { + return crate::ssz_codec::decode_signed_blinded_proposal_block_body(version, body) + .map_err(|err| invalid(Box::new(err))); + } + + let value: Value = serde_json::from_slice(body).map_err(|err| invalid(Box::new(err)))?; + decode_signed_blinded_proposal_block_json(version, value).map_err(invalid) +} + +/// Selects the per-fork blinded variant and parses the JSON block body into +/// it. Mirrors the `submitBlindedBlock` version switch; pre-Bellatrix forks +/// have no blinded form and are rejected. +fn decode_signed_blinded_proposal_block_json( + version: DataVersion, + value: Value, +) -> Result> { + Ok(match version { + DataVersion::Bellatrix => { + SignedBlindedProposalBlock::Bellatrix(serde_json::from_value(value)?) + } + DataVersion::Capella => SignedBlindedProposalBlock::Capella(serde_json::from_value(value)?), + DataVersion::Deneb => SignedBlindedProposalBlock::Deneb(serde_json::from_value(value)?), + DataVersion::Electra => SignedBlindedProposalBlock::Electra(serde_json::from_value(value)?), + // Fulu blinded blocks share the Electra layout. + DataVersion::Fulu => SignedBlindedProposalBlock::Fulu(serde_json::from_value(value)?), + DataVersion::Phase0 | DataVersion::Altair | DataVersion::Unknown => { + return Err("invalid blinded block version".into()); + } + }) +} + +/// Serializes an unsigned [`ProposalBlock`] to the JSON shape Charon's +/// `createProposeBlockResponse` puts in the `data` field: the bare block for +/// pre-Deneb forks (and all blinded forks), and the `BlockContents` object +/// (`{ block, kzg_proofs, blobs }`) for Deneb, Electra, and Fulu full blocks. +fn serialize_proposal_block(block: &ProposalBlock) -> Result { + let to_value = |value: Result| { + value.map_err(|err| internal_error("could not serialize proposal block", err)) + }; + match block { + ProposalBlock::Phase0(b) => to_value(serde_json::to_value(b)), + ProposalBlock::Altair(b) => to_value(serde_json::to_value(b)), + ProposalBlock::Bellatrix(b) => to_value(serde_json::to_value(b)), + ProposalBlock::BellatrixBlinded(b) => to_value(serde_json::to_value(b)), + ProposalBlock::Capella(b) => to_value(serde_json::to_value(b)), + ProposalBlock::CapellaBlinded(b) => to_value(serde_json::to_value(b)), + ProposalBlock::DenebBlinded(b) => to_value(serde_json::to_value(b)), + ProposalBlock::ElectraBlinded(b) => to_value(serde_json::to_value(b)), + ProposalBlock::FuluBlinded(b) => to_value(serde_json::to_value(b)), + ProposalBlock::Deneb { + block, + kzg_proofs, + blobs, + } => block_contents_value(block.as_ref(), kzg_proofs, blobs), + // Electra and Fulu full blocks both carry an `electra::BeaconBlock`. + ProposalBlock::Electra { + block, + kzg_proofs, + blobs, + } + | ProposalBlock::Fulu { + block, + kzg_proofs, + blobs, + } => block_contents_value(block.as_ref(), kzg_proofs, blobs), + } +} + +/// Builds the `BlockContents` JSON object (`{ block, kzg_proofs, blobs }`) for +/// a Deneb-or-later full proposal, matching go-eth2-client's +/// `apiv1.BlockContents` wire shape. +fn block_contents_value( + block: &B, + kzg_proofs: &[pluto_eth2api::spec::deneb::KZGProof], + blobs: &[pluto_eth2api::spec::deneb::Blob], +) -> Result { + Ok(json!({ + "block": serde_json::to_value(block) + .map_err(|err| internal_error("could not serialize block", err))?, + "kzg_proofs": serde_json::to_value(kzg_proofs) + .map_err(|err| internal_error("could not serialize kzg_proofs", err))?, + "blobs": serde_json::to_value(blobs) + .map_err(|err| internal_error("could not serialize blobs", err))?, + })) +} + +/// Inserts a header, mapping an invalid name or value into a `500` (both are +/// derived from internal data here, so a failure is a bug, not bad input). +fn insert_header(headers: &mut HeaderMap, name: &'static str, value: &str) -> Result<(), ApiError> { + let header_name = HeaderName::from_bytes(name.as_bytes()) + .map_err(|err| internal_error("invalid header name", err))?; + let header_value = + HeaderValue::from_str(value).map_err(|err| internal_error("invalid header value", err))?; + headers.insert(header_name, header_value); + Ok(()) +} + +/// Builds a `500 Internal Server Error` [`ApiError`] with an attached source. +fn internal_error(message: &'static str, source: E) -> ApiError +where + E: std::error::Error + Send + Sync + 'static, +{ + ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, message).with_source(source) } #[cfg(test)] @@ -367,16 +1188,36 @@ mod tests { testutils::TestHandler, types::{ AttestationDataResponse, AttesterDutiesResponse, AttesterDuty, ProposerDutiesResponse, - ProposerDuty, SyncCommitteeDutiesResponse, SyncCommitteeDuty, ValIndexes, + ProposerDuty, SyncCommitteeDutiesResponse, SyncCommitteeDuty, SyncCommitteeSelection, + ValIndexes, }, }; + /// Placeholder upstream URL for tests that never reach the proxy fallback. + fn test_upstream_url() -> reqwest::Url { + "http://127.0.0.1:0".parse().expect("valid test url") + } + + /// Builds an [`AppState`] around `handler` for direct-handler unit tests. + /// `builder_enabled` defaults off; the upstream URL is a placeholder. + fn test_state(handler: Arc) -> Arc { + Arc::new(AppState { + handler, + builder_enabled: false, + upstream_base_url: test_upstream_url(), + proxy_client: reqwest::Client::new(), + }) + } + + /// Builds a router for oneshot tests with the proxy disabled (placeholder + /// upstream) and the given builder mode. + fn test_router(handler: Arc, builder_enabled: bool) -> Router { + new_router(handler, builder_enabled, test_upstream_url()) + } + #[tokio::test] async fn node_version_wraps_handler_value() { - let state = Arc::new(AppState { - handler: Arc::new(TestHandler::with_version("pluto/test/v1.0")), - builder_enabled: false, - }); + let state = test_state(Arc::new(TestHandler::with_version("pluto/test/v1.0"))); let Json(body) = node_version(State(state)).await.unwrap(); @@ -399,10 +1240,7 @@ mod tests { dependent_root: "0xab".to_owned(), execution_optimistic: false, }); - let state = Arc::new(AppState { - handler: Arc::new(handler), - builder_enabled: false, - }); + let state = test_state(Arc::new(handler)); let Json(body) = attester_duties( State(state), @@ -432,10 +1270,7 @@ mod tests { data: vec![duty], execution_optimistic: true, }); - let state = Arc::new(AppState { - handler: Arc::new(handler), - builder_enabled: false, - }); + let state = test_state(Arc::new(handler)); let Json(body) = sync_committee_duties( State(state), @@ -468,10 +1303,7 @@ mod tests { }; let handler = TestHandler::default().with_attestation_data(AttestationDataResponse { data }); - let state = Arc::new(AppState { - handler: Arc::new(handler), - builder_enabled: false, - }); + let state = test_state(Arc::new(handler)); let Json(body) = attestation_data( State(state), @@ -513,10 +1345,7 @@ mod tests { dependent_root: "0xcd".to_owned(), execution_optimistic: true, }); - let state = Arc::new(AppState { - handler: Arc::new(handler), - builder_enabled: false, - }); + let state = test_state(Arc::new(handler)); let Json(body) = proposer_duties(State(state), Path(99u64)).await.unwrap(); @@ -548,7 +1377,7 @@ mod tests { target: phase0::Checkpoint::default(), }, }); - let app = new_router(Arc::new(handler), false); + let app = test_router(Arc::new(handler), false); // Missing `committee_index`. let req = Request::builder() @@ -586,7 +1415,7 @@ mod tests { use tower::ServiceExt; let handler = TestHandler::default(); - let app = new_router(Arc::new(handler), false); + let app = test_router(Arc::new(handler), false); // 128 KiB of zeros — well past the 64 KiB cap, valid JSON or not. let big = vec![b'0'; 128 * 1024]; @@ -614,7 +1443,7 @@ mod tests { }; use tower::ServiceExt; - let app = new_router(Arc::new(TestHandler::default()), false); + let app = test_router(Arc::new(TestHandler::default()), false); // Valid JSON, wrong shape (object, not an array) — axum's default // would surface this as a 422 type error. @@ -650,7 +1479,7 @@ mod tests { dependent_root: "0x00".to_owned(), execution_optimistic: false, }); - let app = new_router(Arc::new(handler), false); + let app = test_router(Arc::new(handler), false); // No Content-Type header at all. let req = Request::builder() @@ -673,7 +1502,7 @@ mod tests { }; use tower::ServiceExt; - let app = new_router(Arc::new(TestHandler::default()), false); + let app = test_router(Arc::new(TestHandler::default()), false); let req = Request::builder() .method(Method::POST) @@ -730,4 +1559,818 @@ mod tests { let bad = serde_json::from_str::("[-1]"); assert!(bad.is_err()); } + + // ----------------------------------------------------------------------- + // PR 1: proxy + proposal/validators handler tests + // ----------------------------------------------------------------------- + + use alloy::primitives::U256; + use axum::{ + body::{Body, to_bytes}, + http::{Method, Request}, + }; + use pluto_eth2api::{ + GetStateValidatorsResponseResponseDatum as ValidatorDatum, ValidatorResponseValidator, + ValidatorStatus, + spec::{bellatrix, phase0 as p0}, + }; + use tower::ServiceExt; + + // `ProposalBlock`, `SignedProposalBlock`, `SignedBlindedProposalBlock` and + // `DataVersion` come in via `super::*`. + use crate::{signeddata::VersionedProposal, validatorapi::types::EthResponse}; + + fn empty_sync_bits() -> pluto_ssz::BitVector<512> { + pluto_ssz::BitVector::new() + } + + /// Minimal phase0 unsigned beacon block at `slot`. + fn phase0_unsigned_block(slot: u64) -> p0::BeaconBlock { + p0::BeaconBlock { + slot, + proposer_index: 7, + parent_root: [0; 32], + state_root: [0; 32], + body: p0::BeaconBlockBody { + randao_reveal: [0; 96], + eth1_data: p0::ETH1Data { + deposit_root: [0; 32], + deposit_count: 0, + block_hash: [0; 32], + }, + graffiti: [0; 32], + proposer_slashings: vec![].into(), + attester_slashings: vec![].into(), + attestations: vec![].into(), + deposits: vec![].into(), + voluntary_exits: vec![].into(), + }, + } + } + + /// Phase0 unsigned `VersionedProposal` returned by the proposal handler. + fn phase0_proposal(slot: u64) -> VersionedProposal { + VersionedProposal { + block: ProposalBlock::Phase0(phase0_unsigned_block(slot)), + consensus_block_value: U256::from(1u8), + execution_payload_value: U256::from(1u8), + } + } + + /// Phase0 signed beacon block for submit tests. + fn phase0_signed_block(slot: u64) -> p0::SignedBeaconBlock { + p0::SignedBeaconBlock { + message: phase0_unsigned_block(slot), + signature: [0; 96], + } + } + + /// Bellatrix blinded signed block for blinded-submit tests. + fn bellatrix_blinded_signed_block(slot: u64) -> bellatrix::SignedBlindedBeaconBlock { + let header = bellatrix::ExecutionPayloadHeader { + parent_hash: [0; 32], + fee_recipient: [0; 20], + state_root: [0; 32], + receipts_root: [0; 32], + logs_bloom: [0; 256], + prev_randao: [0; 32], + block_number: 0, + gas_limit: 30_000_000, + gas_used: 0, + timestamp: 0, + extra_data: vec![].into(), + base_fee_per_gas: U256::ZERO, + block_hash: [0; 32], + transactions_root: [0; 32], + }; + let block = bellatrix::BlindedBeaconBlock { + slot, + proposer_index: 7, + parent_root: [0; 32], + state_root: [0; 32], + body: bellatrix::BlindedBeaconBlockBody { + randao_reveal: [0; 96], + eth1_data: p0::ETH1Data { + deposit_root: [0; 32], + deposit_count: 0, + block_hash: [0; 32], + }, + graffiti: [0; 32], + proposer_slashings: vec![].into(), + attester_slashings: vec![].into(), + attestations: vec![].into(), + deposits: vec![].into(), + voluntary_exits: vec![].into(), + sync_aggregate: pluto_eth2api::spec::altair::SyncAggregate { + sync_committee_bits: empty_sync_bits(), + sync_committee_signature: [0; 96], + }, + execution_payload_header: header, + }, + }; + bellatrix::SignedBlindedBeaconBlock { + message: block, + signature: [0; 96], + } + } + + fn sample_validator_datum(index: u64, pubkey_hex: &str) -> ValidatorDatum { + ValidatorDatum { + index: index.to_string(), + balance: "32000000000".to_owned(), + status: ValidatorStatus::ActiveOngoing, + validator: ValidatorResponseValidator { + pubkey: pubkey_hex.to_owned(), + withdrawal_credentials: format!("0x{}", "00".repeat(32)), + effective_balance: "32000000000".to_owned(), + slashed: false, + activation_eligibility_epoch: "0".to_owned(), + activation_epoch: "0".to_owned(), + exit_epoch: "18446744073709551615".to_owned(), + withdrawable_epoch: "18446744073709551615".to_owned(), + }, + } + } + + async fn body_json(response: Response) -> serde_json::Value { + let bytes = to_bytes(response.into_body(), 4 * 1024 * 1024) + .await + .unwrap(); + serde_json::from_slice(&bytes).unwrap() + } + + /// `submit_proposal_preparations` swallows the request and returns 200. + #[tokio::test] + async fn prepare_beacon_proposer_swallows_and_returns_200() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/prepare_beacon_proposer") + .header("content-type", "application/json") + .body(Body::from("[]")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + } + + /// `propose_block_v3` returns the versioned block plus the four + /// consensus/value response headers; builder mode maximises the boost. + #[tokio::test] + async fn propose_block_v3_returns_block_with_headers() { + let handler = TestHandler::default().with_proposal(EthResponse { + data: phase0_proposal(42), + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let opts_handle = handler.proposal_opts.clone(); + let app = test_router(Arc::new(handler), true); + + let randao = format!("0x{}", "ab".repeat(96)); + let req = Request::builder() + .uri(format!( + "/eth/v3/validator/blocks/42?randao_reveal={randao}" + )) + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let headers = resp.headers(); + assert_eq!(headers.get(VERSION_HEADER).unwrap(), "phase0"); + assert_eq!( + headers.get(EXECUTION_PAYLOAD_BLINDED_HEADER).unwrap(), + "false" + ); + assert_eq!(headers.get(EXECUTION_PAYLOAD_VALUE_HEADER).unwrap(), "1"); + assert_eq!(headers.get(CONSENSUS_BLOCK_VALUE_HEADER).unwrap(), "1"); + + let json = body_json(resp).await; + assert_eq!(json["version"], "phase0"); + assert_eq!(json["execution_payload_blinded"], false); + assert_eq!(json["data"]["slot"], "42"); + + // builder_enabled → boost factor maxed. + let opts = opts_handle.lock().unwrap().clone().unwrap(); + assert_eq!(opts.slot, 42); + assert_eq!(opts.builder_boost_factor, Some(u64::MAX)); + assert_eq!(opts.randao_reveal, [0xab; 96]); + } + + /// Graffiti is length-lenient: a short value is zero-padded into the + /// 32-byte array, matching Charon's `copy(graffiti[:], graffitiBytes)`. + #[tokio::test] + async fn propose_block_v3_pads_short_graffiti() { + let handler = TestHandler::default().with_proposal(EthResponse { + data: phase0_proposal(42), + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let opts_handle = handler.proposal_opts.clone(); + let app = test_router(Arc::new(handler), false); + + let randao = format!("0x{}", "ab".repeat(96)); + let req = Request::builder() + .uri(format!( + "/eth/v3/validator/blocks/42?randao_reveal={randao}&graffiti=0xdeadbeef" + )) + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let opts = opts_handle.lock().unwrap().clone().unwrap(); + let mut expected = [0u8; 32]; + expected[..4].copy_from_slice(&[0xde, 0xad, 0xbe, 0xef]); + assert_eq!(opts.graffiti, expected); + // builder disabled → boost factor 0 (always sent, never omitted). + assert_eq!(opts.builder_boost_factor, Some(0)); + } + + /// Missing `randao_reveal` is a 400. + #[tokio::test] + async fn propose_block_v3_rejects_missing_randao() { + let handler = TestHandler::default().with_proposal(EthResponse { + data: phase0_proposal(42), + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let app = test_router(Arc::new(handler), false); + let req = Request::builder() + .uri("/eth/v3/validator/blocks/42") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// `submit_proposal` decodes the JSON block keyed by the version header and + /// forwards it; the handler records the right version + blinded flag. + #[tokio::test] + async fn submit_proposal_decodes_and_forwards() { + let handler = TestHandler::default(); + let submitted = handler.submitted_proposal.clone(); + let app = test_router(Arc::new(handler), false); + + let block = SignedProposalBlock::Phase0(phase0_signed_block(9)); + let body = serde_json::to_vec(&block).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/blocks") + .header("content-type", "application/json") + .header(VERSION_HEADER, "phase0") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = submitted.lock().unwrap().clone().unwrap(); + assert_eq!(got.0.version, DataVersion::Phase0); + assert!(!got.0.blinded); + } + + /// `submit_proposal` decodes an SSZ (`application/octet-stream`) body via + /// the bare per-fork block codec keyed by the version header. + #[tokio::test] + async fn submit_proposal_decodes_ssz_body() { + use ssz::Encode; + + let handler = TestHandler::default(); + let submitted = handler.submitted_proposal.clone(); + let app = test_router(Arc::new(handler), false); + + // The SSZ body is the bare per-fork block, not the Charon versioned + // wire format. + let body = phase0_signed_block(9).as_ssz_bytes(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/blocks") + .header("content-type", "application/octet-stream") + .header(VERSION_HEADER, "phase0") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = submitted.lock().unwrap().clone().unwrap(); + assert_eq!(got.0.version, DataVersion::Phase0); + assert!(matches!(got.0.block, SignedProposalBlock::Phase0(_))); + } + + /// A capitalised version header is accepted (case-insensitive, mirroring + /// go-eth2-client's UnmarshalJSON). + #[tokio::test] + async fn submit_proposal_accepts_capitalised_version_header() { + let handler = TestHandler::default(); + let app = test_router(Arc::new(handler), false); + + let block = SignedProposalBlock::Phase0(phase0_signed_block(9)); + let body = serde_json::to_vec(&block).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v2/beacon/blocks") + .header("content-type", "application/json") + .header(VERSION_HEADER, "Phase0") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + } + + /// Missing version header → 400. + #[tokio::test] + async fn submit_proposal_rejects_missing_version_header() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/blocks") + .header("content-type", "application/json") + .body(Body::from("{}")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// An unsupported content type → 415, mirroring Charon's `wrap`. + #[tokio::test] + async fn submit_proposal_rejects_unsupported_content_type() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/blocks") + .header("content-type", "text/plain") + .header(VERSION_HEADER, "phase0") + .body(Body::from("{}")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE); + } + + /// A body that does not match the declared fork → 400. + #[tokio::test] + async fn submit_proposal_rejects_bad_body() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/blocks") + .header("content-type", "application/json") + .header(VERSION_HEADER, "phase0") + .body(Body::from(r#"{"not":"a block"}"#)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// `submit_blinded_block` decodes the blinded JSON block and forwards it. + #[tokio::test] + async fn submit_blinded_block_decodes_and_forwards() { + let handler = TestHandler::default(); + let submitted = handler.submitted_blinded_proposal.clone(); + let app = test_router(Arc::new(handler), false); + + let block = SignedBlindedProposalBlock::Bellatrix(bellatrix_blinded_signed_block(9)); + let body = serde_json::to_vec(&block).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/blinded_blocks") + .header("content-type", "application/json") + .header(VERSION_HEADER, "bellatrix") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = submitted.lock().unwrap().clone().unwrap(); + assert_eq!(got.version, DataVersion::Bellatrix); + } + + /// A pre-Bellatrix fork has no blinded form → 400. + #[tokio::test] + async fn submit_blinded_block_rejects_phase0() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/blinded_blocks") + .header("content-type", "application/json") + .header(VERSION_HEADER, "phase0") + .body(Body::from("{}")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// `get_validators` via repeated/CSV `id` query and the JSON response + /// shape. + #[tokio::test] + async fn get_validators_by_query_id() { + let handler = TestHandler::default().with_validators(EthResponse { + data: vec![sample_validator_datum(7, &format!("0x{}", "11".repeat(48)))], + execution_optimistic: false, + finalized: true, + dependent_root: None, + }); + let opts_handle = handler.validators_opts.clone(); + let app = test_router(Arc::new(handler), false); + + let req = Request::builder() + .uri("/eth/v1/beacon/states/head/validators?id=7,8") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let json = body_json(resp).await; + assert_eq!(json["finalized"], true); + assert_eq!(json["data"][0]["index"], "7"); + + let opts = opts_handle.lock().unwrap().clone().unwrap(); + assert_eq!(opts.state, "head"); + assert_eq!(opts.indices, vec![7, 8]); + assert!(opts.pubkeys.is_empty()); + } + + /// A `0x`-prefixed first id routes the whole batch as pubkeys, per Go's + /// `getValidatorsByID` first-element dispatch. + #[tokio::test] + async fn get_validators_by_pubkey_dispatch_on_first_id() { + let pubkey_hex = format!("0x{}", "11".repeat(48)); + let handler = TestHandler::default().with_validators(EthResponse { + data: vec![], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let opts_handle = handler.validators_opts.clone(); + let app = test_router(Arc::new(handler), false); + + let req = Request::builder() + .uri(format!( + "/eth/v1/beacon/states/head/validators?id={pubkey_hex}" + )) + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + // Empty result serializes to `[]`, not null. + let json = body_json(resp).await; + assert_eq!(json["data"], serde_json::json!([])); + + let opts = opts_handle.lock().unwrap().clone().unwrap(); + assert_eq!(opts.pubkeys.len(), 1); + assert_eq!(opts.pubkeys[0], [0x11; 48]); + assert!(opts.indices.is_empty()); + } + + /// POST with `{"ids":[...]}` body when the query carries no ids. + #[tokio::test] + async fn get_validators_by_json_body_ids() { + let handler = TestHandler::default().with_validators(EthResponse { + data: vec![], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let opts_handle = handler.validators_opts.clone(); + let app = test_router(Arc::new(handler), false); + + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/states/head/validators") + .header("content-type", "application/json") + .body(Body::from(r#"{"ids":["3","4"]}"#)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let opts = opts_handle.lock().unwrap().clone().unwrap(); + assert_eq!(opts.indices, vec![3, 4]); + } + + /// `get_validator` returns a single object on exactly one result. + #[tokio::test] + async fn get_validator_single_result() { + let handler = TestHandler::default().with_validators(EthResponse { + data: vec![sample_validator_datum(7, &format!("0x{}", "11".repeat(48)))], + execution_optimistic: false, + finalized: true, + dependent_root: None, + }); + let app = test_router(Arc::new(handler), false); + + let req = Request::builder() + .uri("/eth/v1/beacon/states/head/validators/7") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let json = body_json(resp).await; + assert_eq!(json["data"]["index"], "7"); + assert!(json["data"].is_object()); + } + + /// `get_validator` returns 404 when the upstream has no match. + #[tokio::test] + async fn get_validator_not_found() { + let handler = TestHandler::default().with_validators(EthResponse { + data: vec![], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let app = test_router(Arc::new(handler), false); + + let req = Request::builder() + .uri("/eth/v1/beacon/states/head/validators/7") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + /// `get_validator` returns 500 when the upstream returns more than one. + #[tokio::test] + async fn get_validator_multiple_results_is_500() { + let handler = TestHandler::default().with_validators(EthResponse { + data: vec![ + sample_validator_datum(7, &format!("0x{}", "11".repeat(48))), + sample_validator_datum(8, &format!("0x{}", "22".repeat(48))), + ], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let app = test_router(Arc::new(handler), false); + + let req = Request::builder() + .uri("/eth/v1/beacon/states/head/validators/7") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); + } + + /// The reverse-proxy fallback forwards method, path, query and body to the + /// upstream beacon node, applies basic auth from the URL userinfo, and + /// returns the upstream response. + #[tokio::test] + async fn proxy_forwards_to_upstream() { + use wiremock::{ + Mock, MockServer, ResponseTemplate, + matchers::{basic_auth, method, path, query_param}, + }; + + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/eth/v1/some/passthrough")) + .and(query_param("foo", "bar")) + .and(basic_auth("user", "pass")) + .respond_with(ResponseTemplate::new(200).set_body_string("upstream-ok")) + .mount(&server) + .await; + + // Inject basic-auth userinfo into the upstream URL. + let mut upstream: reqwest::Url = server.uri().parse().unwrap(); + upstream.set_username("user").unwrap(); + upstream.set_password(Some("pass")).unwrap(); + + let app = new_router(Arc::new(TestHandler::default()), false, upstream); + let req = Request::builder() + .uri("/eth/v1/some/passthrough?foo=bar") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let bytes = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + assert_eq!(&bytes[..], b"upstream-ok"); + } + + /// The proxy propagates a non-2xx upstream status to the client. + #[tokio::test] + async fn proxy_propagates_upstream_error_status() { + use wiremock::{ + Mock, MockServer, ResponseTemplate, + matchers::{method, path}, + }; + + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/eth/v1/missing")) + .respond_with(ResponseTemplate::new(404).set_body_string("nope")) + .mount(&server) + .await; + + let upstream: reqwest::Url = server.uri().parse().unwrap(); + let app = new_router(Arc::new(TestHandler::default()), false, upstream); + let req = Request::builder() + .uri("/eth/v1/missing") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + // ==================================================================== + // sync committee endpoints + // ==================================================================== + + /// A well-formed sync-committee messages array is parsed and forwarded to + /// the handler, which records it; the route returns `200`. + #[tokio::test] + async fn submit_sync_committee_messages_forwards_array() { + use axum::{ + body::Body, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = Arc::new(TestHandler::default()); + let recorder = Arc::clone(&handler.submitted_sync_messages); + let app = test_router(handler, false); + + let body = r#"[{ + "slot": "5", + "beacon_block_root": "0x1111111111111111111111111111111111111111111111111111111111111111", + "validator_index": "9", + "signature": "0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" + }]"#; + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/pool/sync_committees") + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let recorded = recorder.lock().unwrap().clone().expect("messages recorded"); + assert_eq!(recorded.len(), 1); + assert_eq!(recorded[0].slot, 5); + assert_eq!(recorded[0].validator_index, 9); + } + + /// A non-JSON content type on a sync-committee submit is rejected with + /// `415`, matching the JSON-only encoding declared by Charon. + #[tokio::test] + async fn submit_sync_committee_messages_rejects_non_json() { + use axum::{ + body::Body, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/pool/sync_committees") + .header("content-type", "application/octet-stream") + .body(Body::from("[]")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE); + } + + /// An empty body on a sync-committee submit is a `400`, mirroring Charon's + /// `unmarshal` "empty request body". + #[tokio::test] + async fn submit_contribution_and_proofs_rejects_empty_body() { + use axum::{ + body::Body, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/contribution_and_proofs") + .header("content-type", "application/json") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// `GET /sync_committee_contribution` reads the query parameters and + /// returns the handler's contribution under a `data` envelope. + #[tokio::test] + async fn sync_committee_contribution_returns_data_envelope() { + use axum::{ + body::{Body, to_bytes}, + http::{Method, Request}, + }; + use pluto_eth2api::spec::altair; + use tower::ServiceExt; + + let contribution = altair::SyncCommitteeContribution { + slot: 12, + beacon_block_root: [0x33; 32], + subcommittee_index: 1, + aggregation_bits: pluto_ssz::BitVector::new(), + signature: [0xAB; 96], + }; + let handler = TestHandler::default().with_sync_committee_contribution(EthResponse { + data: contribution, + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let opts_recorder = Arc::clone(&handler.sync_committee_contribution_opts); + let app = test_router(Arc::new(handler), false); + + let req = Request::builder() + .method(Method::GET) + .uri("/eth/v1/validator/sync_committee_contribution?slot=12&subcommittee_index=1&beacon_block_root=0x3333333333333333333333333333333333333333333333333333333333333333") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["data"]["slot"], "12"); + assert_eq!(json["data"]["subcommittee_index"], "1"); + + let opts = opts_recorder + .lock() + .unwrap() + .clone() + .expect("opts recorded"); + assert_eq!(opts.slot, 12); + assert_eq!(opts.subcommittee_index, 1); + assert_eq!(opts.beacon_block_root, [0x33; 32]); + } + + /// A missing required query parameter on the contribution GET is a `400`. + #[tokio::test] + async fn sync_committee_contribution_rejects_missing_query() { + use axum::{ + body::Body, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::GET) + .uri("/eth/v1/validator/sync_committee_contribution?slot=12") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// `POST /sync_committee_selections` forwards the parsed array and returns + /// the aggregated selections under a `data` envelope. + #[tokio::test] + async fn sync_committee_selections_returns_data_envelope() { + use axum::{ + body::{Body, to_bytes}, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let aggregated = SyncCommitteeSelection { + slot: 1, + validator_index: 3, + subcommittee_index: 2, + selection_proof: [0x7E; 96], + }; + let handler = TestHandler::default().with_sync_committee_selections(EthResponse { + data: vec![aggregated], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let recorder = Arc::clone(&handler.submitted_sync_selections); + let app = test_router(Arc::new(handler), false); + + let body = r#"[{ + "slot": "1", + "validator_index": "3", + "subcommittee_index": "2", + "selection_proof": "0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" + }]"#; + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/sync_committee_selections") + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let bytes = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + assert_eq!(json["data"][0]["validator_index"], "3"); + assert_eq!(json["data"][0]["subcommittee_index"], "2"); + + let recorded = recorder + .lock() + .unwrap() + .clone() + .expect("selections recorded"); + assert_eq!(recorded.len(), 1); + assert_eq!(recorded[0].validator_index, 3); + } } diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs index 45980fe7..f6e09770 100644 --- a/crates/core/src/validatorapi/testutils.rs +++ b/crates/core/src/validatorapi/testutils.rs @@ -4,6 +4,8 @@ //! every method. As each router endpoint is ported, the relevant method is //! overridden here so the route's unit test can drive it. +use std::sync::{Arc, Mutex}; + use async_trait::async_trait; use super::{ @@ -34,6 +36,35 @@ pub struct TestHandler { pub sync_committee_duties_response: Option, /// Value returned by [`Handler::attestation_data`]. pub attestation_data_response: Option, + /// Value returned by [`Handler::proposal`]. + pub proposal_response: Option>, + /// Value returned by [`Handler::validators`]. + pub validators_response: Option>>, + /// Records the last [`ProposalOpts`] passed to [`Handler::proposal`]. + pub proposal_opts: Arc>>, + /// Records the last proposal submitted via [`Handler::submit_proposal`]. + pub submitted_proposal: Arc>>, + /// Records the last proposal submitted via + /// [`Handler::submit_blinded_proposal`]. + pub submitted_blinded_proposal: Arc>>, + /// Records the last [`ValidatorsOpts`] passed to [`Handler::validators`]. + pub validators_opts: Arc>>, + /// Value returned by [`Handler::sync_committee_contribution`]. + pub sync_committee_contribution_response: Option>, + /// Value returned by [`Handler::sync_committee_selections`]. + pub sync_committee_selections_response: Option>>, + /// Records the messages submitted via + /// [`Handler::submit_sync_committee_messages`]. + pub submitted_sync_messages: Arc>>>, + /// Records the contributions submitted via + /// [`Handler::submit_sync_committee_contributions`]. + pub submitted_sync_contributions: Arc>>>, + /// Records the selections passed to + /// [`Handler::sync_committee_selections`]. + pub submitted_sync_selections: Arc>>>, + /// Records the last [`SyncCommitteeContributionOpts`] passed to + /// [`Handler::sync_committee_contribution`]. + pub sync_committee_contribution_opts: Arc>>, } impl TestHandler { @@ -45,6 +76,18 @@ impl TestHandler { } } + /// Sets the response returned by [`Handler::proposal`]. + pub fn with_proposal(mut self, response: EthResponse) -> Self { + self.proposal_response = Some(response); + self + } + + /// Sets the response returned by [`Handler::validators`]. + pub fn with_validators(mut self, response: EthResponse>) -> Self { + self.validators_response = Some(response); + self + } + /// Sets the response returned by [`Handler::proposer_duties`]. pub fn with_proposer_duties(mut self, response: ProposerDutiesResponse) -> Self { self.proposer_duties_response = Some(response); @@ -68,6 +111,24 @@ impl TestHandler { self.attestation_data_response = Some(response); self } + + /// Sets the response returned by [`Handler::sync_committee_contribution`]. + pub fn with_sync_committee_contribution( + mut self, + response: EthResponse, + ) -> Self { + self.sync_committee_contribution_response = Some(response); + self + } + + /// Sets the response returned by [`Handler::sync_committee_selections`]. + pub fn with_sync_committee_selections( + mut self, + response: EthResponse>, + ) -> Self { + self.sync_committee_selections_response = Some(response); + self + } } #[async_trait] @@ -129,20 +190,32 @@ impl Handler for TestHandler { async fn proposal( &self, - _opts: ProposalOpts, + opts: ProposalOpts, ) -> Result, ApiError> { - unimplemented!("proposal not stubbed in TestHandler") + *self.proposal_opts.lock().expect("proposal_opts lock") = Some(opts); + Ok(self + .proposal_response + .clone() + .expect("proposal not stubbed in TestHandler")) } - async fn submit_proposal(&self, _proposal: VersionedSignedProposal) -> Result<(), ApiError> { - unimplemented!("submit_proposal not stubbed in TestHandler") + async fn submit_proposal(&self, proposal: VersionedSignedProposal) -> Result<(), ApiError> { + *self + .submitted_proposal + .lock() + .expect("submitted_proposal lock") = Some(proposal); + Ok(()) } async fn submit_blinded_proposal( &self, - _proposal: VersionedSignedBlindedProposal, + proposal: VersionedSignedBlindedProposal, ) -> Result<(), ApiError> { - unimplemented!("submit_blinded_proposal not stubbed in TestHandler") + *self + .submitted_blinded_proposal + .lock() + .expect("submitted_blinded_proposal lock") = Some(proposal); + Ok(()) } async fn aggregate_attestation( @@ -168,16 +241,27 @@ impl Handler for TestHandler { async fn sync_committee_selections( &self, - _selections: Vec, + selections: Vec, ) -> Result>, ApiError> { - unimplemented!("sync_committee_selections not stubbed in TestHandler") + *self + .submitted_sync_selections + .lock() + .expect("submitted_sync_selections lock") = Some(selections); + Ok(self + .sync_committee_selections_response + .clone() + .expect("sync_committee_selections not stubbed in TestHandler")) } async fn validators( &self, - _opts: ValidatorsOpts, + opts: ValidatorsOpts, ) -> Result>, ApiError> { - unimplemented!("validators not stubbed in TestHandler") + *self.validators_opts.lock().expect("validators_opts lock") = Some(opts); + Ok(self + .validators_response + .clone() + .expect("validators not stubbed in TestHandler")) } async fn submit_validator_registrations( @@ -193,22 +277,37 @@ impl Handler for TestHandler { async fn sync_committee_contribution( &self, - _opts: SyncCommitteeContributionOpts, + opts: SyncCommitteeContributionOpts, ) -> Result, ApiError> { - unimplemented!("sync_committee_contribution not stubbed in TestHandler") + *self + .sync_committee_contribution_opts + .lock() + .expect("sync_committee_contribution_opts lock") = Some(opts); + Ok(self + .sync_committee_contribution_response + .clone() + .expect("sync_committee_contribution not stubbed in TestHandler")) } async fn submit_sync_committee_contributions( &self, - _contributions: Vec, + contributions: Vec, ) -> Result<(), ApiError> { - unimplemented!("submit_sync_committee_contributions not stubbed in TestHandler") + *self + .submitted_sync_contributions + .lock() + .expect("submitted_sync_contributions lock") = Some(contributions); + Ok(()) } async fn submit_sync_committee_messages( &self, - _messages: Vec, + messages: Vec, ) -> Result<(), ApiError> { - unimplemented!("submit_sync_committee_messages not stubbed in TestHandler") + *self + .submitted_sync_messages + .lock() + .expect("submitted_sync_messages lock") = Some(messages); + Ok(()) } } diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index a3a9a68a..dcc74506 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -17,6 +17,7 @@ pub use pluto_eth2api::{ GetAttesterDutiesResponseResponseDatum as AttesterDuty, GetProposerDutiesResponseResponse as ProposerDutiesResponse, GetProposerDutiesResponseResponseDatum as ProposerDuty, + GetStateValidatorsResponseResponseDatum as Validator, GetSyncCommitteeDutiesResponseResponse as SyncCommitteeDutiesResponse, GetSyncCommitteeDutiesResponseResponseDatum as SyncCommitteeDuty, GetVersionResponseResponse as NodeVersionResponse, @@ -139,10 +140,6 @@ pub struct AttestationDataResponse { pub data: AttestationData, } -/// Validator payload. Placeholder. -#[derive(Debug, Clone)] -pub struct Validator {} - /// Versioned unsigned proposal payload — alias of the signeddata wrapper. pub use crate::signeddata::VersionedProposal; @@ -170,25 +167,22 @@ pub struct SignedValidatorRegistration {} #[derive(Debug, Clone)] pub struct SignedVoluntaryExit {} -/// Sync-committee message payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SyncCommitteeMessage {} +/// Sync-committee message submitted by the validator client. The validator +/// signs the beacon block root with its sync-committee share. +pub type SyncCommitteeMessage = pluto_eth2api::spec::altair::SyncCommitteeMessage; -/// Sync-committee contribution payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SyncCommitteeContribution {} +/// Aggregated sync-committee contribution returned to the validator client. +pub type SyncCommitteeContribution = pluto_eth2api::spec::altair::SyncCommitteeContribution; -/// Signed contribution-and-proof payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SignedContributionAndProof {} +/// Signed contribution-and-proof submitted by the validator client. +pub type SignedContributionAndProof = pluto_eth2api::spec::altair::SignedContributionAndProof; /// Beacon-committee selection payload. Placeholder. #[derive(Debug, Clone)] pub struct BeaconCommitteeSelection {} -/// Sync-committee selection payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SyncCommitteeSelection {} +/// Sync-committee selection proof exchanged with the validator client. +pub type SyncCommitteeSelection = pluto_eth2api::v1::SyncCommitteeSelection; /// Validator-index request body for the `attester_duties` and /// `sync_committee_duties` endpoints.