Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
6cf6589
feat(masquerade): rename methods & vars and remove misleading log
Fredi-raspall Jun 18, 2026
352588d
chore(routing): rename vars & move auxiliary defs for exit guard
Fredi-raspall Jun 19, 2026
3177ad6
feat(routing): add methods to rebind cli socket
Fredi-raspall Jun 19, 2026
b299e34
feat(dataplane,lifecycle): split signal handler in two
Fredi-raspall Jun 19, 2026
f47de54
feat(dataplane): rebind cli sock on SIGUSR1
Fredi-raspall Jun 19, 2026
2cd5438
feat(net): account pkts on serialize errors
Fredi-raspall Jun 19, 2026
673a9fd
chore: fix Dockerfile for quick, local builds
Fredi-raspall Jun 22, 2026
2e311f9
feat(interface-manager): add interface status monitor
Fredi-raspall Jun 23, 2026
2a8fdcb
feat(mgmt): move creation and running of ConfigProcessor
Fredi-raspall Jun 23, 2026
576ec2f
feat(mgmt): start interface monitor and relay events
Fredi-raspall Jun 24, 2026
92c5e1a
feat(routing): handle interface events
Fredi-raspall Jun 24, 2026
de9adc9
feat(routing): register an event on interface change
Fredi-raspall Jun 24, 2026
fbae641
feat(routing): change setting/logging of interface changes
Fredi-raspall Jun 26, 2026
6dc0f41
feat(routing): integrate control channel in IO loop poller
Fredi-raspall Jun 26, 2026
4961d25
feat(driver): enlarge rx buffer size
Fredi-raspall Jun 26, 2026
1eb3275
feat(dataplane,routing): do not rebind cli sock on SIGUSR1
Fredi-raspall Jun 30, 2026
03d56f7
feat(routing): automatically rebind CLI
Fredi-raspall Jun 30, 2026
70bb7af
feat(interface-manager): avoid arbitrary_self_types
Fredi-raspall Jun 30, 2026
2a92ba8
chore(routing,config): tidy up code and logs
Fredi-raspall Jun 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
FROM debug-tools:dev

ARG PROFILE=debug
ARG PLATFORM=x86_64-unknown-linux-gnu
LABEL sterile=false
COPY --link --chown=0:0 ./target/${PROFILE}/dataplane /bin/dataplane
COPY --link --chown=0:0 ./target/${PROFILE}/dataplane-init /bin/dataplane-init
COPY --link --chown=0:0 ./target/${PROFILE}/cli /bin/cli
COPY --link --chown=0:0 ./target/${PLATFORM}/${PROFILE}/dataplane /bin/dataplane
COPY --link --chown=0:0 ./target/${PLATFORM}/${PROFILE}/dataplane-init /bin/dataplane-init
COPY --link --chown=0:0 ./target/${PLATFORM}/${PROFILE}/cli /bin/cli
Comment thread
coderabbitai[bot] marked this conversation as resolved.

WORKDIR /
# this is a privileged container, we really do want to run as root
Expand Down
5 changes: 2 additions & 3 deletions dataplane/src/drivers/kernel/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ fn packet_recv(
max_to_read: usize,
pkts: &mut Vec<Box<Packet<TestBuffer>>>,
) -> Result<(), nix::Error> {
let mut raw = [0u8; 9100];
let mut raw = [0u8; 9600];
let mut ret = Ok(());
pkts.clear();
while pkts.len() < max_to_read {
Expand Down Expand Up @@ -512,8 +512,7 @@ async fn tx_packet(
}
}
Err(e) => {
// this should be a warn/error. Making it debug until we rate-limit logs
debug!(
warn!(
worker = id,
rx_intf_name = rx_if_name,
"Serialize failed: {e:?}"
Expand Down
34 changes: 32 additions & 2 deletions dataplane/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::statistics::spawn_metrics;
use args::{CmdArgs, Parser};

use crate::drivers::kernel::DriverKernel;
use lifecycle::{Shutdown, default_deadlines, spawn_shutdown_watchdog};
use lifecycle::{
CancellationToken, DpSignal, Shutdown, default_deadlines, spawn_shutdown_watchdog,
};
use mgmt::{ConfigProcessorParams, LaunchError, MgmtParams, run_mgmt};

use nix::unistd::gethostname;
Expand Down Expand Up @@ -127,6 +129,31 @@ fn parse_bmp_params(args: &CmdArgs) -> (Option<BmpServerParams>, Option<BmpOptio
}
}

// Main signal handling of dataplane occurs here
fn spawn_signal_handler(
rt_handle: &tokio::runtime::Handle,
mut sigrx: tokio::sync::mpsc::Receiver<DpSignal>,
root: CancellationToken,
) {
rt_handle.spawn(async move {
loop {
tokio::select! {
Some(sig) = sigrx.recv() => {
info!("Processing signal {sig:?} from signal catcher");
match sig {
DpSignal::SIGTERM | DpSignal::SIGINT | DpSignal::SIGQUIT => root.cancel(),
DpSignal::SIGUSR1 | DpSignal::SIGUSR2 | DpSignal::SIGHUP | DpSignal::SIGALRM | DpSignal::SIGPIPE => {},
}
}
() = root.cancelled() => {
break;
}
}
}
info!("Signal handler ended");
});
}

#[allow(clippy::too_many_lines)]
pub fn main() {
let args = CmdArgs::parse();
Expand Down Expand Up @@ -188,7 +215,7 @@ pub fn main() {
.expect("Failed to build mgmt runtime");
let mgmt_handle = mgmt_runtime.handle().clone();

lifecycle::spawn_signal_handler(&mgmt_handle, shutdown.root.clone())
let sigrx = lifecycle::spawn_signal_catcher(&mgmt_handle, shutdown.root.clone())
.expect("failed to install signal handler");

spawn_shutdown_watchdog(shutdown.root.clone(), default_deadlines::TOTAL, 124)
Expand Down Expand Up @@ -219,6 +246,8 @@ pub fn main() {
)
.expect("failed to start router");

spawn_signal_handler(&mgmt_handle, sigrx, shutdown.root.clone());

spawn_metrics(
&shutdown.metrics,
&mgmt_handle,
Expand All @@ -235,6 +264,7 @@ pub fn main() {
MgmtParams {
config_dir: args.config_dir().cloned(),
hostname: gwname.clone(),
interfaces: args.interfaces().map(|i| i.interface).collect(),
processor_params: ConfigProcessorParams {
router_ctl: setup.router.get_ctl_tx(),
pipeline_data: pipeline_factory().get_data(),
Expand Down
7 changes: 7 additions & 0 deletions interface-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,18 @@ serde = { workspace = true, features = ["std"] }
static_assertions = { workspace = true, features = [] }
thiserror = { workspace = true, features = ["std"] }
tokio = { workspace = true, default-features = false, features = ["fs", "io-util"] }
tokio-util = { workspace = true }
tracing = { workspace = true, features = ["attributes"] }

[dev-dependencies]
# internal
fixin = { workspace = true }
net = { workspace = true, features = ["bolero", "test_buffer", "serde", "netdevsim"] }
n-vm = { workspace = true }
test-utils = { workspace = true }

# external
caps = { workspace = true, default-features = false, features = [] }
bolero = { workspace = true, default-features = false, features = ["alloc"] }
tracing-subscriber = { workspace = true }
tracing-test = { workspace = true }
1 change: 1 addition & 0 deletions interface-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use concurrency::sync::Arc;
use std::marker::PhantomData;

pub mod interface;
pub mod monitor;
pub mod tc;

use rtnetlink::Handle;
Expand Down
215 changes: 215 additions & 0 deletions interface-manager/src/monitor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Open Network Fabric Authors

//! A small interface monitor. The interface monitor listens to netlink events asynchronously
//! and disseminates them over a broadcast channel. It does not make any attempt to interpret
//! the events received via netlink. The interface monitor reports events on ethernet interfaces.
//! For testing, it can be allowed to report events for other types of network devices.

use concurrency::sync::Arc;
use net::interface::{InterfaceIndex, InterfaceName};
use rtnetlink::MulticastGroup;
use rtnetlink::packet_core::{NetlinkMessage, NetlinkPayload};
use rtnetlink::packet_route::RouteNetlinkMessage;
use rtnetlink::packet_route::link::{LinkAttribute, LinkFlags};
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;

#[allow(unused)]
use tracing::{debug, error, info, warn};

/// A type representing an event on an Ethernet interface.
#[derive(Debug, Clone)]
#[allow(clippy::struct_excessive_bools)]
pub struct EthEvent {
pub name: InterfaceName,
pub ifindex: InterfaceIndex,
pub ifup: bool,
pub iflowerup: bool,
pub ifrunning: bool,
pub carrier: bool,
pub carrierup: u32,
pub carrierdown: u32,
}
impl std::fmt::Display for EthEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let ifup = if self.ifup { "yes" } else { "no" };
let ifloup = if self.iflowerup { "yes" } else { "no" };
let ifrun = if self.ifrunning { "yes" } else { "no" };
let carrier = if self.carrier { "yes" } else { "no" };
write!(
f,
"ifname:{} ({}) ifup:{ifup} iflowerup:{ifloup} ifrun:{ifrun} carrier:{carrier} carrierup:{} carrierdown:{}",
self.name, self.ifindex, self.carrierup, self.carrierdown
)
}
}

/// Interface monitor
pub struct InterfaceMonitor {
tx: broadcast::Sender<EthEvent>,
ct: CancellationToken,
tracked: Vec<InterfaceName>,
}
impl InterfaceMonitor {
#[must_use]
pub fn new(ct: CancellationToken, track: &[InterfaceName]) -> Self {
let (tx, _) = broadcast::channel::<EthEvent>(100);
Self {
tx,
ct,
tracked: track.into(),
}
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<EthEvent> {
self.tx.subscribe()
}

/// Convert a netlink message to an `EthEvent` if it is a `NewLink` message for a tracked interface
fn netlink_to_event(&self, msg: NetlinkMessage<RouteNetlinkMessage>) -> Option<EthEvent> {
let (_hdr, payload) = msg.into_parts();

let NetlinkPayload::InnerMessage(RouteNetlinkMessage::NewLink(link_msg)) = payload else {
return None;
};
let ifindex = link_msg.header.index;
let ifup = link_msg.header.flags.contains(LinkFlags::Up);
let iflowerup = link_msg.header.flags.contains(LinkFlags::LowerUp);
let ifrunning = link_msg.header.flags.contains(LinkFlags::Running);
let ifname = link_msg.attributes.iter().find_map(|a| match a {
LinkAttribute::IfName(name) => Some(name.clone()),
_ => None,
})?;
let ifname = InterfaceName::try_from(ifname).ok()?;
if !self.tracked.contains(&ifname) {
return None;
}
let carrier = link_msg.attributes.iter().find_map(|a| match a {
LinkAttribute::Carrier(value) => Some(value),
_ => None,
})?;
let carrierup = link_msg.attributes.iter().find_map(|a| match a {
LinkAttribute::CarrierUpCount(value) => Some(*value),
_ => None,
})?;
let carrierdown = link_msg.attributes.iter().find_map(|a| match a {
LinkAttribute::CarrierDownCount(value) => Some(*value),
_ => None,
})?;
// `LinkAttribute::OperState` is not reliable for events, so we ignore it.
// N.B. the above attributes are required (watch the ?)

// construct the event object
let event = EthEvent {
name: ifname,
ifindex: InterfaceIndex::new(ifindex.try_into().ok()?),
ifup,
iflowerup,
ifrunning,
carrier: *carrier != 0,
carrierup,
carrierdown,
};
info!("Got event for {event}");
Some(event)
}

/// Start an interface monitor to track the set of network devices
///
/// # Errors
///
/// This method fails if a netlink connection cannot be created.
pub async fn run(monitor: Arc<Self>) -> Result<(), ()> {
info!("Starting interface monitor");
for i in &monitor.tracked {
info!("Will track status of interface {i}");
}
let (conn, _, mut messages) = rtnetlink::new_multicast_connection(&[MulticastGroup::Link])
.inspect_err(|e| error!("Failed to open netlink connection: {e}"))
.map_err(|_| ())?;

tokio::spawn(conn);

let tx = monitor.tx.clone();
let ct = monitor.ct.clone();
loop {
tokio::select! {
nlmsg = messages.recv() => {
match nlmsg {
Ok((msg, _)) => {
if let Some(event) = monitor.netlink_to_event(msg) && tx.send(event).is_err() {
warn!("Warning, there are no link event readers!");
}
}
Err(e) => {
error!("Recv error in netlink socket: {e}");
break;
}
}
}
() = ct.cancelled() => {
info!("Interface monitor got cancelled");
break;
}
}
}
info!("Interface monitor is shutting down now");
Ok(())
}
}

#[cfg(test)]
mod test {
use super::InterfaceMonitor;
use caps::Capability;
use concurrency::sync::Arc;
use fixin::wrap;
use net::interface::InterfaceName;
use rtnetlink::{LinkDummy, LinkMessageBuilder, new_connection};
use test_utils::with_caps;
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing_test::traced_test;

async fn create_dummy(name: &str) {
let (connection, handle, _) = new_connection().unwrap();
tokio::spawn(connection);
let msg = LinkMessageBuilder::<LinkDummy>::new(name).build();
handle.link().add(msg).execute().await.unwrap();
}

#[tokio::test]
#[wrap(with_caps([Capability::CAP_NET_ADMIN]))]
#[n_vm::in_vm]
#[cfg_attr(not(emulated), traced_test)]
#[ignore = "disabled until nv_m support is re-enabled"]
async fn test_interface_monitor() {
Comment on lines +184 to +188
const INTERFACE: &str = "test-dummy";
let test_ifname = InterfaceName::try_from(INTERFACE).unwrap();
let ct = CancellationToken::new();
let ifmonitor = Arc::new(InterfaceMonitor::new(ct, &[test_ifname]));
let mut subsc1 = ifmonitor.subscribe();
let mut subsc2 = ifmonitor.subscribe();
tokio::spawn(InterfaceMonitor::run(ifmonitor.clone()));

create_dummy(INTERFACE).await;

let j1 = tokio::spawn(async move {
let event = subsc1.recv().await.unwrap();
println!("listener1: {event}");
});
let j2 = tokio::spawn(async move {
let event = subsc2.recv().await.unwrap();
println!("listener2: {event}");
});
tokio::time::sleep(Duration::from_secs(3)).await;
debug!("Will now cancel the interface monitor");
ifmonitor.ct.cancel();
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(ifmonitor.ct.is_cancelled());
let _ = j1.await;
let _ = j2.await;
}
}
Loading
Loading