diff --git a/crates/core/src/scheduler.rs b/crates/core/src/scheduler.rs index 3893460b..f04e15c5 100644 --- a/crates/core/src/scheduler.rs +++ b/crates/core/src/scheduler.rs @@ -1161,7 +1161,10 @@ mod tests { pubkey: pubkey.to_string(), validator_index: v_idx.to_string(), slot: slot.to_string(), - ..Default::default() + committee_index: "0".to_owned(), + committee_length: "0".to_owned(), + committees_at_slot: "0".to_owned(), + validator_committee_index: "0".to_owned(), }; let def: types::AttesterDutyDefinition = datum.try_into().expect("valid attester datum"); types::DutyDefinition::Attester(def) diff --git a/crates/core/src/signeddata.rs b/crates/core/src/signeddata.rs index 2fc4ed95..0ea6635d 100644 --- a/crates/core/src/signeddata.rs +++ b/crates/core/src/signeddata.rs @@ -1176,7 +1176,7 @@ impl SignedSyncContributionAndProof { } /// Attester duty metadata associated with an attestation. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct AttesterDuty { /// Slot for the duty. pub slot: phase0::Slot, diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 2d1a1bfd..4d45a3eb 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -435,6 +435,15 @@ pub struct AttesterDutyDefinition { pub v_idx: u64, /// The slot at which the validator must attest. pub slot: SlotNumber, + /// Index of the beacon committee the validator attests in. + pub committee_index: u64, + /// Number of validators in the committee. + pub committee_length: u64, + /// Total number of committees at the slot. + pub committees_at_slot: u64, + /// Position of the validator within its committee. Used to match a + /// submitted attestation's single aggregation bit back to this validator. + pub validator_committee_index: u64, } impl TryInto @@ -452,11 +461,30 @@ impl TryInto SlotNumber::from(self.slot.parse::().map_err(|_| { pluto_eth2api::EthBeaconNodeApiClientError::ParseError("slot".into()) })?); + let committee_index = self.committee_index.parse::().map_err(|_| { + pluto_eth2api::EthBeaconNodeApiClientError::ParseError("committee_index".into()) + })?; + let committee_length = self.committee_length.parse::().map_err(|_| { + pluto_eth2api::EthBeaconNodeApiClientError::ParseError("committee_length".into()) + })?; + let committees_at_slot = self.committees_at_slot.parse::().map_err(|_| { + pluto_eth2api::EthBeaconNodeApiClientError::ParseError("committees_at_slot".into()) + })?; + let validator_committee_index = + self.validator_committee_index.parse::().map_err(|_| { + pluto_eth2api::EthBeaconNodeApiClientError::ParseError( + "validator_committee_index".into(), + ) + })?; Ok(AttesterDutyDefinition { pubkey, v_idx, slot, + committee_index, + committee_length, + committees_at_slot, + validator_committee_index, }) } } diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 71ff8e12..29abef48 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -13,7 +13,7 @@ use pluto_eth2api::{ GetProposerDutiesRequest, GetProposerDutiesResponse, GetStateValidatorsResponseResponse, GetSyncCommitteeDutiesRequest, GetSyncCommitteeDutiesResponse, PostStateValidatorsRequest, PostStateValidatorsRequestPath, PostStateValidatorsResponse, ValidatorRequestBody, - spec::phase0::{BLSPubKey, Domain, Epoch, Root, Slot, ValidatorIndex}, + spec::phase0::{AttestationData, BLSPubKey, Domain, Epoch, Root, Slot, ValidatorIndex}, valcache::{ActiveValidators, CachedValidatorsProvider}, versioned::{DataVersion, SignedBlindedProposalBlock, SignedProposalBlock}, }; @@ -50,8 +50,8 @@ use crate::{ VersionedSignedValidatorRegistration as VersionedSignedValidatorRegistrationWrapper, }, types::{ - Duty, DutyDefinitionSet, ParSignedData, ParSignedDataSet, PubKey, Signature, SignedData, - SlotNumber, + Duty, DutyDefinition, DutyDefinitionSet, ParSignedData, ParSignedDataSet, PubKey, + Signature, SignedData, SlotNumber, }, version, }; @@ -161,8 +161,8 @@ pub struct Component { eth2_cl: Arc, /// Per-epoch active-validators cache. Submit handlers consult this to /// translate a validator-client-supplied `validator_index` into the - /// cluster's DV root public key. Backed by the beacon-node validator cache. - #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + /// cluster's DV root public key. Mirrors Go's `eth2Cl.ActiveValidators`, + /// which is itself backed by the beacon-node validator cache. validator_cache: Arc, /// In-memory DutyDB used to await consensus output (e.g. attestation /// data) produced by the rest of the pipeline. @@ -186,19 +186,16 @@ pub struct Component { #[allow(dead_code, reason = "consumed by proposal handler in later PRs")] await_proposal_fn: Option, /// Looks up an aggregated attestation by `(slot, attestation_root)`. - #[allow(dead_code, reason = "consumed by aggregate_attestation in later PRs")] await_agg_attestation_fn: Option, /// Looks up a sync committee contribution. await_sync_contribution_fn: Option, /// Looks up aggregated signed data for a `(duty, pubkey)`. - #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] await_agg_sig_db_fn: Option, /// Looks up the duty-definition set for a duty. The proposal / /// submit_proposal / submit_blinded_proposal handlers consult this to /// resolve the proposer's DV root pubkey. duty_def_fn: Option, /// Looks up the root pubkey for an `(slot, commIdx, valIdx)` triple. - #[allow(dead_code, reason = "consumed by submit_attestations in later PRs")] pub_key_by_att_fn: Option, } @@ -261,8 +258,9 @@ impl Component { /// Returns the cluster's active validators (`validator_index -> DV root /// public key`) from the registered [`CachedValidatorsProvider`], - /// bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. - #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + /// bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. Mirrors Go's + /// `c.eth2Cl.ActiveValidators(ctx)`, which is itself implemented via the + /// beacon-node validator cache. async fn fetch_active_validators(&self) -> Result { tokio::time::timeout( UPSTREAM_REQUEST_TIMEOUT, @@ -495,13 +493,132 @@ impl Component { .map_err(map_dutydb_error) } + /// Resolves the validator index for a VC-submitted attestation. + /// + /// For Electra and Fulu the validator index is carried explicitly in the + /// payload; a missing index is rejected. For all earlier forks the index is + /// not part of the attestation, so it is recovered by matching the + /// attestation's committee index and its single aggregation bit against the + /// scheduler's attester-duty definitions for the slot. + async fn resolve_attestation_validator_index( + &self, + att: &VersionedAttestation, + att_data: &AttestationData, + ) -> Result { + use pluto_eth2api::versioned::DataVersion; + + match att.0.version { + DataVersion::Phase0 + | DataVersion::Altair + | DataVersion::Bellatrix + | DataVersion::Capella + | DataVersion::Deneb => { + let def_set = self.lookup_attester_definitions(att_data.slot).await?; + + // Match the attestation to an attester duty by committee index + // and the single aggregation bit. When no duty matches, the + // validator index stays 0 and the subsequent pubkey lookup + // fails — matching Go, which does not error at this point. + let mut val_idx = 0; + for def in def_set.values() { + let DutyDefinition::Attester(duty) = def else { + continue; + }; + if duty.committee_index != att_data.index { + continue; + } + + let indices = attestation_aggregation_bit_indices(att)?; + let [single_bit] = indices.as_slice() else { + return Err(ApiError::new( + StatusCode::BAD_REQUEST, + "unexpected number of aggregation bits", + )); + }; + + if duty.validator_committee_index == *single_bit as u64 { + val_idx = duty.v_idx; + break; + } + } + + Ok(val_idx) + } + DataVersion::Electra | DataVersion::Fulu => att.0.validator_index.ok_or_else(|| { + ApiError::new( + StatusCode::BAD_REQUEST, + "missing attestation validator index", + ) + }), + DataVersion::Unknown => Err(ApiError::new( + StatusCode::BAD_REQUEST, + "invalid attestations version", + )), + } + } + + /// Looks up the attester-duty definition set for a slot via the registered + /// `duty_def_fn`, downcasting the type-erased result to the attester-duty + /// shape. Mirrors Go's `dutyDefFunc(ctx, Duty{Slot, DutyAttester})`. + async fn lookup_attester_definitions(&self, slot: u64) -> Result { + let f = self.duty_def_fn.as_ref().ok_or_else(|| { + ApiError::new( + StatusCode::SERVICE_UNAVAILABLE, + "duty definition lookup not registered", + ) + })?; + + let duty = Duty::new_attester_duty(SlotNumber::new(slot)); + let boxed = f(duty).await.map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "duty definition lookup failed", + ) + .with_boxed_source(err) + })?; + + boxed + .downcast::() + .map(|boxed| *boxed) + .map_err(|_| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "duty definition lookup returned unexpected type", + ) + }) + } + + /// Resolves the DV root pubkey responsible for an attestation `(slot, + /// committee_index, validator_index)` triple via the registered + /// `pub_key_by_att_fn`. + async fn pub_key_by_attestation( + &self, + slot: u64, + committee_index: u64, + validator_index: u64, + ) -> Result { + let f = self.pub_key_by_att_fn.as_ref().ok_or_else(|| { + ApiError::new( + StatusCode::SERVICE_UNAVAILABLE, + "pubkey-by-attestation lookup not registered", + ) + })?; + + f(slot, committee_index, validator_index) + .await + .map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "failed to find pubkey") + .with_boxed_source(err) + }) + } + /// Verifies a partial BLS signature produced by the validator client /// against this node's public share for the given DV root pubkey. /// /// The BLS domain / epoch / message-root are passed directly rather - /// than projected through a signed-data trait — each submit handler in - /// later PRs derives the triple from the concrete signed-data wrapper - /// it is processing, then invokes this helper. + /// than projected through a signed-data trait — each submit handler + /// derives the triple from the concrete signed-data wrapper it is + /// processing, then invokes this helper. /// /// Skipped entirely when [`Self::insecure_test`] is set. #[instrument(skip_all, fields(domain = ?domain_name, epoch))] @@ -908,9 +1025,51 @@ impl Handler for Component { #[instrument(skip_all)] async fn submit_attestations( &self, - _attestations: Vec, + attestations: Vec, ) -> Result<(), ApiError> { - unimplemented!("submit_attestations not yet ported") + let mut sets_by_slot: HashMap = HashMap::new(); + + for att in attestations { + let att_data = attestation_data(&att)?; + let slot = att_data.slot; + let committee_index = attestation_committee_index(&att)?; + + let val_idx = self + .resolve_attestation_validator_index(&att, &att_data) + .await?; + + let pubkey = self + .pub_key_by_attestation(slot, committee_index, val_idx) + .await?; + + let par_sig_data = + crate::signeddata::VersionedAttestation::new_partial(att.0.clone(), self.share_idx) + .map_err(map_attestation_signed_data_error)?; + + // Verify attestation signature. Domain is DOMAIN_BEACON_ATTESTER and + // the epoch comes from the attestation's own target checkpoint + // (not derived from the slot). + verify_par_signed_attestation(self, &pubkey, att_data.target.epoch, &par_sig_data) + .await?; + + sets_by_slot + .entry(slot) + .or_default() + .insert(pubkey, par_sig_data); + } + + // Send sets to subscriptions. + for (slot, set) in sets_by_slot { + let duty = Duty::new_attester_duty(SlotNumber::new(slot)); + for sub in &self.subs { + sub(&duty, &set).await.map_err(|err| { + ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, "subscriber failed") + .with_boxed_source(err) + })?; + } + } + + Ok(()) } #[instrument(skip_all, fields(slot = opts.slot))] @@ -1088,20 +1247,109 @@ impl Handler for Component { .map_err(|_: Elapsed| proposal_timeout())? } - #[instrument(skip_all)] + #[instrument(skip_all, fields(slot = opts.slot))] async fn aggregate_attestation( &self, - _opts: AggregateAttestationOpts, + opts: AggregateAttestationOpts, ) -> Result, ApiError> { - unimplemented!("aggregate_attestation not yet ported") + let f = self.await_agg_attestation_fn.as_ref().ok_or_else(|| { + ApiError::new( + StatusCode::SERVICE_UNAVAILABLE, + "await aggregate attestation not registered", + ) + })?; + + let agg_att = f(opts.slot, opts.attestation_data_root) + .await + .map_err(|err| { + ApiError::new( + StatusCode::SERVICE_UNAVAILABLE, + "await aggregate attestation failed", + ) + .with_boxed_source(err) + })?; + + // The await hook yields an unsigned aggregated attestation; the response + // type is the same versioned-attestation wrapper. + let data = crate::signeddata::VersionedAttestation::new(agg_att.0) + .map_err(map_attestation_signed_data_error)?; + + Ok(EthResponse { + data, + execution_optimistic: false, + finalized: false, + dependent_root: None, + }) } #[instrument(skip_all)] async fn submit_aggregate_attestations( &self, - _aggregates: Vec, + aggregates: Vec, ) -> Result<(), ApiError> { - unimplemented!("submit_aggregate_attestations not yet ported") + let vals = self.fetch_active_validators().await?; + + let mut sets_by_slot: HashMap = HashMap::new(); + + for agg in aggregates { + let slot = agg.0.slot().ok_or_else(|| { + ApiError::new( + StatusCode::BAD_REQUEST, + "invalid aggregate-and-proof version", + ) + })?; + let aggregator_index = aggregate_aggregator_index(&agg)?; + + let eth2_pubkey = vals + .get(&aggregator_index) + .ok_or_else(|| ApiError::new(StatusCode::BAD_REQUEST, "validator not found"))?; + let pubkey = pubkey_from_bls(eth2_pubkey); + + // Verify the inner selection proof (the outcome of + // DutyPrepareAggregator). Skipped under insecure_test, matching Go. + if !self.insecure_test { + signing::verify_aggregate_and_proof_selection(&self.eth2_cl, eth2_pubkey, &agg.0) + .await + .map_err(|err| { + ApiError::new( + StatusCode::BAD_REQUEST, + "aggregate selection proof verification failed", + ) + .with_source(err) + })?; + } + + let par_sig_data = crate::signeddata::VersionedSignedAggregateAndProof::new_partial( + agg.0.clone(), + self.share_idx, + ); + + // Verify the outer partial signature over the aggregate-and-proof. + let epoch = pluto_eth2util::helpers::epoch_from_slot(&self.eth2_cl, slot) + .await + .map_err(|err| { + ApiError::new(StatusCode::BAD_GATEWAY, "could not resolve epoch from slot") + .with_source(err) + })?; + verify_par_signed_aggregate(self, &pubkey, epoch, &par_sig_data).await?; + + sets_by_slot + .entry(slot) + .or_default() + .insert(pubkey, par_sig_data); + } + + for (slot, set) in sets_by_slot { + let duty = Duty::new_aggregator_duty(SlotNumber::new(slot)); + for sub in &self.subs { + sub(&duty, &set).await.map_err(|err| { + ApiError::new(StatusCode::INTERNAL_SERVER_ERROR, "subscriber failed") + .with_boxed_source(err) + })?; + } + } + + Ok(()) } #[instrument(skip_all)] @@ -2066,6 +2314,197 @@ async fn verify_par_signed_proposal( .map_err(verify_partial_sig_error) } +/// Verifies the partial signature embedded in an attestation `ParSignedData` +/// against this node's public share for `pubkey`. The domain is +/// `DOMAIN_BEACON_ATTESTER` and the epoch is the attestation's own target +/// epoch (passed by the caller). +async fn verify_par_signed_attestation( + component: &Component, + pubkey: &PubKey, + epoch: Epoch, + par_sig: &crate::types::ParSignedData, +) -> Result<(), ApiError> { + verify_par_sig_with_domain( + component, + pubkey, + DomainName::BeaconAttester, + epoch, + par_sig, + ) + .await +} + +/// Verifies the outer partial signature embedded in an aggregate-and-proof +/// `ParSignedData`. The domain is `DOMAIN_AGGREGATE_AND_PROOF`. +async fn verify_par_signed_aggregate( + component: &Component, + pubkey: &PubKey, + epoch: Epoch, + par_sig: &crate::types::ParSignedData, +) -> Result<(), ApiError> { + verify_par_sig_with_domain( + component, + pubkey, + DomainName::AggregateAndProof, + epoch, + par_sig, + ) + .await +} + +/// Extracts the signature + message root from a `ParSignedData` and verifies it +/// against this node's public share for `pubkey` under the supplied BLS domain +/// and epoch. +async fn verify_par_sig_with_domain( + component: &Component, + pubkey: &PubKey, + domain: DomainName, + epoch: Epoch, + par_sig: &crate::types::ParSignedData, +) -> Result<(), ApiError> { + let signature = par_sig.signed_data.signature().map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "could not extract partial signature", + ) + .with_source(err) + })?; + let message_root = par_sig.signed_data.message_root().map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "could not derive message root", + ) + .with_source(err) + })?; + + let pubkey_bytes = pubkey_to_bls(pubkey); + component + .verify_partial_sig(&pubkey_bytes, domain, epoch, message_root, &signature) + .await + .map_err(verify_partial_sig_error) +} + +/// Builds a core [`PubKey`] from a 48-byte BLS public key. +fn pubkey_from_bls(pubkey: &BLSPubKey) -> PubKey { + PubKey::new(*pubkey) +} + +/// Returns the attestation data of a VC-submitted versioned attestation. +fn attestation_data(att: &VersionedAttestation) -> Result { + let payload = att + .0 + .attestation + .as_ref() + .ok_or_else(|| ApiError::new(StatusCode::BAD_REQUEST, "missing attestation payload"))?; + Ok(payload.data().clone()) +} + +/// Returns the committee index of a VC-submitted versioned attestation, +/// mirroring go-eth2-client's `VersionedAttestation.CommitteeIndex`: for +/// pre-Electra forks it is the attestation data's `index`; for Electra and Fulu +/// it is the single set bit of the committee bitfield. +fn attestation_committee_index(att: &VersionedAttestation) -> Result { + use pluto_eth2api::versioned::{AttestationPayload, DataVersion}; + + let payload = att + .0 + .attestation + .as_ref() + .ok_or_else(|| ApiError::new(StatusCode::BAD_REQUEST, "missing attestation payload"))?; + + match att.0.version { + DataVersion::Phase0 + | DataVersion::Altair + | DataVersion::Bellatrix + | DataVersion::Capella + | DataVersion::Deneb => Ok(payload.data().index), + DataVersion::Electra | DataVersion::Fulu => { + let (AttestationPayload::Electra(att) | AttestationPayload::Fulu(att)) = payload else { + return Err(ApiError::new( + StatusCode::BAD_REQUEST, + "electra attestation payload mismatch", + )); + }; + let bits = att.committee_bits.bit_indices(); + match bits.as_slice() { + [single] => Ok(*single as u64), + [] => Err(ApiError::new( + StatusCode::BAD_REQUEST, + "no committee index found in committee bits", + )), + _ => Err(ApiError::new( + StatusCode::BAD_REQUEST, + "multiple committee indices found in committee bits", + )), + } + } + DataVersion::Unknown => Err(ApiError::new( + StatusCode::BAD_REQUEST, + "invalid attestations version", + )), + } +} + +/// Returns the aggregator validator index of a VC-submitted versioned signed +/// aggregate-and-proof. +fn aggregate_aggregator_index(agg: &VersionedSignedAggregateAndProof) -> Result { + use pluto_eth2api::versioned::{DataVersion, SignedAggregateAndProofPayload}; + + if agg.0.version == DataVersion::Unknown { + return Err(ApiError::new( + StatusCode::BAD_REQUEST, + "invalid aggregate-and-proof version", + )); + } + + Ok(match &agg.0.aggregate_and_proof { + SignedAggregateAndProofPayload::Phase0(p) + | SignedAggregateAndProofPayload::Altair(p) + | SignedAggregateAndProofPayload::Bellatrix(p) + | SignedAggregateAndProofPayload::Capella(p) + | SignedAggregateAndProofPayload::Deneb(p) => p.message.aggregator_index, + SignedAggregateAndProofPayload::Electra(p) | SignedAggregateAndProofPayload::Fulu(p) => { + p.message.aggregator_index + } + }) +} + +/// Returns the set-bit indices of a pre-Electra attestation's aggregation +/// bitfield. Only the per-validator forks that reach the pre-Electra branch of +/// `submit_attestations` (Phase0/Altair/Bellatrix/Capella/Deneb) are handled; +/// they all carry a `phase0::Attestation`. +fn attestation_aggregation_bit_indices(att: &VersionedAttestation) -> Result, ApiError> { + use pluto_eth2api::versioned::AttestationPayload; + + let payload = att + .0 + .attestation + .as_ref() + .ok_or_else(|| ApiError::new(StatusCode::BAD_REQUEST, "missing attestation payload"))?; + + match payload { + AttestationPayload::Phase0(a) + | AttestationPayload::Altair(a) + | AttestationPayload::Bellatrix(a) + | AttestationPayload::Capella(a) + | AttestationPayload::Deneb(a) => Ok(a.aggregation_bits.bit_indices()), + AttestationPayload::Electra(a) | AttestationPayload::Fulu(a) => { + Ok(a.aggregation_bits.bit_indices()) + } + } +} + +/// Maps a [`SignedDataError`] from an attestation `new_partial`/`new` +/// constructor to the `ApiError` returned on submit. These fire only when the +/// VC-supplied attestation is malformed (missing payload, unknown version). +fn map_attestation_signed_data_error(err: SignedDataError) -> ApiError { + ApiError::new( + StatusCode::BAD_REQUEST, + "could not wrap VC attestation as signed data", + ) + .with_source(err) +} + /// Cross-checks a VC-submitted proposal against the consensus proposal that /// landed in the dutydb for the same slot. Version, blinded flag, proposer /// index, and the SSZ tree-hash root of the block must all match. @@ -5929,4 +6368,455 @@ mod tests { assert_eq!(err.status_code, StatusCode::BAD_REQUEST); assert!(!err.message.contains("secret")); } + + // ==================================================================== + // submit_attestations / aggregate_attestation / + // submit_aggregate_attestations / beacon_committee_selections + // ==================================================================== + + use pluto_eth2api::{ + spec::electra, + v1::BeaconCommitteeSelection as Eth2BeaconCommitteeSelection, + versioned::{ + AttestationPayload, SignedAggregateAndProofPayload, + VersionedAttestation as Eth2VersionedAttestation, + VersionedSignedAggregateAndProof as Eth2VersionedSignedAggregateAndProof, + }, + }; + + use crate::{ + signeddata::{ + VersionedAttestation as SignedVersionedAttestation, + VersionedSignedAggregateAndProof as SignedVersionedAggregateAndProof, + }, + types::AttesterDutyDefinition, + }; + + /// Build an insecure component (skips BLS verify) pinned to a proposal-spec + /// beacon mock (so `epoch_from_slot` resolves) with a populated validator + /// cache. Returns the component and the mock (held to keep it alive). + async fn make_attestation_component( + cache: HashMap, + ) -> (Component, BeaconMock) { + let mock = mock_beacon_for_proposal().await; + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-attestation-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + let component = Component { + eth2_cl, + dutydb, + share_idx: 1, + pub_share_by_pubkey: HashMap::new(), + builder_enabled: false, + validator_cache: TestValidatorCache::arc(cache), + insecure_test: true, + subs: Vec::new(), + await_proposal_fn: None, + await_agg_attestation_fn: None, + await_sync_contribution_fn: None, + await_agg_sig_db_fn: None, + duty_def_fn: None, + pub_key_by_att_fn: None, + }; + (component, mock) + } + + /// Records the `(duty, set)` pairs a submit handler broadcasts. + type RecordedSets = Arc>>; + + /// Registers a subscriber that records every broadcast `(duty, set)`. + fn record_subscriber(component: &mut Component) -> RecordedSets { + let recorded: RecordedSets = Arc::new(Mutex::new(Vec::new())); + let sink = Arc::clone(&recorded); + component.subscribe(move |duty, set| { + let sink = Arc::clone(&sink); + async move { + sink.lock().unwrap().push((duty, set)); + Ok(()) + } + }); + recorded + } + + fn phase0_attestation(slot: u64, index: u64, bit: usize) -> p0::Attestation { + p0::Attestation { + aggregation_bits: pluto_ssz::BitList::<2048>::with_bits(64, &[bit]), + data: p0::AttestationData { + slot, + index, + beacon_block_root: [0; 32], + source: p0::Checkpoint::default(), + target: p0::Checkpoint { + epoch: 1, + root: [0; 32], + }, + }, + signature: [0; 96], + } + } + + fn phase0_versioned_attestation(att: p0::Attestation) -> SignedVersionedAttestation { + SignedVersionedAttestation::new(Eth2VersionedAttestation { + version: DataVersion::Phase0, + validator_index: None, + attestation: Some(AttestationPayload::Phase0(att)), + }) + .unwrap() + } + + /// `submit_attestations` resolves a pre-Electra validator index from the + /// attester duty set, looks up the pubkey, and broadcasts under an attester + /// duty. + #[tokio::test] + async fn submit_attestations_pre_electra_resolves_and_broadcasts() { + const SLOT: u64 = 9; + const COMM_IDX: u64 = 3; + const VAL_IDX: u64 = 42; + const BIT: usize = 2; + + let (mut component, _mock) = make_attestation_component(HashMap::new()).await; + let recorded = record_subscriber(&mut component); + + // Attester duty set: one duty whose committee index and committee + // position match the submitted attestation. + component.register_get_duty_definition(move |_duty| async move { + let mut set = DutyDefinitionSet::new(); + set.insert( + core_pubkey(0x01), + DutyDefinition::Attester(AttesterDutyDefinition { + pubkey: core_pubkey(0x01), + slot: SlotNumber::new(SLOT), + v_idx: VAL_IDX, + committee_index: COMM_IDX, + committee_length: 64, + committees_at_slot: 1, + validator_committee_index: BIT as u64, + }), + ); + Ok(Box::new(set) as Box) + }); + + let returned = core_pubkey(0xAB); + component.register_pub_key_by_attestation(move |slot, comm, val| async move { + assert_eq!(slot, SLOT); + assert_eq!(comm, COMM_IDX); + assert_eq!(val, VAL_IDX); + Ok(returned) + }); + + let att = phase0_versioned_attestation(phase0_attestation(SLOT, COMM_IDX, BIT)); + component.submit_attestations(vec![att]).await.unwrap(); + + let sets = recorded.lock().unwrap(); + assert_eq!(sets.len(), 1); + assert_eq!(sets[0].0, Duty::new_attester_duty(SlotNumber::new(SLOT))); + assert!(sets[0].1.get(&core_pubkey(0xAB)).is_some()); + } + + /// `submit_attestations` rejects when a matching attester duty exists but + /// the aggregation bits do not select exactly one validator. + #[tokio::test] + async fn submit_attestations_rejects_multiple_aggregation_bits() { + const COMM_IDX: u64 = 3; + let (mut component, _mock) = make_attestation_component(HashMap::new()).await; + // A duty whose committee index matches the attestation, so the + // single-bit check is reached. + component.register_get_duty_definition(move |_duty| async move { + let mut set = DutyDefinitionSet::new(); + set.insert( + core_pubkey(0x01), + DutyDefinition::Attester(AttesterDutyDefinition { + pubkey: core_pubkey(0x01), + slot: SlotNumber::new(9), + v_idx: 1, + committee_index: COMM_IDX, + committee_length: 64, + committees_at_slot: 1, + validator_committee_index: 0, + }), + ); + Ok(Box::new(set) as Box) + }); + component.register_pub_key_by_attestation(|_, _, _| async { Ok(core_pubkey(0x01)) }); + + let mut att = phase0_attestation(9, COMM_IDX, 0); + att.aggregation_bits = pluto_ssz::BitList::<2048>::with_bits(64, &[0, 1]); + let err = component + .submit_attestations(vec![phase0_versioned_attestation(att)]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// `submit_attestations` reads the validator index straight from an Electra + /// payload (no duty-set lookup) and broadcasts. + #[tokio::test] + async fn submit_attestations_electra_uses_explicit_validator_index() { + const SLOT: u64 = 9; + const COMM_IDX: u64 = 5; + const VAL_IDX: u64 = 77; + + let (mut component, _mock) = make_attestation_component(HashMap::new()).await; + let recorded = record_subscriber(&mut component); + + component.register_pub_key_by_attestation(move |slot, comm, val| async move { + assert_eq!(slot, SLOT); + assert_eq!(comm, COMM_IDX); + assert_eq!(val, VAL_IDX); + Ok(core_pubkey(0xCD)) + }); + + let committee_bits = + pluto_ssz::BitVector::<64>::with_bits(&[usize::try_from(COMM_IDX).unwrap()]); + let att = SignedVersionedAttestation::new(Eth2VersionedAttestation { + version: DataVersion::Electra, + validator_index: Some(VAL_IDX), + attestation: Some(AttestationPayload::Electra(electra::Attestation { + aggregation_bits: pluto_ssz::BitList::<131_072>::with_bits(0, &[]), + data: phase0_attestation(SLOT, COMM_IDX, 0).data, + signature: [0; 96], + committee_bits, + })), + }) + .unwrap(); + + component.submit_attestations(vec![att]).await.unwrap(); + assert_eq!(recorded.lock().unwrap().len(), 1); + } + + /// `aggregate_attestation` returns the attestation produced by the + /// registered await hook. + #[tokio::test] + async fn aggregate_attestation_returns_hook_result() { + let (mut component, _mock) = make_attestation_component(HashMap::new()).await; + + let att = Eth2VersionedAttestation { + version: DataVersion::Phase0, + validator_index: None, + attestation: Some(AttestationPayload::Phase0(phase0_attestation(9, 3, 1))), + }; + component.register_await_agg_attestation(move |slot, _root| { + let att = att.clone(); + async move { + assert_eq!(slot, 9); + Ok(VersionedAggregatedAttestation(att)) + } + }); + + let response = component + .aggregate_attestation(AggregateAttestationOpts { + slot: 9, + attestation_data_root: [0x11; 32], + committee_index: 3, + }) + .await + .unwrap(); + assert_eq!(response.data.0.version, DataVersion::Phase0); + } + + /// `aggregate_attestation` returns 503 when no await hook is registered. + #[tokio::test] + async fn aggregate_attestation_rejects_when_hook_missing() { + let (component, _mock) = make_attestation_component(HashMap::new()).await; + let err = component + .aggregate_attestation(AggregateAttestationOpts { + slot: 9, + attestation_data_root: [0; 32], + committee_index: 0, + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::SERVICE_UNAVAILABLE); + } + + /// `submit_aggregate_attestations` resolves the aggregator pubkey from the + /// validator cache and broadcasts under an aggregator duty. + #[tokio::test] + async fn submit_aggregate_attestations_resolves_and_broadcasts() { + const SLOT: u64 = 9; + const AGG_IDX: u64 = 7; + + let root = dv_pubkey(0xEE); + let cache = HashMap::from([(AGG_IDX, root)]); + let (mut component, _mock) = make_attestation_component(cache).await; + let recorded = record_subscriber(&mut component); + + let agg = Eth2VersionedSignedAggregateAndProof { + version: DataVersion::Phase0, + aggregate_and_proof: SignedAggregateAndProofPayload::Phase0( + p0::SignedAggregateAndProof { + message: p0::AggregateAndProof { + aggregator_index: AGG_IDX, + aggregate: phase0_attestation(SLOT, 0, 0), + selection_proof: [0; 96], + }, + signature: [0; 96], + }, + ), + }; + + component + .submit_aggregate_attestations(vec![SignedVersionedAggregateAndProof::new(agg)]) + .await + .unwrap(); + + let sets = recorded.lock().unwrap(); + assert_eq!(sets.len(), 1); + assert_eq!(sets[0].0, Duty::new_aggregator_duty(SlotNumber::new(SLOT))); + assert!(sets[0].1.get(&core_pubkey(0xEE)).is_some()); + } + + /// `submit_aggregate_attestations` rejects when the aggregator index is not + /// in the active-validator set. + #[tokio::test] + async fn submit_aggregate_attestations_rejects_unknown_validator() { + let (component, _mock) = make_attestation_component(HashMap::new()).await; + + let agg = Eth2VersionedSignedAggregateAndProof { + version: DataVersion::Phase0, + aggregate_and_proof: SignedAggregateAndProofPayload::Phase0( + p0::SignedAggregateAndProof { + message: p0::AggregateAndProof { + aggregator_index: 99, + aggregate: phase0_attestation(9, 0, 0), + selection_proof: [0; 96], + }, + signature: [0; 96], + }, + ), + }; + + let err = component + .submit_aggregate_attestations(vec![SignedVersionedAggregateAndProof::new(agg)]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// `beacon_committee_selections` verifies + broadcasts each selection under + /// a prepare-aggregator duty, then returns the aggregated selection awaited + /// from the AggSigDB hook. + #[tokio::test] + async fn beacon_committee_selections_broadcasts_and_returns_aggregated() { + const SLOT: u64 = 9; + const VAL_IDX: u64 = 7; + + let root = dv_pubkey(0xEE); + let cache = HashMap::from([(VAL_IDX, root)]); + let (mut component, _mock) = make_attestation_component(cache).await; + let recorded = record_subscriber(&mut component); + + // The AggSigDB hook returns an aggregated beacon committee selection. + let aggregated = Eth2BeaconCommitteeSelection { + slot: SLOT, + validator_index: VAL_IDX, + selection_proof: [0xAB; 96], + }; + component.register_await_agg_sig_db(move |duty, _pk| { + let aggregated = aggregated.clone(); + async move { + assert_eq!( + duty, + Duty::new_prepare_aggregator_duty(SlotNumber::new(SLOT)) + ); + Ok(Box::new(SignedBeaconCommitteeSelection::new(aggregated)) + as Box) + } + }); + + let selection = Eth2BeaconCommitteeSelection { + slot: SLOT, + validator_index: VAL_IDX, + selection_proof: [0; 96], + }; + let response = component + .beacon_committee_selections(vec![selection]) + .await + .unwrap(); + + assert_eq!(response.data.len(), 1); + assert_eq!(response.data[0].selection_proof, [0xAB; 96]); + + let sets = recorded.lock().unwrap(); + assert_eq!(sets.len(), 1); + assert_eq!( + sets[0].0, + Duty::new_prepare_aggregator_duty(SlotNumber::new(SLOT)) + ); + } + + /// `beacon_committee_selections` rejects when the validator index is not in + /// the active-validator set. + #[tokio::test] + async fn beacon_committee_selections_rejects_unknown_validator() { + let (mut component, _mock) = make_attestation_component(HashMap::new()).await; + // The AggSigDB hook is checked first; register it so the test reaches + // the validator-not-found path. + component.register_await_agg_sig_db(|_duty, _pk| async { + Err::, _>("unused".into()) + }); + let selection = Eth2BeaconCommitteeSelection { + slot: 9, + validator_index: 1, + selection_proof: [0; 96], + }; + let err = component + .beacon_committee_selections(vec![selection]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// `beacon_committee_selections` groups selections by slot and broadcasts a + /// distinct prepare-aggregator duty per slot. + #[tokio::test] + async fn beacon_committee_selections_broadcasts_per_slot() { + let cache = HashMap::from([(7, dv_pubkey(0xE1)), (8, dv_pubkey(0xE2))]); + let (mut component, _mock) = make_attestation_component(cache).await; + let recorded = record_subscriber(&mut component); + + component.register_await_agg_sig_db(|_duty, pk| async move { + // Echo back a selection keyed by the requested pubkey's slot is not + // available here; return a fixed aggregated selection. + let _ = pk; + Ok(Box::new(SignedBeaconCommitteeSelection::new( + Eth2BeaconCommitteeSelection { + slot: 0, + validator_index: 0, + selection_proof: [0xCD; 96], + }, + )) as Box) + }); + + let selections = vec![ + Eth2BeaconCommitteeSelection { + slot: 7, + validator_index: 7, + selection_proof: [0; 96], + }, + Eth2BeaconCommitteeSelection { + slot: 8, + validator_index: 8, + selection_proof: [0; 96], + }, + ]; + let response = component + .beacon_committee_selections(selections) + .await + .unwrap(); + assert_eq!(response.data.len(), 2); + + // Two distinct prepare-aggregator duties broadcast, one per slot. + let sets = recorded.lock().unwrap(); + assert_eq!(sets.len(), 2); + let mut slots: Vec = sets.iter().map(|(duty, _)| duty.slot.inner()).collect(); + slots.sort_unstable(); + assert_eq!(slots, vec![7, 8]); + } } diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index 1e80d15a..7be1ae3b 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -19,28 +19,35 @@ use axum::{ }; use pluto_crypto::types::PublicKey as BlsPubKey; use pluto_eth2api::{ - spec::DataVersion, + spec::{DataVersion, electra, phase0}, versioned::{ - SignedBlindedProposalBlock, SignedProposalBlock, VersionedSignedBlindedProposal, - VersionedSignedProposal as RawVersionedSignedProposal, + AttestationPayload, SignedAggregateAndProofPayload, SignedBlindedProposalBlock, + SignedProposalBlock, VersionedAttestation, + VersionedSignedAggregateAndProof as RawVersionedSignedAggregateAndProof, + VersionedSignedBlindedProposal, VersionedSignedProposal as RawVersionedSignedProposal, }, }; +use pluto_ssz::{BitList, BitVector}; use serde::Deserialize; use serde_json::{Value, json}; +use ssz::Decode; use super::{ error::ApiError, handler::Handler, metrics::{ApiLatencyTimer, ProxyLatencyTimer}, types::{ - AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, AttesterDutiesResponse, - BeaconCommitteeSelection, BeaconCommitteeSelectionsResponse, CommitteeIndex, - NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, ProposerDutiesResponse, - SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, SyncCommitteeSelection, - SyncCommitteeSelectionsResponse, ValIndexes, ValidatorsOpts, + AggregateAttestationOpts, AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, + AttesterDutiesResponse, BeaconCommitteeSelection, BeaconCommitteeSelectionsResponse, + CommitteeIndex, NodeVersionResponse, ProposalOpts, ProposerDutiesOpts, + ProposerDutiesResponse, SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, + SyncCommitteeSelection, SyncCommitteeSelectionsResponse, ValIndexes, ValidatorsOpts, }, }; -use crate::signeddata::{ProposalBlock, VersionedSignedProposal}; +use crate::signeddata::{ + ProposalBlock, VersionedAttestation as SignedVersionedAttestation, + VersionedSignedAggregateAndProof as SignedVersionedAggregateAndProof, VersionedSignedProposal, +}; /// Cap on the `POST /eth/v1/validator/duties/{attester,sync}/{epoch}` request /// bodies. A realistic cluster ships at most a few thousand validator indices; @@ -67,7 +74,6 @@ const EXECUTION_PAYLOAD_BLINDED_HEADER: &str = "Eth-Execution-Payload-Blinded"; const EXECUTION_PAYLOAD_VALUE_HEADER: &str = "Eth-Execution-Payload-Value"; /// Response header carrying the consensus block value, in Wei. const CONSENSUS_BLOCK_VALUE_HEADER: &str = "Eth-Consensus-Block-Value"; - /// Cap on the `POST /eth/v1/validator/{beacon,sync}_committee_selections` /// request bodies. Each selection is ~210-250 bytes of JSON (slot, validator /// index, optional subcommittee index, 96-byte BLS proof in `0x` hex), so @@ -359,8 +365,132 @@ fn json_rejection_to_api_error(rejection: JsonRejection) -> ApiError { ApiError::new(status, message).with_source(std::io::Error::other(rejection.body_text())) } -async fn submit_attestations() { - todo!("vapi: submit_attestations"); +/// `POST /eth/v2/beacon/pool/attestations`. +/// +/// Decodes a versioned array of attestations (JSON or SSZ, fork selected by the +/// `Eth-Consensus-Version` header). Pre-Electra forks carry a bare +/// `phase0::Attestation`; Electra and Fulu carry a `SingleAttestation` that is +/// lifted into the versioned wrapper with its committee index encoded in the +/// committee bitfield and the attester index recorded as the validator index. +/// Mirrors `submitAttestations`. +async fn submit_attestations( + State(state): State>, + headers: HeaderMap, + body: Bytes, +) -> Result { + let version = consensus_version_header(&headers)?; + let ssz = request_is_ssz(&headers)?; + + let attestations = decode_versioned_attestations(version, &body, ssz)?; + + state.handler.submit_attestations(attestations).await?; + Ok(StatusCode::OK.into_response()) +} + +/// Decodes a submitted versioned attestation array for the given fork. +fn decode_versioned_attestations( + version: DataVersion, + body: &[u8], + ssz: bool, +) -> Result, ApiError> { + let invalid = |source: Box| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid submitted attestations") + .with_boxed_source(source) + }; + + match version { + DataVersion::Phase0 + | DataVersion::Altair + | DataVersion::Bellatrix + | DataVersion::Capella + | DataVersion::Deneb => { + let atts: Vec = if ssz { + Vec::::from_ssz_bytes(body) + .map_err(|err| invalid(format!("ssz: {err:?}").into()))? + } else { + serde_json::from_slice(body).map_err(|err| invalid(Box::new(err)))? + }; + + atts.into_iter() + .map(|att| { + build_versioned_attestation(VersionedAttestation { + version, + validator_index: None, + attestation: Some(phase0_attestation_payload(version, att)), + }) + }) + .collect() + } + DataVersion::Electra | DataVersion::Fulu => { + let atts: Vec = if ssz { + Vec::::from_ssz_bytes(body) + .map_err(|err| invalid(format!("ssz: {err:?}").into()))? + } else { + serde_json::from_slice(body).map_err(|err| invalid(Box::new(err)))? + }; + + atts.into_iter() + .map(|single| { + // SingleAttestation disregards aggregation bits once it is + // converted back, so an empty bitlist is safe; the committee + // index is carried in the committee bitfield. + let committee_bit = usize::try_from(single.committee_index) + .ok() + .filter(|&idx| idx < 64) + .ok_or_else(|| { + ApiError::new( + StatusCode::BAD_REQUEST, + "committee index out of range for committee bits", + ) + })?; + let committee_bits = BitVector::<64>::with_bits(&[committee_bit]); + let attestation = electra::Attestation { + aggregation_bits: BitList::<131_072>::with_bits(0, &[]), + data: single.data, + signature: single.signature, + committee_bits, + }; + let payload = match version { + DataVersion::Electra => AttestationPayload::Electra(attestation), + _ => AttestationPayload::Fulu(attestation), + }; + build_versioned_attestation(VersionedAttestation { + version, + validator_index: Some(single.attester_index), + attestation: Some(payload), + }) + }) + .collect() + } + DataVersion::Unknown => Err(ApiError::new( + StatusCode::BAD_REQUEST, + "invalid attestations version", + )), + } +} + +/// Selects the per-fork pre-Electra `AttestationPayload` variant. +fn phase0_attestation_payload( + version: DataVersion, + att: phase0::Attestation, +) -> AttestationPayload { + match version { + DataVersion::Phase0 => AttestationPayload::Phase0(att), + DataVersion::Altair => AttestationPayload::Altair(att), + DataVersion::Bellatrix => AttestationPayload::Bellatrix(att), + DataVersion::Capella => AttestationPayload::Capella(att), + _ => AttestationPayload::Deneb(att), + } +} + +/// Wraps a raw versioned attestation as the validated signeddata type, mapping +/// a construction failure to `400`. +fn build_versioned_attestation( + att: VersionedAttestation, +) -> Result { + SignedVersionedAttestation::new(att).map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "invalid submitted attestation").with_source(err) + }) } /// `GET,POST /eth/v1/beacon/states/{state_id}/validators`. @@ -540,6 +670,11 @@ async fn submit_exit() { todo!("vapi: submit_exit"); } +/// `POST /eth/v1/validator/beacon_committee_selections`. +/// +/// Decodes a JSON array of partially-signed beacon committee selections, +/// forwards them to the handler, and returns the aggregated selections in a +/// `{ "data": [...] }` envelope. Mirrors `beaconCommitteeSelections`. async fn beacon_committee_selections( State(state): State>, selections: Result>, JsonRejection>, @@ -555,12 +690,193 @@ async fn beacon_committee_selections( })) } -async fn aggregate_attestation() { - todo!("vapi: aggregate_attestation"); +/// `GET /eth/v2/validator/aggregate_attestation`. +/// +/// Reads the `slot`, `attestation_data_root`, and `committee_index` query +/// parameters, asks the handler for the aggregated attestation, and returns it +/// as a versioned response carrying the `Eth-Consensus-Version` header. +/// Mirrors `aggregateAttestation`. +async fn aggregate_attestation( + State(state): State>, + RawQuery(query): RawQuery, +) -> Result { + let params = parse_query(query.as_deref()); + + let slot = uint_query(¶ms, "slot")?; + let attestation_data_root = hex_query_fixed::<32>(¶ms, "attestation_data_root")?; + let committee_index = uint_query(¶ms, "committee_index")?; + + let response = state + .handler + .aggregate_attestation(AggregateAttestationOpts { + slot, + attestation_data_root, + committee_index, + }) + .await?; + + let version = response.data.0.version; + let data = serialize_aggregate_attestation(&response.data)?; + let body = json!({ + "version": version.as_str(), + "data": data, + }); + + let mut headers = HeaderMap::new(); + insert_header(&mut headers, VERSION_HEADER, version.as_str())?; + + Ok((headers, Json(body)).into_response()) +} + +/// `POST /eth/v2/validator/aggregate_and_proofs`. +/// +/// Decodes a versioned array of signed aggregate-and-proofs (JSON or SSZ, fork +/// selected by the `Eth-Consensus-Version` header) and forwards them to the +/// handler. Mirrors `submitAggregateAttestations`. +async fn submit_aggregate_attestations( + State(state): State>, + headers: HeaderMap, + body: Bytes, +) -> Result { + let version = consensus_version_header(&headers)?; + let ssz = request_is_ssz(&headers)?; + + let aggregates = decode_versioned_aggregate_and_proofs(version, &body, ssz)?; + + state + .handler + .submit_aggregate_attestations(aggregates) + .await?; + Ok(StatusCode::OK.into_response()) +} + +/// Decodes a submitted versioned signed aggregate-and-proof array for the given +/// fork. Pre-Electra forks carry a `phase0::SignedAggregateAndProof`; Electra +/// and Fulu carry an `electra::SignedAggregateAndProof`. +fn decode_versioned_aggregate_and_proofs( + version: DataVersion, + body: &[u8], + ssz: bool, +) -> Result, ApiError> { + let invalid = |source: Box| { + ApiError::new( + StatusCode::BAD_REQUEST, + "invalid submitted aggregate and proofs", + ) + .with_boxed_source(source) + }; + + let aggregate_and_proof = match version { + DataVersion::Phase0 + | DataVersion::Altair + | DataVersion::Bellatrix + | DataVersion::Capella + | DataVersion::Deneb => { + let aggs: Vec = if ssz { + Vec::::from_ssz_bytes(body) + .map_err(|err| invalid(format!("ssz: {err:?}").into()))? + } else { + serde_json::from_slice(body).map_err(|err| invalid(Box::new(err)))? + }; + aggs.into_iter() + .map(|agg| build_versioned_aggregate(version, phase0_agg_payload(version, agg))) + .collect::, _>>()? + } + DataVersion::Electra | DataVersion::Fulu => { + let aggs: Vec = if ssz { + Vec::::from_ssz_bytes(body) + .map_err(|err| invalid(format!("ssz: {err:?}").into()))? + } else { + serde_json::from_slice(body).map_err(|err| invalid(Box::new(err)))? + }; + aggs.into_iter() + .map(|agg| { + let payload = match version { + DataVersion::Electra => SignedAggregateAndProofPayload::Electra(agg), + _ => SignedAggregateAndProofPayload::Fulu(agg), + }; + build_versioned_aggregate(version, payload) + }) + .collect::, _>>()? + } + DataVersion::Unknown => { + return Err(ApiError::new( + StatusCode::BAD_REQUEST, + "invalid aggregate and proofs version", + )); + } + }; + + Ok(aggregate_and_proof) } -async fn submit_aggregate_attestations() { - todo!("vapi: submit_aggregate_attestations"); +/// Selects the per-fork pre-Electra signed aggregate-and-proof payload variant. +fn phase0_agg_payload( + version: DataVersion, + agg: phase0::SignedAggregateAndProof, +) -> SignedAggregateAndProofPayload { + match version { + DataVersion::Phase0 => SignedAggregateAndProofPayload::Phase0(agg), + DataVersion::Altair => SignedAggregateAndProofPayload::Altair(agg), + DataVersion::Bellatrix => SignedAggregateAndProofPayload::Bellatrix(agg), + DataVersion::Capella => SignedAggregateAndProofPayload::Capella(agg), + _ => SignedAggregateAndProofPayload::Deneb(agg), + } +} + +/// Wraps a raw versioned signed aggregate-and-proof as the signeddata type. +fn build_versioned_aggregate( + version: DataVersion, + payload: SignedAggregateAndProofPayload, +) -> Result { + Ok(SignedVersionedAggregateAndProof::new( + RawVersionedSignedAggregateAndProof { + version, + aggregate_and_proof: payload, + }, + )) +} + +/// Serializes an aggregated attestation to the bare per-fork payload Charon's +/// `createAggregateAttestation` puts in the `data` field. +fn serialize_aggregate_attestation(att: &SignedVersionedAttestation) -> Result { + let payload = att.0.attestation.as_ref().ok_or_else(|| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "no aggregate attestation", + ) + })?; + let to_value = |value: Result| { + value.map_err(|err| internal_error("could not serialize aggregate attestation", err)) + }; + match payload { + AttestationPayload::Phase0(a) + | AttestationPayload::Altair(a) + | AttestationPayload::Bellatrix(a) + | AttestationPayload::Capella(a) + | AttestationPayload::Deneb(a) => to_value(serde_json::to_value(a)), + AttestationPayload::Electra(a) | AttestationPayload::Fulu(a) => { + to_value(serde_json::to_value(a)) + } + } +} + +/// Decodes a required decimal `u64` query parameter. Mirrors Charon's +/// `uintQuery`. +fn uint_query(params: &[(String, String)], name: &str) -> Result { + let value = query_value(params, name).ok_or_else(|| { + ApiError::new( + StatusCode::BAD_REQUEST, + format!("missing query parameter {name}"), + ) + })?; + value.parse::().map_err(|err| { + ApiError::new( + StatusCode::BAD_REQUEST, + format!("invalid query parameter {name} [{value}]"), + ) + .with_source(err) + }) } async fn submit_sync_committee_messages() { @@ -1490,7 +1806,6 @@ mod tests { assert!(bad.is_err()); } - // ----------------------------------------------------------------------- // PR 1: proxy + proposal/validators handler tests // ----------------------------------------------------------------------- @@ -2144,6 +2459,386 @@ mod tests { assert_eq!(resp.status(), StatusCode::NOT_FOUND); } + // ----------------------------------------------------------------------- + // PR 2: attestation + aggregation handler tests + // ----------------------------------------------------------------------- + + /// Phase0 attestation at `slot` with committee `index` and a single + /// aggregation bit set at position `bit`. + fn phase0_attestation(slot: u64, index: u64, bit: usize) -> p0::Attestation { + p0::Attestation { + aggregation_bits: BitList::<2048>::with_bits(64, &[bit]), + data: p0::AttestationData { + slot, + index, + beacon_block_root: [0; 32], + source: p0::Checkpoint { + epoch: 0, + root: [0; 32], + }, + target: p0::Checkpoint { + epoch: 1, + root: [0; 32], + }, + }, + signature: [0; 96], + } + } + + /// Phase0 signed aggregate-and-proof at `slot` for `aggregator_index`. + fn phase0_aggregate(slot: u64, aggregator_index: u64) -> p0::SignedAggregateAndProof { + p0::SignedAggregateAndProof { + message: p0::AggregateAndProof { + aggregator_index, + aggregate: phase0_attestation(slot, 0, 0), + selection_proof: [0; 96], + }, + signature: [0; 96], + } + } + + /// `submit_attestations` decodes a phase0 JSON array and forwards it. + #[tokio::test] + async fn submit_attestations_decodes_phase0_json() { + let handler = TestHandler::default(); + let recorded = handler.submitted_attestations.clone(); + let app = test_router(Arc::new(handler), false); + + let body = serde_json::to_vec(&vec![phase0_attestation(9, 3, 1)]).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v2/beacon/pool/attestations") + .header("content-type", "application/json") + .header(VERSION_HEADER, "phase0") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = recorded.lock().unwrap().clone().unwrap(); + assert_eq!(got.len(), 1); + assert_eq!(got[0].0.version, DataVersion::Phase0); + } + + /// `submit_attestations` decodes a phase0 SSZ array body. + #[tokio::test] + async fn submit_attestations_decodes_phase0_ssz() { + use ssz::Encode; + + let handler = TestHandler::default(); + let recorded = handler.submitted_attestations.clone(); + let app = test_router(Arc::new(handler), false); + + let body = vec![phase0_attestation(9, 3, 1)].as_ssz_bytes(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v2/beacon/pool/attestations") + .header("content-type", "application/octet-stream") + .header(VERSION_HEADER, "phase0") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = recorded.lock().unwrap().clone().unwrap(); + assert_eq!(got.len(), 1); + } + + /// `submit_attestations` lifts an Electra `SingleAttestation` into the + /// versioned wrapper: committee index in the committee bits, attester index + /// recorded as the validator index. + #[tokio::test] + async fn submit_attestations_decodes_electra_single_attestation() { + let handler = TestHandler::default(); + let recorded = handler.submitted_attestations.clone(); + let app = test_router(Arc::new(handler), false); + + let single = electra::SingleAttestation { + committee_index: 5, + attester_index: 42, + data: phase0_attestation(9, 0, 0).data, + signature: [0; 96], + }; + let body = serde_json::to_vec(&vec![single]).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v2/beacon/pool/attestations") + .header("content-type", "application/json") + .header(VERSION_HEADER, "electra") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = recorded.lock().unwrap().clone().unwrap(); + assert_eq!(got.len(), 1); + assert_eq!(got[0].0.version, DataVersion::Electra); + assert_eq!(got[0].0.validator_index, Some(42)); + } + + /// Missing version header → 400. + #[tokio::test] + async fn submit_attestations_rejects_missing_version_header() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v2/beacon/pool/attestations") + .header("content-type", "application/json") + .body(Body::from("[]")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// An unsupported content type → 415. + #[tokio::test] + async fn submit_attestations_rejects_unsupported_content_type() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v2/beacon/pool/attestations") + .header("content-type", "text/plain") + .header(VERSION_HEADER, "phase0") + .body(Body::from("[]")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE); + } + + /// A malformed body → 400. + #[tokio::test] + async fn submit_attestations_rejects_bad_body() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v2/beacon/pool/attestations") + .header("content-type", "application/json") + .header(VERSION_HEADER, "phase0") + .body(Body::from(r#"[{"not":"an attestation"}]"#)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// The legacy v1 attestation route is still a 404. + #[tokio::test] + async fn submit_attestations_v1_route_is_404() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/beacon/pool/attestations") + .header("content-type", "application/json") + .body(Body::from("[]")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + /// `submit_aggregate_attestations` decodes a phase0 JSON array and + /// forwards. + #[tokio::test] + async fn submit_aggregate_attestations_decodes_phase0_json() { + let handler = TestHandler::default(); + let recorded = handler.submitted_aggregates.clone(); + let app = test_router(Arc::new(handler), false); + + let body = serde_json::to_vec(&vec![phase0_aggregate(9, 7)]).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v2/validator/aggregate_and_proofs") + .header("content-type", "application/json") + .header(VERSION_HEADER, "phase0") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = recorded.lock().unwrap().clone().unwrap(); + assert_eq!(got.len(), 1); + assert_eq!(got[0].0.slot(), Some(9)); + } + + /// `submit_aggregate_attestations` decodes a phase0 SSZ array body. + #[tokio::test] + async fn submit_aggregate_attestations_decodes_phase0_ssz() { + use ssz::Encode; + + let handler = TestHandler::default(); + let recorded = handler.submitted_aggregates.clone(); + let app = test_router(Arc::new(handler), false); + + let body = vec![phase0_aggregate(9, 7)].as_ssz_bytes(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v2/validator/aggregate_and_proofs") + .header("content-type", "application/octet-stream") + .header(VERSION_HEADER, "phase0") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = recorded.lock().unwrap().clone().unwrap(); + assert_eq!(got.len(), 1); + } + + /// `submit_aggregate_attestations` decodes an Electra + /// `SignedAggregateAndProof` array. + #[tokio::test] + async fn submit_aggregate_attestations_decodes_electra_json() { + let handler = TestHandler::default(); + let recorded = handler.submitted_aggregates.clone(); + let app = test_router(Arc::new(handler), false); + + let agg = electra::SignedAggregateAndProof { + message: electra::AggregateAndProof { + aggregator_index: 7, + aggregate: electra::Attestation { + aggregation_bits: BitList::<131_072>::with_bits(8, &[0]), + data: phase0_attestation(9, 0, 0).data, + signature: [0; 96], + committee_bits: BitVector::<64>::with_bits(&[0]), + }, + selection_proof: [0; 96], + }, + signature: [0; 96], + }; + let body = serde_json::to_vec(&vec![agg]).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v2/validator/aggregate_and_proofs") + .header("content-type", "application/json") + .header(VERSION_HEADER, "electra") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let got = recorded.lock().unwrap().clone().unwrap(); + assert_eq!(got.len(), 1); + assert_eq!(got[0].0.version, DataVersion::Electra); + assert_eq!(got[0].0.slot(), Some(9)); + } + + /// An Electra `SingleAttestation` with a committee index outside the + /// 64-committee bitfield → 400 (rather than a panic). + #[tokio::test] + async fn submit_attestations_rejects_out_of_range_committee_index() { + let app = test_router(Arc::new(TestHandler::default()), false); + let single = electra::SingleAttestation { + committee_index: 64, + attester_index: 1, + data: phase0_attestation(9, 0, 0).data, + signature: [0; 96], + }; + let body = serde_json::to_vec(&vec![single]).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v2/beacon/pool/attestations") + .header("content-type", "application/json") + .header(VERSION_HEADER, "electra") + .body(Body::from(body)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// Missing version header → 400. + #[tokio::test] + async fn submit_aggregate_attestations_rejects_missing_version_header() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v2/validator/aggregate_and_proofs") + .header("content-type", "application/json") + .body(Body::from("[]")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// The legacy v1 aggregate route is still a 404. + #[tokio::test] + async fn submit_aggregate_attestations_v1_route_is_404() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/aggregate_and_proofs") + .header("content-type", "application/json") + .body(Body::from("[]")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + /// `aggregate_attestation` reads the query params, calls the handler, and + /// returns the versioned aggregate with the consensus-version header. + #[tokio::test] + async fn aggregate_attestation_returns_versioned_response() { + let att = SignedVersionedAttestation::new(VersionedAttestation { + version: DataVersion::Phase0, + validator_index: None, + attestation: Some(AttestationPayload::Phase0(phase0_attestation(9, 3, 1))), + }) + .unwrap(); + let handler = TestHandler::default().with_aggregate_attestation(EthResponse { + data: att, + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let recorded = handler.aggregate_attestation_opts.clone(); + let app = test_router(Arc::new(handler), false); + + let root_hex = format!("0x{}", "11".repeat(32)); + let uri = format!( + "/eth/v2/validator/aggregate_attestation?slot=9&committee_index=3&attestation_data_root={root_hex}" + ); + let req = Request::builder() + .method(Method::GET) + .uri(uri) + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!(resp.headers().get(VERSION_HEADER).unwrap(), "phase0",); + + let opts = recorded.lock().unwrap().clone().unwrap(); + assert_eq!(opts.slot, 9); + assert_eq!(opts.committee_index, 3); + assert_eq!(opts.attestation_data_root, [0x11; 32]); + + let json = body_json(resp).await; + assert_eq!(json["version"], "phase0"); + assert!(json["data"].is_object()); + } + + /// A missing required query parameter → 400. + #[tokio::test] + async fn aggregate_attestation_rejects_missing_query() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::GET) + .uri("/eth/v2/validator/aggregate_attestation?slot=9") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + /// The legacy v1 aggregate_attestation route is still a 404. + #[tokio::test] + async fn aggregate_attestation_v1_route_is_404() { + let app = test_router(Arc::new(TestHandler::default()), false); + let req = Request::builder() + .method(Method::GET) + .uri("/eth/v1/validator/aggregate_attestation?slot=9") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + /// Verifies the router wraps the `Handler::beacon_committee_selections` /// payload into the `{ "data": [...] }` wire shape, dropping the /// `execution_optimistic` / `finalized` / `dependent_root` metadata that diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs index ed5cf140..29bfa98e 100644 --- a/crates/core/src/validatorapi/testutils.rs +++ b/crates/core/src/validatorapi/testutils.rs @@ -49,6 +49,19 @@ pub struct TestHandler { pub submitted_blinded_proposal: Arc>>, /// Records the last [`ValidatorsOpts`] passed to [`Handler::validators`]. pub validators_opts: Arc>>, + /// Records the attestations submitted via [`Handler::submit_attestations`]. + pub submitted_attestations: Arc>>>, + /// Records the aggregate-and-proofs submitted via + /// [`Handler::submit_aggregate_attestations`]. + pub submitted_aggregates: Arc>>>, + /// Records the selections passed to + /// [`Handler::beacon_committee_selections`]. + pub beacon_committee_selections_opts: Arc>>>, + /// Value returned by [`Handler::aggregate_attestation`]. + pub aggregate_attestation_response: Option>, + /// Records the last [`AggregateAttestationOpts`] passed to + /// [`Handler::aggregate_attestation`]. + pub aggregate_attestation_opts: Arc>>, /// Value returned by [`Handler::beacon_committee_selections`]. pub beacon_committee_selections_response: Option>>, /// Value returned by [`Handler::sync_committee_selections`]. @@ -100,6 +113,15 @@ impl TestHandler { self } + /// Sets the response returned by [`Handler::aggregate_attestation`]. + pub fn with_aggregate_attestation( + mut self, + response: EthResponse, + ) -> Self { + self.aggregate_attestation_response = Some(response); + self + } + /// Sets the response returned by [`Handler::beacon_committee_selections`]. pub fn with_beacon_committee_selections( mut self, @@ -171,9 +193,13 @@ impl Handler for TestHandler { async fn submit_attestations( &self, - _attestations: Vec, + attestations: Vec, ) -> Result<(), ApiError> { - unimplemented!("submit_attestations not stubbed in TestHandler") + *self + .submitted_attestations + .lock() + .expect("submitted_attestations lock") = Some(attestations); + Ok(()) } async fn proposal( @@ -208,26 +234,41 @@ impl Handler for TestHandler { async fn aggregate_attestation( &self, - _opts: AggregateAttestationOpts, + opts: AggregateAttestationOpts, ) -> Result, ApiError> { - unimplemented!("aggregate_attestation not stubbed in TestHandler") + *self + .aggregate_attestation_opts + .lock() + .expect("aggregate_attestation_opts lock") = Some(opts); + Ok(self + .aggregate_attestation_response + .clone() + .expect("aggregate_attestation not stubbed in TestHandler")) } async fn submit_aggregate_attestations( &self, - _aggregates: Vec, + aggregates: Vec, ) -> Result<(), ApiError> { - unimplemented!("submit_aggregate_attestations not stubbed in TestHandler") + *self + .submitted_aggregates + .lock() + .expect("submitted_aggregates lock") = Some(aggregates); + Ok(()) } async fn beacon_committee_selections( &self, - _selections: Vec, + selections: Vec, ) -> Result>, ApiError> { - match self.beacon_committee_selections_response.as_ref() { - Some(r) => Ok(r.clone()), - None => unimplemented!("beacon_committee_selections not stubbed in TestHandler"), - } + *self + .beacon_committee_selections_opts + .lock() + .expect("beacon_committee_selections_opts lock") = Some(selections); + Ok(self + .beacon_committee_selections_response + .clone() + .expect("beacon_committee_selections not stubbed in TestHandler")) } async fn sync_committee_selections( diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index 411ccea1..9e8fad98 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -171,13 +171,13 @@ pub use crate::signeddata::VersionedSignedProposal; /// wrapper. pub use pluto_eth2api::versioned::VersionedSignedBlindedProposal; -/// Versioned attestation payload. Placeholder. -#[derive(Debug, Clone)] -pub struct VersionedAttestation {} +/// Versioned attestation payload — alias of the signeddata wrapper. Carries a +/// VC-submitted attestation across all supported forks. +pub use crate::signeddata::VersionedAttestation; -/// Versioned signed aggregate-and-proof payload. Placeholder. -#[derive(Debug, Clone)] -pub struct VersionedSignedAggregateAndProof {} +/// Versioned signed aggregate-and-proof payload — alias of the signeddata +/// wrapper. +pub use crate::signeddata::VersionedSignedAggregateAndProof; /// Signed validator (builder) registration payload. /// diff --git a/crates/eth2api/src/spec/electra.rs b/crates/eth2api/src/spec/electra.rs index ff98d337..2faa2380 100644 --- a/crates/eth2api/src/spec/electra.rs +++ b/crates/eth2api/src/spec/electra.rs @@ -64,6 +64,26 @@ pub struct Attestation { pub committee_bits: BitVector<64>, } +/// Electra single attestation, the wire form a validator client submits to +/// `POST /eth/v2/beacon/pool/attestations` for Electra and Fulu. +/// +/// Spec: +#[serde_as] +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode, TreeHash, Serialize, Deserialize)] +pub struct SingleAttestation { + /// Committee index. + #[serde_as(as = "serde_with::DisplayFromStr")] + pub committee_index: u64, + /// Attesting validator index. + #[serde_as(as = "serde_with::DisplayFromStr")] + pub attester_index: phase0::ValidatorIndex, + /// Attestation data. + pub data: phase0::AttestationData, + /// Validator signature. + #[serde_as(as = "pluto_ssz::serde_utils::Hex0x")] + pub signature: phase0::BLSSignature, +} + /// Execution-layer deposit request. /// /// Spec: