Paella of sea bugs with jamon iberico, cinnamon, maple syrup and peanut butter.#1607
Paella of sea bugs with jamon iberico, cinnamon, maple syrup and peanut butter.#1607Fredi-raspall wants to merge 15 commits into
Conversation
Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
The rebinding should happen when processing a message over the control channel. Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
Split the current signal handler in two parts: a signal catcher and a signal handler. The former is generic and relays signals to the handler, which is dataplane-specific. This change is needed to avoid a circular dependency that would occur because the existing handler is defined in lifecycle. An alternative would be to move the prior signal handler to dataplane, but opted to keep the lifecycle crate complete with that piece. The signal catcher just registers signal listeners and relays them to dataplane, which does the actual handling. The prior handler is extended with listeners for other signals which would otherwise terminate dataplane. Dataplane can now be stopped with: SIGINT, SIGTERM, SIGQUIT. Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
Cause the cli to rebind to the configured path. This allows ensuring the availability of the CLI in case the filesystem path is accidentally removed (e.g. by starting another dataplane). Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
When sending a packet once processed, its buffer needs to be updated according to the changes made in its headers. This can fail when "de-parsing" the headers or in case there is no headroom in the buffer. Avoid unreachable! on a fallible method (which should not otherwise fail) and mark the packet in case of failure so that if those problems happen, they are not silently overlooked at.
Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
Adds a small netlink listener that produces events upon parsing netlink messages sent by the kernel and can broadcast them to multiple subscribers. This will be used to monitor flapping interfaces. This code will require small adjustments when the new interface model lands. Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
Instead of creating the ConfigProcessor explicitly in both config drivers, create and run it before branching. This makes it clearer that the distinct config entry points just require a config client to drive the processor. Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
Start the interface monitor in the management subsystem and relay interface events to the router. Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
Handle interface events by updating the state of the interfaces as seen by the routing crate and logging any change. Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
The prior code logged interface status changes in methods that update the interface objects in the interface table. This is problematic since the interface table is implemented using left-right. As a result those methods are called twice (one for each copy of the interface table), causing logs to be duplicated and appear at seemingly random times. Fix this by moving the logic outside the reach of the absorb trait. Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
The control channel was never integrated in the router IO poller. As a result, control messages were serviced, in the worst case, every timeout interval of the poller. This is problematic now that the router also gets interface events (which may be more frequent than the prior type of messages). Solve this by waking up the poller anytime a message is queued towards the router with the aid of a Waker. Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
Some smart nics are known to accept frames larger than the standard jumbo size. Enlarge the reception buffer in the kernel driver to accommodate for that. Signed-off-by: Fredi Raspall <fredi@githedgehog.com>
📝 WalkthroughWalkthroughThe PR adds platform-specific Dockerfile artifact paths, routes Unix signals through a new catcher/handler flow, introduces interface monitoring and routing updates for Ethernet state changes, changes packet serialization to return structured errors, and renames a masquerade helper with related logging updates. ChangesInterface event pipeline
Packet serialization errors
Dockerfile build paths
Masquerade helper rename
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR improves dataplane operational robustness by (1) restructuring signal handling to avoid unintended process termination and enable CLI socket rebinding, (2) integrating the router control channel into the mio poll loop via a Waker, and (3) adding netlink-based interface event monitoring and router event logging. It also extends packet outcome accounting to include post-processing serialization failures and adjusts build/container ergonomics.
Changes:
- Router I/O loop: integrate control-channel processing into
mio::PollviaWaker, add CLI socket restore control message, and harden loop-exit detection. - Interface events: add an
interface-managernetlink monitor, relay events into the router control path, and log/admin+oper state transitions as router events. - Packet stats: introduce
SerializeErrorand newDoneReasonvalues to account “no headroom” / “deparse error” outcomes; minor Dockerfile and kernel worker tweaks.
Reviewed changes
Copilot reviewed 22 out of 23 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| routing/src/router/rio.rs | Adds mio::Waker integration for ctl channel; adds CLI socket restore; refactors exit guard. |
| routing/src/router/ctl.rs | Adds waker-backed RouterCtlSender, new ctl messages (RebindCli, IfEvent), drains ctl channel on wake. |
| routing/src/router/revent.rs | Adds router events for interface admin/oper transitions with formatted display. |
| routing/src/interfaces/interface.rs | Removes per-set logging for admin/oper state updates (now logged via router events). |
| routing/src/interfaces/iftablerw.rs | Enables writer methods for interface state updates (removes unused allow). |
| routing/src/interfaces/iftable.rs | Narrows visibility of state setters to pub(super). |
| routing/src/cli/display.rs | Updates IfState display to use formatter padding behavior. |
| routing/Cargo.toml | Adds interface-manager dependency. |
| net/src/packet/mod.rs | Adds SerializeError, splits serialize logic, sets DoneReason on serialization failure. |
| net/src/packet/meta.rs | Adds DoneReason::DeparseError and DoneReason::NoHeadRoom. |
| net/src/packet/stats_display.rs | Adds display strings for new DoneReason values. |
| nat/src/masquerade/nf.rs | Minor doc/comment cleanup and internal helper rename. |
| mgmt/src/processor/proc.rs | Changes ConfigProcessor::new to take a runtime handle; spawns rtnetlink connection. |
| mgmt/src/processor/launch.rs | Starts interface monitor + relay task; passes router control sender; plumbs config client. |
| mgmt/src/tests/mgmt.rs | Updates test to pass tokio runtime handle to ConfigProcessor::new. |
| lifecycle/src/lib.rs | Splits signal handling into catcher (relays signals) for broader signal coverage. |
| interface-manager/src/monitor/mod.rs | Introduces netlink-based interface monitor and broadcast distribution; adds (ignored) test. |
| interface-manager/src/lib.rs | Exposes new monitor module. |
| interface-manager/Cargo.toml | Adds dependencies needed for monitor + tests. |
| dataplane/src/runtime.rs | Adds main signal-processing task (acts on SIGUSR1 to rebind CLI); uses new catcher API. |
| dataplane/src/drivers/kernel/worker.rs | Increases RX buffer size; promotes serialize failure log to warn. |
| Dockerfile | Copies binaries from target triple subdir (target/${PLATFORM}/${PROFILE}). |
| Cargo.lock | Updates lockfile for new dependencies. |
| async fn interface_event_notify( | ||
| mut rx: tokio::sync::broadcast::Receiver<EthEvent>, | ||
| mut rtr_ctl: RouterCtlSender, | ||
| ) { | ||
| loop { | ||
| if let Ok(ev) = rx.recv().await { | ||
| info!("Notifying router about interface event..."); | ||
| if rtr_ctl.send_ifevent(ev).await.is_err() { | ||
| warn!("Failed to relay interface event to router") | ||
| } | ||
| } | ||
| } | ||
| } |
| mgmt.spawn_fatal_on_exit( | ||
| "interface event relay", | ||
| interface_event_notify(if_subsc, params.processor_params.router_ctl.clone()), | ||
| handle, | ||
| ); |
| _ = sigint.recv() => { | ||
| let _ = tx.send(DpSignal::SIGINT).await; | ||
| }, | ||
| _ = sigterm.recv() => { | ||
| let _ = tx.send(DpSignal::SIGTERM).await; | ||
| }, | ||
| _ = sigquit.recv() => { | ||
| let _ = tx.send(DpSignal::SIGQUIT).await; | ||
| }, | ||
| _ = sigusr1.recv() => { | ||
| let _ = tx.send(DpSignal::SIGUSR1).await; | ||
| }, | ||
| _ = sigusr2.recv() => { | ||
| let _ = tx.send(DpSignal::SIGUSR2).await; | ||
| }, | ||
| _ = sighup.recv() => { | ||
| let _ = tx.send(DpSignal::SIGHUP).await; | ||
| }, | ||
| _ = sigalrm.recv() => { | ||
| let _ = tx.send(DpSignal::SIGALRM).await; | ||
| }, | ||
| _ = sigpipe.recv() => { | ||
| let _ = tx.send(DpSignal::SIGPIPE).await; | ||
| }, |
| /// Consume a [`Packet`] and update its buffer based on any changes to its [`Headers`]. | ||
| /// | ||
| /// # Errors | ||
| /// | ||
| /// Returns a [`Prepend::Error`] error if the packet does not have enough headroom to | ||
| /// serialize. | ||
| pub fn serialize(mut self) -> Result<Buf, <Buf as Prepend>::Error> { | ||
| /// This method returns `PacketError::NoHeadRoom`if the packet does not have enough headroom to | ||
| /// serialize or `PacketError::DeparseError` if it does, but the de-parsing the `Packet` failed. |
| /// Consume a [`Packet`] and update its buffer based on any changes to its [`Headers`]. | ||
| /// On error, mark the packet accordingly. | ||
| /// | ||
| /// # Errors | ||
| /// | ||
| /// This method returns `PacketError::NoHeadRoom`if the packet does not have enough headroom to | ||
| /// serialize or `PacketError::DeparseError` if it does, but the de-parsing the `Packet` failed. |
| // create netlink socket and spawn background task | ||
| let Ok((connection, netlink, _)) = rtnetlink::new_connection() else { | ||
| panic!("failed to create connection"); | ||
| }; | ||
| spawn(connection); | ||
| handle.spawn(connection); |
| #[wrap(with_caps([Capability::CAP_NET_ADMIN]))] | ||
| #[n_vm::in_vm] | ||
| #[cfg_attr(not(emulated), traced_test)] | ||
| #[ignore = "could not make this test to run"] | ||
| async fn test_interface_monitor() { |
There was a problem hiding this comment.
Actionable comments posted: 9
🧹 Nitpick comments (1)
net/src/packet/mod.rs (1)
65-73: 📐 Maintainability & Code Quality | 🔵 TrivialConsider adding
#[from]toDeparseErrorfor automatic error conversionAdding
#[from]to theDeparseErrorvariant allows the?operator to automatically convertDeParseError<E>intoSerializeError<E>, eliminating the need for explicit.map_err(SerializeError::DeparseError)calls indo_serialize.The
thiserrorcrate automatically implementsError::source()when#[from]is used. EnsureDeParseError<E>implementsstd::error::Error(a requirement for#[from]); if it is a standardthiserrorenum, this likely holds true.♻️ Proposed change
#[error("De-parsing error: {0}")] - DeparseError(DeParseError<E>), + DeparseError(#[from] DeParseError<E>),Then simplify the error handling in
do_serialize:- self.headers.deparse(buf).map_err(SerializeError::DeparseError)?; + self.headers.deparse(buf)?;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@net/src/packet/mod.rs` around lines 65 - 73, The SerializeError enum currently wraps DeParseError(E) without automatic conversion, forcing manual map_err handling in do_serialize. Update the DeparseError variant in SerializeError<E> to use #[from] and confirm DeParseError<E> implements std::error::Error so thiserror can derive source() automatically. Then simplify do_serialize to rely on the ? operator for converting DeParseError<E> into SerializeError<E> instead of explicit mapping.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@Dockerfile`:
- Around line 11-15: The Docker build path now depends on PLATFORM, but the
build invocation only forwards PROFILE, so Docker defaults to the wrong target
and can miss the built artifacts. Update the docker build wiring in the justfile
to pass PLATFORM through alongside PROFILE, and ensure the Dockerfile COPY
commands continue to consume the same PLATFORM variable used by the workflow so
quick builds and CI resolve the same target.
In `@interface-manager/src/monitor/mod.rs`:
- Around line 95-114: The `netlink_to_event` path is incorrectly treating
`carrierup` and `carrierdown` as required, causing link-state events to be
dropped when those display-only counters are missing. Update the parsing in
`netlink_to_event` so missing `LinkAttribute::CarrierUpCount` and
`LinkAttribute::CarrierDownCount` do not short-circuit the function; instead,
make them optional or default them while still constructing the `EthEvent` from
`ifindex`, `ifup`, `iflowerup`, `ifrunning`, and `carrier`.
In `@mgmt/src/processor/launch.rs`:
- Around line 116-128: Startup link events can be dropped because
InterfaceMonitor begins emitting before the router knows the interface, and
interface_event_notify/router control currently ignores events for unknown
ifindex values. Fix this by ensuring the startup path either applies an initial
interface snapshot/replay before subscribing to deltas, or delays starting
InterfaceMonitor/interface_event_notify until after ConfigProcessor has
initialized routing state; use the existing InterfaceMonitor,
interface_event_notify, and ConfigProcessor::new/startup sequence to keep early
link events from being lost.
- Around line 89-100: The interface relay loop in interface_event_notify
currently ignores the broadcast receiver closing state, causing an endless retry
spin after rx.recv().await stops yielding events. Update the loop to explicitly
handle the closed result from rx.recv() and break out of the task when the
channel is closed, while keeping the existing send_ifevent relay behavior for
valid EthEvent messages.
In `@net/src/packet/mod.rs`:
- Around line 335-340: Update the doc comments on `Packet::deparse` and
`Packet::serialize` so the `# Errors` section refers to the actual return type,
`SerializeError`, instead of the non-existent `PacketError` variants. Keep the
wording aligned with the existing method names and error cases in
`net/src/packet/mod.rs`, replacing `PacketError::NoHeadRoom` and
`PacketError::DeparseError` with the correct `SerializeError` terminology
throughout both doc blocks.
In `@routing/src/cli/display.rs`:
- Around line 480-485: The IfState Display formatting now emits variable-length
text, which breaks alignment in the CLI tables that print it from the table
rendering code in display.rs. Fix this by either restoring fixed-width padding
in IfState::fmt so it always returns a consistently padded value, or by updating
every IfState table call site in the display formatting paths to apply an
explicit width/column constraint so the output remains aligned.
In `@routing/src/router/ctl.rs`:
- Around line 38-48: LockGuard::drop currently uses ambient tokio::task::spawn,
which can panic if the guard is dropped outside an active Tokio runtime. Update
LockGuard to store a tokio::runtime::Handle when it is created, then replace the
spawn call in drop with handle.spawn(...) so the unlock task is scheduled safely
regardless of runtime context. Ensure the handle is threaded through the
LockGuard constructor and any call sites that build it.
In `@routing/src/router/rio.rs`:
- Around line 215-228: The cli_sock_restore flow in rio::Router currently
deregisters and shuts down the active CLI socket before opening the replacement,
which can leave the router without a usable socket if open_cli_sock fails.
Change the sequence so the new socket is created first, then only call
deregister and shutdown on the old self.clisock after the new socket opens
successfully, and finally assign the new socket and register it through
register(CLISOCK, ...).
- Around line 510-516: The control-channel drain in rio’s poll loop is too
tightly coupled to the CTL_CHANNEL token, which can leave queued messages
unprocessed if a wake is missed after send_and_wake succeeds. Move the
handle_ctl_msg call in router::rio::Rio’s event loop so it runs on every poll
iteration, not only inside the CTL_CHANNEL match arm, while keeping its
try_recv-based draining behavior unchanged. Use the existing handle_ctl_msg and
send_and_wake flow as the main reference points when updating the loop.
---
Nitpick comments:
In `@net/src/packet/mod.rs`:
- Around line 65-73: The SerializeError enum currently wraps DeParseError(E)
without automatic conversion, forcing manual map_err handling in do_serialize.
Update the DeparseError variant in SerializeError<E> to use #[from] and confirm
DeParseError<E> implements std::error::Error so thiserror can derive source()
automatically. Then simplify do_serialize to rely on the ? operator for
converting DeParseError<E> into SerializeError<E> instead of explicit mapping.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 193b3e81-e417-4008-9911-61214a56e5c0
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (22)
Dockerfiledataplane/src/drivers/kernel/worker.rsdataplane/src/runtime.rsinterface-manager/Cargo.tomlinterface-manager/src/lib.rsinterface-manager/src/monitor/mod.rslifecycle/src/lib.rsmgmt/src/processor/launch.rsmgmt/src/processor/proc.rsmgmt/src/tests/mgmt.rsnat/src/masquerade/nf.rsnet/src/packet/meta.rsnet/src/packet/mod.rsnet/src/packet/stats_display.rsrouting/Cargo.tomlrouting/src/cli/display.rsrouting/src/interfaces/iftable.rsrouting/src/interfaces/iftablerw.rsrouting/src/interfaces/interface.rsrouting/src/router/ctl.rsrouting/src/router/revent.rsrouting/src/router/rio.rs
💤 Files with no reviewable changes (1)
- routing/src/interfaces/iftablerw.rs
| ARG PLATFORM=x86_64-unknown-linux-gnu | ||
| LABEL sterile=false | ||
| COPY --link --chown=0:0 ./target/${PROFILE}/dataplane /bin/dataplane | ||
| COPY --link --chown=0:0 ./target/${PROFILE}/dataplane-init /bin/dataplane-init | ||
| COPY --link --chown=0:0 ./target/${PROFILE}/cli /bin/cli | ||
| COPY --link --chown=0:0 ./target/${PLATFORM}/${PROFILE}/dataplane /bin/dataplane | ||
| COPY --link --chown=0:0 ./target/${PLATFORM}/${PROFILE}/dataplane-init /bin/dataplane-init | ||
| COPY --link --chown=0:0 ./target/${PLATFORM}/${PROFILE}/cli /bin/cli |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Thread PLATFORM through the Docker build path.
These COPY paths now require PLATFORM, but justfile:268-279 still passes only PROFILE into docker build. That means quick builds silently fall back to x86_64-unknown-linux-gnu, which does not match the x86-64-v3 platform value already used by the workflow, so the image build can fail with missing artifacts even after a successful local build.
Suggested fix
docker build \
--file ./Dockerfile \
--build-arg PROFILE="{{profile}}" \
+ --build-arg PLATFORM="{{platform}}" \
--label sterile="false" \
--annotation sterile="false" \
--tag "dataplane:dev" \
.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@Dockerfile` around lines 11 - 15, The Docker build path now depends on
PLATFORM, but the build invocation only forwards PROFILE, so Docker defaults to
the wrong target and can miss the built artifacts. Update the docker build
wiring in the justfile to pass PLATFORM through alongside PROFILE, and ensure
the Dockerfile COPY commands continue to consume the same PLATFORM variable used
by the workflow so quick builds and CI resolve the same target.
| let carrierup = link_msg.attributes.iter().find_map(|a| match a { | ||
| LinkAttribute::CarrierUpCount(value) => Some(*value), | ||
| _ => None, | ||
| })?; | ||
| let carrierdown = link_msg.attributes.iter().find_map(|a| match a { | ||
| LinkAttribute::CarrierDownCount(value) => Some(*value), | ||
| _ => None, | ||
| })?; | ||
| // `LinkAttribute::OperState` is not reliable for events | ||
|
|
||
| // construct the event object | ||
| let event = EthEvent { | ||
| name: InterfaceName::try_from(ifname).ok()?, | ||
| ifindex: InterfaceIndex::new(ifindex.try_into().ok()?), | ||
| ifup, | ||
| iflowerup, | ||
| ifrunning, | ||
| carrier: *carrier != 0, | ||
| carrierup, | ||
| carrierdown, |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Don't drop link-state updates when the counter attrs are absent.
carrierup and carrierdown are only display data here, but the ? operators make them mandatory for the whole event. If either attribute is missing, netlink_to_event returns None and routing never sees the admin/oper transition even though it only consumes ifindex, ifup, iflowerup, ifrunning, and carrier.
Proposed fix
- let carrierup = link_msg.attributes.iter().find_map(|a| match a {
- LinkAttribute::CarrierUpCount(value) => Some(*value),
- _ => None,
- })?;
- let carrierdown = link_msg.attributes.iter().find_map(|a| match a {
- LinkAttribute::CarrierDownCount(value) => Some(*value),
- _ => None,
- })?;
+ let carrierup = link_msg.attributes.iter().find_map(|a| match a {
+ LinkAttribute::CarrierUpCount(value) => Some(*value),
+ _ => None,
+ }).unwrap_or(0);
+ let carrierdown = link_msg.attributes.iter().find_map(|a| match a {
+ LinkAttribute::CarrierDownCount(value) => Some(*value),
+ _ => None,
+ }).unwrap_or(0);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let carrierup = link_msg.attributes.iter().find_map(|a| match a { | |
| LinkAttribute::CarrierUpCount(value) => Some(*value), | |
| _ => None, | |
| })?; | |
| let carrierdown = link_msg.attributes.iter().find_map(|a| match a { | |
| LinkAttribute::CarrierDownCount(value) => Some(*value), | |
| _ => None, | |
| })?; | |
| // `LinkAttribute::OperState` is not reliable for events | |
| // construct the event object | |
| let event = EthEvent { | |
| name: InterfaceName::try_from(ifname).ok()?, | |
| ifindex: InterfaceIndex::new(ifindex.try_into().ok()?), | |
| ifup, | |
| iflowerup, | |
| ifrunning, | |
| carrier: *carrier != 0, | |
| carrierup, | |
| carrierdown, | |
| let carrierup = link_msg.attributes.iter().find_map(|a| match a { | |
| LinkAttribute::CarrierUpCount(value) => Some(*value), | |
| _ => None, | |
| }).unwrap_or(0); | |
| let carrierdown = link_msg.attributes.iter().find_map(|a| match a { | |
| LinkAttribute::CarrierDownCount(value) => Some(*value), | |
| _ => None, | |
| }).unwrap_or(0); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@interface-manager/src/monitor/mod.rs` around lines 95 - 114, The
`netlink_to_event` path is incorrectly treating `carrierup` and `carrierdown` as
required, causing link-state events to be dropped when those display-only
counters are missing. Update the parsing in `netlink_to_event` so missing
`LinkAttribute::CarrierUpCount` and `LinkAttribute::CarrierDownCount` do not
short-circuit the function; instead, make them optional or default them while
still constructing the `EthEvent` from `ifindex`, `ifup`, `iflowerup`,
`ifrunning`, and `carrier`.
| async fn interface_event_notify( | ||
| mut rx: tokio::sync::broadcast::Receiver<EthEvent>, | ||
| mut rtr_ctl: RouterCtlSender, | ||
| ) { | ||
| loop { | ||
| if let Ok(ev) = rx.recv().await { | ||
| info!("Notifying router about interface event..."); | ||
| if rtr_ctl.send_ifevent(ev).await.is_err() { | ||
| warn!("Failed to relay interface event to router") | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
Break the relay loop when the broadcast channel closes.
When rx.recv().await returns Closed, the if let Ok(...) body is skipped and the loop immediately retries forever. Once the monitor exits, this task turns into a tight spin.
Proposed fix
async fn interface_event_notify(
mut rx: tokio::sync::broadcast::Receiver<EthEvent>,
mut rtr_ctl: RouterCtlSender,
) {
loop {
- if let Ok(ev) = rx.recv().await {
- info!("Notifying router about interface event...");
- if rtr_ctl.send_ifevent(ev).await.is_err() {
- warn!("Failed to relay interface event to router")
- }
+ match rx.recv().await {
+ Ok(ev) => {
+ info!("Notifying router about interface event...");
+ if rtr_ctl.send_ifevent(ev).await.is_err() {
+ warn!("Failed to relay interface event to router");
+ }
+ }
+ Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
+ warn!(skipped, "Interface event relay lagged");
+ }
+ Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn interface_event_notify( | |
| mut rx: tokio::sync::broadcast::Receiver<EthEvent>, | |
| mut rtr_ctl: RouterCtlSender, | |
| ) { | |
| loop { | |
| if let Ok(ev) = rx.recv().await { | |
| info!("Notifying router about interface event..."); | |
| if rtr_ctl.send_ifevent(ev).await.is_err() { | |
| warn!("Failed to relay interface event to router") | |
| } | |
| } | |
| } | |
| async fn interface_event_notify( | |
| mut rx: tokio::sync::broadcast::Receiver<EthEvent>, | |
| mut rtr_ctl: RouterCtlSender, | |
| ) { | |
| loop { | |
| match rx.recv().await { | |
| Ok(ev) => { | |
| info!("Notifying router about interface event..."); | |
| if rtr_ctl.send_ifevent(ev).await.is_err() { | |
| warn!("Failed to relay interface event to router"); | |
| } | |
| } | |
| Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { | |
| warn!(skipped, "Interface event relay lagged"); | |
| } | |
| Err(tokio::sync::broadcast::error::RecvError::Closed) => break, | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@mgmt/src/processor/launch.rs` around lines 89 - 100, The interface relay loop
in interface_event_notify currently ignores the broadcast receiver closing
state, causing an endless retry spin after rx.recv().await stops yielding
events. Update the loop to explicitly handle the closed result from rx.recv()
and break out of the task when the channel is closed, while keeping the existing
send_ifevent relay behavior for valid EthEvent messages.
| // start interface monitor | ||
| let ifmonitor = Arc::new(InterfaceMonitor::new(mgmt.cancel_token()).phy_only()); | ||
| let if_subsc = ifmonitor.subscribe(); | ||
| mgmt.spawn_fatal_on_exit("interface monitor", ifmonitor.run(), handle); | ||
| mgmt.spawn_fatal_on_exit( | ||
| "interface event relay", | ||
| interface_event_notify(if_subsc, params.processor_params.router_ctl.clone()), | ||
| handle, | ||
| ); | ||
|
|
||
| // create config processor and run it | ||
| let (processor, client) = ConfigProcessor::new(params.processor_params, handle); | ||
| mgmt.spawn_fatal_on_exit("k8s-less config processor", processor.run(), handle); |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | 🏗️ Heavy lift
Startup link events can be lost before routing knows the interface.
InterfaceMonitor is started before the first config is applied, but routing/src/router/ctl.rs:259-285 returns early when the ifindex is unknown. Because the monitor only listens to multicast deltas, any link event delivered during startup is dropped permanently, and the CLI stays stale until that interface flaps again. This needs either an initial snapshot/replay path or later startup ordering.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@mgmt/src/processor/launch.rs` around lines 116 - 128, Startup link events can
be dropped because InterfaceMonitor begins emitting before the router knows the
interface, and interface_event_notify/router control currently ignores events
for unknown ifindex values. Fix this by ensuring the startup path either applies
an initial interface snapshot/replay before subscribing to deltas, or delays
starting InterfaceMonitor/interface_event_notify until after ConfigProcessor has
initialized routing state; use the existing InterfaceMonitor,
interface_event_notify, and ConfigProcessor::new/startup sequence to keep early
link events from being lost.
| /// Consume a [`Packet`] and update its buffer based on any changes to its [`Headers`]. | ||
| /// | ||
| /// # Errors | ||
| /// | ||
| /// Returns a [`Prepend::Error`] error if the packet does not have enough headroom to | ||
| /// serialize. | ||
| pub fn serialize(mut self) -> Result<Buf, <Buf as Prepend>::Error> { | ||
| /// This method returns `PacketError::NoHeadRoom`if the packet does not have enough headroom to | ||
| /// serialize or `PacketError::DeparseError` if it does, but the de-parsing the `Packet` failed. |
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win
Doc references a non-existent PacketError type.
The # Errors section mentions PacketError::NoHeadRoom / PacketError::DeparseError, but this method returns SerializeError. Update the doc to the correct type to avoid confusion. The same wording is repeated on serialize (Lines 359-362).
📝 Suggested wording
- /// This method returns `PacketError::NoHeadRoom`if the packet does not have enough headroom to
- /// serialize or `PacketError::DeparseError` if it does, but the de-parsing the `Packet` failed.
+ /// Returns `SerializeError::NoHeadRoom` if the packet does not have enough headroom to
+ /// serialize, or `SerializeError::DeparseError` if de-parsing the `Packet` failed.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Consume a [`Packet`] and update its buffer based on any changes to its [`Headers`]. | |
| /// | |
| /// # Errors | |
| /// | |
| /// Returns a [`Prepend::Error`] error if the packet does not have enough headroom to | |
| /// serialize. | |
| pub fn serialize(mut self) -> Result<Buf, <Buf as Prepend>::Error> { | |
| /// This method returns `PacketError::NoHeadRoom`if the packet does not have enough headroom to | |
| /// serialize or `PacketError::DeparseError` if it does, but the de-parsing the `Packet` failed. | |
| /// Consume a [`Packet`] and update its buffer based on any changes to its [`Headers`]. | |
| /// | |
| /// # Errors | |
| /// | |
| /// Returns `SerializeError::NoHeadRoom` if the packet does not have enough headroom to | |
| /// serialize, or `SerializeError::DeparseError` if de-parsing the `Packet` failed. |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@net/src/packet/mod.rs` around lines 335 - 340, Update the doc comments on
`Packet::deparse` and `Packet::serialize` so the `# Errors` section refers to
the actual return type, `SerializeError`, instead of the non-existent
`PacketError` variants. Keep the wording aligned with the existing method names
and error cases in `net/src/packet/mod.rs`, replacing `PacketError::NoHeadRoom`
and `PacketError::DeparseError` with the correct `SerializeError` terminology
throughout both doc blocks.
| let s = match *self { | ||
| IfState::Unknown => "unknown", | ||
| IfState::Up => "up", | ||
| IfState::Down => "down", | ||
| }; | ||
| f.pad(s) |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify IfState display call sites use explicit width where table alignment matters.
rg -n 'admin_state|oper_state|IfState' routing/src/cli/display.rs routing/src -g '*.rs' -C 2Repository: githedgehog/dataplane
Length of output: 20228
Refactor IfState Display to preserve fixed-width column alignment in CLI tables.
The Display implementation for IfState currently outputs variable-length strings ("unknown", "up", "down"). Call sites in routing/src/cli/display.rs (lines 534–535, 574) insert these values into tabular output without enforcing explicit column widths. This change removes the previous implicit fixed-width padding, causing column misalignment in CLI tables.
To resolve:
- Either revert
IfState::fmtto return a consistently padded string (e.g., 9 characters), or - Update all table-formating call sites to specify explicit width constraints (e.g.,
{:9}or configure the table formatter to fix column width).
routing/src/cli/display.rs
routing/src/cli/display.rs
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@routing/src/cli/display.rs` around lines 480 - 485, The IfState Display
formatting now emits variable-length text, which breaks alignment in the CLI
tables that print it from the table rendering code in display.rs. Fix this by
either restoring fixed-width padding in IfState::fmt so it always returns a
consistently padded value, or by updating every IfState table call site in the
display formatting paths to apply an explicit width/column constraint so the
output remains aligned.
| impl Drop for LockGuard { | ||
| fn drop(&mut self) { | ||
| let tx = self.0.take(); | ||
| if let Some(tx) = tx { | ||
| if let Some(tx) = self.tx.take() { | ||
| let waker = Arc::clone(&self.waker); | ||
| task::spawn(async move { | ||
| if let Err(e) = tx.send(RouterCtlMsg::GuardedUnlock).await { | ||
| error!("Fatal: could not send unlock request!!: {e}"); | ||
| } else { | ||
| let _ = waker.wake(); | ||
| } | ||
| }); |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major
🧩 Analysis chain
🏁 Script executed:
cat -n routing/src/router/ctl.rs | head -100Repository: githedgehog/dataplane
Length of output: 3781
🏁 Script executed:
rg -A 5 -B 5 "LockGuard" routing/src/ | head -80Repository: githedgehog/dataplane
Length of output: 2393
Avoid ambient tokio::task::spawn in Drop implementation.
LockGuard::drop invokes tokio::task::spawn without capturing a Runtime handle. If LockGuard is dropped outside an active Tokio runtime context (e.g., in synchronous code or during process shutdown), spawn will panic.
Store a tokio::runtime::Handle within LockGuard at creation and use handle.spawn(...) to ensure the task is spawned safely regardless of the caller's runtime context.
Current unsafe implementation
impl Drop for LockGuard {
fn drop(&mut self) {
if let Some(tx) = self.tx.take() {
let waker = Arc::clone(&self.waker);
task::spawn(async move {
// ...
});
}
}
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@routing/src/router/ctl.rs` around lines 38 - 48, LockGuard::drop currently
uses ambient tokio::task::spawn, which can panic if the guard is dropped outside
an active Tokio runtime. Update LockGuard to store a tokio::runtime::Handle when
it is created, then replace the spawn call in drop with handle.spawn(...) so the
unlock task is scheduled safely regardless of runtime context. Ensure the handle
is threaded through the LockGuard constructor and any call sites that build it.
| pub(crate) fn cli_sock_restore(&mut self) { | ||
| let raw_fd = self.clisock.as_raw_fd(); | ||
| debug!("Restoring CLI socket. Current fd is {raw_fd}..."); | ||
| self.deregister(raw_fd); | ||
| let _ = self.clisock.shutdown(std::net::Shutdown::Both); | ||
|
|
||
| // open new sock, bind it and register it | ||
| let Ok(new_sock) = open_cli_sock(&self.cli_sock_path) else { | ||
| error!("Failed to open CLI sock"); | ||
| return; | ||
| }; | ||
| self.clisock = new_sock; | ||
| self.register(CLISOCK, self.clisock.as_raw_fd(), Interest::READABLE); | ||
| debug!("CLI socket restored at {}", self.cli_sock_path); |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
Open the replacement CLI socket before tearing down the current one.
A bind/open failure currently leaves the existing CLI socket deregistered and shut down. Create the new socket first, then swap only after success.
Proposed fix
pub(crate) fn cli_sock_restore(&mut self) {
let raw_fd = self.clisock.as_raw_fd();
debug!("Restoring CLI socket. Current fd is {raw_fd}...");
- self.deregister(raw_fd);
- let _ = self.clisock.shutdown(std::net::Shutdown::Both);
// open new sock, bind it and register it
let Ok(new_sock) = open_cli_sock(&self.cli_sock_path) else {
error!("Failed to open CLI sock");
return;
};
+ self.deregister(raw_fd);
+ let _ = self.clisock.shutdown(std::net::Shutdown::Both);
self.clisock = new_sock;
self.register(CLISOCK, self.clisock.as_raw_fd(), Interest::READABLE);
debug!("CLI socket restored at {}", self.cli_sock_path);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub(crate) fn cli_sock_restore(&mut self) { | |
| let raw_fd = self.clisock.as_raw_fd(); | |
| debug!("Restoring CLI socket. Current fd is {raw_fd}..."); | |
| self.deregister(raw_fd); | |
| let _ = self.clisock.shutdown(std::net::Shutdown::Both); | |
| // open new sock, bind it and register it | |
| let Ok(new_sock) = open_cli_sock(&self.cli_sock_path) else { | |
| error!("Failed to open CLI sock"); | |
| return; | |
| }; | |
| self.clisock = new_sock; | |
| self.register(CLISOCK, self.clisock.as_raw_fd(), Interest::READABLE); | |
| debug!("CLI socket restored at {}", self.cli_sock_path); | |
| pub(crate) fn cli_sock_restore(&mut self) { | |
| let raw_fd = self.clisock.as_raw_fd(); | |
| debug!("Restoring CLI socket. Current fd is {raw_fd}..."); | |
| // open new sock, bind it and register it | |
| let Ok(new_sock) = open_cli_sock(&self.cli_sock_path) else { | |
| error!("Failed to open CLI sock"); | |
| return; | |
| }; | |
| self.deregister(raw_fd); | |
| let _ = self.clisock.shutdown(std::net::Shutdown::Both); | |
| self.clisock = new_sock; | |
| self.register(CLISOCK, self.clisock.as_raw_fd(), Interest::READABLE); | |
| debug!("CLI socket restored at {}", self.cli_sock_path); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@routing/src/router/rio.rs` around lines 215 - 228, The cli_sock_restore flow
in rio::Router currently deregisters and shuts down the active CLI socket before
opening the replacement, which can leave the router without a usable socket if
open_cli_sock fails. Change the sequence so the new socket is created first,
then only call deregister and shutdown on the old self.clisock after the new
socket opens successfully, and finally assign the new socket and register it
through register(CLISOCK, ...).
| CTL_CHANNEL => handle_ctl_msg(&mut rio, &mut db), | ||
| _ => {} | ||
| } | ||
| } | ||
|
|
||
| /* check stale timeout. If expired, remove stale routes */ | ||
| rio.check_stale_timeout(&mut db); |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major
🧩 Analysis chain
🏁 Script executed:
cat -n routing/src/router/rio.rs | sed -n '490,530p'Repository: repoEx
Length of output: 2002
🏁 Script executed:
ls -laRepository: githedgehog/dataplane
Length of output: 4674
🏁 Script executed:
cat -n routing/src/router/rio.rs | sed -n '480,540p'Repository: githedgehog/dataplane
Length of output: 3001
🏁 Script executed:
rg -A 20 "fn handle_ctl_msg" routing/src/router/rio.rsRepository: githedgehog/dataplane
Length of output: 159
🏁 Script executed:
rg "fn handle_ctl_msg|pub fn handle_ctl_msg" --type rustRepository: githedgehog/dataplane
Length of output: 159
🏁 Script executed:
rg "handle_ctl_msg" --type rust -B 5 -A 5Repository: githedgehog/dataplane
Length of output: 159
🏁 Script executed:
cat -n routing/src/router/rio.rs | sed -n '1,50p'Repository: githedgehog/dataplane
Length of output: 2213
🏁 Script executed:
cat -n routing/src/router/ctl.rs | head -100Repository: githedgehog/dataplane
Length of output: 3781
🏁 Script executed:
gawk '/^pub.*fn handle_ctl_msg/,/^\}/' routing/src/router/ctl.rs | head -80Repository: githedgehog/dataplane
Length of output: 1307
Drain the control channel on every poll iteration to prevent stalls from missed wakeups.
The handle_ctl_msg function currently executes only when the CTL_CHANNEL token fires. The send_and_wake logic sends the message before waking the poller; if wake() fails after send() succeeds, the message sits in the channel indefinitely without being processed. Since handle_ctl_msg drains all pending messages efficiently via try_recv() and returns instantly if empty, moving the call outside the CTL_CHANNEL match case ensures it is checked on every loop iteration, eliminating the stall vulnerability.
Proposed fix
- CTL_CHANNEL => handle_ctl_msg(&mut rio, &mut db),
+ CTL_CHANNEL => {}
_ => {}
}
}
+
+ handle_ctl_msg(&mut rio, &mut db);
/* check stale timeout. If expired, remove stale routes */
rio.check_stale_timeout(&mut db);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| CTL_CHANNEL => handle_ctl_msg(&mut rio, &mut db), | |
| _ => {} | |
| } | |
| } | |
| /* check stale timeout. If expired, remove stale routes */ | |
| rio.check_stale_timeout(&mut db); | |
| CTL_CHANNEL => {} | |
| _ => {} | |
| } | |
| } | |
| handle_ctl_msg(&mut rio, &mut db); | |
| /* check stale timeout. If expired, remove stale routes */ | |
| rio.check_stale_timeout(&mut db); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@routing/src/router/rio.rs` around lines 510 - 516, The control-channel drain
in rio’s poll loop is too tightly coupled to the CTL_CHANNEL token, which can
leave queued messages unprocessed if a wake is missed after send_and_wake
succeeds. Move the handle_ctl_msg call in router::rio::Rio’s event loop so it
runs on every poll iteration, not only inside the CTL_CHANNEL match arm, while
keeping its try_recv-based draining behavior unchanged. Use the existing
handle_ctl_msg and send_and_wake flow as the main reference points when updating
the loop.
qmonnet
left a comment
There was a problem hiding this comment.
Some small comments and suggestions below, but not blocking - PR looks good from my side 🥘. Thanks!
However:
- You have one commit missing a sign-off
- AI have raised some actual concerns, it seems, please address where necessary
- The “concurrency” job failure looks like something you need to address, too
- (The Miri job on powerpc64 is currently broken though, not related to this PR.)
| } | ||
|
|
||
| fn open_unix_sock(path: &String) -> Result<UnixDatagram, RouterError> { | ||
| debug!("Opening UX sock; target bind point is {path}"); |
There was a problem hiding this comment.
“UNIX” would be easier to understand than “UX”
| LinkAttribute::CarrierDownCount(value) => Some(*value), | ||
| _ => None, | ||
| })?; | ||
| // `LinkAttribute::OperState` is not reliable for events |
There was a problem hiding this comment.
| // `LinkAttribute::OperState` is not reliable for events | |
| // `LinkAttribute::OperState` is not reliable for events, so we ignore it. |
(It was not immediately clear to me what the comment implied.)
| /// Returns [`LaunchError`] on init failure. [`LaunchError::Cancelled`] is | ||
| /// Returns [`LaunchError`] on init failure. [`LaunchError::Canc elled`] is |
There was a problem hiding this comment.
Looks like a mistake (but I guess the AI review will flag this too)
| ) -> (Self, ConfigClient) { | ||
| debug!("Creating config processor..."); | ||
| let (tx, rx) = mpsc::channel(Self::CHANNEL_SIZE); | ||
| let _g = handle.enter(); |
There was a problem hiding this comment.
| let _g = handle.enter(); | |
| let _ = handle.enter(); |
Fixes several small things experienced on deployments: