diff --git a/CHANGELOG.md b/CHANGELOG.md index 2df38dfb3c46..43dfac1138f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. [7.0.5]: https://github.com/microsoft/CCF/releases/tag/ccf-7.0.5 +### Changed + +- Missing previous-identity endorsement ledger chunks no longer block node startup: the network identity subsystem now exposes a `Partial` fetch status, serves the validated chain prefix, and offers a `trigger_extension()` call so callers can request another fetch attempt. Chain-integrity violations still fail-hard (#7913). + ### Deprecated - Accessing ledger-signature names (table names, exception classes) via `ccf.ledger` now emits a `DeprecationWarning`; import them from `ccf.signatures` instead (#7904). diff --git a/CMakeLists.txt b/CMakeLists.txt index a0e61edf2636..a4c24a9a264d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -771,6 +771,15 @@ if(BUILD_TESTS) historical_queries_test PRIVATE http_parser ccf_kv ccf_endpoints ccf_tasks ) + + add_unit_test( + network_identity_subsystem_test + ${CMAKE_CURRENT_SOURCE_DIR}/src/node/test/network_identity_subsystem.cpp + ) + target_link_libraries( + network_identity_subsystem_test + PRIVATE ccf_kv ccf_endpoints ccf_tasks + ) add_unit_test( indexing_test ${CMAKE_CURRENT_SOURCE_DIR}/src/indexing/test/indexing.cpp diff --git a/include/ccf/network_identity_interface.h b/include/ccf/network_identity_interface.h index 6402eb55713a..446af896ca15 100644 --- a/include/ccf/network_identity_interface.h +++ b/include/ccf/network_identity_interface.h @@ -6,10 +6,8 @@ #include "ccf/node_subsystem_interface.h" #include "ccf/tx_id.h" -#include #include #include -#include #include namespace ccf @@ -24,29 +22,19 @@ namespace ccf /// Status of the network identity endorsement fetching process. enum class FetchStatus : uint8_t { - Retry, ///< Fetching should be retried - Done, ///< Fetching completed successfully - Failed ///< Fetching failed + Done, ///< Fetching trusted identities completed successfully + Partial, ///< Chain is still being built or fetching attempts were + ///< exhausted (e.g. ledger files missing). Readers see the + ///< validated subset; @ref + ///< NetworkIdentitySubsystemInterface::trigger_extension + ///< can request more. + Failed ///< Fetching failed with error and cannot be resumed }; /// Map from sequence number to EC public key, representing the trusted /// network identity keys over the history of the service. using TrustedKeys = std::map; - /// Exception thrown when identity data is requested before the - /// asynchronous identity-history-fetching process has completed. - struct IdentityHistoryNotFetched : public std::exception - { - std::string msg; - - IdentityHistoryNotFetched(std::string msg) : msg(std::move(msg)) {} - - [[nodiscard]] const char* what() const noexcept override - { - return msg.c_str(); - } - }; - /// Interface for accessing the network identity subsystem, which manages /// the service's cryptographic identity and its historical trusted keys. class NetworkIdentitySubsystemInterface : public ccf::AbstractNodeSubSystem @@ -62,34 +50,36 @@ namespace ccf /// Returns a reference to the current network identity. virtual const std::unique_ptr& get() = 0; - /// Returns the current status of endorsement fetching. + /// Returns the current status of endorsement fetching. Callers + /// should check this before acting on a nullopt/nullptr/empty + /// reader result: in @ref FetchStatus::Partial more data may + /// arrive via @ref NetworkIdentitySubsystemInterface::trigger_extension; + /// in @ref FetchStatus::Failed the fetch is unrecoverable. [[nodiscard]] virtual FetchStatus endorsements_fetching_status() const = 0; + /// Schedule a fresh attempt to fetch the next missing predecessor + /// endorsement. No-op outside @ref FetchStatus::Partial. Thread-safe + /// and idempotent: concurrent callers trigger at most one cycle. + virtual void trigger_extension() = 0; + /// Returns the COSE endorsements chain for the given sequence number, - /// or std::nullopt if the chain is not available for the given sequence - /// number. - /// - /// @throws IdentityHistoryNotFetched if identity history fetching has not - /// completed. + /// or std::nullopt if the chain does not yet reach back to the + /// requested seqno (see @ref + /// NetworkIdentitySubsystemInterface::trigger_extension). [[nodiscard]] virtual std::optional get_cose_endorsements_chain(ccf::SeqNo seqno) const = 0; /// Returns the trusted EC public key that was active at the given - /// sequence number, or nullptr if the sequence number precedes the - /// earliest known trusted key. - /// - /// @throws IdentityHistoryNotFetched if identity history fetching has not - /// completed. - /// @throws std::logic_error if no trusted keys have been fetched, or if - /// internal key resolution is inconsistent. + /// sequence number, or nullptr if the sequence number predates the + /// earliest known trusted key (see @ref + /// NetworkIdentitySubsystemInterface::trigger_extension). [[nodiscard]] virtual ccf::crypto::ECPublicKeyPtr get_trusted_identity_for( ccf::SeqNo seqno) const = 0; /// Returns all trusted network identity keys as a map from sequence - /// number to EC public key. - /// - /// @throws IdentityHistoryNotFetched if identity history fetching has not - /// completed. + /// number to EC public key. In @ref FetchStatus::Partial older epochs + /// may be missing -- see @ref + /// NetworkIdentitySubsystemInterface::trigger_extension. [[nodiscard]] virtual TrustedKeys get_trusted_keys() const = 0; }; } diff --git a/src/node/historical_queries_utils.cpp b/src/node/historical_queries_utils.cpp index 9d56583813d2..2cf7255f0c0a 100644 --- a/src/node/historical_queries_utils.cpp +++ b/src/node/historical_queries_utils.cpp @@ -222,10 +222,6 @@ namespace ccf const auto fetching = network_identity_subsystem->endorsements_fetching_status(); - if (fetching == FetchStatus::Retry) - { - return false; - } if (fetching == FetchStatus::Failed) { throw std::runtime_error(fmt::format( @@ -233,14 +229,23 @@ namespace ccf "cannot be fetched", state->transaction_id.seqno)); } - if (fetching != FetchStatus::Done) - { - throw std::logic_error("Unexpected endorsements fetching status"); - } auto cose_endorsements = network_identity_subsystem->get_cose_endorsements_chain( state->transaction_id.seqno); + if (!cose_endorsements.has_value()) + { + // The chain does not yet reach back to the requested seqno + // (either the initial walk hasn't gotten there or a previous + // extension cycle exhausted its retries). Trigger a fresh + // extension cycle (no-op if not in Partial or one is already + // running) and signal the caller to retry. + if (fetching == FetchStatus::Partial) + { + network_identity_subsystem->trigger_extension(); + } + return false; + } state->receipt->cose_endorsements = cose_endorsements; return true; } diff --git a/src/node/rpc/network_identity_accessors.h b/src/node/rpc/network_identity_accessors.h new file mode 100644 index 000000000000..492917c54e5a --- /dev/null +++ b/src/node/rpc/network_identity_accessors.h @@ -0,0 +1,47 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include "ccf/tx_id.h" +#include "service/tables/previous_service_identity.h" +#include "tasks/task_system.h" + +#include +#include +#include + +namespace ccf +{ + struct INodeStateAccessor + { + virtual ~INodeStateAccessor() = default; + + [[nodiscard]] virtual bool is_part_of_network() const = 0; + + // Current service's create-txid, or nullopt if not yet available. + virtual std::optional read_current_service_from() = 0; + + // Topmost previous-identity endorsement entry, or nullopt if none. + virtual std::optional read_topmost_endorsement() = 0; + }; + + struct IHistoricalStateAccessor + { + virtual ~IHistoricalStateAccessor() = default; + + // Endorsement entry at the given historical kv version, or nullopt + // if the historical state is not yet loaded. Implementations may + // throw on hard errors. + virtual std::optional get_endorsement_at(SeqNo) = 0; + }; + + struct TaskScheduler + { + virtual ~TaskScheduler() = default; + + virtual void add_task(std::function fn) = 0; + + virtual void add_delayed_task( + std::function fn, std::chrono::milliseconds delay) = 0; + }; +} diff --git a/src/node/rpc/network_identity_accessors_impl.h b/src/node/rpc/network_identity_accessors_impl.h new file mode 100644 index 000000000000..69791bcb37b3 --- /dev/null +++ b/src/node/rpc/network_identity_accessors_impl.h @@ -0,0 +1,130 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include "ccf/service/tables/service.h" +#include "node/historical_queries.h" +#include "node/rpc/network_identity_accessors.h" +#include "node/rpc/node_interface.h" +#include "service/internal_tables_access.h" +#include "tasks/basic_task.h" +#include "tasks/task_system.h" + +#include +#include +#include +#include + +namespace ccf +{ + class NodeStateAccessor : public INodeStateAccessor + { + protected: + AbstractNodeState& node_state; + + public: + NodeStateAccessor(AbstractNodeState& node_state_) : node_state(node_state_) + {} + + [[nodiscard]] bool is_part_of_network() const override + { + return node_state.is_part_of_network(); + } + + std::optional read_current_service_from() override + { + auto store = node_state.get_store(); + auto tx = store->create_read_only_tx(); + auto* service_info_handle = + tx.template ro(ccf::Tables::SERVICE); + auto service_info = service_info_handle->get(); + if ( + !service_info || !service_info->current_service_create_txid.has_value()) + { + return std::nullopt; + } + if (service_info->status != ServiceStatus::OPEN) + { + // It can happen that node advances its internal state machine to + // part-of-network, but the service opening tx has not been + // replicated yet. This will cause the first fetched endorsement + // to be obsolete, but waiting for ServiceStatus::OPEN is + // sufficient, as it's supposed to arrive in the same TX that + // the previous identity endorsement. + return std::nullopt; + } + return service_info->current_service_create_txid; + } + + std::optional read_topmost_endorsement() override + { + auto store = node_state.get_store(); + auto tx = store->create_read_only_tx(); + return tx + .template ro( + ccf::Tables::PREVIOUS_SERVICE_IDENTITY_ENDORSEMENT) + ->get(); + } + }; + + class HistoricalStateAccessor : public IHistoricalStateAccessor + { + protected: + std::shared_ptr historical_cache; + + public: + HistoricalStateAccessor( + std::shared_ptr historical_cache_) : + historical_cache(std::move(historical_cache_)) + {} + + std::optional get_endorsement_at(SeqNo seq) override + { + auto state = historical_cache->get_state_at( + ccf::historical::CompoundHandle{ + ccf::historical::RequestNamespace::System, seq}, + seq); + if (!state) + { + return std::nullopt; + } + if (!state->store) + { + throw std::runtime_error(fmt::format( + "Historical state with seqno {} is loaded but its store is " + "missing", + seq)); + } + auto htx = state->store->create_read_only_tx(); + auto endorsement = + htx + .template ro( + ccf::Tables::PREVIOUS_SERVICE_IDENTITY_ENDORSEMENT) + ->get(); + if (!endorsement.has_value()) + { + throw std::runtime_error(fmt::format( + "COSE endorsement entry for seqno {} is missing from its " + "historical state", + seq)); + } + return endorsement; + } + }; + + class TaskSchedulerImpl : public TaskScheduler + { + public: + void add_task(std::function fn) override + { + ccf::tasks::add_task(ccf::tasks::make_basic_task(std::move(fn))); + } + + void add_delayed_task( + std::function fn, std::chrono::milliseconds delay) override + { + auto task = ccf::tasks::make_basic_task(std::move(fn)); + ccf::tasks::add_delayed_task(task, delay); + } + }; +} diff --git a/src/node/rpc/network_identity_chain_helpers.h b/src/node/rpc/network_identity_chain_helpers.h new file mode 100644 index 000000000000..49709c0ae2bf --- /dev/null +++ b/src/node/rpc/network_identity_chain_helpers.h @@ -0,0 +1,74 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include "ccf/network_identity_interface.h" +#include "ccf/tx_id.h" +#include "consensus/aft/raft_types.h" +#include "service/tables/previous_service_identity.h" + +#include +#include + +namespace ccf +{ + inline bool is_self_endorsement(const ccf::CoseEndorsement& endorsement) + { + return !endorsement.previous_version.has_value(); + } + + inline bool is_ill_formed(const ccf::CoseEndorsement& endorsement) + { + return endorsement.endorsement_epoch_end.has_value() && + endorsement.endorsement_epoch_end->seqno < + endorsement.endorsement_epoch_begin.seqno; + } + + inline void verify_endorsements_connected( + const ccf::CoseEndorsement& newer, const ccf::CoseEndorsement& older) + { + if (!older.endorsement_epoch_end.has_value()) + { + throw std::logic_error(fmt::format( + "COSE endorsement chain integrity is violated, previous endorsement " + "from {} does not have an epoch end", + older.endorsement_epoch_begin.to_str())); + } + + if ( + newer.endorsement_epoch_begin.view - aft::starting_view_change != + older.endorsement_epoch_end->view || + newer.endorsement_epoch_begin.seqno - 1 != + older.endorsement_epoch_end->seqno) + { + throw std::logic_error(fmt::format( + "COSE endorsement chain integrity is violated, previous endorsement " + "epoch end {} is not chained with newer endorsement epoch begin {}", + older.endorsement_epoch_end->to_str(), + newer.endorsement_epoch_begin.to_str())); + } + } + + // Verify the newest endorsement immediately precedes the current service. + inline void validate_chain_front_connection( + const ccf::CoseEndorsement& newest, const ccf::TxID& current_service_from) + { + if (!newest.endorsement_epoch_end.has_value()) + { + throw std::logic_error(fmt::format( + "The last fetched endorsement at {} has no epoch end", + newest.endorsement_epoch_begin.seqno)); + } + if ( + current_service_from.view - aft::starting_view_change != + newest.endorsement_epoch_end->view || + current_service_from.seqno - 1 != newest.endorsement_epoch_end->seqno) + { + throw std::logic_error(fmt::format( + "COSE endorsement chain integrity is violated, the current service " + "start at {} is not chained with previous endorsement ending at {}", + current_service_from.to_str(), + newest.endorsement_epoch_end->to_str())); + } + } +} diff --git a/src/node/rpc/network_identity_subsystem.h b/src/node/rpc/network_identity_subsystem.h index ed4b991c455d..152b023699a5 100644 --- a/src/node/rpc/network_identity_subsystem.h +++ b/src/node/rpc/network_identity_subsystem.h @@ -2,35 +2,35 @@ // Licensed under the Apache 2.0 License. #pragma once +#include "ccf/crypto/cose_verifier.h" +#include "ccf/crypto/ec_public_key.h" +#include "ccf/ds/hex.h" #include "ccf/network_identity_interface.h" #include "ccf/service/tables/service.h" +#include "node/cose_common.h" #include "node/historical_queries.h" #include "node/identity.h" +#include "node/rpc/network_identity_accessors.h" +#include "node/rpc/network_identity_accessors_impl.h" +#include "node/rpc/network_identity_chain_helpers.h" #include "node/rpc/node_interface.h" #include "service/internal_tables_access.h" -#include +#include +#include +#include +#include +#include +#include namespace ccf { - static std::string format_epoch(const std::optional& epoch_end) + inline std::string format_epoch(const std::optional& epoch_end) { return epoch_end.has_value() ? epoch_end->to_str() : "null"; } - static bool is_self_endorsement(const ccf::CoseEndorsement& endorsement) - { - return !endorsement.previous_version.has_value(); - } - - static bool is_ill_formed(const ccf::CoseEndorsement& endorsement) - { - return endorsement.endorsement_epoch_end.has_value() && - endorsement.endorsement_epoch_end->seqno < - endorsement.endorsement_epoch_begin.seqno; - } - - static void validate_fetched_endorsement( + inline void validate_fetched_endorsement( const ccf::CoseEndorsement& endorsement) { LOG_INFO_FMT( @@ -79,97 +79,192 @@ namespace ccf } } - static void validate_chain_integrity( - const ccf::CoseEndorsement& newer, const ccf::CoseEndorsement& older) - { - if (!older.endorsement_epoch_end.has_value()) - { - throw std::logic_error(fmt::format( - "COSE endorsement chain integrity is violated, previous endorsement " - "from {} does not have an epoch end", - older.endorsement_epoch_begin.to_str())); - } - - if ( - newer.endorsement_epoch_begin.view - aft::starting_view_change != - older.endorsement_epoch_end->view || - newer.endorsement_epoch_begin.seqno - 1 != - older.endorsement_epoch_end->seqno) - { - throw std::logic_error(fmt::format( - "COSE endorsement chain integrity is violated, previous endorsement " - "epoch end {} is not chained with newer endorsement epoch begin {}", - older.endorsement_epoch_end->to_str(), - newer.endorsement_epoch_begin.to_str())); - } - } - class NetworkIdentitySubsystem : public NetworkIdentitySubsystemInterface { + // Threading: every method is either a lock-taking wrapper that + // forwards to its `_unsafe` sibling, or an `_unsafe` method that + // assumes chain_mutex is held and may only call other `_unsafe` + // methods (calling a wrapper would re-enter the non-recursive + // chain_mutex and deadlock). + protected: - AbstractNodeState& node_state; + static constexpr std::chrono::milliseconds RETRY_INTERVAL{100}; + static constexpr int MAX_FETCH_ATTEMPTS = 30; + + // Immutable after construction. + std::shared_ptr node_state_accessor; + std::shared_ptr historical_state_accessor; const std::unique_ptr& network_identity; - std::shared_ptr historical_cache; + std::shared_ptr scheduler; + + // All mutable state is guarded by chain_mutex. + mutable std::mutex chain_mutex; + std::map endorsements; std::map trusted_keys; std::optional current_service_from; SeqNo earliest_endorsed_seq{0}; - std::atomic fetch_status{FetchStatus::Retry}; bool has_predecessors{false}; + int fetch_attempts{0}; + FetchStatus fetch_status{FetchStatus::Partial}; + // True while a fetch cycle is in flight; cleared by complete_unsafe / + // fail_fetching_unsafe. Starts true because the constructor + // synchronously claims the initial bootstrap cycle. + bool fetch_active{true}; public: NetworkIdentitySubsystem( AbstractNodeState& node_state_, const std::unique_ptr& network_identity_, std::shared_ptr historical_cache_) : - node_state(node_state_), + NetworkIdentitySubsystem( + std::make_shared(node_state_), + std::make_shared(std::move(historical_cache_)), + network_identity_, + std::make_shared()) + {} + + NetworkIdentitySubsystem( + std::shared_ptr node_state_accessor_, + std::shared_ptr historical_state_accessor_, + const std::unique_ptr& network_identity_, + std::shared_ptr scheduler_) : + node_state_accessor(std::move(node_state_accessor_)), + historical_state_accessor(std::move(historical_state_accessor_)), network_identity(network_identity_), - historical_cache(std::move(historical_cache_)) + scheduler(std::move(scheduler_)) { fetch_first(); } + // --- Public wrappers ---------------------------------------------- + [[nodiscard]] FetchStatus endorsements_fetching_status() const override { - return fetch_status.load(); + std::lock_guard g(chain_mutex); + return endorsements_fetching_status_unsafe(); } + // Returns an immutable reference; no lock needed. const std::unique_ptr& get() override { return network_identity; } + void trigger_extension() override + { + std::lock_guard g(chain_mutex); + trigger_extension_unsafe(); + } + [[nodiscard]] std::optional get_cose_endorsements_chain(ccf::SeqNo seqno) const override { - if (fetch_status.load() != FetchStatus::Done) + std::lock_guard g(chain_mutex); + return get_cose_endorsements_chain_unsafe(seqno); + } + + [[nodiscard]] ccf::crypto::ECPublicKeyPtr get_trusted_identity_for( + ccf::SeqNo seqno) const override + { + std::lock_guard g(chain_mutex); + return get_trusted_identity_for_unsafe(seqno); + } + + [[nodiscard]] TrustedKeys get_trusted_keys() const override + { + std::lock_guard g(chain_mutex); + return get_trusted_keys_unsafe(); + } + + private: + // --- Private wrappers (scheduler-callback entry points) ---------- + + void fetch_first() + { + std::lock_guard g(chain_mutex); + fetch_first_unsafe(); + } + + void fetch_next_at(ccf::SeqNo seq) + { + std::lock_guard g(chain_mutex); + fetch_next_at_unsafe(seq); + } + + // --- `_unsafe` methods (chain_mutex held by caller) -------------- + + [[nodiscard]] FetchStatus endorsements_fetching_status_unsafe() const + { + return fetch_status; + } + + void trigger_extension_unsafe() + { + if (fetch_status != FetchStatus::Partial) + { + return; + } + if (fetch_active) + { + // A cycle is already in flight (initial bootstrap or a prior + // extension); folding this trigger in is a no-op. + return; + } + fetch_active = true; + fetch_attempts = 0; + + // Resume from the earliest validated link, or re-run fetch_first + // when nothing has been inserted yet (ill-formed or unfetched + // topmost). + std::optional seq; + if (!endorsements.empty()) + { + seq = endorsements.begin()->second.previous_version; + } + + if (seq.has_value()) + { + scheduler->add_task([this, s = *seq]() { this->fetch_next_at(s); }); + } + else { - throw IdentityHistoryNotFetched(fmt::format( - "COSE endorsements chain requested for seqno {} but identity " - "history fetching has not been completed yet", - seqno)); + scheduler->add_task([this]() { this->fetch_first(); }); } + } + [[nodiscard]] std::optional + get_cose_endorsements_chain_unsafe(ccf::SeqNo seqno) const + { if (!current_service_from.has_value()) { - LOG_FAIL_FMT( - "Unset current_service_from when fetching endorsements chain"); return std::nullopt; } - if (!has_predecessors || seqno >= current_service_from->seqno) + if (seqno >= current_service_from->seqno) { return CoseEndorsementsChain{}; } + if (!has_predecessors) + { + // Done: confirmed self-only chain -> empty. Partial: topmost + // not yet read -> nullopt so caller can trigger_extension. + return fetch_status == FetchStatus::Done ? + std::optional(CoseEndorsementsChain{}) : + std::nullopt; + } + auto it = endorsements.upper_bound(seqno); if (it == endorsements.begin()) { - LOG_INFO_FMT( - "No endorsements found for seqno {}, earliest endorsed is {}", - seqno, - earliest_endorsed_seq); - return {}; + // Below the earliest validated link. Done: pre-history -> empty. + // Partial: caller may trigger_extension -> nullopt. + if (fetch_status == FetchStatus::Done) + { + return CoseEndorsementsChain{}; + } + return std::nullopt; } CoseEndorsementsChain result; @@ -181,25 +276,14 @@ namespace ccf return result; } - [[nodiscard]] ccf::crypto::ECPublicKeyPtr get_trusted_identity_for( - ccf::SeqNo seqno) const override + [[nodiscard]] ccf::crypto::ECPublicKeyPtr get_trusted_identity_for_unsafe( + ccf::SeqNo seqno) const { - if (fetch_status.load() != FetchStatus::Done) - { - throw IdentityHistoryNotFetched(fmt::format( - "Trusted key requested for seqno {} but identity history " - "fetching has not been completed yet", - seqno)); - } - if (trusted_keys.empty()) - { - throw std::logic_error(fmt::format( - "No trusted keys fetched when requested one for seqno {}", seqno)); - } auto it = trusted_keys.upper_bound(seqno); if (it == trusted_keys.begin()) { - // The earliest known trusted seqno is greater than the requested one. + // Seqno predates the earliest known key (or trusted_keys is + // still empty); caller may trigger_extension. return nullptr; } const auto& [key_seqno, key_ptr] = *(--it); @@ -213,171 +297,113 @@ namespace ccf return key_ptr; } - [[nodiscard]] TrustedKeys get_trusted_keys() const override + [[nodiscard]] TrustedKeys get_trusted_keys_unsafe() const { - if (fetch_status.load() != FetchStatus::Done) - { - throw IdentityHistoryNotFetched( - "Trusted keys requested but identity history fetching has not " - "completed yet"); - } return trusted_keys; } - private: - void retry_first_fetch() + void retry_first_fetch_unsafe() { using namespace std::chrono_literals; static constexpr auto retry_after = 1s; - ccf::tasks::add_delayed_task( - ccf::tasks::make_basic_task([this]() { this->fetch_first(); }), - retry_after); + + ++fetch_attempts; + if (fetch_attempts >= MAX_FETCH_ATTEMPTS) + { + LOG_FAIL_FMT( + "Could not start fetching network identity after {} attempts at " + "{}ms intervals. Cycle ends.", + MAX_FETCH_ATTEMPTS, + std::chrono::duration_cast(retry_after) + .count()); + complete_unsafe(FetchStatus::Partial); + return; + } + + scheduler->add_delayed_task( + [this]() { this->fetch_first(); }, retry_after); } - void fail_fetching(const std::string& err = "") + void fail_fetching_unsafe(const std::string& err = "") { if (!err.empty()) { LOG_FAIL_FMT("Failed fetching network identity: {}", err); } - fetch_status.store(FetchStatus::Failed); + fetch_status = FetchStatus::Failed; + fetch_active = false; - // The caller may want to re-capture this, but by default it's supposed to - // fail the node startup early. This is purely reading, so there's no risk - // of corruption, but the endorsement chain is essential for the node to - // produce receipts for the past epochs, which is a must-have - // functionality. + // By default this fails node startup early. The throw unwinds + // out through the lock-taking wrapper, releasing chain_mutex + // cleanly via the lock_guard destructor. Readers are unaffected: + // they will observe FetchStatus::Failed on their next call and + // return empty/nullopt rather than throwing. throw std::runtime_error("Failed fetching network identity: " + err); } - void complete_fetching() + void complete_unsafe(FetchStatus target_status) { - if (!current_service_from.has_value()) - { - fail_fetching("Unset current_service_from when completing fetching"); - return; // to silence clang-tidy unchecked optional - } - - if (!endorsements.empty()) - { - auto next = endorsements.begin(); - auto prev = next++; - try - { - while (next != endorsements.end()) - { - validate_chain_integrity(next->second, prev->second); - ++prev; - ++next; - } - } - catch (const std::exception& e) - { - fail_fetching(e.what()); - } - - const auto& last = prev->second; - if (!last.endorsement_epoch_end.has_value()) - { - fail_fetching(fmt::format( - "The last fetched endorsement at {} has no epoch end", - last.endorsement_epoch_begin.seqno)); - return; // to silence clang-tidy unchecked optional - } - - if ( - current_service_from->view - aft::starting_view_change != - last.endorsement_epoch_end->view || - current_service_from->seqno - 1 != last.endorsement_epoch_end->seqno) - { - fail_fetching(fmt::format( - "COSE endorsement chain integrity is violated, the current " - "service start at {} is not chained with previous endorsement " - "ending at {}", - current_service_from->to_str(), - last.endorsement_epoch_end->to_str())); - } - } - - try - { - build_trusted_key_chain(); - } - catch (const std::exception& e) - { - fail_fetching(e.what()); - } - - fetch_status.store(FetchStatus::Done); + fetch_attempts = 0; + fetch_status = target_status; + fetch_active = false; } - void fetch_first() + void fetch_first_unsafe() { - if (!node_state.is_part_of_network()) + if (!node_state_accessor->is_part_of_network()) { LOG_INFO_FMT( "Retry fetching network identity as node is not part of the network " "yet"); - retry_first_fetch(); + retry_first_fetch_unsafe(); return; } - auto store = node_state.get_store(); - auto tx = store->create_read_only_tx(); - if (!current_service_from.has_value()) { - auto* service_info_handle = - tx.template ro(ccf::Tables::SERVICE); - auto service_info = service_info_handle->get(); - if ( - !service_info || - !service_info->current_service_create_txid.has_value()) + auto cs = node_state_accessor->read_current_service_from(); + if (!cs.has_value()) { LOG_INFO_FMT( - "Retrying fetching network identity as current service create txid " - "is not yet available"); - retry_first_fetch(); + "Retrying fetching network identity as current service create " + "txid is not yet available or service is not yet open"); + retry_first_fetch_unsafe(); return; } - - if (service_info->status != ServiceStatus::OPEN) - { - // It can happen that node advances its internal state machine to - // part-of-network, but the service opening tx has not been replicated - // yet. This will cause the first fetched endorsement to be obsolete, - // but waiting for ServiceStatus::OPEN is sufficient, as it's supposed - // to arrive in the same TX that the previous identity endorsement. - LOG_INFO_FMT( - "Retrying fetching network identity as service is not yet open"); - retry_first_fetch(); - return; - } - - current_service_from = service_info->current_service_create_txid; + current_service_from = cs; } - auto* previous_identity_endorsement = - tx.ro( - ccf::Tables::PREVIOUS_SERVICE_IDENTITY_ENDORSEMENT); + // Seed trusted_keys with the current-service key (idempotent). + if (trusted_keys.find(current_service_from->seqno) == trusted_keys.end()) + { + trusted_keys.insert( + {current_service_from->seqno, + ccf::crypto::make_ec_public_key( + network_identity->get_key_pair()->public_key_der())}); + } - auto endorsement = previous_identity_endorsement->get(); + auto endorsement = node_state_accessor->read_topmost_endorsement(); if (!endorsement.has_value()) { LOG_INFO_FMT( "Retrying fetching network identity as there is no previous service " "identity endorsement yet"); - retry_first_fetch(); + retry_first_fetch_unsafe(); return; } + // Reset the per-step counter for the chain walk: retries spent + // waiting for the topmost entry must not eat into the per-chunk + // budget consumed below. + fetch_attempts = 0; + if (is_self_endorsement(endorsement.value())) { if ( current_service_from->seqno != endorsement->endorsement_epoch_begin.seqno) { - fail_fetching(fmt::format( + fail_fetching_unsafe(fmt::format( "The first fetched endorsement is a self-endorsement with seqno {} " "which is different from current_service_create_txid {}", endorsement->endorsement_epoch_begin.seqno, @@ -390,23 +416,32 @@ namespace ccf current_service_from->seqno); has_predecessors = false; - complete_fetching(); + complete_unsafe(FetchStatus::Done); return; } has_predecessors = true; earliest_endorsed_seq = current_service_from->seqno; - process_endorsement(endorsement.value()); + process_endorsement_unsafe(endorsement.value()); } - void process_endorsement(const ccf::CoseEndorsement& endorsement) + void process_endorsement_unsafe(const ccf::CoseEndorsement& endorsement) { + try + { + validate_fetched_endorsement(endorsement); + } + catch (const std::exception& e) + { + fail_fetching_unsafe(e.what()); + } + if (is_ill_formed(endorsement)) { - // For double-sealed cases, which could have happened in the past. We - // mark with failed logs, but skip intentionally if there are other - // endorsements that follow. The overall chain integrity will be checked - // at the end and will fail anyway if it's not intact. + // For double-sealed cases, which could have happened in the past. + // Skip intentionally if a predecessor exists; the next link's + // chain-integrity check will fail-hard if the resulting chain is + // inconsistent. if (endorsement.previous_version.has_value()) { LOG_INFO_FMT( @@ -414,80 +449,114 @@ namespace ccf "predecessor, so skipping this entry", endorsement.endorsement_epoch_begin.to_str(), format_epoch(endorsement.endorsement_epoch_end)); - fetch_next_at(endorsement.previous_version.value()); + fetch_next_at_unsafe(endorsement.previous_version.value()); return; } - fail_fetching(fmt::format( + fail_fetching_unsafe(fmt::format( "Found an ill-formed endorsement for {} - {} which has no " "predecessor", endorsement.endorsement_epoch_begin.to_str(), format_epoch(endorsement.endorsement_epoch_end))); } + process_link_unsafe(endorsement); + } + + // Verify and append a single endorsement to the chain. Used for + // both the initial walk and extension cycles; the two cases differ + // only in whether `endorsements` is empty when the link arrives. + // + // Chain-link predicate (oldest->newest): + // adjacent (older A, newer B): B.endorsed_key == A.endorsing_key + // newest entry N: N.endorsing_key == current_service_pkey + // Extending backward with NEW becoming the new oldest: + // existing_earliest.endorsed_key == NEW.endorsing_key + void process_link_unsafe(const ccf::CoseEndorsement& endorsement) + { const auto from = endorsement.endorsement_epoch_begin.seqno; + if (is_self_endorsement(endorsement)) { if (endorsements.find(from) == endorsements.end()) { - fail_fetching(fmt::format( + fail_fetching_unsafe(fmt::format( "Fetched self-endorsement with seqno {} which has not been seen", from)); } - LOG_INFO_FMT("Got self-endorsement at {}, stopping fetching", from); - complete_fetching(); + LOG_INFO_FMT( + "COSE endorsement chain reached self-endorsement at {}", from); + complete_unsafe(FetchStatus::Done); return; } - if (from >= earliest_endorsed_seq) - { - fail_fetching(fmt::format( - "Fetched service endorsement with seqno {} which is greater than " - "the earliest known in the chain {}", - from, - earliest_endorsed_seq)); - } - if (!endorsement.endorsement_epoch_end.has_value()) { - fail_fetching( + fail_fetching_unsafe( fmt::format("Fetched endorsement at {} has no epoch end", from)); return; // to silence clang-tidy unchecked optional } - earliest_endorsed_seq = from; + if (!current_service_from.has_value()) + { + fail_fetching_unsafe("Unset current_service_from when extending chain"); + return; // to silence clang-tidy unchecked optional + } + if (from >= earliest_endorsed_seq) + { + fail_fetching_unsafe(fmt::format( + "Fetched service endorsement with seqno {} which is not earlier " + "than the current earliest known {}", + from, + earliest_endorsed_seq)); + } if (endorsements.find(from) != endorsements.end()) { - fail_fetching(fmt::format( + fail_fetching_unsafe(fmt::format( "Fetched service endorsement with seqno {} which already exists", from)); } - LOG_INFO_FMT( - "Fetched service endorsement from {} to {}", - from, - endorsement.endorsement_epoch_end->seqno); - endorsements.insert({from, endorsement}); - - if (endorsement.previous_version.has_value()) + std::vector expected_new_endorsing_key_der; + if (!endorsements.empty()) { - fetch_next_at(endorsement.previous_version.value()); - return; - } - - complete_fetching(); - } + const auto& existing_earliest = endorsements.begin()->second; + auto trusted_it = + trusted_keys.find(existing_earliest.endorsement_epoch_begin.seqno); + if (trusted_it == trusted_keys.end()) + { + fail_fetching_unsafe(fmt::format( + "Missing trusted key entry for existing earliest endorsement " + "at seqno {}", + existing_earliest.endorsement_epoch_begin.seqno)); + return; // to silence clang-tidy unchecked iterator + } + expected_new_endorsing_key_der = trusted_it->second->public_key_der(); - void build_trusted_key_chain() - { - if (!current_service_from.has_value()) + try + { + verify_endorsements_connected(existing_earliest, endorsement); + } + catch (const std::exception& e) + { + fail_fetching_unsafe(e.what()); + } + } + else { - throw std::logic_error( - "Attempting to build trusted key chain but no current service " - "created seqno fetched"); + expected_new_endorsing_key_der = + network_identity->get_key_pair()->public_key_der(); + try + { + validate_chain_front_connection(endorsement, *current_service_from); + } + catch (const std::exception& e) + { + fail_fetching_unsafe(e.what()); + } } - std::span previous_key_der{}; - for (const auto& [seqno, endorsement] : endorsements) + ccf::crypto::ECPublicKeyPtr new_trusted_key; + try { auto verifier = ccf::crypto::make_cose_verifier_from_key(endorsement.endorsing_key); @@ -495,117 +564,103 @@ namespace ccf if (!verifier->verify(endorsement.endorsement, endorsed_key)) { throw std::logic_error(fmt::format( - "COSE endorsement chain integrity is violated, endorsement from {} " - "to {} failed signature verification", + "Endorsement from {} to {} failed signature verification", endorsement.endorsement_epoch_begin.to_str(), format_epoch(endorsement.endorsement_epoch_end))); } - - LOG_INFO_FMT( - "Adding trusted seq {} key {}", - endorsement.endorsement_epoch_begin.seqno, - ccf::crypto::b64_from_raw(endorsed_key)); - trusted_keys.insert( - {endorsement.endorsement_epoch_begin.seqno, - ccf::crypto::make_ec_public_key(endorsed_key)}); - if ( - !previous_key_der.empty() && + endorsement.endorsing_key.size() != + expected_new_endorsing_key_der.size() || !std::equal( - previous_key_der.begin(), - previous_key_der.end(), - endorsed_key.begin(), - endorsed_key.end())) + endorsement.endorsing_key.begin(), + endorsement.endorsing_key.end(), + expected_new_endorsing_key_der.begin())) { throw std::logic_error(fmt::format( - "Endorsement from {} to {} over public key {} doesn't chain with " - "the previous endorsement with key {}", - endorsement.endorsement_epoch_begin.seqno, + "Endorsement from {} to {} signed by key {} does not chain with " + "the expected next key {}", + endorsement.endorsement_epoch_begin.to_str(), format_epoch(endorsement.endorsement_epoch_end), - ccf::ds::to_hex(endorsed_key), - ccf::ds::to_hex(previous_key_der))); + ccf::ds::to_hex(endorsement.endorsing_key), + ccf::ds::to_hex(expected_new_endorsing_key_der))); } - - previous_key_der = endorsement.endorsing_key; + new_trusted_key = ccf::crypto::make_ec_public_key(endorsed_key); } - - const auto& current_pkey = - network_identity->get_key_pair()->public_key_der(); - if ( - !previous_key_der.empty() && - !std::equal( - previous_key_der.begin(), - previous_key_der.end(), - current_pkey.begin(), - current_pkey.end())) + catch (const std::exception& e) { - throw std::logic_error(fmt::format( - "Current service identity public key {} does not match the last " - "endorsing key {}", - ccf::ds::to_hex(current_pkey), - ccf::ds::to_hex(previous_key_der))); + fail_fetching_unsafe(e.what()); } LOG_INFO_FMT( - "Adding trusted seq {} key {}", - current_service_from->seqno, - ccf::crypto::b64_from_raw(current_pkey)); - trusted_keys.insert( - {current_service_from->seqno, - ccf::crypto::make_ec_public_key(current_pkey)}); - } + "COSE endorsement chain extended to seqno {} (epoch {} - {})", + from, + endorsement.endorsement_epoch_begin.to_str(), + endorsement.endorsement_epoch_end->to_str()); - void fetch_next_at(ccf::SeqNo seq) - { - auto state = historical_cache->get_state_at( - ccf::historical::CompoundHandle{ - ccf::historical::RequestNamespace::System, seq}, - seq); - if (!state) - { - retry_fetch_next(seq); - return; - } + endorsements.insert({from, endorsement}); + trusted_keys.insert({from, std::move(new_trusted_key)}); + earliest_endorsed_seq = from; - if (!state->store) + if (!endorsement.previous_version.has_value()) { - fail_fetching(fmt::format( - "Fetched historical state with seqno {} with missing store", seq)); + fail_fetching_unsafe(fmt::format( + "Non-self-endorsement at seqno {} unexpectedly has no " + "previous_version", + from)); + return; // to silence clang-tidy unchecked optional } - auto htx = state->store->create_read_only_tx(); - const auto endorsement = - htx - .template ro( - ccf::Tables::PREVIOUS_SERVICE_IDENTITY_ENDORSEMENT) - ->get(); + fetch_next_at_unsafe(*endorsement.previous_version); + } - if (!endorsement.has_value()) + void fetch_next_at_unsafe(ccf::SeqNo seq) + { + // Bail out on stale callbacks from cycles that have already + // ended. Belt-and-braces: under the monitor discipline a single + // cycle has at most one outstanding scheduled task, so genuine + // stale callbacks should not occur. Cheap to keep as a guard. + if (!fetch_active) { - fail_fetching( - fmt::format("Fetched COSE endorsement for {} is invalid", seq)); - return; // to silence clang-tidy unchecked optional + return; } + std::optional endorsement; try { - validate_fetched_endorsement(endorsement.value()); + endorsement = historical_state_accessor->get_endorsement_at(seq); } catch (const std::exception& e) { - fail_fetching(e.what()); + fail_fetching_unsafe(e.what()); + } + if (!endorsement.has_value()) + { + retry_fetch_next_unsafe(seq); + return; } - process_endorsement(endorsement.value()); + // Successful fetch: reset attempt counter. + fetch_attempts = 0; + process_endorsement_unsafe(endorsement.value()); } - void retry_fetch_next(ccf::SeqNo seq) + void retry_fetch_next_unsafe(ccf::SeqNo seq) { - using namespace std::chrono_literals; - static constexpr auto retry_after = 100ms; - ccf::tasks::add_delayed_task( - ccf::tasks::make_basic_task( - [this, seq]() { this->fetch_next_at(seq); }), - retry_after); + ++fetch_attempts; + if (fetch_attempts >= MAX_FETCH_ATTEMPTS) + { + LOG_FAIL_FMT( + "Could not fetch previous service identity endorsement at seqno {} " + "after {} attempts at {}ms intervals. Cycle ends; status is " + "Partial. Callers may invoke trigger_extension to retry.", + seq, + MAX_FETCH_ATTEMPTS, + RETRY_INTERVAL.count()); + complete_unsafe(FetchStatus::Partial); + return; + } + + scheduler->add_delayed_task( + [this, seq]() { this->fetch_next_at(seq); }, RETRY_INTERVAL); } }; } diff --git a/src/node/test/network_identity_subsystem.cpp b/src/node/test/network_identity_subsystem.cpp new file mode 100644 index 000000000000..ac9d5afd75a5 --- /dev/null +++ b/src/node/test/network_identity_subsystem.cpp @@ -0,0 +1,1494 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. + +#include "node/rpc/network_identity_subsystem.h" + +#include "cose/cose_rs_ffi.h" +#include "crypto/openssl/ec_key_pair.h" +#include "node/rpc/network_identity_accessors.h" +#include "node/rpc/network_identity_chain_helpers.h" +#include "tasks/basic_task.h" + +#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; + +namespace +{ + // Build a synthetic CoseEndorsement covering range [begin_view:begin_seqno, + // end_view:end_seqno]. Signature/key payloads are left empty: only the + // range fields are exercised by the pure-helper validators tested here. + ccf::CoseEndorsement make_range_endorsement( + ccf::View begin_view, + ccf::SeqNo begin_seqno, + ccf::View end_view, + ccf::SeqNo end_seqno, + std::optional previous_version = ccf::kv::Version{ + 1} /* default: not self-endorsement */) + { + ccf::CoseEndorsement e; + e.endorsement_epoch_begin = ccf::TxID{begin_view, begin_seqno}; + e.endorsement_epoch_end = ccf::TxID{end_view, end_seqno}; + e.previous_version = previous_version; + return e; + } + + ccf::CoseEndorsement make_self_endorsement( + ccf::View begin_view, ccf::SeqNo begin_seqno) + { + ccf::CoseEndorsement e; + e.endorsement_epoch_begin = ccf::TxID{begin_view, begin_seqno}; + // Self-endorsements have neither epoch_end nor previous_version + e.endorsement_epoch_end = std::nullopt; + e.previous_version = std::nullopt; + return e; + } + + // Test scaffolding ----------------------------------------------------- + + // Mocks -- in-memory canned responses. `unavailable` lets tests + // simulate "ledger chunk is not yet loadable" by making + // get_endorsement_at(seq) return nullopt. + class MockNodeStateAccessor : public ccf::INodeStateAccessor + { + public: + bool part_of_network = true; + std::optional current_service_from; + std::optional topmost; + + [[nodiscard]] bool is_part_of_network() const override + { + return part_of_network; + } + std::optional read_current_service_from() override + { + return current_service_from; + } + std::optional read_topmost_endorsement() override + { + return topmost; + } + }; + + class MockHistoricalStateAccessor : public ccf::IHistoricalStateAccessor + { + public: + std::map entries; + std::set unavailable; + + std::optional get_endorsement_at( + ccf::SeqNo seq) override + { + if (unavailable.contains(seq)) + { + return std::nullopt; + } + auto it = entries.find(seq); + if (it == entries.end()) + { + return std::nullopt; + } + return it->second; + } + }; + + // Fake TaskScheduler -- queues immediate and delayed tasks for tests + // to fire deterministically. Thread-safe for concurrent add_task / + // add_delayed_task calls (the subsystem may schedule from multiple + // threads in trigger_extension scenarios); single-threaded for + // drain methods, which tests call from the main thread. + class FakeTaskScheduler : public ccf::TaskScheduler + { + public: + std::mutex mtx; + std::deque> immediate; + std::deque> delayed; + + void add_task(std::function fn) override + { + std::lock_guard g(mtx); + immediate.push_back(std::move(fn)); + } + + void add_delayed_task( + std::function fn, std::chrono::milliseconds /* delay */) override + { + std::lock_guard g(mtx); + delayed.push_back(std::move(fn)); + } + + [[nodiscard]] size_t pending_immediate_count() + { + std::lock_guard g(mtx); + return immediate.size(); + } + + [[nodiscard]] size_t pending_delayed_count() + { + std::lock_guard g(mtx); + return delayed.size(); + } + + // Drain all immediate tasks, calling each. May push new ones. + void run_immediate() + { + while (true) + { + std::function fn; + { + std::lock_guard g(mtx); + if (immediate.empty()) + { + return; + } + fn = std::move(immediate.front()); + immediate.pop_front(); + } + fn(); + } + } + + // Fire all delayed tasks and drain any immediate tasks they enqueue. + // Returns the number of delayed tasks fired. + size_t fire_delayed_once() + { + std::deque> batch; + { + std::lock_guard g(mtx); + batch = std::move(delayed); + delayed.clear(); + } + size_t ran = 0; + for (auto& fn : batch) + { + fn(); + ++ran; + } + run_immediate(); + return ran; + } + + // Loop firing immediate + delayed until both are empty. + void run_to_completion(size_t safety_cap = 100) + { + run_immediate(); + while (pending_delayed_count() > 0 && safety_cap-- > 0) + { + fire_delayed_once(); + } + } + }; + + // ChainBuilder -- mints real COSE-signed endorsements via the same + // cose-rs path production uses. Layout mirrors production: + // + // * Service S_0 self-endorses, producing entry e_0. e_0 has no + // epoch_end and no previous_version. Its epoch_begin is the create + // TxID of S_0. + // * Each subsequent service S_i (i>=1) creates entry e_i whose + // endorsing_key is S_i's public key and whose COSE payload is + // S_{i-1}'s public key. e_i.epoch_begin equals the previous entry's + // epoch_begin if the previous is a self-endorsement, else + // next_tx_if_recovery(prev.epoch_end). e_i.epoch_end is one txid + // before S_i's create TxID. e_i.previous_version is the kv::Version + // at which e_{i-1} was written. + // * In production, e_N (the topmost) is what + // `read_topmost_endorsement` returns. e_0..e_{N-1} are what + // `get_endorsement_at(write_version)` returns. The current + // service's public key (in DER) must match e_N's signing key, + // which is what `process_link` checks against + // `network_identity->get_key_pair()->public_key_der()`. + // + // For Test purposes we ignore the "previous_root" semantics -- the + // subsystem doesn't validate it. + class ChainBuilder + { + public: + std::vector> service_keys; + std::vector entries; + std::vector write_versions; + ccf::kv::Version next_write_version = 1; + + // Add the deepest service in the chain, self-endorsing. + ChainBuilder& add_self(ccf::TxID begin) + { + auto kp = std::make_shared( + ccf::crypto::CurveID::SECP384R1); + ccf::CoseEndorsement e; + e.endorsement_epoch_begin = begin; + e.endorsing_key = kp->public_key_der(); + e.endorsement = sign(*kp, begin, std::nullopt, {}, kp->public_key_der()); + service_keys.push_back(kp); + entries.push_back(e); + write_versions.push_back(next_write_version++); + return *this; + } + + // Add the next service in the chain, endorsing the prior service's + // key. `begin` and `end` define this entry's epoch range. + ChainBuilder& add_next(ccf::TxID begin, ccf::TxID end) + { + REQUIRE(!service_keys.empty()); + auto kp = std::make_shared( + ccf::crypto::CurveID::SECP384R1); + const auto& prev_kp = service_keys.back(); + ccf::CoseEndorsement e; + e.endorsement_epoch_begin = begin; + e.endorsement_epoch_end = end; + e.previous_version = write_versions.back(); + e.endorsing_key = kp->public_key_der(); + e.endorsement = sign( + *kp, + begin, + end, + std::vector{0xaa, 0xbb, 0xcc, 0xdd}, + prev_kp->public_key_der()); + service_keys.push_back(kp); + entries.push_back(e); + write_versions.push_back(next_write_version++); + return *this; + } + + // The current service's public key (DER) -- matches the most-recent + // entry's signing key, which `process_link` checks against + // `network_identity->get_key_pair()->public_key_der()`. + [[nodiscard]] std::vector current_pkey_der() const + { + REQUIRE(!service_keys.empty()); + return service_keys.back()->public_key_der(); + } + + [[nodiscard]] std::shared_ptr + current_key_pair() const + { + REQUIRE(!service_keys.empty()); + return service_keys.back(); + } + + // TxID at which the current service was created. Synthesised by + // applying the recovery (view, seqno) increment to the most-recent + // entry's epoch_end. The subsystem checks the resulting TxID against + // the last entry's epoch_end via validate_chain_front_connection. + [[nodiscard]] ccf::TxID synthesised_current_service_from() const + { + REQUIRE(!entries.empty()); + const auto& last = entries.back(); + REQUIRE(last.endorsement_epoch_end.has_value()); + return ccf::TxID{ + last.endorsement_epoch_end->view + aft::starting_view_change, + last.endorsement_epoch_end->seqno + 1}; + } + + // For sources: pop the topmost entry off (it lives in + // read_topmost_endorsement, not in `historical`). + ccf::CoseEndorsement topmost_entry() const + { + REQUIRE(!entries.empty()); + return entries.back(); + } + std::map historical_entries() const + { + std::map m; + // All but the topmost. + for (size_t i = 0; i + 1 < entries.size(); ++i) + { + m.emplace(write_versions[i], entries[i]); + } + return m; + } + + private: + static std::vector sign( + ccf::crypto::ECKeyPair_OpenSSL& key, + const ccf::TxID& begin, + const std::optional& end, + const std::vector& previous_root, + const std::vector& payload) + { + const auto begin_str = begin.to_str(); + const auto end_str = end.has_value() ? end->to_str() : std::string{}; + auto priv_der = key.private_key_der(); + CoseBuffer key_err; + auto cose_key = + CoseKey::from_private(priv_der.data(), priv_der.size(), key_err); + REQUIRE(cose_key.is_set()); + + CoseBuffer out; + CoseBuffer sign_err; + auto rc = cose_sign_endorsement( + cose_key, + /*iat=*/1700000000, + reinterpret_cast(begin_str.data()), + begin_str.size(), + end.has_value() ? reinterpret_cast(end_str.data()) : + nullptr, + end.has_value() ? end_str.size() : 0, + previous_root.data(), + previous_root.size(), + payload.data(), + payload.size(), + out, + sign_err); + REQUIRE(rc == 0); + REQUIRE(out.is_set()); + return out.to_vector(); + } + }; + + // Fixture that owns the mocks, scheduler, and identity so tests can + // build a subsystem with one call. + struct SubsystemFixture + { + std::shared_ptr node_state = + std::make_shared(); + std::shared_ptr historical = + std::make_shared(); + std::shared_ptr scheduler = + std::make_shared(); + std::unique_ptr identity; + + SubsystemFixture() + { + identity = std::make_unique( + "CN=Test Service", + ccf::crypto::CurveID::SECP384R1, + "20240101000000Z", + 365); + } + + // Replace the identity's key pair so its public key matches the chain + // tail. Tests must do this BEFORE constructing the subsystem. + void use_identity_key( + const std::shared_ptr& kp) + { + identity->priv_key = kp->private_key_pem(); + } + + std::unique_ptr make_subsystem() + { + return std::make_unique( + node_state, historical, identity, scheduler); + } + }; + + // Populate the mocks from `cb` to model a chain of length N already + // committed to the ledger and topmost in the current store. + void wire_chain( + SubsystemFixture& f, + const ChainBuilder& cb, + std::optional current_service_from = std::nullopt) + { + f.node_state->current_service_from = + current_service_from.value_or(cb.synthesised_current_service_from()); + f.node_state->topmost = cb.topmost_entry(); + f.historical->entries = cb.historical_entries(); + } +} + +TEST_CASE("is_self_endorsement detects absence of previous_version") +{ + REQUIRE(ccf::is_self_endorsement(make_self_endorsement(2, 10))); + REQUIRE_FALSE(ccf::is_self_endorsement(make_range_endorsement(3, 11, 3, 20))); +} + +TEST_CASE("is_ill_formed detects inverted range") +{ + REQUIRE(ccf::is_ill_formed(make_range_endorsement(3, 20, 3, 10))); + REQUIRE_FALSE(ccf::is_ill_formed(make_range_endorsement(3, 10, 3, 20))); + // Self-endorsement has no epoch_end, so it is never ill-formed + REQUIRE_FALSE(ccf::is_ill_formed(make_self_endorsement(2, 10))); +} + +TEST_CASE("verify_endorsements_connected accepts adjacent endorsements") +{ + // older covers [v=2, 10..20]; newer starts at v=3, seqno=21. + // The view rule is: newer.begin.view - aft::starting_view_change == + // older.end.view, and newer.begin.seqno - 1 == older.end.seqno. + auto older = make_range_endorsement(2, 10, 2, 20); + auto newer = make_range_endorsement(2 + aft::starting_view_change, 21, 3, 30); + + REQUIRE_NOTHROW(ccf::verify_endorsements_connected(newer, older)); +} + +TEST_CASE("verify_endorsements_connected rejects view discontinuity") +{ + auto older = make_range_endorsement(2, 10, 2, 20); + // newer.begin.view should be 2 + starting_view_change; use a wrong value + auto newer = + make_range_endorsement(2 + aft::starting_view_change + 1, 21, 3, 30); + + REQUIRE_THROWS_AS( + ccf::verify_endorsements_connected(newer, older), std::logic_error); +} + +TEST_CASE("verify_endorsements_connected rejects seqno gap") +{ + auto older = make_range_endorsement(2, 10, 2, 20); + // seqno gap: newer.begin.seqno should be 21 + auto newer = make_range_endorsement(2 + aft::starting_view_change, 22, 3, 30); + + REQUIRE_THROWS_AS( + ccf::verify_endorsements_connected(newer, older), std::logic_error); +} + +TEST_CASE("verify_endorsements_connected rejects older with no epoch_end") +{ + ccf::CoseEndorsement older = make_range_endorsement(2, 10, 2, 20); + older.endorsement_epoch_end = std::nullopt; + auto newer = make_range_endorsement(2 + aft::starting_view_change, 21, 3, 30); + + REQUIRE_THROWS_AS( + ccf::verify_endorsements_connected(newer, older), std::logic_error); +} + +TEST_CASE( + "validate_chain_front_connection requires current_service_from to " + "immediately follow the endorsement") +{ + ccf::CoseEndorsement e = make_range_endorsement(2, 10, 2, 20); + + // current_service_from must have view = 2 + starting_view_change and + // seqno = 21 to be adjacent. + ccf::TxID good{2 + aft::starting_view_change, 21}; + REQUIRE_NOTHROW(ccf::validate_chain_front_connection(e, good)); + + ccf::TxID bad_seqno{2 + aft::starting_view_change, 22}; + REQUIRE_THROWS_AS( + ccf::validate_chain_front_connection(e, bad_seqno), std::logic_error); + + ccf::TxID bad_view{3 + aft::starting_view_change, 21}; + REQUIRE_THROWS_AS( + ccf::validate_chain_front_connection(e, bad_view), std::logic_error); +} + +TEST_CASE( + "validate_chain_front_connection rejects endorsement with no epoch_end") +{ + ccf::CoseEndorsement bad = make_range_endorsement(2, 10, 2, 20); + bad.endorsement_epoch_end = std::nullopt; + + ccf::TxID after{2 + aft::starting_view_change, 21}; + REQUIRE_THROWS_AS( + ccf::validate_chain_front_connection(bad, after), std::logic_error); +} + +// ------------------------------------------------------------------------ +// State-machine tests using Mock{NodeState,HistoricalState}Accessor + +// FakeTaskScheduler + ChainBuilder. Each test wires a synthetic ledger, +// constructs the subsystem, drives the scheduler, and asserts on the +// public state. +// ------------------------------------------------------------------------ + +TEST_CASE("Bootstrap with self-only chain transitions immediately to Done") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 100}); + f.use_identity_key(cb.current_key_pair()); + // Self-only: topmost IS the self-endorsement, current_service_from + // matches its epoch_begin. + f.node_state->current_service_from = {2, 100}; + f.node_state->topmost = cb.topmost_entry(); + + auto sub = f.make_subsystem(); + f.scheduler->run_to_completion(); + + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); + auto keys = sub->get_trusted_keys(); + REQUIRE(keys.size() == 1); + // Current-epoch seqno: empty chain (short-circuited by the + // seqno >= current_service_from branch). + REQUIRE(sub->get_cose_endorsements_chain(100)->empty()); + // Older seqno: Done + !has_predecessors -> empty chain (pre-history, + // no historical endorsements will ever cover it). + REQUIRE(sub->get_cose_endorsements_chain(50)->empty()); +} + +TEST_CASE("Bootstrap with N-link chain reaches Done with full key map") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}); + size_t expected_keys = 2; // topmost + current + + SUBCASE("3 links") + { + cb.add_next({6, 201}, {6, 400}); + expected_keys = 3; + } + SUBCASE("5 links") + { + cb.add_next({6, 201}, {6, 400}) + .add_next({8, 401}, {8, 600}) + .add_next({10, 601}, {10, 800}); + expected_keys = 5; + } + + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + + auto sub = f.make_subsystem(); + f.scheduler->run_to_completion(); + + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); + REQUIRE(sub->get_trusted_keys().size() == expected_keys); +} + +TEST_CASE( + "Bootstrap exhausts -> Partial; trigger_extension when chunk reappears -> " + "Done") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + // Predecessor's chunk is missing during the initial fetch. + f.historical->unavailable.insert(cb.write_versions.at(0)); + + auto sub = f.make_subsystem(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->fire_delayed_once(); + } + f.scheduler->run_to_completion(); + + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + // Topmost + seeded current-service key. + REQUIRE(sub->get_trusted_keys().size() == 2); + // Current-epoch reads succeed with an empty chain. + REQUIRE(sub->get_cose_endorsements_chain(401)->empty()); + + // Chunk reappears; caller triggers extension and the chain heals. + f.historical->unavailable.clear(); + sub->trigger_extension(); + f.scheduler->run_to_completion(); + + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); + REQUIRE(sub->get_trusted_keys().size() == 2); +} + +TEST_CASE( + "trigger_extension in non-Partial states is a no-op (status unchanged, " + "no task scheduled)") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}); + f.use_identity_key(cb.current_key_pair()); + f.node_state->current_service_from = {2, 1}; + f.node_state->topmost = cb.topmost_entry(); + + auto sub = f.make_subsystem(); + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); + + REQUIRE(f.scheduler->pending_immediate_count() == 0); + REQUIRE(f.scheduler->pending_delayed_count() == 0); + + sub->trigger_extension(); + REQUIRE(f.scheduler->pending_immediate_count() == 0); + REQUIRE(f.scheduler->pending_delayed_count() == 0); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); +} + +TEST_CASE("Back-to-back trigger_extension calls schedule at most one new cycle") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + f.historical->unavailable.insert(cb.write_versions.at(0)); + + auto sub = f.make_subsystem(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->fire_delayed_once(); + } + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + + REQUIRE(f.scheduler->pending_immediate_count() == 0); + REQUIRE(f.scheduler->pending_delayed_count() == 0); + + // Source still has the chunk unavailable, so the new cycle will not + // succeed on its first try -- it must schedule a retry. Multiple + // back-to-back trigger calls should fold into the SAME single cycle. + sub->trigger_extension(); + sub->trigger_extension(); + sub->trigger_extension(); + + // Exactly one immediate task was added (the others saw fetch_active + // set under chain_mutex and bailed). + REQUIRE(f.scheduler->pending_immediate_count() == 1); + f.scheduler->run_immediate(); + // That task's failed fetch enqueues exactly one delayed retry. + REQUIRE(f.scheduler->pending_delayed_count() == 1); +} + +// ------------------------------------------------------------------------ +// Extended coverage: bootstrap waiting cases, multi-step extension, +// Failed transitions, reader semantics, concurrency, stale callbacks. +// ------------------------------------------------------------------------ + +TEST_CASE("Bootstrap waits for is_part_of_network then proceeds") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}); + f.use_identity_key(cb.current_key_pair()); + f.node_state->current_service_from = ccf::TxID{2, 1}; + f.node_state->topmost = cb.topmost_entry(); + f.node_state->part_of_network = false; + + auto sub = f.make_subsystem(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + // A delayed retry_first_fetch should have been scheduled. + REQUIRE(f.scheduler->pending_delayed_count() == 1); + + // Fire it: still not part of network -> another retry queued. + f.scheduler->fire_delayed_once(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + REQUIRE(f.scheduler->pending_delayed_count() == 1); + + // Become ready; next firing should bootstrap to Done. + f.node_state->part_of_network = true; + f.scheduler->fire_delayed_once(); + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); +} + +// Readers must be safe to call before fetch_first has produced anything +// useful: no nullopt-deref of current_service_from, no iterator-deref of +// empty trusted_keys / endorsements, and trigger_extension is a no-op +// (the constructor's synchronous fetch_first has already claimed +// fetch_active). +TEST_CASE("Readers safe immediately after construction (nothing fetched yet)") +{ + SubsystemFixture f; + f.node_state->part_of_network = false; + auto sub = f.make_subsystem(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + + REQUIRE_FALSE(sub->get_cose_endorsements_chain(0).has_value()); + REQUIRE_FALSE(sub->get_cose_endorsements_chain(123).has_value()); + REQUIRE(sub->get_trusted_identity_for(0) == nullptr); + REQUIRE(sub->get_trusted_identity_for(999) == nullptr); + REQUIRE(sub->get_trusted_keys().empty()); + + // trigger_extension observes fetch_active = true (held by the + // in-flight initial cycle) and bails, so no new task is scheduled. + const auto immediate_before = f.scheduler->pending_immediate_count(); + sub->trigger_extension(); + REQUIRE(f.scheduler->pending_immediate_count() == immediate_before); +} + +TEST_CASE("Bootstrap waits for current_service_from then proceeds") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}); + f.use_identity_key(cb.current_key_pair()); + // part_of_network=true (default) but no current_service_from yet + f.node_state->topmost = cb.topmost_entry(); + + auto sub = f.make_subsystem(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + REQUIRE(f.scheduler->pending_delayed_count() == 1); + + f.node_state->current_service_from = ccf::TxID{2, 1}; + f.scheduler->fire_delayed_once(); + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); +} + +TEST_CASE("Bootstrap waits for topmost endorsement entry then proceeds") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}); + f.use_identity_key(cb.current_key_pair()); + f.node_state->current_service_from = ccf::TxID{2, 1}; + // topmost is unset + + auto sub = f.make_subsystem(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + REQUIRE(f.scheduler->pending_delayed_count() == 1); + + f.node_state->topmost = cb.topmost_entry(); + f.scheduler->fire_delayed_once(); + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); +} + +TEST_CASE("retry_first_fetch respects MAX_FETCH_ATTEMPTS budget") +{ + SubsystemFixture f; + // Source never becomes ready: retry_first_fetch loops until it + // exhausts the budget, then ends the cycle in Partial without + // anything to serve. trigger_extension can then dispatch a fresh + // fetch_first to try again. + f.node_state->part_of_network = false; + + auto sub = f.make_subsystem(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + // One delayed retry queued after the synchronous fetch_first in the ctor. + REQUIRE(f.scheduler->pending_delayed_count() == 1); + + // Fire the budget-minus-two times: 28 more retries; counter goes from + // 1 -> 29, still under MAX_FETCH_ATTEMPTS == 30. Each firing should + // re-queue another delayed task. + for (int i = 0; i < 28; ++i) + { + REQUIRE(f.scheduler->fire_delayed_once() == 1); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + REQUIRE(f.scheduler->pending_delayed_count() == 1); + } + + // 30th firing trips the budget. complete_partial ends the cycle in + // Partial; no further task is queued. + f.scheduler->fire_delayed_once(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + REQUIRE(f.scheduler->pending_delayed_count() == 0); + + // Readers return nothing meaningful -- current_service_from never + // resolved. + REQUIRE_FALSE(sub->get_cose_endorsements_chain(50).has_value()); + REQUIRE(sub->get_trusted_identity_for(50) == nullptr); + REQUIRE(sub->get_trusted_keys().empty()); + + // trigger_extension dispatches a fresh fetch_first. Source is still + // not ready, so a new delayed-retry cycle begins. + sub->trigger_extension(); + REQUIRE(f.scheduler->pending_immediate_count() == 1); + f.scheduler->run_immediate(); + REQUIRE(f.scheduler->pending_delayed_count() == 1); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); +} + +TEST_CASE("retry_first_fetch budget reset by successful progress") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}); + f.use_identity_key(cb.current_key_pair()); + + // Source not ready initially. + f.node_state->part_of_network = false; + + auto sub = f.make_subsystem(); + // Burn 5 retries. + for (int i = 0; i < 5; ++i) + { + f.scheduler->fire_delayed_once(); + } + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + + // Service comes up; wire the chain. + f.node_state->part_of_network = true; + wire_chain(f, cb); + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); +} + +TEST_CASE( + "retry_first_fetch exhausted with service info set but no topmost -> " + "Partial; readers return nullopt instead of false 'pre-history'") +{ + SubsystemFixture f; + // current_service_from is resolvable, but read_topmost_endorsement + // returns nullopt forever. Exhaustion -> complete_partial -> Partial + // (current_service_from is set). has_predecessors stays false (we + // never read the topmost), but the reader must distinguish this + // from a confirmed self-only chain and return nullopt for any older + // seqno. + f.node_state->current_service_from = ccf::TxID{2, 100}; + // No topmost wired. + auto sub = f.make_subsystem(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->fire_delayed_once(); + } + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + + // Current-epoch seqno: empty chain (current service identity covers). + auto current_chain = sub->get_cose_endorsements_chain(100); + REQUIRE(current_chain.has_value()); + REQUIRE(current_chain->empty()); + + // Older seqno: nullopt (we don't know if there's a chain; caller + // should trigger_extension and retry). + auto older = sub->get_cose_endorsements_chain(50); + REQUIRE_FALSE(older.has_value()); + + // trusted_keys contains the seeded current-service key. + auto keys = sub->get_trusted_keys(); + REQUIRE(keys.size() == 1); + REQUIRE(keys.count(100) == 1); +} + +TEST_CASE("Partial -> Partial: re-trigger that also exhausts") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + f.historical->unavailable.insert(cb.write_versions.at(0)); + + auto sub = f.make_subsystem(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->fire_delayed_once(); + } + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + const auto trusted_before = sub->get_trusted_keys(); + + // Re-trigger while chunk is STILL unavailable. + sub->trigger_extension(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->run_immediate(); + f.scheduler->fire_delayed_once(); + } + f.scheduler->run_to_completion(); + + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + REQUIRE(sub->get_trusted_keys().size() == trusted_before.size()); +} + +TEST_CASE("Multi-step extension: one trigger heals 3 missing predecessors") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}) + .add_next({2, 1}, {4, 200}) + .add_next({6, 201}, {6, 400}) + .add_next({8, 401}, {8, 600}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + // Make ALL predecessors of the topmost unavailable. Only the topmost + // (in source.topmost) is initially reachable. + f.historical->unavailable = { + cb.write_versions.at(0), cb.write_versions.at(1), cb.write_versions.at(2)}; + + auto sub = f.make_subsystem(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->fire_delayed_once(); + } + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + REQUIRE(sub->get_trusted_keys().size() == 2); // topmost + current + + // Make ALL predecessors available; one trigger should walk back through + // every one of them and finalise Done. + f.historical->unavailable.clear(); + sub->trigger_extension(); + f.scheduler->run_to_completion(); + + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); + REQUIRE(sub->get_trusted_keys().size() == 4); +} + +TEST_CASE("Failed: bad signature on topmost detected during bootstrap") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + + // Tamper the topmost's signature. process_link verifies it during + // the synchronous initial fetch, throws, fail_fetching transitions + // to Failed and re-throws -- escaping the constructor since the + // all-available chain runs to completion synchronously. + REQUIRE(f.node_state->topmost.has_value()); + f.node_state->topmost->endorsement.back() ^= 0xFF; + + REQUIRE_THROWS_AS({ auto sub = f.make_subsystem(); }, std::exception); +} + +TEST_CASE( + "Failed: topmost COSE header range disagrees with table fields -> " + "Failed during bootstrap") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + + // Tamper the topmost's table-side epoch_end so it disagrees with the + // signed COSE header range. process_endorsement runs + // validate_fetched_endorsement on the topmost (just like on + // historically-fetched links), detects the mismatch, and fail-hards. + REQUIRE(f.node_state->topmost.has_value()); + f.node_state->topmost->endorsement_epoch_end = ccf::TxID{99, 99999}; + + REQUIRE_THROWS_AS({ auto sub = f.make_subsystem(); }, std::exception); +} + +TEST_CASE("Failed: tampered epoch range detected during extension cycle") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}).add_next({6, 201}, {6, 400}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + auto mid_wv = cb.write_versions.at(1); + f.historical->unavailable.insert(mid_wv); + + auto sub = f.make_subsystem(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->fire_delayed_once(); + } + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + + // Restore the chunk but tamper the epoch_end on the table side. + // validate_fetched_endorsement compares the COSE header range + // against the table fields; mismatch -> throw -> Failed. + f.historical->unavailable.clear(); + f.historical->entries.at(mid_wv).endorsement_epoch_end = ccf::TxID{99, 99999}; + + sub->trigger_extension(); + REQUIRE_THROWS_AS(f.scheduler->run_to_completion(), std::exception); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Failed); +} + +TEST_CASE("Failed: chain link broken (wrong endorsing_key) during bootstrap") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}).add_next({6, 201}, {6, 400}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + + // Replace the middle entry's endorsing_key with a stranger; the + // actual signature was made with the original key, so verification + // fails. Throw escapes the constructor. + auto mid_wv = cb.write_versions.at(1); + auto stranger = std::make_shared( + ccf::crypto::CurveID::SECP384R1); + f.historical->entries.at(mid_wv).endorsing_key = stranger->public_key_der(); + + REQUIRE_THROWS_AS({ auto sub = f.make_subsystem(); }, std::exception); +} + +TEST_CASE( + "Failed: self-endorsement at unexpected seqno detected during bootstrap") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}); + f.use_identity_key(cb.current_key_pair()); + // Advertise current_service_from at a different seqno than the + // self-endorsement's epoch_begin. + f.node_state->current_service_from = ccf::TxID{2, 99}; + f.node_state->topmost = cb.topmost_entry(); + + REQUIRE_THROWS_AS({ auto sub = f.make_subsystem(); }, std::exception); +} + +TEST_CASE("Reader: get_cose_endorsements_chain returns nullopt while waiting") +{ + SubsystemFixture f; + // Source is left unconfigured: bootstrap loops in retry_first_fetch. + // The subsystem starts in Partial and stays there; reads succeed but + // can't resolve any seqno because no chain has been built yet. + f.node_state->part_of_network = false; + auto sub = f.make_subsystem(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + auto chain = sub->get_cose_endorsements_chain(123); + REQUIRE_FALSE(chain.has_value()); +} + +// In Failed, readers must not throw -- they serve whatever was validated +// before the failure (the validator rejected the next link without +// committing). Caller checks status to decide what to do. +TEST_CASE("Reader: all three readers serve validated prefix in Failed") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}).add_next({6, 201}, {6, 400}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + auto mid_wv = cb.write_versions.at(1); + f.historical->unavailable.insert(mid_wv); + + auto sub = f.make_subsystem(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->fire_delayed_once(); + } + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + + // Snapshot the validated prefix before tampering. + const auto pre_keys = sub->get_trusted_keys(); + + f.historical->unavailable.clear(); + f.historical->entries.at(mid_wv).endorsement_epoch_end = ccf::TxID{99, 99999}; + sub->trigger_extension(); + REQUIRE_THROWS_AS(f.scheduler->run_to_completion(), std::exception); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Failed); + + // Readers still serve. trusted_keys preserved exactly as before; + // chain reader returns nullopt for older-seqno (would have needed + // the missing/tainted predecessor) or the validated prefix for a + // seqno already covered. + REQUIRE(sub->get_trusted_keys() == pre_keys); + REQUIRE_FALSE(sub->get_cose_endorsements_chain(50).has_value()); +} + +TEST_CASE("Reader: empty chain for current-epoch seqno in Done") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + auto sub = f.make_subsystem(); + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); + // current_service_from = synthesised = view 6, seqno 201 + auto chain = sub->get_cose_endorsements_chain(500); + REQUIRE(chain.has_value()); + REQUIRE(chain->empty()); +} + +TEST_CASE("Reader: empty chain for pre-history seqno in Done") +{ + // Construct a chain whose earliest endorsement has begin.seqno > 1, then + // request seqno 0 (pre-history). chain_complete logic returns empty. + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 100}).add_next({2, 100}, {4, 500}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + auto sub = f.make_subsystem(); + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); + auto chain = sub->get_cose_endorsements_chain(50); + REQUIRE(chain.has_value()); + REQUIRE(chain->empty()); +} + +TEST_CASE("Reader: nullopt for uncovered seqno in Partial") +{ + SubsystemFixture f; + ChainBuilder cb; + // Pick a self-endorsement at seqno 100 so the topmost ends up with + // earliest_endorsed_seq = 100 and a query at seqno 50 is genuinely + // below the partial chain. + cb.add_self({2, 100}).add_next({2, 100}, {4, 200}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + f.historical->unavailable.insert(cb.write_versions.at(0)); + + auto sub = f.make_subsystem(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->fire_delayed_once(); + } + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + + // earliest_endorsed_seq is the topmost's begin.seqno = 100. + // A request strictly below that returns nullopt (caller may trigger + // extension). + REQUIRE_FALSE(sub->get_cose_endorsements_chain(50).has_value()); + // A request inside the topmost epoch returns a non-empty chain. + auto chain = sub->get_cose_endorsements_chain(150); + REQUIRE(chain.has_value()); + REQUIRE(chain->size() == 1); +} + +TEST_CASE("Reader: get_trusted_identity_for boundary semantics") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}).add_next({6, 201}, {6, 400}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + auto sub = f.make_subsystem(); + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); + + // trusted_keys are at seqnos {1, 201, 401} + // Below the earliest -> nullptr + REQUIRE(sub->get_trusted_identity_for(0) == nullptr); + // Exact boundary returns the key at that seqno + REQUIRE(sub->get_trusted_identity_for(1) != nullptr); + // Between boundaries: returns the most-recent <= seqno + REQUIRE(sub->get_trusted_identity_for(100) != nullptr); + REQUIRE(sub->get_trusted_identity_for(200) != nullptr); + REQUIRE(sub->get_trusted_identity_for(201) != nullptr); + // Far above last boundary -> returns the most-recent (current) + REQUIRE(sub->get_trusted_identity_for(1000000) != nullptr); +} + +TEST_CASE("Reader: get_trusted_keys returns partial map in Partial") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}) + .add_next({2, 1}, {4, 200}) + .add_next({6, 201}, {6, 400}) + .add_next({8, 401}, {8, 600}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + // Make all but the topmost unavailable. + f.historical->unavailable = { + cb.write_versions.at(0), cb.write_versions.at(1), cb.write_versions.at(2)}; + + auto sub = f.make_subsystem(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->fire_delayed_once(); + } + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + // Only topmost + current = 2 entries; the doxygen note on + // get_trusted_keys() promises this exact behaviour. + REQUIRE(sub->get_trusted_keys().size() == 2); +} + +TEST_CASE("Concurrent threads triggering all fold into one cycle") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + f.historical->unavailable.insert(cb.write_versions.at(0)); + + auto sub = f.make_subsystem(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->fire_delayed_once(); + } + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + REQUIRE(f.scheduler->pending_immediate_count() == 0); + + std::vector threads; + threads.reserve(16); + for (int i = 0; i < 16; ++i) + { + threads.emplace_back([&] { sub->trigger_extension(); }); + } + for (auto& t : threads) + { + t.join(); + } + // fetch_active under chain_mutex guarantees exactly one cycle started. + REQUIRE(f.scheduler->pending_immediate_count() == 1); +} + +TEST_CASE("Done is terminal: no tasks queued after reaching Done") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + + auto sub = f.make_subsystem(); + f.scheduler->run_to_completion(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); + REQUIRE(f.scheduler->pending_immediate_count() == 0); + REQUIRE(f.scheduler->pending_delayed_count() == 0); +} + +// ------------------------------------------------------------------------ +// Concurrency tests: readers running concurrently with the worker-thread +// fetch_first writing current_service_from / has_predecessors / +// earliest_endorsed_seq. Under the monitor discipline these writes +// happen inside fetch_first_unsafe with chain_mutex held, and the +// readers acquire the same mutex, so the access is serialised. The +// tests serve as defensive regressions against any future change that +// weakens that discipline (e.g. demoting a wrapper or moving a field +// write out of an _unsafe method). +// ------------------------------------------------------------------------ + +TEST_CASE("Concurrent readers vs fetch_first establishing current_service_from") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 100}); + f.use_identity_key(cb.current_key_pair()); + // Start in Partial with current_service_from STILL UNSET. The + // worker-thread fetch_first will assign it while readers concurrently + // call the get_* methods; chain_mutex serialises both sides. + f.node_state->part_of_network = false; + + auto sub = f.make_subsystem(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->fire_delayed_once(); + } + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + + std::atomic stop{false}; + std::vector readers; + for (int i = 0; i < 4; ++i) + { + readers.emplace_back([&] { + while (!stop.load(std::memory_order_relaxed)) + { + (void)sub->get_cose_endorsements_chain(50); + (void)sub->get_trusted_keys(); + } + }); + } + + f.node_state->part_of_network = true; + f.node_state->current_service_from = ccf::TxID{2, 100}; + f.node_state->topmost = cb.topmost_entry(); + + for (int round = 0; round < 8; ++round) + { + sub->trigger_extension(); + std::thread writer([&] { f.scheduler->run_immediate(); }); + writer.join(); + if (sub->endorsements_fetching_status() == ccf::FetchStatus::Done) + { + break; + } + } + + stop.store(true, std::memory_order_relaxed); + for (auto& t : readers) + { + t.join(); + } + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); +} + +TEST_CASE("Concurrent readers vs fetch_first self-only termination") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 100}); + f.use_identity_key(cb.current_key_pair()); + // Self-only chain: fetch_first takes the self-endorsement branch and + // writes has_predecessors = false. Force Partial first so the + // dispatched fetch_first runs on a worker concurrently with readers; + // chain_mutex serialises the write against the readers. + f.node_state->current_service_from = ccf::TxID{2, 100}; + f.node_state->topmost = cb.topmost_entry(); + f.node_state->part_of_network = false; + + auto sub = f.make_subsystem(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->fire_delayed_once(); + } + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + + std::atomic stop{false}; + std::vector readers; + for (int i = 0; i < 4; ++i) + { + readers.emplace_back([&] { + while (!stop.load(std::memory_order_relaxed)) + { + (void)sub->get_cose_endorsements_chain(50); + (void)sub->get_cose_endorsements_chain(150); + } + }); + } + + f.node_state->part_of_network = true; + sub->trigger_extension(); + std::thread writer([&] { f.scheduler->run_immediate(); }); + writer.join(); + + stop.store(true, std::memory_order_relaxed); + for (auto& t : readers) + { + t.join(); + } + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); +} + +TEST_CASE("Concurrent readers vs fetch_first non-self chain walk start") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}); + f.use_identity_key(cb.current_key_pair()); + // Multi-link chain: fetch_first takes the non-self branch and writes + // has_predecessors = true + earliest_endorsed_seq = ... before + // delegating to process_endorsement. + f.node_state->current_service_from = cb.synthesised_current_service_from(); + f.node_state->topmost = cb.topmost_entry(); + f.historical->entries = cb.historical_entries(); + f.node_state->part_of_network = false; + + auto sub = f.make_subsystem(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->fire_delayed_once(); + } + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + + std::atomic stop{false}; + std::vector readers; + for (int i = 0; i < 4; ++i) + { + readers.emplace_back([&] { + while (!stop.load(std::memory_order_relaxed)) + { + (void)sub->get_cose_endorsements_chain(50); + (void)sub->get_cose_endorsements_chain(150); + (void)sub->get_trusted_keys(); + } + }); + } + + f.node_state->part_of_network = true; + sub->trigger_extension(); + std::thread writer([&] { f.scheduler->run_to_completion(); }); + writer.join(); + + stop.store(true, std::memory_order_relaxed); + for (auto& t : readers) + { + t.join(); + } + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); +} + +// ------------------------------------------------------------------------ +// Chain-walk retry budget must be independent of topmost-wait retries: +// retries spent in retry_first_fetch waiting for the topmost entry must +// not eat into the per-chunk retry budget consumed by retry_fetch_next. +// ------------------------------------------------------------------------ + +TEST_CASE("Chain-walk retry budget independent of topmost-wait retries") +{ + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}); + f.use_identity_key(cb.current_key_pair()); + + // Service info ready immediately so the only thing fetch_first waits + // on in phase 1 is the topmost endorsement entry. + f.node_state->part_of_network = true; + f.node_state->current_service_from = cb.synthesised_current_service_from(); + // Topmost intentionally left unset. + + static constexpr int K = 10; + static constexpr int MAX = 30; + + // Phase 1: ctor's synchronous fetch_first sees topmost==nullopt and + // schedules retry #1. Fire K-1 more times to reach K topmost-wait + // retries total; fetch_attempts is now K. + auto sub = f.make_subsystem(); + REQUIRE(f.scheduler->pending_delayed_count() == 1); + for (int i = 0; i < K - 1; ++i) + { + f.scheduler->fire_delayed_once(); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + REQUIRE(f.scheduler->pending_delayed_count() == 1); + } + + // Phase 2: topmost appears, first historical chunk is missing. Count + // every scheduled retry until the cycle ends in Partial. + f.node_state->topmost = cb.topmost_entry(); + f.historical->entries = cb.historical_entries(); + f.historical->unavailable.insert(cb.write_versions.at(0)); + + int fires_after_toggle = 0; + while (f.scheduler->pending_delayed_count() > 0) + { + f.scheduler->fire_delayed_once(); + ++fires_after_toggle; + } + + // With the bug: chain walk inherits the K-bumped counter, so it only + // gets MAX-K=20 retries before complete(Partial). With the fix: full + // MAX=30 retries dedicated to the chain walk. + REQUIRE(fires_after_toggle == MAX); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); +} + +// ------------------------------------------------------------------------ +// trigger_extension must not dispatch a new cycle after the prior cycle +// has reached a terminal state. Under the monitor discipline the +// fetch_status check and the fetch_active claim both happen under +// chain_mutex, so a trigger that observes Partial+!fetch_active and +// then schedules cannot interleave with a concurrent complete_unsafe. +// This stress test guards against future regressions that might split +// the check and the claim across two critical sections again. +// ------------------------------------------------------------------------ + +TEST_CASE("trigger_extension must not dispatch after complete() flips to Done") +{ + // The original (CAS-based) bug fired within ~1 iteration on x86, so + // this count gives a wide safety margin against any plausible + // regression that re-introduces a non-atomic check-and-claim while + // keeping the test under a few seconds. + static constexpr int ITERATIONS = 200; + for (int it = 0; it < ITERATIONS; ++it) + { + SubsystemFixture f; + ChainBuilder cb; + cb.add_self({2, 1}).add_next({2, 1}, {4, 200}); + f.use_identity_key(cb.current_key_pair()); + wire_chain(f, cb); + // Force initial cycle into Partial by withholding the chunk. + f.historical->unavailable.insert(cb.write_versions.at(0)); + + auto sub = f.make_subsystem(); + for (int i = 0; i < 30; ++i) + { + f.scheduler->fire_delayed_once(); + } + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Partial); + + // Restore the chunk and trigger an extension cycle. + f.historical->unavailable.clear(); + sub->trigger_extension(); + + // Thread A drives the cycle to Done. + std::thread completer([&] { f.scheduler->run_to_completion(); }); + + // Thread B races trigger_extension calls against the completion. + // Any task scheduled AFTER status moves to Done is a bug. + std::atomic stop{false}; + int post_done_dispatches = 0; + std::thread racer([&] { + while (!stop.load(std::memory_order_relaxed)) + { + const auto before = f.scheduler->pending_immediate_count(); + sub->trigger_extension(); + const auto after = f.scheduler->pending_immediate_count(); + if ( + after > before && + sub->endorsements_fetching_status() == ccf::FetchStatus::Done) + { + ++post_done_dispatches; + } + } + }); + + completer.join(); + stop.store(true, std::memory_order_relaxed); + racer.join(); + + REQUIRE(post_done_dispatches == 0); + REQUIRE(sub->endorsements_fetching_status() == ccf::FetchStatus::Done); + } +} diff --git a/tests/recovery.py b/tests/recovery.py index 90485aaffc2a..97f028ed3ed2 100644 --- a/tests/recovery.py +++ b/tests/recovery.py @@ -1445,7 +1445,7 @@ def run_recovery_after_cose_upgrade(args): """Start Dual, upgrade to COSE-only via node replacement, then recover with allow-dual-joiners. Then live-upgrade the recovered network to strict COSE-only by replacing nodes again, and recover from ledger files. - Exercises the full upgrade path: Dual → COSE (allow dual) → COSE (strict), + Exercises the full upgrade path: Dual -> COSE (allow dual) -> COSE (strict), with recovery at each transition.""" cose_only_package = args.package + "_cose_only_allow_join_dual" @@ -1492,7 +1492,7 @@ def run_recovery_after_cose_upgrade(args): # Issue more TXs in COSE-only mode (new view after election) network.txs.issue(network, number_txs=5) - # Now stop and recover — the ledger has dual sigs then COSE-only sigs + # Now stop and recover -- the ledger has dual sigs then COSE-only sigs network.save_service_identity(args) recover_primary, _ = network.find_primary() current_ledger_dir, committed_ledger_dirs = recover_primary.get_ledger() @@ -2020,6 +2020,300 @@ def run_recover_snapshot_ledger_offset(args): ) +def _find_endorsement_write_chunks(committed_ledger_dirs): + """Walk all committed ledger chunks; return a list of + (write_seqno, chunk_path) for every transaction that writes to the + public:ccf.internal.previous_service_identity_endorsement table. + + Ordered by ascending write_seqno. There is one such write per service + epoch (each service writes the previous-identity endorsement at open). + """ + endorsement_table = "public:ccf.internal.previous_service_identity_endorsement" + writes = [] + for ledger_dir in committed_ledger_dirs: + for filename in os.listdir(ledger_dir): + if not ccf.ledger.is_ledger_chunk_committed(filename): + continue + chunk_path = os.path.join(ledger_dir, filename) + chunk = ccf.ledger.LedgerChunk(chunk_path) + for transaction in chunk: + public_domain = transaction.get_public_domain() + tables = public_domain.get_tables() + if endorsement_table in tables: + writes.append((public_domain.get_seqno(), chunk_path)) + writes.sort(key=lambda w: w[0]) + return writes + + +def _poll_endorsements(node, tx, *, expect_status, attempts=60, sleep_s=0.5): + """Poll /log/public/cose_endorsements at `tx` until the response status + matches `expect_status`, or `attempts` are exhausted. Returns the last + response. Defaults give a 30-second polling budget, comfortably longer + than the partial-Done extension budget (30 retries at 100ms intervals).""" + response = None + for _ in range(attempts): + with node.client("user0") as cli: + response = cli.get( + "/log/public/cose_endorsements", + headers={infra.clients.CCF_TX_ID_HEADER: str(tx)}, + ) + if response.status_code == expect_status: + return response + time.sleep(sleep_s) + return response + + +def _get_trusted_keys_count(node, *, attempts=60, sleep_s=0.5): + """Return the number of trusted keys exposed by the network identity + subsystem on `node`. While the subsystem is in Partial, this excludes + any older-epoch keys whose endorsements could not be validated; in + Done it includes the full historical set. Polls so that callers can + observe the partial chain growing as extensions land.""" + last = None + for _ in range(attempts): + with node.client() as cli: + r = cli.get("/log/public/trusted_keys") + if r.status_code == http.HTTPStatus.OK: + return len(r.body.json()["keys"]) + last = r + time.sleep(sleep_s) + raise AssertionError( + f"/log/public/trusted_keys never returned 200 within budget: {last}" + ) + + +@reqs.description("COSE endorsement chain heals when missing ledger chunks reappear") +def run_recovery_endorsement_chain_heals(args): + """End-to-end test for the resilient COSE endorsement-chain fetching. + + Builds a chain over 2 recoveries (S1 -> S2 -> S3), then captures a tx + in the S1 epoch (whose receipt-chain crosses the predecessor link we are + about to break). Stops the network, moves the ledger chunk that contains + the write of the S2-era endorsement out of the ledger dir, recovers a + third time (-> S4). Because the missing chunk leaves a gap, queries for + S1-era receipts get HTTP 202 and the subsystem transitions to Partial + (trusted_keys exposes only the validated subset). Each 202 response + triggers a fresh extension cycle; once the chunk is restored a + triggered cycle heals the chain back to Done and the S1-era receipts + succeed.""" + txs = app.LoggingTxs("user0") + + with infra.network.network( + args.nodes, + args.binary_dir, + args.debug_nodes, + pdb=args.pdb, + txs=txs, + ) as network: + network.start_and_open(args) + + # S1 epoch transaction. After the third recovery (post chunk + # deletion) this receipt query must initially return 202 and + # then, after the chunk is restored, transition to 200. + network.txs.issue(network, number_txs=2) + _, s1_msg = network.txs.get_last_tx() + s1_tx = ccf.tx_id.TxID(s1_msg["view"], s1_msg["seqno"]) + + # First recovery -> S2. + network = test_recover_service(network, args) + network.txs.issue(network, number_txs=2) + + # Second recovery -> S3. + network = test_recover_service(network, args) + network.txs.issue(network, number_txs=2) + + primary, _ = network.find_primary() + with primary.client() as cli: + s3_create_txid = ccf.tx_id.TxID.from_str( + cli.get("/node/network").body.json()["current_service_create_txid"] + ) + LOG.info(f"After 2 recoveries: s1_tx={s1_tx}, s3_create_txid={s3_create_txid}") + + # Snapshot and ledger for the next (third) recovery. + snapshots_dir = network.get_committed_snapshots(primary) + network.save_service_identity(args) + network.stop_all_nodes() + current_ledger_dir, committed_ledger_dirs = primary.get_ledger() + + # Locate the chunk containing the write of the S2-era endorsement + # (the second-oldest write to the endorsement table). The third + # recovery's chain walk fetches this from a historical ledger + # chunk; removing it forces the partial-Done path. + writes = _find_endorsement_write_chunks(committed_ledger_dirs) + LOG.info(f"Endorsement-table writes across committed ledger: {writes}") + assert len(writes) >= 3, ( + "Expected at least 3 endorsement writes (one per service open " + "across S1, S2, S3) in the committed ledger, got: " + f"{writes}" + ) + target_seqno, target_chunk = writes[1] + # Make sure the chunk we are about to move does not also contain + # the latest endorsement write (deterministic isolation). + other_writes_in_target_chunk = [ + seq + for seq, chunk in writes + if chunk == target_chunk and seq != target_seqno + ] + assert not other_writes_in_target_chunk, ( + f"Chunk {target_chunk} contains additional endorsement writes " + f"at seqnos {other_writes_in_target_chunk}; the test cannot " + "deterministically isolate the predecessor link to break. " + "Consider tightening ledger_chunk_bytes." + ) + deleted_chunk_range = ccf.ledger.range_from_filename( + os.path.basename(target_chunk) + ) + LOG.info( + f"Selected chunk to move out: {target_chunk} (seqno range " + f"{deleted_chunk_range}, contains endorsement write at " + f"seqno {target_seqno})" + ) + + backup_dir = tempfile.mkdtemp(prefix="ccf_missing_chunk_") + moved_chunk_path = os.path.join(backup_dir, os.path.basename(target_chunk)) + LOG.info( + f"Moving chunk {target_chunk} (containing endorsement write at " + f"seqno {target_seqno}) to {moved_chunk_path}" + ) + shutil.move(target_chunk, moved_chunk_path) + + # Third recovery (S4) with the missing chunk. Recovery itself + # replays from the snapshot forward, so it does not require the + # older chunk; only the endorsement chain walk does. + recovered_network = infra.network.Network( + args.nodes, + args.binary_dir, + args.debug_nodes, + existing_network=network, + txs=txs, + ) + with infra.network.close_on_error(recovered_network): + recovered_network.start_in_recovery( + args, + ledger_dir=current_ledger_dir, + committed_ledger_dirs=committed_ledger_dirs, + snapshots_dir=snapshots_dir, + ) + recovered_network.recover(args) + + primary, _ = recovered_network.find_primary() + with primary.client() as cli: + s4_create_txid = ccf.tx_id.TxID.from_str( + cli.get("/node/network").body.json()["current_service_create_txid"] + ) + service_cert = cli.get("/node/network").body.json()[ + "service_certificate" + ] + cert = load_pem_x509_certificate( + service_cert.encode("ascii"), default_backend() + ) + cert_pubkey = cert.public_key().public_bytes( + serialization.Encoding.PEM, + serialization.PublicFormat.SubjectPublicKeyInfo, + ) + + # Current-epoch (S4) receipts need no chain -- the request is + # served by the current-store endorsement read alone and is + # independent of the missing predecessor chunk. Serves as a + # liveness check. + LOG.info("Verifying S4-era receipts are served with empty chain") + response = _poll_endorsements( + primary, s4_create_txid, expect_status=http.HTTPStatus.NOT_FOUND + ) + assert response.status_code == http.HTTPStatus.NOT_FOUND, response + + # S1-era receipts depend on the predecessor chunk that we + # moved out (both for the chain walk and possibly for + # historical-state reconstruction), so they must keep getting + # 202 while the chunk is missing. The exact reason for the + # 202 -- endorsement-chain gap or historical-state dependency + # -- does not matter for this test; what matters is the + # transition to 200 once the chunk is restored. + LOG.info( + f"Verifying S1-era receipt at {s1_tx} keeps returning 202 " + "while the predecessor chunk is missing" + ) + for _ in range(3): + with primary.client("user0") as cli: + response = cli.get( + "/log/public/cose_endorsements", + headers={infra.clients.CCF_TX_ID_HEADER: str(s1_tx)}, + ) + assert response.status_code == http.HTTPStatus.ACCEPTED, ( + f"Expected 202 for S1-era tx {s1_tx} while chunk is " + f"missing, got {response.status_code}: {response}" + ) + time.sleep(0.5) + + # The subsystem must be in Partial: the trusted_keys endpoint + # exposes only the keys whose endorsements were validated, so + # the older-epoch key (the one endorsed by the missing chunk) + # should be absent. + partial_keys_count = _get_trusted_keys_count(primary) + LOG.info(f"trusted_keys count while in Partial: {partial_keys_count}") + assert partial_keys_count >= 1, ( + f"Expected at least the current-service key to be trusted " + f"in Partial, got {partial_keys_count}" + ) + + # Restore the chunk into the live node's read-only ledger + # directory (the new recovery network copied the original + # committed_ledger_dirs into its workspace at startup, so we + # cannot heal by moving the backup back into the source dir). + live_read_only_dirs = primary.remote.read_only_ledger_paths() + assert live_read_only_dirs, ( + "Expected at least one read-only ledger directory on the " + "live recovered primary so the missing chunk can be " + "restored, got none" + ) + restored_chunk_path = os.path.join( + live_read_only_dirs[0], os.path.basename(target_chunk) + ) + LOG.info( + f"Restoring chunk to live read-only dir at " + f"{restored_chunk_path}; the chain must heal within the " + "30s polling budget" + ) + shutil.move(moved_chunk_path, restored_chunk_path) + shutil.rmtree(backup_dir, ignore_errors=True) + + response = _poll_endorsements( + primary, + s1_tx, + expect_status=http.HTTPStatus.OK, + attempts=60, + sleep_s=0.5, + ) + assert response.status_code == http.HTTPStatus.OK, ( + f"Chain failed to heal: S1-era tx {s1_tx} still not " + f"served after restoring the chunk: {response}" + ) + endorsements = [ + base64.b64decode(x) for x in response.body.json()["endorsements"] + ] + assert len(endorsements) >= 2, ( + f"Expected chain length >= 2 after healing for S1-era tx " + f"{s1_tx}, got {len(endorsements)}" + ) + verify_endorsements_chain(primary, endorsements, cert_pubkey) + + # After healing the chain, the missing older-epoch key must + # have been added to the trusted set: at minimum one more + # than the Partial-state count. + healed_keys_count = _get_trusted_keys_count(primary) + LOG.info(f"trusted_keys count after healing: {healed_keys_count}") + assert healed_keys_count > partial_keys_count, ( + f"Expected trusted_keys to grow after healing, " + f"partial={partial_keys_count}, healed={healed_keys_count}" + ) + + LOG.success( + "COSE endorsement chain healed: S1-era receipts now " + "served with a full chain after the missing chunk was " + "restored." + ) + + if __name__ == "__main__": def add(parser): @@ -2160,4 +2454,17 @@ def add(parser): snapshot_tx_interval=50, ) + cr.add( + "recovery_chain_heals", + run_recovery_endorsement_chain_heals, + package="samples/apps/logging/logging", + nodes=infra.e2e_args.min_nodes(cr.args, f=0), + # Short chunks so each recovery's previous-identity endorsement + # write lands in its own ledger chunk; this lets us deterministically + # move out the predecessor chunk without taking unrelated writes + # with it. + ledger_chunk_bytes="20KB", + snapshot_tx_interval=30, + ) + cr.run()