diff --git a/Cargo.lock b/Cargo.lock index b65c3ffb..0e63c84e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5649,6 +5649,7 @@ dependencies = [ "prost-types 0.14.3", "rand 0.8.6", "regex", + "reqwest 0.13.3", "serde", "serde_json", "test-case", @@ -5658,6 +5659,7 @@ dependencies = [ "tower", "tracing", "tree_hash", + "url", "vise", "wiremock", ] @@ -5793,6 +5795,7 @@ dependencies = [ "test-case", "thiserror 2.0.18", "tokio", + "tracing", "tree_hash", "tree_hash_derive", "unicode-normalization", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 741f1482..2d71650c 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -23,6 +23,7 @@ pluto-featureset.workspace = true prost.workspace = true prost-types.workspace = true regex.workspace = true +reqwest = { workspace = true, features = ["stream"] } serde.workspace = true serde_json.workspace = true base64.workspace = true @@ -34,6 +35,7 @@ pluto-eth2util.workspace = true pluto-ssz.workspace = true ssz.workspace = true tree_hash.workspace = true +url.workspace = true [dev-dependencies] anyhow.workspace = true diff --git a/crates/core/src/ssz_codec.rs b/crates/core/src/ssz_codec.rs index 2ae992d9..860e61d9 100644 --- a/crates/core/src/ssz_codec.rs +++ b/crates/core/src/ssz_codec.rs @@ -7,6 +7,7 @@ use pluto_eth2api::{ spec::{altair, bellatrix, capella, deneb, electra, fulu, phase0}, + v1, versioned::{self, AttestationPayload, DataVersion, SignedAggregateAndProofPayload}, }; use pluto_ssz::{ @@ -41,6 +42,10 @@ pub enum SszCodecError { /// Unknown or unsupported data version. #[error("ssz unknown version: {0}")] UnknownVersion(u64), + /// A fixed-size array body length is not a whole multiple of the element + /// size. + #[error("invalid buffer size")] + InvalidBufferSize, /// Inner SSZ binary decoding failed. #[error("ssz decode: {0}")] Decode(String), @@ -144,6 +149,35 @@ pub fn decode_signed_contribution_and_proof( Ok(altair::SignedContributionAndProof::from_ssz_bytes(bytes)?) } +/// SSZ-serialized byte length of a single `v1::SignedValidatorRegistration`: +/// `fee_recipient`(20) + `gas_limit`(8) + `timestamp`(8) + `pubkey`(48) + +/// `signature`(96). The register-validator endpoint accepts a bare +/// concatenation of these fixed-size objects (no length prefix), so the body +/// must be a whole multiple of this size. +const SIGNED_VALIDATOR_REGISTRATION_SSZ_SIZE: usize = 180; + +/// Decodes an array of `v1::SignedValidatorRegistration` from a bare SSZ +/// concatenation. The body must be a whole multiple of +/// [`SIGNED_VALIDATOR_REGISTRATION_SSZ_SIZE`]; otherwise +/// [`SszCodecError::InvalidBufferSize`] is returned. +pub fn decode_signed_validator_registrations( + bytes: &[u8], +) -> Result, SszCodecError> { + if !bytes + .len() + .is_multiple_of(SIGNED_VALIDATOR_REGISTRATION_SSZ_SIZE) + { + return Err(SszCodecError::InvalidBufferSize); + } + + let mut out = Vec::with_capacity(bytes.len() / SIGNED_VALIDATOR_REGISTRATION_SSZ_SIZE); + for chunk in bytes.chunks_exact(SIGNED_VALIDATOR_REGISTRATION_SSZ_SIZE) { + out.push(v1::SignedValidatorRegistration::from_ssz_bytes(chunk)?); + } + + Ok(out) +} + // =========================================================================== // Versioned type helpers // =========================================================================== @@ -449,6 +483,57 @@ fn encode_proposal_block(block: &versioned::SignedProposalBlock) -> Result Result { + decode_proposal_block(version, false, bytes) +} + +/// Decodes a bare per-fork blinded signed proposal block body from SSZ binary, +/// selecting the variant by `version`. +/// +/// The raw beacon-API SSZ block body posted to +/// `/eth/v{1,2}/beacon/blinded_blocks`; the fork is taken from the +/// `Eth-Consensus-Version` request header. +pub fn decode_signed_blinded_proposal_block_body( + version: DataVersion, + bytes: &[u8], +) -> Result { + use versioned::SignedBlindedProposalBlock; + Ok(match version { + DataVersion::Bellatrix => SignedBlindedProposalBlock::Bellatrix( + bellatrix::SignedBlindedBeaconBlock::from_ssz_bytes(bytes)?, + ), + DataVersion::Capella => SignedBlindedProposalBlock::Capella( + capella::SignedBlindedBeaconBlock::from_ssz_bytes(bytes)?, + ), + DataVersion::Deneb => SignedBlindedProposalBlock::Deneb( + deneb::SignedBlindedBeaconBlock::from_ssz_bytes(bytes)?, + ), + DataVersion::Electra => SignedBlindedProposalBlock::Electra( + electra::SignedBlindedBeaconBlock::from_ssz_bytes(bytes)?, + ), + // Fulu blinded blocks share the Electra layout. + DataVersion::Fulu => SignedBlindedProposalBlock::Fulu( + electra::SignedBlindedBeaconBlock::from_ssz_bytes(bytes)?, + ), + DataVersion::Phase0 | DataVersion::Altair | DataVersion::Unknown => { + return Err(SszCodecError::UnknownVersion( + version.to_legacy_u64().unwrap_or(u64::MAX), + )); + } + }) +} + fn decode_proposal_block( version: DataVersion, blinded: bool, @@ -521,6 +606,50 @@ mod tests { } } + fn sample_signed_registration(byte: u8) -> v1::SignedValidatorRegistration { + v1::SignedValidatorRegistration { + message: v1::ValidatorRegistration { + fee_recipient: [byte; 20], + gas_limit: 30_000_000, + timestamp: 1_700_000_000, + pubkey: [byte; 48], + }, + signature: [byte; 96], + } + } + + #[test] + fn roundtrip_signed_validator_registrations() { + use ssz::Encode; + + let regs = vec![ + sample_signed_registration(0x1A), + sample_signed_registration(0x2B), + ]; + let mut body = Vec::new(); + for reg in ®s { + body.extend_from_slice(®.as_ssz_bytes()); + } + // Each object is exactly 180 bytes. + assert_eq!(body.len(), 2 * SIGNED_VALIDATOR_REGISTRATION_SSZ_SIZE); + + let decoded = decode_signed_validator_registrations(&body).unwrap(); + assert_eq!(decoded, regs); + } + + #[test] + fn decode_signed_validator_registrations_rejects_misaligned_buffer() { + let err = decode_signed_validator_registrations(&[0u8; 181]).unwrap_err(); + assert!(matches!(err, SszCodecError::InvalidBufferSize)); + assert_eq!(err.to_string(), "invalid buffer size"); + } + + #[test] + fn decode_signed_validator_registrations_empty_is_empty() { + let decoded = decode_signed_validator_registrations(&[]).unwrap(); + assert!(decoded.is_empty()); + } + #[test] fn roundtrip_phase0_attestation() { let att = phase0::Attestation { diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 1f9c099e..bc445b25 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -10,15 +10,19 @@ use async_trait::async_trait; use axum::http::StatusCode; use pluto_eth2api::{ EthBeaconNodeApiClient, GetAttesterDutiesRequest, GetAttesterDutiesResponse, - GetProposerDutiesRequest, GetProposerDutiesResponse, GetSyncCommitteeDutiesRequest, - GetSyncCommitteeDutiesResponse, + GetProposerDutiesRequest, GetProposerDutiesResponse, GetStateValidatorsResponseResponse, + GetSyncCommitteeDutiesRequest, GetSyncCommitteeDutiesResponse, PostStateValidatorsRequest, + PostStateValidatorsRequestPath, PostStateValidatorsResponse, ValidatorRequestBody, spec::phase0::{BLSPubKey, Epoch, Root, ValidatorIndex}, valcache::{ActiveValidators, CachedValidatorsProvider}, - versioned::{DataVersion, SignedBlindedProposalBlock, SignedProposalBlock}, + versioned::{ + BuilderVersion, DataVersion, SignedBlindedProposalBlock, SignedProposalBlock, + VersionedSignedValidatorRegistration as RawVersionedSignedValidatorRegistration, + }, }; use pluto_eth2util::signing::{self, DomainName, SigningError}; use tokio::time::error::Elapsed; -use tracing::{debug, instrument}; +use tracing::{debug, info, instrument}; use super::{ error::ApiError, @@ -38,8 +42,10 @@ use super::{ use crate::{ dutydb::{Error as DutyDbError, MemDB}, signeddata::{ - SignedDataError, SignedRandao, SyncContribution, VersionedAggregatedAttestation, + SignedDataError, SignedRandao, SignedVoluntaryExit as SignedVoluntaryExitData, + SyncContribution, VersionedAggregatedAttestation, VersionedProposal as UnsignedVersionedProposal, + VersionedSignedValidatorRegistration as VersionedSignedValidatorRegistrationData, }, types::{Duty, DutyDefinitionSet, ParSignedDataSet, PubKey, Signature, SignedData, SlotNumber}, version, @@ -147,13 +153,11 @@ pub struct Component { /// translate a validator-client-supplied `validator_index` into the /// cluster's DV root public key. Mirrors Go's `eth2Cl.ActiveValidators`, /// which is itself backed by the beacon-node validator cache. - #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] validator_cache: Arc, /// In-memory DutyDB used to await consensus output (e.g. attestation /// data) produced by the rest of the pipeline. dutydb: Arc, /// Threshold BLS share index assigned to this node (1-indexed). - #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] share_idx: u64, /// Maps DV root public keys to this node's public share. Used to rewrite /// validator-client-facing endpoints (proposer/attester duties, etc.) so @@ -161,17 +165,12 @@ pub struct Component { pub_share_by_pubkey: HashMap, /// Whether builder mode is enabled. Read by `propose_block_v3` and the /// validator-registration submitter. - #[allow( - dead_code, - reason = "consumed by propose_block_v3 / submit_validator_registrations" - )] builder_enabled: bool, /// Skip signature verification on partial-signed submissions. Test-only. insecure_test: bool, /// Subscribers invoked by submit endpoints once a partial-signed-data set /// has been validated. Each entry clones the set before invoking the /// user-provided callback. - #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] subs: Vec, /// Looks up an unsigned beacon proposal for a slot. #[allow(dead_code, reason = "consumed by proposal handler in later PRs")] @@ -259,7 +258,6 @@ impl Component { /// bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. Mirrors Go's /// `c.eth2Cl.ActiveValidators(ctx)`, which is itself implemented via the /// beacon-node validator cache. - #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] async fn fetch_active_validators(&self) -> Result { tokio::time::timeout( UPSTREAM_REQUEST_TIMEOUT, @@ -426,7 +424,6 @@ impl Component { /// it is processing, then invokes this helper. /// /// Skipped entirely when [`Self::insecure_test`] is set. - #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] #[instrument(skip_all, fields(domain = ?domain_name, epoch))] pub async fn verify_partial_sig( &self, @@ -459,6 +456,139 @@ impl Component { Ok(()) } + + /// Processes a single builder validator registration: resolves the group + /// public key, skips non-distributed-validator keys, maps the registration + /// timestamp to a slot, verifies the partial signature, and broadcasts the + /// partial-signed registration to subscribers. + #[instrument(skip_all)] + async fn submit_registration( + &self, + registration: SignedValidatorRegistration, + ) -> Result<(), ApiError> { + // This is the group (DV root) public key, not a per-share key. + let eth2_pubkey = registration.message.pubkey; + let pubkey = pubkey_from_bls(ð2_pubkey); + + // Swallow non-DV registrations: many validator clients submit + // registrations for every key they know, including ones this cluster + // does not manage. + if !self.pub_share_by_pubkey.contains_key(ð2_pubkey) { + debug!( + pubkey = %pubkey, + "Swallowing non-dv registration, this is a known limitation for many validator clients", + ); + return Ok(()); + } + + let timestamp = chrono::DateTime::from_timestamp( + i64::try_from(registration.message.timestamp).map_err(|_| { + ApiError::new( + StatusCode::BAD_REQUEST, + "registration timestamp out of range", + ) + })?, + 0, + ) + .ok_or_else(|| ApiError::new(StatusCode::BAD_REQUEST, "invalid registration timestamp"))?; + + let slot = tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + pluto_eth2util::helpers::slot_from_timestamp(&self.eth2_cl, timestamp), + ) + .await + .map_err(|_: Elapsed| upstream_timeout("slot from timestamp"))? + .map_err(|err| { + ApiError::new( + StatusCode::BAD_GATEWAY, + "could not resolve slot from timestamp", + ) + .with_source(err) + })?; + let duty = Duty::new_builder_registration_duty(SlotNumber::new(slot)); + + let versioned = RawVersionedSignedValidatorRegistration { + version: BuilderVersion::V1, + v1: Some(registration), + }; + let par_sig = + VersionedSignedValidatorRegistrationData::new_partial(versioned, self.share_idx) + .map_err(map_signed_data_error)?; + + // Verify registration signature. The application-builder domain always + // uses epoch 0. + self.verify_par_signed_registration(&pubkey, &par_sig) + .await?; + + debug!(pubkey = %pubkey, slot, "Builder registration submitted by validator client"); + + let mut set = ParSignedDataSet::new(); + set.insert(pubkey, par_sig); + for sub in &self.subs { + sub(&duty, &set).await.map_err(|err| { + ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, "subscriber failed") + .with_boxed_source(err) + })?; + } + + Ok(()) + } + + /// Verifies the partial signature of a wrapped voluntary exit against this + /// node's public share. The voluntary-exit domain uses the exit message's + /// own epoch. + async fn verify_par_signed_exit( + &self, + pubkey: &PubKey, + epoch: Epoch, + par_sig: &crate::types::ParSignedData, + ) -> Result<(), ApiError> { + self.verify_par_signed(pubkey, DomainName::VoluntaryExit, epoch, par_sig) + .await + } + + /// Verifies the partial signature of a wrapped validator registration + /// against this node's public share. The application-builder domain always + /// uses epoch 0. + async fn verify_par_signed_registration( + &self, + pubkey: &PubKey, + par_sig: &crate::types::ParSignedData, + ) -> Result<(), ApiError> { + self.verify_par_signed(pubkey, DomainName::ApplicationBuilder, 0, par_sig) + .await + } + + /// Extracts the signature and message root from a [`ParSignedData`] + /// wrapper and verifies them against this node's public share under the + /// given signing domain and epoch. + async fn verify_par_signed( + &self, + pubkey: &PubKey, + domain: DomainName, + epoch: Epoch, + par_sig: &crate::types::ParSignedData, + ) -> Result<(), ApiError> { + let signature = par_sig.signed_data.signature().map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "could not extract partial signature", + ) + .with_source(err) + })?; + let message_root = par_sig.signed_data.message_root().map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "could not derive message root", + ) + .with_source(err) + })?; + + let pubkey_bytes = pubkey_to_bls(pubkey); + self.verify_partial_sig(&pubkey_bytes, domain, epoch, message_root, &signature) + .await + .map_err(verify_partial_sig_error) + } } /// Errors returned by [`Component::verify_partial_sig`]. @@ -881,22 +1011,158 @@ impl Handler for Component { #[instrument(skip_all)] async fn validators( &self, - _opts: ValidatorsOpts, + opts: ValidatorsOpts, ) -> Result>, ApiError> { - unimplemented!("validators not yet ported") + // The VC sends share pubkeys (one per DV root). Translate each share + // back to the cluster's root pubkey before forwarding upstream, since + // the beacon node only knows the root keys. An empty `pubkeys` is + // forwarded as `None` so the upstream is not artificially narrowed. + // + // Port of `Validators` in + // `core/validatorapi/validatorapi.go` (lines 1218–1296). + let pubkey_by_share = invert_pub_share_map(&self.pub_share_by_pubkey); + + let mut root_pubkeys: Vec = Vec::with_capacity(opts.pubkeys.len()); + for share in &opts.pubkeys { + let root = pubkey_by_share.get(share).ok_or_else(|| { + // Mirrors the Go `getPubKeyFunc` "unknown public key" branch. + ApiError::new( + StatusCode::BAD_REQUEST, + "unknown validator public key in request", + ) + })?; + root_pubkeys.push(format_bls_pubkey(root)); + } + + // Upstream's `id` field accepts either a pubkey hex string or a + // decimal validator-index string — both go in the same `ids` array. + let mut ids: Vec = root_pubkeys; + ids.extend(opts.indices.iter().map(|idx| idx.to_string())); + + let request = PostStateValidatorsRequest { + path: PostStateValidatorsRequestPath { + state_id: opts.state.clone(), + }, + body: ValidatorRequestBody { + ids: if ids.is_empty() { None } else { Some(ids) }, + // Status filter is not exposed by Pluto's `ValidatorsOpts`; the + // Go reference also omits it from the upstream call. + statuses: None, + }, + }; + + let response = tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.eth2_cl.post_state_validators(request), + ) + .await + .map_err(|_| upstream_timeout("validators"))? + .map_err(|err| upstream_call_failed("validators", err.into()))?; + + let payload: GetStateValidatorsResponseResponse = match response { + PostStateValidatorsResponse::Ok(payload) => payload, + PostStateValidatorsResponse::BadRequest(body) => { + return Err(upstream_status_error( + StatusCode::BAD_REQUEST, + "validators", + body, + )); + } + PostStateValidatorsResponse::NotFound(body) => { + return Err(upstream_status_error( + StatusCode::NOT_FOUND, + "validators", + body, + )); + } + other @ (PostStateValidatorsResponse::InternalServerError(_) + | PostStateValidatorsResponse::Unknown) => { + return Err(upstream_unexpected("validators", other)); + } + }; + + // The `ignoreNotFound` flag mirrors the Go contract: when the caller + // filtered by indices only (no pubkeys), the upstream may return + // validators that are not part of this cluster's share map, and + // those validators should pass through with their root pubkey + // untouched. Otherwise an unknown pubkey indicates a configuration + // error and is surfaced to the caller. + let ignore_not_found = opts.indices.is_empty(); + let data = convert_validators(payload.data, &self.pub_share_by_pubkey, ignore_not_found)?; + + Ok(EthResponse { + data, + execution_optimistic: payload.execution_optimistic, + finalized: payload.finalized, + dependent_root: None, + }) } #[instrument(skip_all)] async fn submit_validator_registrations( &self, - _registrations: Vec, + registrations: Vec, ) -> Result<(), ApiError> { - unimplemented!("submit_validator_registrations not yet ported") + if registrations.is_empty() { + return Ok(()); + } + + // Swallow unexpected validator registrations from VCs (e.g. vouch) + // when builder mode is disabled — they are not actionable. + if !self.builder_enabled { + return Ok(()); + } + + for registration in registrations { + self.submit_registration(registration).await?; + } + + Ok(()) } #[instrument(skip_all)] - async fn submit_voluntary_exit(&self, _exit: SignedVoluntaryExit) -> Result<(), ApiError> { - unimplemented!("submit_voluntary_exit not yet ported") + async fn submit_voluntary_exit(&self, exit: SignedVoluntaryExit) -> Result<(), ApiError> { + let validator_index = exit.message.validator_index; + let epoch = exit.message.epoch; + + let vals = self.fetch_active_validators().await?; + let eth2_pubkey = vals + .get(&validator_index) + .ok_or_else(|| ApiError::new(StatusCode::BAD_REQUEST, "validator not found"))?; + let pubkey = pubkey_from_bls(eth2_pubkey); + + let (_, slots_per_epoch) = + tokio::time::timeout(UPSTREAM_REQUEST_TIMEOUT, self.eth2_cl.fetch_slots_config()) + .await + .map_err(|_: Elapsed| upstream_timeout("slots config"))? + .map_err(|err| { + ApiError::new(StatusCode::BAD_GATEWAY, "could not fetch slots config") + .with_source(err) + })?; + + let exit_slot = slots_per_epoch.checked_mul(epoch).ok_or_else(|| { + ApiError::new(StatusCode::BAD_REQUEST, "voluntary exit slot overflow") + })?; + let duty = Duty::new_voluntary_exit_duty(SlotNumber::new(exit_slot)); + + let par_sig = SignedVoluntaryExitData::new_partial(exit, self.share_idx); + + // Verify voluntary exit signature against this node's public share. + self.verify_par_signed_exit(&pubkey, epoch, &par_sig) + .await?; + + info!("Voluntary exit submitted by validator client"); + + let mut set = ParSignedDataSet::new(); + set.insert(pubkey, par_sig); + for sub in &self.subs { + sub(&duty, &set).await.map_err(|err| { + ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, "subscriber failed") + .with_boxed_source(err) + })?; + } + + Ok(()) } #[instrument(skip_all)] @@ -1066,6 +1332,57 @@ fn swap_sync_committee_pubshares( Ok(()) } +/// Replaces the root public key on each upstream validator entry with this +/// node's public share. Port of `convertValidators` in +/// `core/validatorapi/validatorapi.go` (lines 1305–1332). +/// +/// When `ignore_not_found` is `true`, validators whose root pubkey is not +/// part of this cluster's share map are passed through with their original +/// root pubkey — the upstream filter was index-based, so the result may +/// include validators we do not own and those entries are not dropped. +/// When `false`, an unknown pubkey is rejected: the caller explicitly asked +/// for validators by pubkey, so every entry should be translatable. +fn convert_validators( + upstream: Vec, + pub_share_by_pubkey: &HashMap, + ignore_not_found: bool, +) -> Result, ApiError> { + let mut out = Vec::with_capacity(upstream.len()); + for mut validator in upstream { + let pubkey = parse_bls_pubkey(&validator.validator.pubkey)?; + match pub_share_by_pubkey.get(&pubkey) { + Some(share) => { + validator.validator.pubkey = format_bls_pubkey(share); + } + None if ignore_not_found => { + // Validator does not belong to this cluster — keep the + // entry with its root pubkey unchanged. Mirrors the Go + // `convertValidators` `else if ok` branch. + } + None => { + return Err(ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "pubshare not found for validator", + )); + } + } + out.push(validator); + } + Ok(out) +} + +/// Builds the share → root pubkey map by inverting `pub_share_by_pubkey`. +/// Used by the `validators` handler to translate VC-side share pubkeys back +/// into the cluster's root pubkeys before forwarding upstream. +fn invert_pub_share_map( + pub_share_by_pubkey: &HashMap, +) -> HashMap { + pub_share_by_pubkey + .iter() + .map(|(root, share)| (*share, *root)) + .collect() +} + fn parse_bls_pubkey(s: &str) -> Result { let trimmed = s.strip_prefix("0x").unwrap_or(s); let bytes = hex::decode(trimmed).map_err(|err| { @@ -1094,6 +1411,12 @@ fn pubkey_to_bls(pk: &PubKey) -> BLSPubKey { out } +/// Re-interprets a [`BLSPubKey`] byte-array as a Pluto [`PubKey`]. Both are +/// 48-byte arrays, so this is infallible. +fn pubkey_from_bls(pk: &BLSPubKey) -> PubKey { + PubKey::from(*pk) +} + /// Maps a [`VerifyPartialSigError`] back to an [`ApiError`]. `UnknownPubKey` /// signals a cluster/share-mapping misconfiguration. Signing-level failures /// (zero signature, bad BLS, beacon-node lookup) become 400 since they @@ -1132,21 +1455,6 @@ async fn verify_par_signed_proposal( slot: u64, par_sig: &crate::types::ParSignedData, ) -> Result<(), ApiError> { - let signature = par_sig.signed_data.signature().map_err(|err| { - ApiError::new( - StatusCode::INTERNAL_SERVER_ERROR, - "could not extract partial signature", - ) - .with_source(err) - })?; - let message_root = par_sig.signed_data.message_root().map_err(|err| { - ApiError::new( - StatusCode::INTERNAL_SERVER_ERROR, - "could not derive message root", - ) - .with_source(err) - })?; - let epoch = pluto_eth2util::helpers::epoch_from_slot(&component.eth2_cl, slot) .await .map_err(|err| { @@ -1154,17 +1462,9 @@ async fn verify_par_signed_proposal( .with_source(err) })?; - let pubkey_bytes = pubkey_to_bls(pubkey); component - .verify_partial_sig( - &pubkey_bytes, - DomainName::BeaconProposer, - epoch, - message_root, - &signature, - ) + .verify_par_signed(pubkey, DomainName::BeaconProposer, epoch, par_sig) .await - .map_err(verify_partial_sig_error) } /// Cross-checks a VC-submitted proposal against the consensus proposal that @@ -1353,7 +1653,12 @@ mod tests { unsigneddata::{UnsignedDataSet, UnsignedDutyData}, validatorapi::types::AttestationDataOpts, }; - use pluto_eth2api::valcache::{CompleteValidators, ValidatorCacheError}; + use pluto_crypto::types::PrivateKey; + use pluto_eth2api::{ + spec::phase0, + v1, + valcache::{CompleteValidators, ValidatorCacheError}, + }; /// In-memory [`CachedValidatorsProvider`] for tests. Holds a fixed /// `validator_index -> DV root pubkey` map. `complete_validators` is not @@ -1368,7 +1673,6 @@ mod tests { } /// A cache pre-populated with `validators`. - #[allow(dead_code, reason = "consumed by submit_* handler tests in later PRs")] pub(super) fn arc( validators: HashMap, ) -> Arc { @@ -3017,4 +3321,758 @@ mod tests { .await .unwrap(); } + + // ---------------------------------------------------------------------- + // `validators` tests + // ---------------------------------------------------------------------- + + use pluto_eth2api::{ValidatorResponseValidator, ValidatorStatus}; + use wiremock::{ + Mock, MockServer, ResponseTemplate, + matchers::{method, path}, + }; + + /// Builds a `Validator` (i.e. `GetStateValidatorsResponseResponseDatum`) + /// with the given index and pubkey. Other fields are filled with + /// placeholder values acceptable to the eth2api type. + fn make_validator_datum(index: u64, pubkey: &BLSPubKey) -> Validator { + Validator { + balance: "32000000000".to_owned(), + index: index.to_string(), + status: ValidatorStatus::ActiveOngoing, + validator: ValidatorResponseValidator { + pubkey: format_bls_pubkey(pubkey), + withdrawal_credentials: + "0x0000000000000000000000000000000000000000000000000000000000000000".to_owned(), + effective_balance: "32000000000".to_owned(), + slashed: false, + activation_eligibility_epoch: "0".to_owned(), + activation_epoch: "0".to_owned(), + exit_epoch: "18446744073709551615".to_owned(), + withdrawable_epoch: "18446744073709551615".to_owned(), + }, + } + } + + /// Builds a `Component` whose upstream client points at the given + /// `MockServer`, with the supplied root → share map. The dutydb is the + /// usual never-expiring stub since the `validators` handler does not + /// consult it. + fn make_component_with_upstream( + server: &MockServer, + pub_share_by_pubkey: HashMap, + ) -> Component { + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = + DeadlinerTask::start(cancel.clone(), "validatorapi-tests", FarFutureCalculator); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(server.uri()).unwrap()); + Component::new( + eth2_cl, + dutydb, + 1, + pub_share_by_pubkey, + false, + TestValidatorCache::empty(), + ) + } + + /// Happy path: every upstream entry has a known root pubkey, so each + /// inner `validator.pubkey` is rewritten to this node's share. Mirrors + /// the `else if ok` branch of `convertValidators`. + #[test] + fn convert_validators_rewrites_known_pubkeys() { + let root = [0xAA_u8; 48]; + let share = [0xBB_u8; 48]; + let map = HashMap::from([(root, share)]); + + let upstream = vec![make_validator_datum(7, &root)]; + let out = convert_validators(upstream, &map, false).unwrap(); + + assert_eq!(out.len(), 1); + assert_eq!(out[0].validator.pubkey, format_bls_pubkey(&share)); + assert_eq!(out[0].index, "7"); + } + + /// With `ignore_not_found = true`, an unknown pubkey is passed through + /// unchanged (Go: `else if ok` — the entry is still appended to `resp` + /// with the original root pubkey). + #[test] + fn convert_validators_ignore_not_found_keeps_entry_unchanged() { + let known_root = [0x11_u8; 48]; + let share = [0x22_u8; 48]; + let unknown = [0x33_u8; 48]; + let map = HashMap::from([(known_root, share)]); + + let upstream = vec![ + make_validator_datum(1, &known_root), + make_validator_datum(2, &unknown), + ]; + let out = convert_validators(upstream, &map, true).unwrap(); + + assert_eq!(out.len(), 2); + assert_eq!(out[0].validator.pubkey, format_bls_pubkey(&share)); + // Unknown entry is preserved verbatim. + assert_eq!(out[1].validator.pubkey, format_bls_pubkey(&unknown)); + assert_eq!(out[1].index, "2"); + } + + /// With `ignore_not_found = false`, an unknown pubkey is rejected. + /// Mirrors Go: `if !ok && !ignoreNotFound { return nil, errors.New(...) }`. + #[test] + fn convert_validators_rejects_unknown_when_not_ignoring() { + let known_root = [0x44_u8; 48]; + let share = [0x55_u8; 48]; + let unknown = [0x66_u8; 48]; + let map = HashMap::from([(known_root, share)]); + + let upstream = vec![make_validator_datum(3, &unknown)]; + let err = convert_validators(upstream, &map, false).unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } + + /// A malformed pubkey from the upstream is surfaced as 502 — the + /// gateway returned data we cannot interpret. + #[test] + fn convert_validators_rejects_malformed_upstream_pubkey() { + let mut datum = make_validator_datum(0, &[0; 48]); + datum.validator.pubkey = "0xnothex".to_owned(); + let err = convert_validators(vec![datum], &HashMap::new(), true).unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + } + + /// `invert_pub_share_map` is the share → root direction needed when + /// translating VC-supplied pubshares back into root pubkeys before the + /// upstream call. + #[test] + fn invert_pub_share_map_round_trips() { + let root = [0x77_u8; 48]; + let share = [0x88_u8; 48]; + let forward = HashMap::from([(root, share)]); + + let inverted = invert_pub_share_map(&forward); + assert_eq!(inverted.get(&share), Some(&root)); + assert_eq!(inverted.len(), 1); + } + + /// End-to-end happy path: the upstream returns one validator keyed by + /// the cluster's root pubkey; the handler rewrites it to the VC's + /// share pubkey before returning. + #[tokio::test] + async fn validators_rewrites_root_pubkeys_to_shares() { + let server = MockServer::start().await; + let root = [0xCA_u8; 48]; + let share = [0xFE_u8; 48]; + let body = GetStateValidatorsResponseResponse { + data: vec![make_validator_datum(42, &root)], + execution_optimistic: false, + finalized: true, + }; + Mock::given(method("POST")) + .and(path("/eth/v1/beacon/states/head/validators")) + .respond_with(ResponseTemplate::new(200).set_body_json(body)) + .expect(1) + .mount(&server) + .await; + + let component = make_component_with_upstream(&server, HashMap::from([(root, share)])); + let response = component + .validators(ValidatorsOpts { + state: "head".to_owned(), + // VC sends the share pubkey it knows. + pubkeys: vec![share], + indices: vec![], + }) + .await + .unwrap(); + + assert_eq!(response.data.len(), 1); + assert_eq!(response.data[0].validator.pubkey, format_bls_pubkey(&share)); + assert_eq!(response.data[0].index, "42"); + assert!(response.finalized); + assert!(!response.execution_optimistic); + assert!(response.dependent_root.is_none()); + } + + /// When the caller filters by pubkey only (no indices), `ignoreNotFound` + /// is `true` per the Go reference, so an upstream entry whose pubkey is + /// not part of this cluster's share map passes through with its root + /// pubkey unchanged. Mirrors `len(opts.Indices) == 0` in + /// `validatorapi.go:1288`. + #[tokio::test] + async fn validators_passes_through_unknown_when_filtering_by_pubkey_only() { + let server = MockServer::start().await; + let known_root = [0x10_u8; 48]; + let share = [0x20_u8; 48]; + let stranger = [0x30_u8; 48]; + let body = GetStateValidatorsResponseResponse { + data: vec![ + make_validator_datum(1, &known_root), + make_validator_datum(2, &stranger), + ], + execution_optimistic: false, + finalized: true, + }; + Mock::given(method("POST")) + .and(path("/eth/v1/beacon/states/head/validators")) + .respond_with(ResponseTemplate::new(200).set_body_json(body)) + .expect(1) + .mount(&server) + .await; + + let component = make_component_with_upstream(&server, HashMap::from([(known_root, share)])); + let response = component + .validators(ValidatorsOpts { + state: "head".to_owned(), + pubkeys: vec![share], + indices: vec![], + }) + .await + .unwrap(); + + assert_eq!(response.data.len(), 2); + assert_eq!(response.data[0].validator.pubkey, format_bls_pubkey(&share)); + // Stranger entry is preserved with the upstream's root pubkey. + assert_eq!( + response.data[1].validator.pubkey, + format_bls_pubkey(&stranger) + ); + } + + /// When the caller filters by index (any non-empty `Indices`), + /// `ignoreNotFound` is `false` per the Go reference, so an upstream + /// validator that does not belong to this cluster surfaces as + /// `INTERNAL_SERVER_ERROR`. Mirrors `len(opts.Indices) == 0 == false` in + /// `validatorapi.go:1288`. + #[tokio::test] + async fn validators_rejects_unknown_pubkey_when_index_filter_used() { + let server = MockServer::start().await; + let known_root = [0x40_u8; 48]; + let share = [0x50_u8; 48]; + let stranger = [0x60_u8; 48]; + let body = GetStateValidatorsResponseResponse { + // The upstream returned a validator we did not ask for — its + // pubkey is not in our share map. + data: vec![make_validator_datum(99, &stranger)], + execution_optimistic: false, + finalized: false, + }; + Mock::given(method("POST")) + .and(path("/eth/v1/beacon/states/head/validators")) + .respond_with(ResponseTemplate::new(200).set_body_json(body)) + .expect(1) + .mount(&server) + .await; + + let component = make_component_with_upstream(&server, HashMap::from([(known_root, share)])); + let err = component + .validators(ValidatorsOpts { + state: "head".to_owned(), + pubkeys: vec![], + indices: vec![99], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } + + /// A pubkey from the VC that is not part of this cluster's share map is + /// rejected as `BAD_REQUEST` before any upstream call. Mirrors Go's + /// `getPubKeyFunc` "unknown public key" error. + #[tokio::test] + async fn validators_rejects_unknown_input_pubshare() { + let server = MockServer::start().await; + // No mock mounted — if the handler reaches the upstream, the call + // will surface as a different (non-400) error. + let root = [0x70_u8; 48]; + let share = [0x80_u8; 48]; + let unknown_share = [0x90_u8; 48]; + let component = make_component_with_upstream(&server, HashMap::from([(root, share)])); + let err = component + .validators(ValidatorsOpts { + state: "head".to_owned(), + pubkeys: vec![unknown_share], + indices: vec![], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// Upstream stall longer than [`UPSTREAM_REQUEST_TIMEOUT`] surfaces as + /// 504. + #[tokio::test(start_paused = true)] + async fn validators_upstream_timeout_returns_504() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/eth/v1/beacon/states/head/validators")) + .respond_with( + ResponseTemplate::new(200) + .set_delay(UPSTREAM_REQUEST_TIMEOUT * 2) + .set_body_json(GetStateValidatorsResponseResponse { + data: vec![], + execution_optimistic: false, + finalized: false, + }), + ) + .mount(&server) + .await; + + let component = make_component_with_upstream(&server, HashMap::new()); + let err = component + .validators(ValidatorsOpts { + state: "head".to_owned(), + pubkeys: vec![], + indices: vec![], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::GATEWAY_TIMEOUT); + } + + /// A malformed pubkey from the upstream surfaces as 502. + #[tokio::test] + async fn validators_malformed_upstream_pubkey_returns_502() { + let server = MockServer::start().await; + let root = [0xA1_u8; 48]; + let share = [0xA2_u8; 48]; + let mut bad = make_validator_datum(1, &root); + bad.validator.pubkey = "not-a-hex-pubkey".to_owned(); + Mock::given(method("POST")) + .and(path("/eth/v1/beacon/states/head/validators")) + .respond_with(ResponseTemplate::new(200).set_body_json( + GetStateValidatorsResponseResponse { + data: vec![bad], + execution_optimistic: false, + finalized: false, + }, + )) + .mount(&server) + .await; + + let component = make_component_with_upstream(&server, HashMap::from([(root, share)])); + let err = component + .validators(ValidatorsOpts { + state: "head".to_owned(), + pubkeys: vec![], + indices: vec![1], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + } + + /// Upstream 400 propagates faithfully; the upstream body must not leak + /// into the client-visible message. + #[tokio::test] + async fn validators_propagates_upstream_400() { + use pluto_eth2api::BlindedBlock400Response; + + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/eth/v1/beacon/states/head/validators")) + .respond_with( + ResponseTemplate::new(400).set_body_json(BlindedBlock400Response { + code: 400.0, + message: "secret upstream message".to_owned(), + stacktraces: None, + }), + ) + .mount(&server) + .await; + + let component = make_component_with_upstream(&server, HashMap::new()); + let err = component + .validators(ValidatorsOpts { + state: "head".to_owned(), + pubkeys: vec![], + indices: vec![], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + assert!(!err.message.contains("secret")); + } + + // ----------------------------------------------------------------------- + // Validator lifecycle: voluntary exit + validator registrations + // ----------------------------------------------------------------------- + + /// Records every `(duty, set)` pair a submit handler fans out to its + /// subscriber, so lifecycle tests can assert the broadcast contents. + type RecordedBroadcasts = Arc>>; + + /// Builds a fully-signing lifecycle component: a beacon mock that serves + /// the signing domains plus `SLOTS_PER_EPOCH`/`SECONDS_PER_SLOT`/genesis + /// time, a populated `validator_index -> root pubkey` cache, a + /// `root -> public-share` map, and a recording subscriber. Returns the + /// component, the held mock, and the broadcast recorder. + async fn make_lifecycle_component( + builder_enabled: bool, + validators: HashMap, + pub_share_by_pubkey: HashMap, + ) -> (Component, BeaconMock, RecordedBroadcasts) { + let mock = BeaconMock::builder() + .spec(signing_spec_fixture()) + .slots_per_epoch(16) + .slot_duration(std::time::Duration::from_secs(12)) + .genesis_time(DateTime::from_timestamp(0, 0).unwrap()) + .genesis_validators_root([0; 32]) + .build() + .await + .unwrap(); + + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-lifecycle-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + let mut component = Component::new( + eth2_cl, + dutydb, + 1, + pub_share_by_pubkey, + builder_enabled, + TestValidatorCache::arc(validators), + ); + + let recorder: RecordedBroadcasts = Arc::new(Mutex::new(Vec::new())); + { + let recorder = Arc::clone(&recorder); + component.subscribe(move |duty, set| { + let recorder = Arc::clone(&recorder); + async move { + recorder.lock().unwrap().push((duty, set)); + Ok(()) + } + }); + } + + (component, mock, recorder) + } + + /// Signs `message_root` under `domain`/`epoch` with `secret` exactly as + /// `signing::verify` reconstructs it, yielding a partial signature the + /// component will accept. + async fn sign_for( + mock: &BeaconMock, + secret: &PrivateKey, + domain: DomainName, + epoch: Epoch, + message_root: Root, + ) -> Signature { + let signing_root = + pluto_eth2util::signing::get_data_root(mock.client(), domain, epoch, message_root) + .await + .unwrap(); + BlstImpl.sign(secret, &signing_root).unwrap() + } + + /// Generates a BLS keypair to act as this node's public share. + fn new_share() -> (PrivateKey, BLSPubKey) { + let secret = BlstImpl + .generate_insecure_secret(rand::rngs::OsRng) + .unwrap(); + let pubshare = BlstImpl.secret_to_public_key(&secret).unwrap(); + (secret, pubshare) + } + + /// A correctly-signed voluntary exit is verified and broadcast as an + /// `Exit` duty at slot `slots_per_epoch * epoch`. + #[tokio::test] + async fn submit_voluntary_exit_verifies_and_broadcasts() { + let (secret, pubshare) = new_share(); + let dv_root = dv_pubkey(0xAA); + let validator_index: ValidatorIndex = 7; + let epoch: Epoch = 3; + + let validators = HashMap::from([(validator_index, dv_root)]); + let map = HashMap::from([(dv_root, pubshare)]); + let (component, mock, recorder) = make_lifecycle_component(false, validators, map).await; + + let mut exit = phase0::SignedVoluntaryExit { + message: phase0::VoluntaryExit { + epoch, + validator_index, + }, + signature: [0; 96], + }; + let message_root = exit.message_root(); + exit.signature = sign_for( + &mock, + &secret, + DomainName::VoluntaryExit, + epoch, + message_root, + ) + .await; + + component.submit_voluntary_exit(exit).await.unwrap(); + + let recorded = recorder.lock().unwrap().clone(); + assert_eq!(recorded.len(), 1); + let (duty, set) = &recorded[0]; + assert_eq!(duty.duty_type, DutyType::Exit); + // slot = slots_per_epoch (16) * epoch (3) + assert_eq!(duty.slot.inner(), 48); + assert!(set.inner().contains_key(&core_pubkey(0xAA))); + } + + /// An exit for an unknown validator index → 400 "validator not found". + #[tokio::test] + async fn submit_voluntary_exit_rejects_unknown_validator() { + let (_secret, _pubshare) = new_share(); + let (component, _mock, _recorder) = + make_lifecycle_component(false, HashMap::new(), HashMap::new()).await; + + let exit = phase0::SignedVoluntaryExit { + message: phase0::VoluntaryExit { + epoch: 1, + validator_index: 99, + }, + signature: [0; 96], + }; + let err = component.submit_voluntary_exit(exit).await.unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + assert_eq!(err.message, "validator not found"); + } + + /// A tampered exit signature is rejected at the verify step. + #[tokio::test] + async fn submit_voluntary_exit_rejects_bad_signature() { + let (secret, pubshare) = new_share(); + let dv_root = dv_pubkey(0xAB); + let validators = HashMap::from([(1, dv_root)]); + let map = HashMap::from([(dv_root, pubshare)]); + let (component, mock, _recorder) = make_lifecycle_component(false, validators, map).await; + + let mut exit = phase0::SignedVoluntaryExit { + message: phase0::VoluntaryExit { + epoch: 1, + validator_index: 1, + }, + signature: [0; 96], + }; + let message_root = exit.message_root(); + let mut sig = sign_for(&mock, &secret, DomainName::VoluntaryExit, 1, message_root).await; + sig[0] ^= 0xFF; + exit.signature = sig; + + let err = component.submit_voluntary_exit(exit).await.unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// Builds an unsigned `v1::ValidatorRegistration` for `pubkey`. + fn registration_message(pubkey: BLSPubKey, timestamp: u64) -> v1::ValidatorRegistration { + v1::ValidatorRegistration { + fee_recipient: [0x11; 20], + gas_limit: 30_000_000, + timestamp, + pubkey, + } + } + + /// With builder mode disabled, registrations are swallowed: no broadcast, + /// no validator/spec lookups. + #[tokio::test] + async fn submit_validator_registrations_swallowed_when_builder_disabled() { + let (_secret, pubshare) = new_share(); + let dv_root = dv_pubkey(0xC0); + let map = HashMap::from([(dv_root, pubshare)]); + let (component, _mock, recorder) = + make_lifecycle_component(false, HashMap::new(), map).await; + + let reg = v1::SignedValidatorRegistration { + message: registration_message(dv_root, 600), + signature: [0; 96], + }; + component + .submit_validator_registrations(vec![reg]) + .await + .unwrap(); + + assert!(recorder.lock().unwrap().is_empty()); + } + + /// An empty registration list is a no-op even with builder mode enabled. + #[tokio::test] + async fn submit_validator_registrations_empty_is_noop() { + let (_secret, _pubshare) = new_share(); + let (component, _mock, recorder) = + make_lifecycle_component(true, HashMap::new(), HashMap::new()).await; + + component + .submit_validator_registrations(vec![]) + .await + .unwrap(); + assert!(recorder.lock().unwrap().is_empty()); + } + + /// A registration for a non-distributed-validator key is swallowed + /// (builder enabled, but the pubkey has no public share registered). + #[tokio::test] + async fn submit_validator_registrations_swallows_non_dv_key() { + let (_secret, _pubshare) = new_share(); + // pub_share map is empty, so any pubkey is "non-DV". + let (component, _mock, recorder) = + make_lifecycle_component(true, HashMap::new(), HashMap::new()).await; + + let reg = v1::SignedValidatorRegistration { + message: registration_message(dv_pubkey(0xC1), 600), + signature: [0; 96], + }; + component + .submit_validator_registrations(vec![reg]) + .await + .unwrap(); + assert!(recorder.lock().unwrap().is_empty()); + } + + /// With builder mode enabled and a known DV key, a correctly-signed + /// registration is verified and broadcast as a `BuilderRegistration` duty + /// whose slot derives from the registration timestamp. + #[tokio::test] + async fn submit_validator_registrations_verifies_and_broadcasts() { + let (secret, pubshare) = new_share(); + let dv_root = dv_pubkey(0xC2); + let map = HashMap::from([(dv_root, pubshare)]); + let (component, mock, recorder) = make_lifecycle_component(true, HashMap::new(), map).await; + + // genesis = 0, slot_duration = 12s → timestamp 600 ⇒ slot 50. + let message = registration_message(dv_root, 600); + let message_root = message.message_root(); + // Builder domain always uses epoch 0. + let signature = sign_for( + &mock, + &secret, + DomainName::ApplicationBuilder, + 0, + message_root, + ) + .await; + let reg = v1::SignedValidatorRegistration { message, signature }; + + component + .submit_validator_registrations(vec![reg]) + .await + .unwrap(); + + let recorded = recorder.lock().unwrap().clone(); + assert_eq!(recorded.len(), 1); + let (duty, set) = &recorded[0]; + assert_eq!(duty.duty_type, DutyType::BuilderRegistration); + assert_eq!(duty.slot.inner(), 50); + assert!(set.inner().contains_key(&core_pubkey(0xC2))); + } + + /// A tampered registration signature is rejected at the verify step. + #[tokio::test] + async fn submit_validator_registrations_rejects_bad_signature() { + let (secret, pubshare) = new_share(); + let dv_root = dv_pubkey(0xC3); + let map = HashMap::from([(dv_root, pubshare)]); + let (component, mock, _recorder) = + make_lifecycle_component(true, HashMap::new(), map).await; + + let message = registration_message(dv_root, 600); + let message_root = message.message_root(); + let mut signature = sign_for( + &mock, + &secret, + DomainName::ApplicationBuilder, + 0, + message_root, + ) + .await; + signature[0] ^= 0xFF; + let reg = v1::SignedValidatorRegistration { message, signature }; + + let err = component + .submit_validator_registrations(vec![reg]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// A batch of two valid DV registrations is processed in order and each is + /// broadcast. + #[tokio::test] + async fn submit_validator_registrations_processes_batch_in_order() { + let (secret_a, pubshare_a) = new_share(); + let (secret_b, pubshare_b) = new_share(); + let dv_a = dv_pubkey(0xD0); + let dv_b = dv_pubkey(0xD1); + let map = HashMap::from([(dv_a, pubshare_a), (dv_b, pubshare_b)]); + let (component, mock, recorder) = make_lifecycle_component(true, HashMap::new(), map).await; + + let mut regs = Vec::new(); + for (dv, secret) in [(dv_a, &secret_a), (dv_b, &secret_b)] { + let message = registration_message(dv, 600); + let message_root = message.message_root(); + let signature = sign_for( + &mock, + secret, + DomainName::ApplicationBuilder, + 0, + message_root, + ) + .await; + regs.push(v1::SignedValidatorRegistration { message, signature }); + } + + component + .submit_validator_registrations(regs) + .await + .unwrap(); + + let recorded = recorder.lock().unwrap().clone(); + assert_eq!(recorded.len(), 2); + assert!(recorded[0].1.inner().contains_key(&core_pubkey(0xD0))); + assert!(recorded[1].1.inner().contains_key(&core_pubkey(0xD1))); + } + + /// A batch halts on the first failing registration: the valid first entry + /// is broadcast, then the bad-signature second entry aborts the loop. + #[tokio::test] + async fn submit_validator_registrations_batch_halts_on_error() { + let (secret_a, pubshare_a) = new_share(); + let (secret_b, pubshare_b) = new_share(); + let dv_a = dv_pubkey(0xD2); + let dv_b = dv_pubkey(0xD3); + let map = HashMap::from([(dv_a, pubshare_a), (dv_b, pubshare_b)]); + let (component, mock, recorder) = make_lifecycle_component(true, HashMap::new(), map).await; + + let message_a = registration_message(dv_a, 600); + let root_a = message_a.message_root(); + let sig_a = sign_for(&mock, &secret_a, DomainName::ApplicationBuilder, 0, root_a).await; + let reg_a = v1::SignedValidatorRegistration { + message: message_a, + signature: sig_a, + }; + + let message_b = registration_message(dv_b, 600); + let root_b = message_b.message_root(); + let mut sig_b = sign_for(&mock, &secret_b, DomainName::ApplicationBuilder, 0, root_b).await; + sig_b[0] ^= 0xFF; + let reg_b = v1::SignedValidatorRegistration { + message: message_b, + signature: sig_b, + }; + + let err = component + .submit_validator_registrations(vec![reg_a, reg_b]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + + // The first (valid) registration was broadcast before the second failed. + let recorded = recorder.lock().unwrap().clone(); + assert_eq!(recorded.len(), 1); + assert!(recorded[0].1.inner().contains_key(&core_pubkey(0xD2))); + } } diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index a32ab9b9..56dea7ec 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -7,32 +7,68 @@ use std::sync::Arc; use axum::{ Json, Router, + body::Bytes, extract::{ - DefaultBodyLimit, Path, Query, Request, State, + DefaultBodyLimit, Path, Query, RawQuery, Request, State, rejection::{JsonRejection, QueryRejection}, }, - http::{HeaderValue, StatusCode, header}, + http::{HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Uri, header}, middleware::{self, Next}, response::{IntoResponse, Response}, routing::{MethodRouter, get, post}, }; +use pluto_crypto::types::PublicKey as BlsPubKey; +use pluto_eth2api::{ + spec::DataVersion, + versioned::{ + SignedBlindedProposalBlock, SignedProposalBlock, VersionedSignedBlindedProposal, + VersionedSignedProposal as RawVersionedSignedProposal, + }, +}; use serde::Deserialize; - -/// Cap on the `POST /eth/v1/validator/duties/{attester,sync}/{epoch}` request -/// bodies. A realistic cluster ships at most a few thousand validator indices; -/// 64 KiB still allows ~10k indices in either numeric or string encoding, -/// well above any plausible workload. -const DUTIES_BODY_LIMIT: usize = 64 * 1024; +use serde_json::{Value, json}; use super::{ error::ApiError, handler::Handler, + metrics::{ApiLatencyTimer, ProxyLatencyTimer}, types::{ AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, AttesterDutiesResponse, - CommitteeIndex, NodeVersionResponse, ProposerDutiesOpts, ProposerDutiesResponse, - SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, ValIndexes, + CommitteeIndex, NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, + ProposerDutiesResponse, SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, ValIndexes, + ValidatorsOpts, }, }; +use crate::signeddata::{ProposalBlock, VersionedSignedProposal}; + +/// Cap on the `POST /eth/v1/validator/duties/{attester,sync}/{epoch}` request +/// bodies. A realistic cluster ships at most a few thousand validator indices; +/// 64 KiB still allows ~10k indices in either numeric or string encoding, +/// well above any plausible workload. +const DUTIES_BODY_LIMIT: usize = 64 * 1024; + +/// Hard cap on the number of validator registrations accepted in a single +/// `register_validator` request. A realistic cluster manages at most a few +/// hundred validators; the cap is set generously above that. Each accepted +/// registration triggers upstream beacon-node calls and a BLS verification, +/// so bounding the count bounds the per-request fan-out a single caller can +/// induce. +const REGISTRATIONS_MAX_LEN: usize = 8192; + +/// Cap on the `POST /eth/v1/validator/register_validator` request body. Sized +/// at [`REGISTRATIONS_MAX_LEN`] SSZ objects (each 180 bytes) so the byte limit +/// and the count limit agree, plus headroom for the more verbose JSON +/// encoding of the same number of entries. +const REGISTRATIONS_BODY_LIMIT: usize = REGISTRATIONS_MAX_LEN * 512; + +/// Response/request header carrying the consensus fork name (e.g. `deneb`). +const VERSION_HEADER: &str = "Eth-Consensus-Version"; +/// Response header signalling whether the returned proposal is blinded. +const EXECUTION_PAYLOAD_BLINDED_HEADER: &str = "Eth-Execution-Payload-Blinded"; +/// Response header carrying the execution payload value, in Wei. +const EXECUTION_PAYLOAD_VALUE_HEADER: &str = "Eth-Execution-Payload-Value"; +/// Response header carrying the consensus block value, in Wei. +const CONSENSUS_BLOCK_VALUE_HEADER: &str = "Eth-Consensus-Block-Value"; /// Query parameters for `GET /eth/v1/validator/attestation_data`. #[derive(Debug, Clone, Deserialize)] @@ -45,22 +81,36 @@ struct AttestationDataQuery { pub(super) struct AppState { /// Request handler invoked by each route. pub handler: Arc, - /// Whether builder mode is enabled. Read by `propose_block_v3`. - #[allow(dead_code, reason = "consumed by propose_block_v3 in a later PR")] + /// Whether builder mode is enabled. Read by `propose_block_v3` to maximise + /// the builder boost factor. pub builder_enabled: bool, + /// Upstream beacon-node base URL. The fallback reverse-proxies every + /// non-DV request here. A `userinfo` component (`user:pass@host`) is + /// applied as HTTP basic auth on the proxied request. + pub upstream_base_url: reqwest::Url, + /// HTTP client used by the reverse-proxy fallback. + pub proxy_client: reqwest::Client, } /// Builds the validator API HTTP router. /// /// Registers the distributed-validator-related endpoints and a fallback -/// that reverse-proxies everything else to the upstream beacon node. +/// that reverse-proxies everything else to `upstream_base_url`. /// /// `builder_enabled` is consumed by `propose_block_v3` to maximise the -/// builder boost factor. -pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { +/// builder boost factor. `upstream_base_url` is the beacon-node address the +/// fallback proxies to; a `user:pass@host` component is applied as HTTP basic +/// auth on each proxied request. +pub fn new_router( + handler: Arc, + builder_enabled: bool, + upstream_base_url: reqwest::Url, +) -> Router { let state = Arc::new(AppState { handler, builder_enabled, + upstream_base_url, + proxy_client: reqwest::Client::new(), }); Router::new() @@ -99,7 +149,8 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { .route("/eth/v2/beacon/blinded_blocks", post(submit_blinded_block)) .route( "/eth/v1/validator/register_validator", - post(submit_validator_registrations), + post(submit_validator_registrations) + .route_layer(DefaultBodyLimit::max(REGISTRATIONS_BODY_LIMIT)), ) .route("/eth/v1/beacon/pool/voluntary_exits", post(submit_exit)) .route("/teku_proposer_config", get(respond_404)) @@ -282,32 +333,224 @@ async fn submit_attestations() { todo!("vapi: submit_attestations"); } -async fn get_validators() { - todo!("vapi: get_validators"); +/// `GET,POST /eth/v1/beacon/states/{state_id}/validators`. +/// +/// Validator ids arrive as repeated/CSV `id` query parameters; when the query +/// carries none and the request has a JSON body, the body's `ids` array is +/// used instead. The whole id batch is dispatched on the first element's +/// `0x` prefix exactly as Charon's `getValidatorsByID` does: all-pubkeys if +/// `ids[0]` begins `0x`, otherwise all decimal indices. +async fn get_validators( + State(state): State>, + Path(state_id): Path, + RawQuery(query): RawQuery, + body: Bytes, +) -> Result, ApiError> { + let mut ids = validator_ids_from_query(query.as_deref()); + if ids.is_empty() && !body.is_empty() { + ids = validator_ids_from_json_body(&body)?; + } + + let opts = validators_opts(state_id, &ids)?; + let response = state.handler.validators(opts).await?; + + let data = serde_json::to_value(&response.data) + .map_err(|err| internal_error("could not serialize validators", err))?; + Ok(Json(json!({ + "execution_optimistic": response.execution_optimistic, + "finalized": response.finalized, + "data": data, + }))) } -async fn get_validator() { - todo!("vapi: get_validator"); +/// `GET /eth/v1/beacon/states/{state_id}/validators/{validator_id}`. +/// +/// Returns a single validator; `404` when the upstream has none and `500` +/// when it unexpectedly returns more than one. Mirrors `getValidator`. +async fn get_validator( + State(state): State>, + Path((state_id, validator_id)): Path<(String, String)>, +) -> Result, ApiError> { + let opts = validators_opts(state_id, std::slice::from_ref(&validator_id))?; + let response = state.handler.validators(opts).await?; + + let mut data = response.data; + match data.len() { + 0 => Err(ApiError::not_found()), + 1 => { + let validator = serde_json::to_value(data.remove(0)) + .map_err(|err| internal_error("could not serialize validator", err))?; + Ok(Json(json!({ + "execution_optimistic": response.execution_optimistic, + "finalized": response.finalized, + "data": validator, + }))) + } + _ => Err(ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "unexpected number of validators", + )), + } } -async fn propose_block_v3() { - todo!("vapi: propose_block_v3"); +/// `GET /eth/v3/validator/blocks/{slot}`. +/// +/// Produces an unsigned (possibly blinded) beacon block. `builder_enabled` +/// maximises the builder boost factor so builder payloads win. The block is +/// returned as JSON with the consensus-version / payload-blinded / value +/// headers Charon sets in `proposeBlockV3`. +async fn propose_block_v3( + State(state): State>, + Path(slot): Path, + RawQuery(query): RawQuery, +) -> Result { + let params = parse_query(query.as_deref()); + + let randao_reveal = hex_query_fixed::<96>(¶ms, "randao_reveal")?; + let graffiti = graffiti_query(¶ms, "graffiti")?; + + // Builder mode gives maximum priority to builder blocks (`u64::MAX`); + // otherwise the factor is `0`. Charon always sends the factor (it is never + // omitted), so use `Some` in both branches. + let builder_boost_factor = Some(if state.builder_enabled { u64::MAX } else { 0 }); + + let response = state + .handler + .proposal(ProposalOpts { + slot, + randao_reveal, + graffiti, + builder_boost_factor, + }) + .await?; + + let proposal = &response.data; + let version = proposal.version(); + let blinded = proposal.is_blinded(); + let execution_value = proposal.execution_payload_value.to_string(); + let consensus_value = proposal.consensus_block_value.to_string(); + + let body = json!({ + "version": version.as_str(), + "execution_payload_blinded": blinded, + "execution_payload_value": execution_value, + "consensus_block_value": consensus_value, + "data": serialize_proposal_block(&proposal.block)?, + }); + + let mut headers = HeaderMap::new(); + insert_header(&mut headers, VERSION_HEADER, version.as_str())?; + insert_header( + &mut headers, + EXECUTION_PAYLOAD_BLINDED_HEADER, + &blinded.to_string(), + )?; + insert_header( + &mut headers, + EXECUTION_PAYLOAD_VALUE_HEADER, + &execution_value, + )?; + insert_header(&mut headers, CONSENSUS_BLOCK_VALUE_HEADER, &consensus_value)?; + + Ok((headers, Json(body)).into_response()) } -async fn submit_proposal() { - todo!("vapi: submit_proposal"); +/// `POST /eth/v{1,2}/beacon/blocks`. +/// +/// Decodes the submitted full signed block, selecting the fork from the +/// `Eth-Consensus-Version` header (JSON or SSZ body per content type), then +/// forwards it to the handler. Mirrors `submitProposal`. +async fn submit_proposal( + State(state): State>, + headers: HeaderMap, + body: Bytes, +) -> Result { + let version = consensus_version_header(&headers)?; + let ssz = request_is_ssz(&headers)?; + + let block = decode_signed_proposal_block(version, &body, ssz)?; + let proposal = VersionedSignedProposal::new(RawVersionedSignedProposal { + version, + blinded: false, + block, + }) + .map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid submitted block").with_source(err) + })?; + + state.handler.submit_proposal(proposal).await?; + Ok(StatusCode::OK.into_response()) } -async fn submit_blinded_block() { - todo!("vapi: submit_blinded_block"); +/// `POST /eth/v{1,2}/beacon/blinded_blocks`. +/// +/// Decodes the submitted blinded signed block, selecting the fork from the +/// `Eth-Consensus-Version` header, then forwards it to the handler. +/// Mirrors `submitBlindedBlock`. +async fn submit_blinded_block( + State(state): State>, + headers: HeaderMap, + body: Bytes, +) -> Result { + let version = consensus_version_header(&headers)?; + let ssz = request_is_ssz(&headers)?; + + let block = decode_signed_blinded_proposal_block(version, &body, ssz)?; + let proposal = VersionedSignedBlindedProposal { version, block }; + + state.handler.submit_blinded_proposal(proposal).await?; + Ok(StatusCode::OK.into_response()) } -async fn submit_validator_registrations() { - todo!("vapi: submit_validator_registrations"); +/// `POST /eth/v1/validator/register_validator`. +/// +/// Decodes an array of signed builder validator registrations (JSON or SSZ +/// per content type) and forwards them to the handler. The SSZ body is a bare +/// concatenation of fixed-size `SignedValidatorRegistration` objects; the JSON +/// body is a plain array. +async fn submit_validator_registrations( + State(state): State>, + headers: HeaderMap, + body: Bytes, +) -> Result { + let ssz = request_is_ssz(&headers)?; + let registrations = decode_signed_validator_registrations(&body, ssz)?; + + state + .handler + .submit_validator_registrations(registrations) + .await?; + Ok(StatusCode::OK.into_response()) } -async fn submit_exit() { - todo!("vapi: submit_exit"); +/// `POST /eth/v1/beacon/pool/voluntary_exits`. +/// +/// Decodes a single signed voluntary exit (JSON only) and forwards it to the +/// handler. +async fn submit_exit( + State(state): State>, + headers: HeaderMap, + body: Bytes, +) -> Result { + // JSON-only endpoint: the beacon API does not define an SSZ encoding for + // voluntary exits, so an SSZ or otherwise unrecognised content type is + // rejected with 415. + if request_is_ssz(&headers)? { + return Err(ApiError::new( + StatusCode::UNSUPPORTED_MEDIA_TYPE, + "Cannot read the supplied content type.", + )); + } + + if body.is_empty() { + return Err(ApiError::new(StatusCode::BAD_REQUEST, "empty request body")); + } + let exit = serde_json::from_slice(&body).map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "failed parsing json request body").with_source(err) + })?; + + state.handler.submit_voluntary_exit(exit).await?; + Ok(StatusCode::OK.into_response()) } async fn beacon_committee_selections() { @@ -334,8 +577,13 @@ async fn submit_contribution_and_proofs() { todo!("vapi: submit_contribution_and_proofs"); } -async fn submit_proposal_preparations() { - todo!("vapi: submit_proposal_preparations"); +/// `POST /eth/v1/validator/prepare_beacon_proposer`. +/// +/// Swallows the fee-recipient preparation: Charon derives the fee recipient +/// from `cluster-lock.json`, so the validator client need not be configured +/// with one. Returns `200` with no body. Mirrors `submitProposalPreparations`. +async fn submit_proposal_preparations() -> impl IntoResponse { + StatusCode::OK } async fn sync_committee_selections() { @@ -354,8 +602,547 @@ async fn respond_404() -> impl IntoResponse { ApiError::not_found() } -async fn proxy_handler() { - todo!("vapi: proxy_handler"); +/// Reverse-proxy fallback: forwards every request not handled by a registered +/// distributed-validator route to the upstream beacon node. Mirrors +/// `proxyHandler`. +/// +/// Basic-auth credentials in the upstream URL's `userinfo` are applied to the +/// proxied request and the `Host` header is rewritten to the upstream host, +/// matching Charon's reverse-proxy director. The upstream response body is +/// streamed straight through (not buffered), so long-lived endpoints such as +/// the SSE `/eth/v1/events` stream proxy incrementally. Charon clones the +/// request with the lifecycle context so in-flight proxied requests are +/// cancelled on soft shutdown; here the proxied request inherits the axum +/// request's own lifetime, which is cancelled when the connection/server is +/// torn down. +async fn proxy_handler( + State(state): State>, + method: Method, + uri: Uri, + headers: HeaderMap, + body: Bytes, +) -> Result { + let path = uri.path().to_owned(); + let _proxy_timer = ProxyLatencyTimer::start(&path); + let _api_timer = ApiLatencyTimer::start("proxy"); + + // Build the target URL: upstream base + request path (+ query). The + // userinfo is stripped from the URL and applied as a basic-auth header + // instead (below), mirroring Charon's reverse-proxy director and avoiding + // a duplicate Authorization header from URL-embedded credentials. + let mut target = state.upstream_base_url.clone(); + target.set_path(uri.path()); + target.set_query(uri.query()); + // These setters only fail on cannot-be-a-base URLs, which an HTTP(S) base + // URL never is; ignore the result to keep the proxy infallible here. + let _ = target.set_username(""); + let _ = target.set_password(None); + + let reqwest_method = reqwest::Method::from_bytes(method.as_str().as_bytes()) + .map_err(|err| internal_error("invalid proxy method", err))?; + + let mut request = state + .proxy_client + .request(reqwest_method, target.clone()) + .body(reqwest::Body::from(body)); + + // When the upstream URL carries credentials we own the auth, so the + // client's own Authorization header must not be relayed (it would produce + // a second, conflicting Authorization header on the proxied request). + let upstream_user = state.upstream_base_url.username(); + let has_upstream_auth = !upstream_user.is_empty(); + + // Forward request headers, skipping the Host (rewritten below), + // Content-Length (reqwest sets it from the body), the hop-by-hop headers a + // proxy must not relay, and — when we apply our own basic auth — the + // client Authorization header. + for (name, value) in &headers { + if name == header::HOST + || name == header::CONTENT_LENGTH + || is_hop_by_hop_header(name) + || (has_upstream_auth && name == header::AUTHORIZATION) + { + continue; + } + request = request.header(name.as_str(), value.as_bytes()); + } + if let Some(host) = target.host_str() { + let host_header = match target.port() { + Some(port) => format!("{host}:{port}"), + None => host.to_owned(), + }; + request = request.header(header::HOST, host_header); + } + + // Apply basic auth from the upstream URL's userinfo, if present. + if has_upstream_auth { + request = request.basic_auth(upstream_user, state.upstream_base_url.password()); + } + + let upstream = request.send().await.map_err(|err| { + ApiError::new( + StatusCode::BAD_GATEWAY, + "proxy request to beacon node failed", + ) + .with_source(err) + })?; + + let status = StatusCode::from_u16(upstream.status().as_u16()) + .map_err(|err| internal_error("invalid upstream status", err))?; + + // Re-emit upstream response headers, dropping Content-Length (axum derives + // it from the streamed body) and hop-by-hop headers. + let mut response_headers = HeaderMap::new(); + for (name, value) in upstream.headers() { + if let (Ok(name), Ok(value)) = ( + HeaderName::from_bytes(name.as_str().as_bytes()), + HeaderValue::from_bytes(value.as_bytes()), + ) { + if name == header::CONTENT_LENGTH || is_hop_by_hop_header(&name) { + continue; + } + response_headers.append(name, value); + } + } + + // Stream the body straight through rather than buffering it, so + // long-lived/streaming endpoints (e.g. the SSE `/eth/v1/events`) are + // proxied incrementally. Charon achieves the same with a flushing reverse + // proxy writer. + let body = axum::body::Body::from_stream(upstream.bytes_stream()); + + Ok((status, response_headers, body).into_response()) +} + +/// Reports whether `name` is an HTTP hop-by-hop header that a proxy must not +/// forward end to end (RFC 7230 §6.1). Comparison is case-insensitive via the +/// normalised [`HeaderName`]. +fn is_hop_by_hop_header(name: &HeaderName) -> bool { + matches!( + name.as_str(), + "connection" + | "keep-alive" + | "proxy-authenticate" + | "proxy-authorization" + | "te" + | "trailer" + | "transfer-encoding" + | "upgrade" + ) +} + +/// Parses a raw URL query string into decoded `(key, value)` pairs, preserving +/// order and duplicate keys. An absent query yields an empty list. +fn parse_query(query: Option<&str>) -> Vec<(String, String)> { + match query { + Some(q) => url::form_urlencoded::parse(q.as_bytes()) + .map(|(k, v)| (k.into_owned(), v.into_owned())) + .collect(), + None => Vec::new(), + } +} + +/// Collects validator ids from the `id` query parameter, splitting CSV values +/// and trimming each, mirroring Charon's `getQueryArrayParameter`. +fn validator_ids_from_query(query: Option<&str>) -> Vec { + parse_query(query) + .into_iter() + .filter(|(key, _)| key == "id") + .flat_map(|(_, value)| { + value + .split(',') + .map(|id| id.trim().to_owned()) + .collect::>() + }) + .collect() +} + +/// Validator-ids POST body: `{ "ids": [...] }`. Mirrors +/// `getValidatorIDsFromJSON`. +#[derive(Debug, Deserialize)] +struct ValidatorIdsBody { + #[serde(default)] + ids: Vec, +} + +/// Extracts validator ids from a JSON POST body. A parse failure surfaces as +/// `400`, matching Charon's wrapped "failed to parse request body" error. +fn validator_ids_from_json_body(body: &[u8]) -> Result, ApiError> { + let parsed: ValidatorIdsBody = serde_json::from_slice(body).map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "failed to parse request body").with_source(err) + })?; + Ok(parsed.ids) +} + +/// Builds [`ValidatorsOpts`] from a state id and a batch of validator ids. +/// +/// The whole batch is dispatched on `ids[0]`'s `0x` prefix exactly as Charon's +/// `getValidatorsByID` does: if the first id is `0x`-prefixed every id is +/// parsed as a public key, otherwise every id is parsed as a decimal validator +/// index. An empty batch forwards no filter. +fn validators_opts(state: String, ids: &[String]) -> Result { + let mut pubkeys = Vec::new(); + let mut indices = Vec::new(); + + if ids.first().is_some_and(|id| id.starts_with("0x")) { + for id in ids { + pubkeys.push(parse_pubkey_id(id)?); + } + } else { + for id in ids { + let index = id.parse::().map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid validator index").with_source(err) + })?; + indices.push(index); + } + } + + Ok(ValidatorsOpts { + state, + pubkeys, + indices, + }) +} + +/// Parses a `0x`-prefixed 48-byte hex public key. +fn parse_pubkey_id(id: &str) -> Result { + let stripped = id.strip_prefix("0x").unwrap_or(id); + let bytes = hex::decode(stripped).map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid validator public key hex").with_source(err) + })?; + bytes.as_slice().try_into().map_err(|_| { + ApiError::new( + StatusCode::BAD_REQUEST, + "invalid validator public key length", + ) + }) +} + +/// Returns the value of the first query parameter named `name`, if present. +fn query_value<'a>(params: &'a [(String, String)], name: &str) -> Option<&'a str> { + params + .iter() + .find(|(key, _)| key == name) + .map(|(_, value)| value.as_str()) +} + +/// Decodes a required fixed-length `0x`-hex query parameter into an `N`-byte +/// array. Mirrors Charon's `hexQueryFixed`. +fn hex_query_fixed( + params: &[(String, String)], + name: &str, +) -> Result<[u8; N], ApiError> { + optional_hex_query_fixed::(params, name)?.ok_or_else(|| { + ApiError::new( + StatusCode::BAD_REQUEST, + format!("missing 0x-hex query parameter {name}"), + ) + }) +} + +/// Decodes an optional fixed-length `0x`-hex query parameter into an `N`-byte +/// array. Returns `None` when absent; rejects wrong lengths. Mirrors Charon's +/// `hexQuery` + `hexQueryFixed` length check. +fn optional_hex_query_fixed( + params: &[(String, String)], + name: &str, +) -> Result, ApiError> { + let Some(value) = query_value(params, name) else { + return Ok(None); + }; + let stripped = value.strip_prefix("0x").unwrap_or(value); + let bytes = hex::decode(stripped).map_err(|err| { + ApiError::new( + StatusCode::BAD_REQUEST, + format!("invalid 0x-hex query parameter {name} [{value}]"), + ) + .with_source(err) + })?; + let array: [u8; N] = bytes.as_slice().try_into().map_err(|_| { + ApiError::new( + StatusCode::BAD_REQUEST, + format!("invalid length for 0x-hex query parameter {name}, expect {N} bytes"), + ) + })?; + Ok(Some(array)) +} + +/// Decodes the optional `graffiti` query parameter into a 32-byte array. +/// +/// Graffiti is lenient on length, mirroring Charon's `getProposeBlockParams` +/// (`hexQuery` + `copy(graffiti[:], graffitiBytes)`): any-length hex is +/// accepted, then left-aligned into 32 bytes — longer input is truncated and +/// shorter input is zero-padded. An absent parameter yields all-zero graffiti. +fn graffiti_query(params: &[(String, String)], name: &str) -> Result<[u8; 32], ApiError> { + let Some(value) = query_value(params, name) else { + return Ok([0u8; 32]); + }; + let stripped = value.strip_prefix("0x").unwrap_or(value); + let bytes = hex::decode(stripped).map_err(|err| { + ApiError::new( + StatusCode::BAD_REQUEST, + format!("invalid 0x-hex query parameter {name} [{value}]"), + ) + .with_source(err) + })?; + let mut graffiti = [0u8; 32]; + let len = bytes.len().min(32); + graffiti[..len].copy_from_slice(&bytes[..len]); + Ok(graffiti) +} + +/// Parses the `Eth-Consensus-Version` request header into a [`DataVersion`]. +/// +/// The header is matched case-insensitively (lowercased before lookup) to +/// mirror go-eth2-client's `DataVersion.UnmarshalJSON`. A missing or +/// unrecognised value is a `400`, matching Charon's "missing consensus version +/// header". +fn consensus_version_header(headers: &HeaderMap) -> Result { + let missing = || ApiError::new(StatusCode::BAD_REQUEST, "missing consensus version header"); + let raw = headers.get(VERSION_HEADER).ok_or_else(missing)?; + let value = raw.to_str().map_err(|_| missing())?.to_ascii_lowercase(); + match value.as_str() { + "phase0" => Ok(DataVersion::Phase0), + "altair" => Ok(DataVersion::Altair), + "bellatrix" => Ok(DataVersion::Bellatrix), + "capella" => Ok(DataVersion::Capella), + "deneb" => Ok(DataVersion::Deneb), + "electra" => Ok(DataVersion::Electra), + "fulu" => Ok(DataVersion::Fulu), + _ => Err(missing()), + } +} + +/// Classifies the request body encoding from its `Content-Type`, mirroring +/// Charon's `wrap` content negotiation for JSON+SSZ endpoints: a missing or +/// `application/json` header is JSON, `application/octet-stream` is SSZ, and +/// anything else is rejected with `415 Unsupported Media Type` carrying the +/// offending content type. Returns `true` for SSZ. +fn request_is_ssz(headers: &HeaderMap) -> Result { + let Some(value) = headers.get(header::CONTENT_TYPE) else { + return Ok(false); + }; + // A present but non-ASCII header is unrecognised, not JSON: surface it as + // 415 like any other unsupported type rather than silently defaulting. + let unsupported = || { + ApiError::new( + StatusCode::UNSUPPORTED_MEDIA_TYPE, + format!("unsupported media type {value:?}"), + ) + }; + let value = value.to_str().map_err(|_| unsupported())?; + if value.is_empty() || value.contains("application/json") { + Ok(false) + } else if value.contains("application/octet-stream") { + Ok(true) + } else { + Err(ApiError::new( + StatusCode::UNSUPPORTED_MEDIA_TYPE, + format!("unsupported media type {value}"), + )) + } +} + +/// Decodes the `register_validator` request body into a list of signed +/// validator registrations. JSON bodies are a plain array; SSZ bodies are a +/// bare concatenation of fixed-size objects. An empty body or a JSON parse +/// failure surfaces as `400`; an SSZ parse failure as `415`. The decoded list +/// is capped at [`REGISTRATIONS_MAX_LEN`] entries (`400` when exceeded) so a +/// single caller cannot drive an unbounded per-request fan-out of upstream +/// calls and BLS verifications. +fn decode_signed_validator_registrations( + body: &[u8], + ssz: bool, +) -> Result, ApiError> { + if body.is_empty() { + return Err(ApiError::new(StatusCode::BAD_REQUEST, "empty request body")); + } + + let registrations = if ssz { + crate::ssz_codec::decode_signed_validator_registrations(body).map_err(|err| { + ApiError::new( + StatusCode::UNSUPPORTED_MEDIA_TYPE, + "failed parsing ssz request body", + ) + .with_source(err) + })? + } else { + serde_json::from_slice(body).map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "failed parsing json request body") + .with_source(err) + })? + }; + + if registrations.len() > REGISTRATIONS_MAX_LEN { + return Err(ApiError::new( + StatusCode::BAD_REQUEST, + format!("too many validator registrations (max {REGISTRATIONS_MAX_LEN})"), + )); + } + + Ok(registrations) +} + +/// Decodes a submitted full signed proposal block (JSON or SSZ) for the given +/// fork. A decode failure surfaces as `400`, mirroring Charon's +/// "invalid submitted block". +fn decode_signed_proposal_block( + version: DataVersion, + body: &[u8], + ssz: bool, +) -> Result { + if body.is_empty() { + return Err(ApiError::new(StatusCode::BAD_REQUEST, "empty request body")); + } + let invalid = |source: Box| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid submitted block").with_boxed_source(source) + }; + + if ssz { + return crate::ssz_codec::decode_signed_proposal_block_body(version, body) + .map_err(|err| invalid(Box::new(err))); + } + + let value: Value = serde_json::from_slice(body).map_err(|err| invalid(Box::new(err)))?; + decode_signed_proposal_block_json(version, value).map_err(invalid) +} + +/// Selects the per-fork (non-blinded) `SignedProposalBlock` variant and parses +/// the JSON block body into it. Mirrors the `submitProposal` version switch. +fn decode_signed_proposal_block_json( + version: DataVersion, + value: Value, +) -> Result> { + Ok(match version { + DataVersion::Phase0 => SignedProposalBlock::Phase0(serde_json::from_value(value)?), + DataVersion::Altair => SignedProposalBlock::Altair(serde_json::from_value(value)?), + DataVersion::Bellatrix => SignedProposalBlock::Bellatrix(serde_json::from_value(value)?), + DataVersion::Capella => SignedProposalBlock::Capella(serde_json::from_value(value)?), + DataVersion::Deneb => SignedProposalBlock::Deneb(serde_json::from_value(value)?), + DataVersion::Electra => SignedProposalBlock::Electra(serde_json::from_value(value)?), + DataVersion::Fulu => SignedProposalBlock::Fulu(serde_json::from_value(value)?), + DataVersion::Unknown => return Err("unknown consensus version".into()), + }) +} + +/// Decodes a submitted blinded signed proposal block (JSON or SSZ) for the +/// given fork. Mirrors `submitBlindedBlock`. +fn decode_signed_blinded_proposal_block( + version: DataVersion, + body: &[u8], + ssz: bool, +) -> Result { + if body.is_empty() { + return Err(ApiError::new(StatusCode::BAD_REQUEST, "empty request body")); + } + let invalid = |source: Box| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid submitted blinded block") + .with_boxed_source(source) + }; + + if ssz { + return crate::ssz_codec::decode_signed_blinded_proposal_block_body(version, body) + .map_err(|err| invalid(Box::new(err))); + } + + let value: Value = serde_json::from_slice(body).map_err(|err| invalid(Box::new(err)))?; + decode_signed_blinded_proposal_block_json(version, value).map_err(invalid) +} + +/// Selects the per-fork blinded variant and parses the JSON block body into +/// it. Mirrors the `submitBlindedBlock` version switch; pre-Bellatrix forks +/// have no blinded form and are rejected. +fn decode_signed_blinded_proposal_block_json( + version: DataVersion, + value: Value, +) -> Result> { + Ok(match version { + DataVersion::Bellatrix => { + SignedBlindedProposalBlock::Bellatrix(serde_json::from_value(value)?) + } + DataVersion::Capella => SignedBlindedProposalBlock::Capella(serde_json::from_value(value)?), + DataVersion::Deneb => SignedBlindedProposalBlock::Deneb(serde_json::from_value(value)?), + DataVersion::Electra => SignedBlindedProposalBlock::Electra(serde_json::from_value(value)?), + // Fulu blinded blocks share the Electra layout. + DataVersion::Fulu => SignedBlindedProposalBlock::Fulu(serde_json::from_value(value)?), + DataVersion::Phase0 | DataVersion::Altair | DataVersion::Unknown => { + return Err("invalid blinded block version".into()); + } + }) +} + +/// Serializes an unsigned [`ProposalBlock`] to the JSON shape Charon's +/// `createProposeBlockResponse` puts in the `data` field: the bare block for +/// pre-Deneb forks (and all blinded forks), and the `BlockContents` object +/// (`{ block, kzg_proofs, blobs }`) for Deneb, Electra, and Fulu full blocks. +fn serialize_proposal_block(block: &ProposalBlock) -> Result { + let to_value = |value: Result| { + value.map_err(|err| internal_error("could not serialize proposal block", err)) + }; + match block { + ProposalBlock::Phase0(b) => to_value(serde_json::to_value(b)), + ProposalBlock::Altair(b) => to_value(serde_json::to_value(b)), + ProposalBlock::Bellatrix(b) => to_value(serde_json::to_value(b)), + ProposalBlock::BellatrixBlinded(b) => to_value(serde_json::to_value(b)), + ProposalBlock::Capella(b) => to_value(serde_json::to_value(b)), + ProposalBlock::CapellaBlinded(b) => to_value(serde_json::to_value(b)), + ProposalBlock::DenebBlinded(b) => to_value(serde_json::to_value(b)), + ProposalBlock::ElectraBlinded(b) => to_value(serde_json::to_value(b)), + ProposalBlock::FuluBlinded(b) => to_value(serde_json::to_value(b)), + ProposalBlock::Deneb { + block, + kzg_proofs, + blobs, + } => block_contents_value(block.as_ref(), kzg_proofs, blobs), + // Electra and Fulu full blocks both carry an `electra::BeaconBlock`. + ProposalBlock::Electra { + block, + kzg_proofs, + blobs, + } + | ProposalBlock::Fulu { + block, + kzg_proofs, + blobs, + } => block_contents_value(block.as_ref(), kzg_proofs, blobs), + } +} + +/// Builds the `BlockContents` JSON object (`{ block, kzg_proofs, blobs }`) for +/// a Deneb-or-later full proposal, matching go-eth2-client's +/// `apiv1.BlockContents` wire shape. +fn block_contents_value( + block: &B, + kzg_proofs: &[pluto_eth2api::spec::deneb::KZGProof], + blobs: &[pluto_eth2api::spec::deneb::Blob], +) -> Result { + Ok(json!({ + "block": serde_json::to_value(block) + .map_err(|err| internal_error("could not serialize block", err))?, + "kzg_proofs": serde_json::to_value(kzg_proofs) + .map_err(|err| internal_error("could not serialize kzg_proofs", err))?, + "blobs": serde_json::to_value(blobs) + .map_err(|err| internal_error("could not serialize blobs", err))?, + })) +} + +/// Inserts a header, mapping an invalid name or value into a `500` (both are +/// derived from internal data here, so a failure is a bug, not bad input). +fn insert_header(headers: &mut HeaderMap, name: &'static str, value: &str) -> Result<(), ApiError> { + let header_name = HeaderName::from_bytes(name.as_bytes()) + .map_err(|err| internal_error("invalid header name", err))?; + let header_value = + HeaderValue::from_str(value).map_err(|err| internal_error("invalid header value", err))?; + headers.insert(header_name, header_value); + Ok(()) +} + +/// Builds a `500 Internal Server Error` [`ApiError`] with an attached source. +fn internal_error(message: &'static str, source: E) -> ApiError +where + E: std::error::Error + Send + Sync + 'static, +{ + ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, message).with_source(source) } #[cfg(test)] @@ -371,12 +1158,31 @@ mod tests { }, }; + /// Placeholder upstream URL for tests that never reach the proxy fallback. + fn test_upstream_url() -> reqwest::Url { + "http://127.0.0.1:0".parse().expect("valid test url") + } + + /// Builds an [`AppState`] around `handler` for direct-handler unit tests. + /// `builder_enabled` defaults off; the upstream URL is a placeholder. + fn test_state(handler: Arc) -> Arc { + Arc::new(AppState { + handler, + builder_enabled: false, + upstream_base_url: test_upstream_url(), + proxy_client: reqwest::Client::new(), + }) + } + + /// Builds a router for oneshot tests with the proxy disabled (placeholder + /// upstream) and the given builder mode. + fn test_router(handler: Arc, builder_enabled: bool) -> Router { + new_router(handler, builder_enabled, test_upstream_url()) + } + #[tokio::test] async fn node_version_wraps_handler_value() { - let state = Arc::new(AppState { - handler: Arc::new(TestHandler::with_version("pluto/test/v1.0")), - builder_enabled: false, - }); + let state = test_state(Arc::new(TestHandler::with_version("pluto/test/v1.0"))); let Json(body) = node_version(State(state)).await.unwrap(); @@ -399,10 +1205,7 @@ mod tests { dependent_root: "0xab".to_owned(), execution_optimistic: false, }); - let state = Arc::new(AppState { - handler: Arc::new(handler), - builder_enabled: false, - }); + let state = test_state(Arc::new(handler)); let Json(body) = attester_duties( State(state), @@ -432,10 +1235,7 @@ mod tests { data: vec![duty], execution_optimistic: true, }); - let state = Arc::new(AppState { - handler: Arc::new(handler), - builder_enabled: false, - }); + let state = test_state(Arc::new(handler)); let Json(body) = sync_committee_duties( State(state), @@ -468,10 +1268,7 @@ mod tests { }; let handler = TestHandler::default().with_attestation_data(AttestationDataResponse { data }); - let state = Arc::new(AppState { - handler: Arc::new(handler), - builder_enabled: false, - }); + let state = test_state(Arc::new(handler)); let Json(body) = attestation_data( State(state), @@ -513,10 +1310,7 @@ mod tests { dependent_root: "0xcd".to_owned(), execution_optimistic: true, }); - let state = Arc::new(AppState { - handler: Arc::new(handler), - builder_enabled: false, - }); + let state = test_state(Arc::new(handler)); let Json(body) = proposer_duties(State(state), Path(99u64)).await.unwrap(); @@ -548,7 +1342,7 @@ mod tests { target: phase0::Checkpoint::default(), }, }); - let app = new_router(Arc::new(handler), false); + let app = test_router(Arc::new(handler), false); // Missing `committee_index`. let req = Request::builder() @@ -586,7 +1380,7 @@ mod tests { use tower::ServiceExt; let handler = TestHandler::default(); - let app = new_router(Arc::new(handler), false); + let app = test_router(Arc::new(handler), false); // 128 KiB of zeros — well past the 64 KiB cap, valid JSON or not. let big = vec![b'0'; 128 * 1024]; @@ -614,7 +1408,7 @@ mod tests { }; use tower::ServiceExt; - let app = new_router(Arc::new(TestHandler::default()), false); + let app = test_router(Arc::new(TestHandler::default()), false); // Valid JSON, wrong shape (object, not an array) — axum's default // would surface this as a 422 type error. @@ -650,7 +1444,7 @@ mod tests { dependent_root: "0x00".to_owned(), execution_optimistic: false, }); - let app = new_router(Arc::new(handler), false); + let app = test_router(Arc::new(handler), false); // No Content-Type header at all. let req = Request::builder() @@ -673,7 +1467,7 @@ mod tests { }; use tower::ServiceExt; - let app = new_router(Arc::new(TestHandler::default()), false); + let app = test_router(Arc::new(TestHandler::default()), false); let req = Request::builder() .method(Method::POST) @@ -730,4 +1524,810 @@ mod tests { let bad = serde_json::from_str::("[-1]"); assert!(bad.is_err()); } + + // ----------------------------------------------------------------------- + // PR 1: proxy + proposal/validators handler tests + // ----------------------------------------------------------------------- + + use alloy::primitives::U256; + use axum::{ + body::{Body, to_bytes}, + http::{Method, Request}, + }; + use pluto_eth2api::{ + GetStateValidatorsResponseResponseDatum as ValidatorDatum, ValidatorResponseValidator, + ValidatorStatus, + spec::{bellatrix, phase0 as p0}, + }; + use tower::ServiceExt; + + // `ProposalBlock`, `SignedProposalBlock`, `SignedBlindedProposalBlock` and + // `DataVersion` come in via `super::*`. + use crate::{signeddata::VersionedProposal, validatorapi::types::EthResponse}; + + fn empty_sync_bits() -> pluto_ssz::BitVector<512> { + pluto_ssz::BitVector::new() + } + + /// Minimal phase0 unsigned beacon block at `slot`. + fn phase0_unsigned_block(slot: u64) -> p0::BeaconBlock { + p0::BeaconBlock { + slot, + proposer_index: 7, + parent_root: [0; 32], + state_root: [0; 32], + body: p0::BeaconBlockBody { + randao_reveal: [0; 96], + eth1_data: p0::ETH1Data { + deposit_root: [0; 32], + deposit_count: 0, + block_hash: [0; 32], + }, + graffiti: [0; 32], + proposer_slashings: vec![].into(), + attester_slashings: vec![].into(), + attestations: vec![].into(), + deposits: vec![].into(), + voluntary_exits: vec![].into(), + }, + } + } + + /// Phase0 unsigned `VersionedProposal` returned by the proposal handler. + fn phase0_proposal(slot: u64) -> VersionedProposal { + VersionedProposal { + block: ProposalBlock::Phase0(phase0_unsigned_block(slot)), + consensus_block_value: U256::from(1u8), + execution_payload_value: U256::from(1u8), + } + } + + /// Phase0 signed beacon block for submit tests. + fn phase0_signed_block(slot: u64) -> p0::SignedBeaconBlock { + p0::SignedBeaconBlock { + message: phase0_unsigned_block(slot), + signature: [0; 96], + } + } + + /// Bellatrix blinded signed block for blinded-submit tests. + fn bellatrix_blinded_signed_block(slot: u64) -> bellatrix::SignedBlindedBeaconBlock { + let header = bellatrix::ExecutionPayloadHeader { + parent_hash: [0; 32], + fee_recipient: [0; 20], + state_root: [0; 32], + receipts_root: [0; 32], + logs_bloom: [0; 256], + prev_randao: [0; 32], + block_number: 0, + gas_limit: 30_000_000, + gas_used: 0, + timestamp: 0, + extra_data: vec![].into(), + base_fee_per_gas: U256::ZERO, + block_hash: [0; 32], + transactions_root: [0; 32], + }; + let block = bellatrix::BlindedBeaconBlock { + slot, + proposer_index: 7, + parent_root: [0; 32], + state_root: [0; 32], + body: bellatrix::BlindedBeaconBlockBody { + randao_reveal: [0; 96], + eth1_data: p0::ETH1Data { + deposit_root: [0; 32], + deposit_count: 0, + block_hash: [0; 32], + }, + graffiti: [0; 32], + proposer_slashings: vec![].into(), + attester_slashings: vec![].into(), + attestations: vec![].into(), + deposits: vec![].into(), + voluntary_exits: vec![].into(), + sync_aggregate: pluto_eth2api::spec::altair::SyncAggregate { + sync_committee_bits: empty_sync_bits(), + sync_committee_signature: [0; 96], + }, + execution_payload_header: header, + }, + }; + bellatrix::SignedBlindedBeaconBlock { + message: block, + signature: [0; 96], + } + } + + fn sample_validator_datum(index: u64, pubkey_hex: &str) -> ValidatorDatum { + ValidatorDatum { + index: index.to_string(), + balance: "32000000000".to_owned(), + status: ValidatorStatus::ActiveOngoing, + validator: ValidatorResponseValidator { + pubkey: pubkey_hex.to_owned(), + withdrawal_credentials: format!("0x{}", "00".repeat(32)), + effective_balance: "32000000000".to_owned(), + slashed: false, + activation_eligibility_epoch: "0".to_owned(), + activation_epoch: "0".to_owned(), + exit_epoch: "18446744073709551615".to_owned(), + withdrawable_epoch: "18446744073709551615".to_owned(), + }, + } + } + + async fn body_json(response: Response) -> serde_json::Value { + let bytes = to_bytes(response.into_body(), 4 * 1024 * 1024) + .await + .unwrap(); + serde_json::from_slice(&bytes).unwrap() + } + + /// `submit_proposal_preparations` swallows the request and returns 200. + #[tokio::test] + async fn prepare_beacon_proposer_swallows_and_returns_200() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/prepare_beacon_proposer") + .header("content-type", "application/json") + .body(Body::from("[]")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + } + + /// `propose_block_v3` returns the versioned block plus the four + /// consensus/value response headers; builder mode maximises the boost. + #[tokio::test] + async fn propose_block_v3_returns_block_with_headers() { + let handler = TestHandler::default().with_proposal(EthResponse { + data: phase0_proposal(42), + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let opts_handle = handler.proposal_opts.clone(); + let app = test_router(Arc::new(handler), true); + + let randao = format!("0x{}", "ab".repeat(96)); + let req = Request::builder() + .uri(format!( + "/eth/v3/validator/blocks/42?randao_reveal={randao}" + )) + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let headers = resp.headers(); + assert_eq!(headers.get(VERSION_HEADER).unwrap(), "phase0"); + assert_eq!( + headers.get(EXECUTION_PAYLOAD_BLINDED_HEADER).unwrap(), + "false" + ); + assert_eq!(headers.get(EXECUTION_PAYLOAD_VALUE_HEADER).unwrap(), "1"); + assert_eq!(headers.get(CONSENSUS_BLOCK_VALUE_HEADER).unwrap(), "1"); + + let json = body_json(resp).await; + assert_eq!(json["version"], "phase0"); + assert_eq!(json["execution_payload_blinded"], false); + assert_eq!(json["data"]["slot"], "42"); + + // builder_enabled → boost factor maxed. + let opts = opts_handle.lock().unwrap().clone().unwrap(); + assert_eq!(opts.slot, 42); + assert_eq!(opts.builder_boost_factor, Some(u64::MAX)); + assert_eq!(opts.randao_reveal, [0xab; 96]); + } + + /// Graffiti is length-lenient: a short value is zero-padded into the + /// 32-byte array, matching Charon's `copy(graffiti[:], graffitiBytes)`. + #[tokio::test] + async fn propose_block_v3_pads_short_graffiti() { + let handler = TestHandler::default().with_proposal(EthResponse { + data: phase0_proposal(42), + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let opts_handle = handler.proposal_opts.clone(); + let app = test_router(Arc::new(handler), false); + + let randao = format!("0x{}", "ab".repeat(96)); + let req = Request::builder() + .uri(format!( + "/eth/v3/validator/blocks/42?randao_reveal={randao}&graffiti=0xdeadbeef" + )) + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let opts = opts_handle.lock().unwrap().clone().unwrap(); + let mut expected = [0u8; 32]; + expected[..4].copy_from_slice(&[0xde, 0xad, 0xbe, 0xef]); + assert_eq!(opts.graffiti, expected); + // builder disabled → boost factor 0 (always sent, never omitted). + assert_eq!(opts.builder_boost_factor, Some(0)); + } + + /// Missing `randao_reveal` is a 400. + #[tokio::test] + async fn propose_block_v3_rejects_missing_randao() { + let handler = TestHandler::default().with_proposal(EthResponse { + data: phase0_proposal(42), + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let app = test_router(Arc::new(handler), false); + let req = Request::builder() + .uri("/eth/v3/validator/blocks/42") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// `submit_proposal` decodes the JSON block keyed by the version header and + /// forwards it; the handler records the right version + blinded flag. + #[tokio::test] + async fn submit_proposal_decodes_and_forwards() { + let handler = TestHandler::default(); + let submitted = handler.submitted_proposal.clone(); + let app = test_router(Arc::new(handler), false); + + let block = SignedProposalBlock::Phase0(phase0_signed_block(9)); + let body = serde_json::to_vec(&block).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/blocks") + .header("content-type", "application/json") + .header(VERSION_HEADER, "phase0") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = submitted.lock().unwrap().clone().unwrap(); + assert_eq!(got.0.version, DataVersion::Phase0); + assert!(!got.0.blinded); + } + + /// `submit_proposal` decodes an SSZ (`application/octet-stream`) body via + /// the bare per-fork block codec keyed by the version header. + #[tokio::test] + async fn submit_proposal_decodes_ssz_body() { + use ssz::Encode; + + let handler = TestHandler::default(); + let submitted = handler.submitted_proposal.clone(); + let app = test_router(Arc::new(handler), false); + + // The SSZ body is the bare per-fork block, not the Charon versioned + // wire format. + let body = phase0_signed_block(9).as_ssz_bytes(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/blocks") + .header("content-type", "application/octet-stream") + .header(VERSION_HEADER, "phase0") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = submitted.lock().unwrap().clone().unwrap(); + assert_eq!(got.0.version, DataVersion::Phase0); + assert!(matches!(got.0.block, SignedProposalBlock::Phase0(_))); + } + + /// A capitalised version header is accepted (case-insensitive, mirroring + /// go-eth2-client's UnmarshalJSON). + #[tokio::test] + async fn submit_proposal_accepts_capitalised_version_header() { + let handler = TestHandler::default(); + let app = test_router(Arc::new(handler), false); + + let block = SignedProposalBlock::Phase0(phase0_signed_block(9)); + let body = serde_json::to_vec(&block).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v2/beacon/blocks") + .header("content-type", "application/json") + .header(VERSION_HEADER, "Phase0") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + } + + /// Missing version header → 400. + #[tokio::test] + async fn submit_proposal_rejects_missing_version_header() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/blocks") + .header("content-type", "application/json") + .body(Body::from("{}")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// An unsupported content type → 415, mirroring Charon's `wrap`. + #[tokio::test] + async fn submit_proposal_rejects_unsupported_content_type() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/blocks") + .header("content-type", "text/plain") + .header(VERSION_HEADER, "phase0") + .body(Body::from("{}")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE); + } + + /// A body that does not match the declared fork → 400. + #[tokio::test] + async fn submit_proposal_rejects_bad_body() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/blocks") + .header("content-type", "application/json") + .header(VERSION_HEADER, "phase0") + .body(Body::from(r#"{"not":"a block"}"#)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// `submit_blinded_block` decodes the blinded JSON block and forwards it. + #[tokio::test] + async fn submit_blinded_block_decodes_and_forwards() { + let handler = TestHandler::default(); + let submitted = handler.submitted_blinded_proposal.clone(); + let app = test_router(Arc::new(handler), false); + + let block = SignedBlindedProposalBlock::Bellatrix(bellatrix_blinded_signed_block(9)); + let body = serde_json::to_vec(&block).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/blinded_blocks") + .header("content-type", "application/json") + .header(VERSION_HEADER, "bellatrix") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = submitted.lock().unwrap().clone().unwrap(); + assert_eq!(got.version, DataVersion::Bellatrix); + } + + /// A pre-Bellatrix fork has no blinded form → 400. + #[tokio::test] + async fn submit_blinded_block_rejects_phase0() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/blinded_blocks") + .header("content-type", "application/json") + .header(VERSION_HEADER, "phase0") + .body(Body::from("{}")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// `get_validators` via repeated/CSV `id` query and the JSON response + /// shape. + #[tokio::test] + async fn get_validators_by_query_id() { + let handler = TestHandler::default().with_validators(EthResponse { + data: vec![sample_validator_datum(7, &format!("0x{}", "11".repeat(48)))], + execution_optimistic: false, + finalized: true, + dependent_root: None, + }); + let opts_handle = handler.validators_opts.clone(); + let app = test_router(Arc::new(handler), false); + + let req = Request::builder() + .uri("/eth/v1/beacon/states/head/validators?id=7,8") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let json = body_json(resp).await; + assert_eq!(json["finalized"], true); + assert_eq!(json["data"][0]["index"], "7"); + + let opts = opts_handle.lock().unwrap().clone().unwrap(); + assert_eq!(opts.state, "head"); + assert_eq!(opts.indices, vec![7, 8]); + assert!(opts.pubkeys.is_empty()); + } + + /// A `0x`-prefixed first id routes the whole batch as pubkeys, per Go's + /// `getValidatorsByID` first-element dispatch. + #[tokio::test] + async fn get_validators_by_pubkey_dispatch_on_first_id() { + let pubkey_hex = format!("0x{}", "11".repeat(48)); + let handler = TestHandler::default().with_validators(EthResponse { + data: vec![], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let opts_handle = handler.validators_opts.clone(); + let app = test_router(Arc::new(handler), false); + + let req = Request::builder() + .uri(format!( + "/eth/v1/beacon/states/head/validators?id={pubkey_hex}" + )) + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + // Empty result serializes to `[]`, not null. + let json = body_json(resp).await; + assert_eq!(json["data"], serde_json::json!([])); + + let opts = opts_handle.lock().unwrap().clone().unwrap(); + assert_eq!(opts.pubkeys.len(), 1); + assert_eq!(opts.pubkeys[0], [0x11; 48]); + assert!(opts.indices.is_empty()); + } + + /// POST with `{"ids":[...]}` body when the query carries no ids. + #[tokio::test] + async fn get_validators_by_json_body_ids() { + let handler = TestHandler::default().with_validators(EthResponse { + data: vec![], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let opts_handle = handler.validators_opts.clone(); + let app = test_router(Arc::new(handler), false); + + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/states/head/validators") + .header("content-type", "application/json") + .body(Body::from(r#"{"ids":["3","4"]}"#)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let opts = opts_handle.lock().unwrap().clone().unwrap(); + assert_eq!(opts.indices, vec![3, 4]); + } + + /// `get_validator` returns a single object on exactly one result. + #[tokio::test] + async fn get_validator_single_result() { + let handler = TestHandler::default().with_validators(EthResponse { + data: vec![sample_validator_datum(7, &format!("0x{}", "11".repeat(48)))], + execution_optimistic: false, + finalized: true, + dependent_root: None, + }); + let app = test_router(Arc::new(handler), false); + + let req = Request::builder() + .uri("/eth/v1/beacon/states/head/validators/7") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let json = body_json(resp).await; + assert_eq!(json["data"]["index"], "7"); + assert!(json["data"].is_object()); + } + + /// `get_validator` returns 404 when the upstream has no match. + #[tokio::test] + async fn get_validator_not_found() { + let handler = TestHandler::default().with_validators(EthResponse { + data: vec![], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let app = test_router(Arc::new(handler), false); + + let req = Request::builder() + .uri("/eth/v1/beacon/states/head/validators/7") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + /// `get_validator` returns 500 when the upstream returns more than one. + #[tokio::test] + async fn get_validator_multiple_results_is_500() { + let handler = TestHandler::default().with_validators(EthResponse { + data: vec![ + sample_validator_datum(7, &format!("0x{}", "11".repeat(48))), + sample_validator_datum(8, &format!("0x{}", "22".repeat(48))), + ], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let app = test_router(Arc::new(handler), false); + + let req = Request::builder() + .uri("/eth/v1/beacon/states/head/validators/7") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); + } + + /// The reverse-proxy fallback forwards method, path, query and body to the + /// upstream beacon node, applies basic auth from the URL userinfo, and + /// returns the upstream response. + #[tokio::test] + async fn proxy_forwards_to_upstream() { + use wiremock::{ + Mock, MockServer, ResponseTemplate, + matchers::{basic_auth, method, path, query_param}, + }; + + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/eth/v1/some/passthrough")) + .and(query_param("foo", "bar")) + .and(basic_auth("user", "pass")) + .respond_with(ResponseTemplate::new(200).set_body_string("upstream-ok")) + .mount(&server) + .await; + + // Inject basic-auth userinfo into the upstream URL. + let mut upstream: reqwest::Url = server.uri().parse().unwrap(); + upstream.set_username("user").unwrap(); + upstream.set_password(Some("pass")).unwrap(); + + let app = new_router(Arc::new(TestHandler::default()), false, upstream); + let req = Request::builder() + .uri("/eth/v1/some/passthrough?foo=bar") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let bytes = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + assert_eq!(&bytes[..], b"upstream-ok"); + } + + /// The proxy propagates a non-2xx upstream status to the client. + #[tokio::test] + async fn proxy_propagates_upstream_error_status() { + use wiremock::{ + Mock, MockServer, ResponseTemplate, + matchers::{method, path}, + }; + + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/eth/v1/missing")) + .respond_with(ResponseTemplate::new(404).set_body_string("nope")) + .mount(&server) + .await; + + let upstream: reqwest::Url = server.uri().parse().unwrap(); + let app = new_router(Arc::new(TestHandler::default()), false, upstream); + let req = Request::builder() + .uri("/eth/v1/missing") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + // ------------------------------------------------------------------- + // Validator lifecycle: register_validator + voluntary_exits + // ------------------------------------------------------------------- + + /// Builds a single signed builder validator registration with the given + /// pubkey first byte, for use as test JSON/SSZ array input. + fn signed_registration(byte: u8) -> pluto_eth2api::v1::SignedValidatorRegistration { + pluto_eth2api::v1::SignedValidatorRegistration { + message: pluto_eth2api::v1::ValidatorRegistration { + fee_recipient: [0x11; 20], + gas_limit: 30_000_000, + timestamp: 1_700_000_000, + pubkey: [byte; 48], + }, + signature: [byte; 96], + } + } + + /// A JSON array body to `register_validator` is decoded and forwarded to + /// the handler. + #[tokio::test] + async fn submit_validator_registrations_decodes_json_array() { + use axum::body::Body; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let recorded = handler.submitted_registrations.clone(); + let app = test_router(Arc::new(handler), true); + + let regs = vec![signed_registration(0xA1), signed_registration(0xA2)]; + let body = serde_json::to_vec(®s).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/register_validator") + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = recorded.lock().unwrap().clone().unwrap(); + assert_eq!(got, regs); + } + + /// An SSZ body to `register_validator` is decoded from a bare + /// concatenation of fixed-size objects and forwarded to the handler. + #[tokio::test] + async fn submit_validator_registrations_decodes_ssz_array() { + use axum::body::Body; + use ssz::Encode; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let recorded = handler.submitted_registrations.clone(); + let app = test_router(Arc::new(handler), true); + + let regs = vec![signed_registration(0xB1), signed_registration(0xB2)]; + let mut body = Vec::new(); + for reg in ®s { + body.extend_from_slice(®.as_ssz_bytes()); + } + // Each object is exactly 180 bytes. + assert_eq!(body.len(), 360); + + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/register_validator") + .header("content-type", "application/octet-stream") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = recorded.lock().unwrap().clone().unwrap(); + assert_eq!(got, regs); + } + + /// An SSZ body that is not a whole multiple of the object size → 415. + #[tokio::test] + async fn submit_validator_registrations_rejects_misaligned_ssz() { + use axum::body::Body; + use tower::ServiceExt; + + let app = test_router(Arc::new(TestHandler::default()), true); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/register_validator") + .header("content-type", "application/octet-stream") + .body(Body::from(vec![0u8; 181])) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE); + } + + /// An empty `register_validator` body → 400. + #[tokio::test] + async fn submit_validator_registrations_rejects_empty_body() { + use axum::body::Body; + use tower::ServiceExt; + + let app = test_router(Arc::new(TestHandler::default()), true); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/register_validator") + .header("content-type", "application/json") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// A JSON voluntary exit is decoded and forwarded to the handler. + #[tokio::test] + async fn submit_exit_decodes_json() { + use axum::body::Body; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let recorded = handler.submitted_exit.clone(); + let app = test_router(Arc::new(handler), false); + + let exit = phase0::SignedVoluntaryExit { + message: phase0::VoluntaryExit { + epoch: 5, + validator_index: 42, + }, + signature: [0x33; 96], + }; + let body = serde_json::to_vec(&exit).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/pool/voluntary_exits") + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = recorded.lock().unwrap().clone().unwrap(); + assert_eq!(got, exit); + } + + /// An empty voluntary-exit body → 400. + #[tokio::test] + async fn submit_exit_rejects_empty_body() { + use axum::body::Body; + use tower::ServiceExt; + + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/pool/voluntary_exits") + .header("content-type", "application/json") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// A voluntary exit submitted with an SSZ content type → 415, since the + /// endpoint is JSON-only. + #[tokio::test] + async fn submit_exit_rejects_ssz_content_type() { + use axum::body::Body; + use tower::ServiceExt; + + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/pool/voluntary_exits") + .header("content-type", "application/octet-stream") + .body(Body::from(vec![0u8; 112])) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE); + } + + /// A malformed voluntary-exit JSON body → 400. + #[tokio::test] + async fn submit_exit_rejects_invalid_json() { + use axum::body::Body; + use tower::ServiceExt; + + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/pool/voluntary_exits") + .header("content-type", "application/json") + .body(Body::from("{not json")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } } diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs index 45980fe7..9fcd3d48 100644 --- a/crates/core/src/validatorapi/testutils.rs +++ b/crates/core/src/validatorapi/testutils.rs @@ -4,6 +4,8 @@ //! every method. As each router endpoint is ported, the relevant method is //! overridden here so the route's unit test can drive it. +use std::sync::{Arc, Mutex}; + use async_trait::async_trait; use super::{ @@ -34,6 +36,24 @@ pub struct TestHandler { pub sync_committee_duties_response: Option, /// Value returned by [`Handler::attestation_data`]. pub attestation_data_response: Option, + /// Value returned by [`Handler::proposal`]. + pub proposal_response: Option>, + /// Value returned by [`Handler::validators`]. + pub validators_response: Option>>, + /// Records the last [`ProposalOpts`] passed to [`Handler::proposal`]. + pub proposal_opts: Arc>>, + /// Records the last proposal submitted via [`Handler::submit_proposal`]. + pub submitted_proposal: Arc>>, + /// Records the last proposal submitted via + /// [`Handler::submit_blinded_proposal`]. + pub submitted_blinded_proposal: Arc>>, + /// Records the last [`ValidatorsOpts`] passed to [`Handler::validators`]. + pub validators_opts: Arc>>, + /// Records the last registrations submitted via + /// [`Handler::submit_validator_registrations`]. + pub submitted_registrations: Arc>>>, + /// Records the last exit submitted via [`Handler::submit_voluntary_exit`]. + pub submitted_exit: Arc>>, } impl TestHandler { @@ -45,6 +65,18 @@ impl TestHandler { } } + /// Sets the response returned by [`Handler::proposal`]. + pub fn with_proposal(mut self, response: EthResponse) -> Self { + self.proposal_response = Some(response); + self + } + + /// Sets the response returned by [`Handler::validators`]. + pub fn with_validators(mut self, response: EthResponse>) -> Self { + self.validators_response = Some(response); + self + } + /// Sets the response returned by [`Handler::proposer_duties`]. pub fn with_proposer_duties(mut self, response: ProposerDutiesResponse) -> Self { self.proposer_duties_response = Some(response); @@ -129,20 +161,32 @@ impl Handler for TestHandler { async fn proposal( &self, - _opts: ProposalOpts, + opts: ProposalOpts, ) -> Result, ApiError> { - unimplemented!("proposal not stubbed in TestHandler") + *self.proposal_opts.lock().expect("proposal_opts lock") = Some(opts); + Ok(self + .proposal_response + .clone() + .expect("proposal not stubbed in TestHandler")) } - async fn submit_proposal(&self, _proposal: VersionedSignedProposal) -> Result<(), ApiError> { - unimplemented!("submit_proposal not stubbed in TestHandler") + async fn submit_proposal(&self, proposal: VersionedSignedProposal) -> Result<(), ApiError> { + *self + .submitted_proposal + .lock() + .expect("submitted_proposal lock") = Some(proposal); + Ok(()) } async fn submit_blinded_proposal( &self, - _proposal: VersionedSignedBlindedProposal, + proposal: VersionedSignedBlindedProposal, ) -> Result<(), ApiError> { - unimplemented!("submit_blinded_proposal not stubbed in TestHandler") + *self + .submitted_blinded_proposal + .lock() + .expect("submitted_blinded_proposal lock") = Some(proposal); + Ok(()) } async fn aggregate_attestation( @@ -175,20 +219,29 @@ impl Handler for TestHandler { async fn validators( &self, - _opts: ValidatorsOpts, + opts: ValidatorsOpts, ) -> Result>, ApiError> { - unimplemented!("validators not stubbed in TestHandler") + *self.validators_opts.lock().expect("validators_opts lock") = Some(opts); + Ok(self + .validators_response + .clone() + .expect("validators not stubbed in TestHandler")) } async fn submit_validator_registrations( &self, - _registrations: Vec, + registrations: Vec, ) -> Result<(), ApiError> { - unimplemented!("submit_validator_registrations not stubbed in TestHandler") + *self + .submitted_registrations + .lock() + .expect("submitted_registrations lock") = Some(registrations); + Ok(()) } - async fn submit_voluntary_exit(&self, _exit: SignedVoluntaryExit) -> Result<(), ApiError> { - unimplemented!("submit_voluntary_exit not stubbed in TestHandler") + async fn submit_voluntary_exit(&self, exit: SignedVoluntaryExit) -> Result<(), ApiError> { + *self.submitted_exit.lock().expect("submitted_exit lock") = Some(exit); + Ok(()) } async fn sync_committee_contribution( diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index a3a9a68a..5e36e5c2 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -17,6 +17,7 @@ pub use pluto_eth2api::{ GetAttesterDutiesResponseResponseDatum as AttesterDuty, GetProposerDutiesResponseResponse as ProposerDutiesResponse, GetProposerDutiesResponseResponseDatum as ProposerDuty, + GetStateValidatorsResponseResponseDatum as Validator, GetSyncCommitteeDutiesResponseResponse as SyncCommitteeDutiesResponse, GetSyncCommitteeDutiesResponseResponseDatum as SyncCommitteeDuty, GetVersionResponseResponse as NodeVersionResponse, @@ -139,10 +140,6 @@ pub struct AttestationDataResponse { pub data: AttestationData, } -/// Validator payload. Placeholder. -#[derive(Debug, Clone)] -pub struct Validator {} - /// Versioned unsigned proposal payload — alias of the signeddata wrapper. pub use crate::signeddata::VersionedProposal; @@ -162,13 +159,14 @@ pub struct VersionedAttestation {} #[derive(Debug, Clone)] pub struct VersionedSignedAggregateAndProof {} -/// Signed validator registration payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SignedValidatorRegistration {} +/// Signed validator registration payload — the bare builder-API v1 object the +/// validator client submits to `register_validator`. The component wraps each +/// entry as a `BuilderVersion::V1` versioned registration before broadcasting. +pub use pluto_eth2api::v1::SignedValidatorRegistration; -/// Signed voluntary exit payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SignedVoluntaryExit {} +/// Signed voluntary exit payload — the phase0 object the validator client +/// submits to `voluntary_exits`. +pub use pluto_eth2api::spec::phase0::SignedVoluntaryExit; /// Sync-committee message payload. Placeholder. #[derive(Debug, Clone)] diff --git a/crates/eth2api/src/v1.rs b/crates/eth2api/src/v1.rs index 19a77f2d..3f92fdba 100644 --- a/crates/eth2api/src/v1.rs +++ b/crates/eth2api/src/v1.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; use serde_with::serde_as; +use ssz_derive::{Decode, Encode}; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; @@ -14,7 +15,7 @@ use crate::spec::{ /// /// Spec: #[serde_as] -#[derive(Debug, Clone, PartialEq, Eq, TreeHash, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode, TreeHash, Serialize, Deserialize)] pub struct ValidatorRegistration { /// Fee recipient address (20 bytes). #[serde(with = "crate::spec::bellatrix::execution_address_serde")] @@ -34,7 +35,7 @@ pub struct ValidatorRegistration { /// /// Spec: #[serde_as] -#[derive(Debug, Clone, PartialEq, Eq, TreeHash, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode, TreeHash, Serialize, Deserialize)] pub struct SignedValidatorRegistration { /// Unsigned validator registration message. pub message: ValidatorRegistration, diff --git a/crates/eth2util/Cargo.toml b/crates/eth2util/Cargo.toml index 9f367638..691bd3c8 100644 --- a/crates/eth2util/Cargo.toml +++ b/crates/eth2util/Cargo.toml @@ -38,6 +38,7 @@ uuid.workspace = true rand.workspace = true reqwest = { workspace = true, features = ["json"] } url.workspace = true +tracing.workspace = true [dev-dependencies] assert-json-diff.workspace = true diff --git a/crates/eth2util/src/helpers.rs b/crates/eth2util/src/helpers.rs index d694f38b..436527df 100644 --- a/crates/eth2util/src/helpers.rs +++ b/crates/eth2util/src/helpers.rs @@ -34,6 +34,18 @@ pub enum HelperError { /// Failed to fetch a required value from the spec #[error("fetch slots per epoch")] FetchSlotsPerEpoch, + + /// Failed to fetch the genesis time from the beacon node. + #[error("fetch genesis time: {0}")] + FetchGenesisTime(String), + + /// Failed to fetch the slots configuration from the beacon node. + #[error("fetch slots config: {0}")] + FetchSlotsConfig(String), + + /// Slot computation failed (overflow or a degenerate slot duration). + #[error("slot computation: {0}")] + SlotComputation(String), } type Result = std::result::Result; @@ -101,6 +113,59 @@ pub(crate) fn verify_address(address: &str) -> Result
{ .map_err(|_| HelperError::InvalidAddress(address.to_string())) } +/// Returns the slot a wall-clock `timestamp` falls in, computed from the +/// beacon-node genesis time and slot duration. +/// +/// When `timestamp` precedes genesis — which can happen in test scenarios +/// where there is no strict validation on the value — it falls back to the +/// current wall-clock time, matching the reference implementation. +pub async fn slot_from_timestamp( + client: &pluto_eth2api::client::EthBeaconNodeApiClient, + timestamp: chrono::DateTime, +) -> Result { + let genesis_time = client + .fetch_genesis_time() + .await + .map_err(|e| HelperError::FetchGenesisTime(e.to_string()))?; + + let (slot_duration, _) = client + .fetch_slots_config() + .await + .map_err(|e| HelperError::FetchSlotsConfig(e.to_string()))?; + + let timestamp = if timestamp < genesis_time { + let now = chrono::Utc::now(); + tracing::info!( + genesis_timestamp = genesis_time.timestamp(), + overridden_timestamp = timestamp.timestamp(), + new_timestamp = now.timestamp(), + "timestamp before genesis, defaulting to current timestamp", + ); + now + } else { + timestamp + }; + + // `timestamp >= genesis_time` holds here, so the signed delta is + // non-negative and the conversion to nanoseconds cannot overflow for any + // realistic chain timestamp. + let delta_nanos = timestamp + .signed_duration_since(genesis_time) + .num_nanoseconds() + .ok_or(HelperError::SlotComputation("delta overflow".to_owned()))?; + let delta_nanos = u128::try_from(delta_nanos) + .map_err(|_| HelperError::SlotComputation("negative delta".to_owned()))?; + + let slot_nanos = slot_duration.as_nanos(); + let slot = delta_nanos + .checked_div(slot_nanos) + .ok_or(HelperError::SlotComputation( + "zero slot duration".to_owned(), + ))?; + + u64::try_from(slot).map_err(|_| HelperError::SlotComputation("slot overflow".to_owned())) +} + /// Returns epoch calculated from given slot. pub async fn epoch_from_slot( client: &pluto_eth2api::client::EthBeaconNodeApiClient, @@ -118,7 +183,44 @@ pub async fn epoch_from_slot( #[cfg(test)] mod tests { use super::*; + use chrono::{DateTime, Utc}; use k256::SecretKey; + use pluto_testutil::BeaconMock; + + async fn slot_mock() -> BeaconMock { + BeaconMock::builder() + .genesis_time(DateTime::from_timestamp(0, 0).unwrap()) + .slot_duration(std::time::Duration::from_secs(12)) + .build() + .await + .unwrap() + } + + #[tokio::test] + async fn slot_from_timestamp_divides_delta_by_slot_duration() { + let mock = slot_mock().await; + // genesis = 0, slot_duration = 12s → timestamp 600 ⇒ slot 50. + let ts = DateTime::::from_timestamp(600, 0).unwrap(); + let slot = slot_from_timestamp(mock.client(), ts).await.unwrap(); + assert_eq!(slot, 50); + } + + #[tokio::test] + async fn slot_from_timestamp_before_genesis_falls_back_to_now() { + let mock = slot_mock().await; + // A timestamp before genesis (genesis = 0) falls back to the current + // wall clock. With genesis at epoch 0 and a 12s slot, the returned + // slot should approximate `now / 12`, well within a few slots. + let ts = DateTime::::from_timestamp(-100, 0).unwrap(); + let before = Utc::now().timestamp(); + let slot = slot_from_timestamp(mock.client(), ts).await.unwrap(); + let expected = u64::try_from(before).unwrap().checked_div(12).unwrap(); + let diff = slot.abs_diff(expected); + assert!( + diff <= 5, + "slot {slot} should be within 5 of expected {expected}" + ); + } #[test] fn checksummed_address() {