From 605e7adb44ea363a3651aa2b09e55526e30f7501 Mon Sep 17 00:00:00 2001 From: Rishav-N Date: Sat, 30 May 2026 01:06:27 +0000 Subject: [PATCH] Changed to work with ros topics into WebRTC --- include/CameraConfig.hpp | 2 + include/CameraPipeline.hpp | 2 + ros_camera_bridge.py | 192 +++++++++++++++++++++++++++++++++++++ src/CameraManager.cpp | 24 ++++- src/CameraPipeline.cpp | 100 ++++++++++++++++++- 5 files changed, 315 insertions(+), 5 deletions(-) create mode 100644 ros_camera_bridge.py diff --git a/include/CameraConfig.hpp b/include/CameraConfig.hpp index 25c66da..171882f 100644 --- a/include/CameraConfig.hpp +++ b/include/CameraConfig.hpp @@ -14,6 +14,8 @@ struct CameraConfig { std::string quality = "medium"; // "low" | "medium" | "high" | "ultra" int exposure = -1; // -1 = auto; >= 0 = absolute exposure value (v4l2 only) bool cropLeftHalf = false; // true for side-by-side stereo devices where only left view is streamed + bool useRosTopic = false; // when true, read from rosTopic instead of devicePath + std::string rosTopic; // e.g. "/cameras/cam0/image_raw" int computeBitrate() const { double bpp; diff --git a/include/CameraPipeline.hpp b/include/CameraPipeline.hpp index d3f621d..447c401 100644 --- a/include/CameraPipeline.hpp +++ b/include/CameraPipeline.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include "CameraConfig.hpp" @@ -36,6 +37,7 @@ class CameraPipeline { int sourceFps_ = 0; // native rate to negotiate with the source; 0 = use config_.fps GstElement* pipeline_ = nullptr; guint busWatchId_ = 0; + pid_t rosBridgePid_ = -1; // subprocess PID when useRosTopic is true std::atomic frameCount_{0}; std::atomic byteCount_{0}; diff --git a/ros_camera_bridge.py b/ros_camera_bridge.py new file mode 100644 index 0000000..95dc06a --- /dev/null +++ b/ros_camera_bridge.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +""" +ros_camera_bridge.py + +Connects to the existing rosbridge WebSocket server, subscribes to a ROS +sensor_msgs/Image topic, and publishes the frames to MediaMTX via RTSP +using a GStreamer appsrc pipeline. + +No ROS2 install required — uses the existing rosbridge WS on port 9090. + +Usage (invoked by the C++ backend automatically): + python3 ros_camera_bridge.py \ + --topic /cameras/cam0/image_raw \ + --stream-name camera_0 \ + [--width 640] [--height 480] [--fps 30] [--bitrate 500000] \ + [--rosbridge-port 9090] +""" + +import argparse +import asyncio +import base64 +import json +import signal +import sys +import threading + +import numpy as np + +import gi +gi.require_version("Gst", "1.0") +from gi.repository import Gst + +import websockets + + +class RosCameraBridge: + def __init__(self, args): + self.args = args + self.appsrc = None + self.pipeline = None + self.running = True + self._pts = 0 + self._frame_duration_ns = int(1e9 / args.fps) + + def setup_gstreamer(self): + Gst.init(None) + bitrate_kbps = self.args.bitrate // 1000 + caps = ( + f"video/x-raw,format=BGR," + f"width={self.args.width},height={self.args.height}," + f"framerate={self.args.fps}/1" + ) + pipeline_str = ( + f"appsrc name=src format=time is-live=true block=false " + f"caps=\"{caps}\" " + f"! videoconvert " + f"! x264enc tune=zerolatency bitrate={bitrate_kbps} " + f"speed-preset=ultrafast key-int-max=30 " + f"! h264parse " + f"! video/x-h264,stream-format=byte-stream,alignment=au " + f"! rtspclientsink name=sink protocols=tcp latency=0 " + f"location=rtsp://127.0.0.1:8554/{self.args.stream_name}" + ) + self.pipeline = Gst.parse_launch(pipeline_str) + self.appsrc = self.pipeline.get_by_name("src") + ret = self.pipeline.set_state(Gst.State.PLAYING) + if ret == Gst.StateChangeReturn.FAILURE: + print("[bridge] GStreamer pipeline failed to start", file=sys.stderr) + sys.exit(1) + print( + f"[bridge] GStreamer pipeline started: " + f"appsrc → x264 → rtsp://127.0.0.1:8554/{self.args.stream_name}", + file=sys.stderr, + ) + + def push_frame(self, image_msg: dict): + if self.appsrc is None or not self.running: + return + try: + raw = base64.b64decode(image_msg["data"]) + h = image_msg["height"] + w = image_msg["width"] + encoding = image_msg["encoding"] + + arr = np.frombuffer(raw, dtype=np.uint8) + if encoding == "bgr8": + frame = arr.reshape(h, w, 3) + elif encoding == "rgb8": + frame = arr.reshape(h, w, 3)[:, :, ::-1] # RGB → BGR + elif encoding in ("bgra8", "8UC4"): + frame = arr.reshape(h, w, 4)[:, :, :3] # BGRA → BGR (drop alpha) + elif encoding == "rgba8": + frame = arr.reshape(h, w, 4)[:, :, [2, 1, 0]] # RGBA → BGR + elif encoding == "mono8": + gray = arr.reshape(h, w) + frame = np.stack([gray, gray, gray], axis=2) + else: + print(f"[bridge] unsupported encoding: {encoding}", file=sys.stderr) + return + + # Resize if the topic resolution differs from the configured output size + if w != self.args.width or h != self.args.height: + import cv2 + frame = cv2.resize(frame, (self.args.width, self.args.height)) + + buf = Gst.Buffer.new_wrapped(bytes(frame)) + buf.pts = self._pts + buf.duration = self._frame_duration_ns + self._pts += self._frame_duration_ns + + self.appsrc.emit("push-buffer", buf) + except Exception as exc: + print(f"[bridge] push_frame error: {exc}", file=sys.stderr) + + async def run(self): + uri = f"ws://127.0.0.1:{self.args.rosbridge_port}" + reconnect_delay = 1.0 + + while self.running: + try: + async with websockets.connect(uri, max_size=10 * 1024 * 1024) as ws: + reconnect_delay = 1.0 + sub_msg = json.dumps({ + "op": "subscribe", + "topic": self.args.topic, + "type": "sensor_msgs/Image", + "queue_length": 1, + "throttle_rate": 0, + "compression": "none", + }) + await ws.send(sub_msg) + print( + f"[bridge] subscribed to {self.args.topic} via rosbridge {uri}", + file=sys.stderr, + ) + while self.running: + try: + raw = await asyncio.wait_for(ws.recv(), timeout=5.0) + except asyncio.TimeoutError: + continue + msg = json.loads(raw) + if msg.get("op") == "publish" and msg.get("topic") == self.args.topic: + self.push_frame(msg["msg"]) + except (websockets.ConnectionClosed, ConnectionRefusedError, OSError) as exc: + if not self.running: + break + print( + f"[bridge] rosbridge connection lost ({exc}), " + f"retrying in {reconnect_delay:.0f}s", + file=sys.stderr, + ) + await asyncio.sleep(reconnect_delay) + reconnect_delay = min(reconnect_delay * 2, 10.0) + + def stop(self): + self.running = False + if self.pipeline: + self.pipeline.set_state(Gst.State.NULL) + + +def main(): + parser = argparse.ArgumentParser(description="ROS Image → MediaMTX RTSP bridge via rosbridge") + parser.add_argument("--topic", required=True, help="ROS Image topic") + parser.add_argument("--stream-name", required=True, dest="stream_name", help="MediaMTX RTSP path") + parser.add_argument("--width", type=int, default=640) + parser.add_argument("--height", type=int, default=480) + parser.add_argument("--fps", type=int, default=30) + parser.add_argument("--bitrate", type=int, default=500000, help="Bitrate in bits/s") + parser.add_argument("--rosbridge-port", type=int, default=9090, dest="rosbridge_port") + args = parser.parse_args() + + bridge = RosCameraBridge(args) + bridge.setup_gstreamer() + + loop = asyncio.new_event_loop() + + def handle_signal(sig, frame): + bridge.stop() + loop.call_soon_threadsafe(loop.stop) + + signal.signal(signal.SIGTERM, handle_signal) + signal.signal(signal.SIGINT, handle_signal) + + try: + loop.run_until_complete(bridge.run()) + finally: + bridge.stop() + loop.close() + + +if __name__ == "__main__": + main() diff --git a/src/CameraManager.cpp b/src/CameraManager.cpp index 1d25144..d837624 100644 --- a/src/CameraManager.cpp +++ b/src/CameraManager.cpp @@ -378,9 +378,11 @@ void CameraManager::loadConfigs(const std::string& path) { cfg.height = obj.value("height", 720); cfg.fps = obj.value("fps", 30); cfg.quality = obj.value("quality", "medium"); - cfg.exposure = obj.value("exposure", -1); + cfg.exposure = obj.value("exposure", -1); cfg.cropLeftHalf = obj.value("cropLeftHalf", false); - configs_[id] = cfg; + cfg.useRosTopic = obj.value("useRosTopic", false); + cfg.rosTopic = obj.value("rosTopic", ""); + configs_[id] = cfg; } } catch (const std::exception& e) { std::cerr << "loadConfigs: " << e.what() << std::endl; @@ -402,6 +404,8 @@ void CameraManager::saveConfigs(const std::string& path) const { {"quality", cfg.quality}, {"exposure", cfg.exposure}, {"cropLeftHalf", cfg.cropLeftHalf}, + {"useRosTopic", cfg.useRosTopic}, + {"rosTopic", cfg.rosTopic}, }; } std::ofstream f(path); @@ -681,7 +685,15 @@ void CameraManager::enableCamera(int clientId, const std::string& id) { } const std::string key = pipelineKey(clientId, id); - if (pipelines_.count(key)) disableCamera(clientId, id); + if (pipelines_.count(key)) { + // Pipeline already running for this client — no restart needed. + // (Failed pipelines are reaped by the background timer, leaving + // pipelines_ empty for that key, so this branch is only hit when + // the pipeline is genuinely still streaming.) + std::cout << "enableCamera: already streaming client=" << clientId + << " camera=" << id << " (no-op)" << std::endl; + return; + } // Pick a native source fps >= the user's cap so the camera negotiates a // supported rate; videorate inside the pipeline then drops down to cfg.fps. @@ -825,7 +837,9 @@ bool CameraManager::applyConfigPatch(const std::string& id, const json& patch) { } if (patch.contains("quality")) { cfg.quality = patch["quality"].get(); pipelineChange = true; } if (patch.contains("exposure")) { cfg.exposure = patch["exposure"]; pipelineChange = true; } - if (patch.contains("role")) cfg.role = patch["role"].get(); + if (patch.contains("role")) cfg.role = patch["role"].get(); + if (patch.contains("useRosTopic")) { cfg.useRosTopic = patch["useRosTopic"].get(); pipelineChange = true; } + if (patch.contains("rosTopic")) { cfg.rosTopic = patch["rosTopic"].get(); pipelineChange = true; } if (pipelineChange && isEnabled(id)) { std::vector enabledClients; for (const auto& [key, _] : pipelines_) { @@ -873,6 +887,8 @@ std::string CameraManager::buildStateJson(int clientId) const { {"bitrate", cfg.computeBitrate()}, {"exposure", cfg.exposure}, {"cropLeftHalf", cfg.cropLeftHalf}, + {"useRosTopic", cfg.useRosTopic}, + {"rosTopic", cfg.rosTopic}, {"capabilities", caps}, }); } diff --git a/src/CameraPipeline.cpp b/src/CameraPipeline.cpp index cced51d..4091643 100644 --- a/src/CameraPipeline.cpp +++ b/src/CameraPipeline.cpp @@ -5,6 +5,11 @@ #include #include #include +#ifdef __linux__ +# include +# include +# include +#endif static std::string timestamp() { struct timeval tv; @@ -30,6 +35,64 @@ static bool isV4l2AllocationError(const std::string& message) { } +// ── ROS bridge subprocess ───────────────────────────────────────────────────── + +static std::string findBridgeScript() { + // Binary lives in build/; script lives one level up in the repo root. + char selfPath[4096] = {}; +#ifdef __linux__ + ssize_t len = readlink("/proc/self/exe", selfPath, sizeof(selfPath) - 1); + if (len > 0) { + std::string exe(selfPath, len); + // strip filename → build dir + auto s1 = exe.rfind('/'); + if (s1 != std::string::npos) { + // strip build dir → repo root + std::string buildDir = exe.substr(0, s1); + auto s2 = buildDir.rfind('/'); + if (s2 != std::string::npos) { + std::string candidate = buildDir.substr(0, s2 + 1) + "ros_camera_bridge.py"; + if (access(candidate.c_str(), R_OK) == 0) return candidate; + } + } + } +#endif + return "ros_camera_bridge.py"; // cwd fallback +} + +static pid_t spawnRosBridge(const CameraConfig& cfg, const std::string& streamName) { +#ifdef __linux__ + std::string script = findBridgeScript(); + std::string bitrate = std::to_string(cfg.computeBitrate()); + std::string width = std::to_string(cfg.width); + std::string height = std::to_string(cfg.height); + std::string fps = std::to_string(cfg.fps); + + pid_t pid = fork(); + if (pid == 0) { + const char* argv[] = { + "python3", script.c_str(), + "--topic", cfg.rosTopic.c_str(), + "--stream-name", streamName.c_str(), + "--width", width.c_str(), + "--height", height.c_str(), + "--fps", fps.c_str(), + "--bitrate", bitrate.c_str(), + nullptr + }; + execvp("python3", const_cast(argv)); + _exit(127); + } + return pid; +#else + (void)cfg; (void)streamName; + std::cerr << "ROS bridge subprocess not supported on this platform" << std::endl; + return -1; +#endif +} + +// ───────────────────────────────────────────────────────────────────────────── + static std::string buildPipelineString(const CameraConfig& cfg, const PlatformSpecifics& specs, const std::string& streamName, @@ -117,13 +180,28 @@ CameraPipeline::~CameraPipeline() { } bool CameraPipeline::start() { - if (pipeline_) return true; + if (pipeline_ || rosBridgePid_ != -1) return true; failed_.store(false); { std::lock_guard lock(errorMutex_); lastError_.clear(); } + // ── ROS topic mode ──────────────────────────────────────────────────────── + if (config_.useRosTopic && !config_.rosTopic.empty()) { + std::cout << "[" << timestamp() << "] Starting ROS bridge: " + << config_.rosTopic << " -> rtsp://127.0.0.1:8554/" << streamName_ << std::endl; + rosBridgePid_ = spawnRosBridge(config_, streamName_); + if (rosBridgePid_ <= 0) { + std::cerr << "[" << timestamp() << "] Failed to spawn ros_camera_bridge.py" << std::endl; + failed_.store(true); + rosBridgePid_ = -1; + return false; + } + return true; + } + + // ── Direct v4l2 / avfvideosrc mode ─────────────────────────────────────── auto specs = PlatformDetect::getPlatformSpecifics(); if (config_.devicePath == "test") specs.source = "videotestsrc"; @@ -206,6 +284,26 @@ bool CameraPipeline::start() { } void CameraPipeline::stop() { +#ifdef __linux__ + if (rosBridgePid_ > 0) { + std::cout << "[" << timestamp() << "] Stopping ROS bridge (pid=" << rosBridgePid_ << ")" << std::endl; + kill(rosBridgePid_, SIGTERM); + // Reap to avoid zombies; give it 1 s then SIGKILL. + for (int i = 0; i < 10; ++i) { + int status; + pid_t ret = waitpid(rosBridgePid_, &status, WNOHANG); + if (ret != 0) break; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + int status; + if (waitpid(rosBridgePid_, &status, WNOHANG) == 0) { + kill(rosBridgePid_, SIGKILL); + waitpid(rosBridgePid_, &status, 0); + } + rosBridgePid_ = -1; + return; + } +#endif removeSourceIfActive(busWatchId_); if (pipeline_) { std::cout << "[" << timestamp() << "] Stopping pipeline" << std::endl;