diff --git a/src/lean_spec/node/sync/service.py b/src/lean_spec/node/sync/service.py index feca86cee..ffcfba381 100644 --- a/src/lean_spec/node/sync/service.py +++ b/src/lean_spec/node/sync/service.py @@ -204,10 +204,12 @@ def ancestors(start: Bytes32) -> set[Bytes32]: # We only count blocks that pass validation and update the store. self._blocks_processed += 1 - # Recover per-attestation proofs from every processed block. - # Queue them for publishing only when this node is an aggregator. - new_store, aggregates = self._deconstruct_block_into_store(new_store, block) - if self.is_aggregator: + # Aggregators recover per-attestation proofs from each processed block, + # then re-broadcast them. Non-aggregators rely on the gossip path instead. + # Skip while syncing: historical blocks flood this path and the justified + # anchor is still moving, so recovered votes would not match a live head. + if self.is_aggregator and self.state == SyncState.SYNCED: + new_store, aggregates = self._deconstruct_block_into_store(new_store, block) self._pending_block_aggregates.extend(aggregates) # Write-through persistence: synchronous and optional. @@ -587,11 +589,16 @@ def _deconstruct_block_into_store( } aggregates: list[SignedAggregatedAttestation] = [] + # Block building anchors every packed vote on the head state's justified + # checkpoint as its source, so only votes with that source are worth recovering. + # Read it from the passed store, which the head-sync drain advances per block. + head_state_justified_checkpoint = store.states[store.head].latest_justified + for attestation in block_attestations: attestation_data = attestation.data - # Skip targets at or behind justified, which can no longer advance justification. - if attestation_data.target.slot <= store.latest_justified.slot: + # A vote with any other source is never selected into a block. + if attestation_data.source != head_state_justified_checkpoint: continue data_root = hash_tree_root(attestation_data) diff --git a/tests/node/sync/test_service.py b/tests/node/sync/test_service.py index 6fbde2b6e..b9d44abee 100644 --- a/tests/node/sync/test_service.py +++ b/tests/node/sync/test_service.py @@ -857,12 +857,11 @@ def test_replay_plain_mixed_success_and_failure(self, sync_service: SyncService) # # Deconstruction only runs for an attestation when: # -# - its target is ahead of the store's justified checkpoint, so the proof -# can still help move justification, and +# - its source is the store's current justified checkpoint, so a future block +# build could anchor on it and pack the vote, and # - it adds at least one participant the node does not already hold. # -# Only the decision/gate paths are exercised here. -# These tests check when the split runs, not the cryptographic split itself. +# These tests check when the split runs, plus one positive path that runs it. # The cryptographic split and merge are covered by the aggregation consensus vectors. # Round-robin proposer is slot % num_validators with four validators. @@ -923,20 +922,29 @@ def _service(peer_id: PeerId): return create_mock_sync_service(peer_id) -def test_skips_when_target_not_ahead_of_justified( +def test_skips_when_source_not_current_justified( peer_id: PeerId, key_manager: XmssKeyManager ) -> None: """ - Target at or behind the justified checkpoint -> no aggregates. + Source other than the head state's justified checkpoint -> no aggregates. - The block's attestation cannot move justification, so the expensive - split is never attempted and the store is returned unchanged. + Block building anchors every packed vote on the head state's justified + checkpoint as its source, so a vote with a different source is never + selected. The expensive split is skipped and the store is unchanged. """ - chain_store, signed_block, attestation_data = _setup( + chain_store, signed_block, _ = _setup( key_manager, block_participants=[ValidatorIndex(1), ValidatorIndex(2)] ) - # Justified now sits at the attestation's target slot. - store = chain_store.model_copy(update={"latest_justified": attestation_data.target}) + # The attestation's source is the genesis justified checkpoint. + # Shift the head state's justified to the slot-1 block so the source no longer matches. + head_root = chain_store.head + head_state = chain_store.states[head_root] + shifted_state = head_state.model_copy( + update={"latest_justified": Checkpoint(root=head_root, slot=CHAIN_SLOT)} + ) + store = chain_store.model_copy( + update={"states": {**chain_store.states, head_root: shifted_state}} + ) service = _service(peer_id) new_store, aggregates = service._deconstruct_block_into_store(store, signed_block) @@ -945,6 +953,34 @@ def test_skips_when_target_not_ahead_of_justified( assert new_store is store +def test_splits_when_source_is_current_justified( + peer_id: PeerId, key_manager: XmssKeyManager +) -> None: + """ + Source is the head state's justified checkpoint and the block adds a voter -> split runs. + + The attestation sources at the genesis justified checkpoint, which the + head state still carries. With no locally held proof, the block adds new + participants, so the proof is split and folded into the pending pool. + """ + block_participants = [ValidatorIndex(1), ValidatorIndex(2)] + chain_store, signed_block, attestation_data = _setup( + key_manager, block_participants=block_participants + ) + service = _service(peer_id) + + new_store, aggregates = service._deconstruct_block_into_store(chain_store, signed_block) + + # One aggregate emerges, carrying the block's vote and exactly its voters. + assert len(aggregates) == 1 + assert aggregates[0].data == attestation_data + assert set(aggregates[0].proof.participants.to_validator_indices()) == set(block_participants) + + # The pending pool now holds that one combined proof under the vote. + pending_proofs = new_store.latest_new_aggregated_payloads[attestation_data] + assert pending_proofs == {aggregates[0].proof} + + def test_skips_when_block_adds_no_new_validators( peer_id: PeerId, key_manager: XmssKeyManager ) -> None: