diff --git a/demos/mosaico_integration/README.md b/demos/mosaico_integration/README.md new file mode 100644 index 0000000..52107be --- /dev/null +++ b/demos/mosaico_integration/README.md @@ -0,0 +1,180 @@ +# Mosaico integration demo + +End-to-end pipeline demonstrating that **medkit fault snapshots flow into Mosaico as queryable, structured forensic data** - with no robot hardware and no recording 24/7. + +> Robot stack runs in Docker. Inject high noise on the simulated LiDAR. The medkit gateway detects it via the standard `/diagnostics` ROS topic, confirms the fault, and flushes its 15-second pre-fault + 10-second post-fault ring buffer to an `.mcap` file. A small Python bridge listens on the gateway's `/faults/stream` SSE endpoint, downloads the bag via REST, and ingests it into mosaicod via Apache Arrow Flight using Mosaico's own Python SDK. From `docker compose up` to a queryable `Sequence` in mosaicod takes about a minute. + +There are two variants of the stack: + +- **Single-robot** (`docker-compose.yml`): one sensor-demo + one bridge. Good for stepping through the pipeline the first time and for the notebook. +- **Fleet** (`docker-compose.fleet.yml`): three sensor-demos (warehouse-A, warehouse-B, outdoor-yard) each with its own bridge, all ingesting into one shared mosaicod. All three robots produce a `LIDAR_SIM` fault snapshot, but robot-02 is rotating (IMU `drift_rate = 0.3 rad/s`) during its fault window while robot-01 and robot-03 stay stationary. That way the metadata filter (Step 1 in the notebook) matches 3 of 3 and the compound IMU `.Q` stationarity query (Step 2) is the one doing the actual triage, excluding the rotating robot. + +## Quick start + +```bash +git clone https://github.com/selfpatch/selfpatch_demos.git +cd selfpatch_demos/demos/mosaico_integration + +# Bring up postgres + mosaicod + sensor-demo + bridge +docker compose up -d + +# Wait until everything is healthy (sensor-demo healthcheck takes ~30s on first run) +docker compose ps + +# Inject the LiDAR HIGH_NOISE fault. The bridge picks up the SSE event, +# waits POST_FAULT_WAIT_SEC (default 12s) for the post-fault ring segment +# to finalize, then downloads the bag and ingests it. End to end is +# roughly 15-20s single-robot / 20-30s fleet. +./scripts/trigger-fault.sh + +# Watch the bridge do its thing +docker compose logs -f bridge + +# Open the notebook (locally or via VS Code) +jupyter notebook notebooks/mosaico_demo.ipynb +``` + +The notebook connects to `localhost:16726` (mosaicod Arrow Flight) and runs four queries against your freshly-ingested fault snapshot, ending with a three-panel time-series plot showing the LiDAR noise spike alongside a stationary IMU - exactly the kind of cross-topic forensic correlation Mosaico is designed for. + +### Fleet variant + +```bash +# Three robots + three bridges + shared mosaicod. Ports 18081/2/3 for the +# robots, 16726 for mosaicod (same as single-robot). +docker compose -f docker-compose.fleet.yml up -d + +# Wait ~25s after all containers are healthy so the ring buffer has a +# pre-fault baseline, then inject three LIDAR_SIM faults with different +# signatures and motion states: +# robot-01 warehouse-A : LiDAR noise_stddev = 0.5 , stationary (range std spike) +# robot-02 warehouse-B : LiDAR noise_stddev = 0.5 , rotating ω=0.3 (Step 2 excludes) +# robot-03 outdoor-yard: LiDAR drift_rate = 0.5 , stationary (range mean shift) +./scripts/trigger-fleet-faults.sh + +# Each bridge independently ingests its robot's snapshot. After ~45s you +# will have three Sequences in mosaicod with distinct robot_id metadata. +docker compose -f docker-compose.fleet.yml logs -f bridge-01 bridge-02 bridge-03 + +docker compose -f docker-compose.fleet.yml down -v +``` + +## Architecture + +``` +docker compose stack (network: mosaico_net) + + ┌─────────────┐ ┌─────────────────────────┐ + │ postgres │◄────────┤ mosaicod │ + │ :5432 │ │ ghcr.io/mosaico-labs/ │ + │ (internal) │ │ mosaicod:v0.3.0 │ + └─────────────┘ │ --local-store /data │ + │ port 6726 ──> host:16726 + └─────────────────────────┘ + ▲ + │ Arrow Flight + │ (RosbagInjector via mosaicolabs SDK) + │ + ┌─────────────────────────┐ ┌────────┴────────┐ + │ sensor-demo │ │ bridge │ + │ built from sibling │◄─►│ python:3.11 │ + │ ../sensor_diagnostics/ │ │ + mosaicolabs │ + │ │ │ (pin: Dockerfile) + │ - ros2_medkit gateway │ │ + httpx │ + │ - lidar/imu/gps/camera │ │ │ + │ sim nodes │ │ Subscribes: │ + │ - diagnostic_bridge │ │ GET /api/v1/ │ + │ - fault_manager │ │ faults/ │ + │ - rosbag ring buffer │ │ stream │ + │ (15s pre + 10s post) │ │ (SSE) │ + │ │ │ │ + │ port 8080 ──> host:18080│ │ Downloads: │ + └─────────────────────────┘ │ GET /api/v1/ │ + │ apps/ │ + │ diagnostic- │ + │ bridge/ │ + │ bulk-data/ │ + │ rosbags/... │ + └─────────────────┘ +``` + +**License-safe**: mosaicod runs as the unmodified upstream Docker image. The bridge is a separate Python process that talks Apache Arrow Flight (a public Apache standard) to mosaicod via Mosaico's own Python SDK. We never link or modify mosaicod or its Rust crates. + +## What lands in Mosaico (verified end to end on this stack) + +| ROS topic | ROS message type | Mosaico ontology | Status | +|------------------------|----------------------------------------|---------------------------------|-------------------------------------| +| `/sensors/scan` | `sensor_msgs/msg/LaserScan` | `LaserScan` (`futures.laser`) | ✅ via [Mosaico PR #368][pr368] | +| `/sensors/imu` | `sensor_msgs/msg/Imu` | `IMU` | ✅ shipped adapter | +| `/sensors/fix` | `sensor_msgs/msg/NavSatFix` | `GPS` | ✅ shipped adapter | +| `/sensors/image_raw` | `sensor_msgs/msg/Image` | `Image` | ✅ adapter ships in SDK, not captured in this demo | +| `/diagnostics` | `diagnostic_msgs/msg/DiagnosticArray` | (none) | ⚠️ silently dropped, no adapter yet | + +The `/diagnostics` drop is the only adapter gap. We use it on the medkit side to flag the fault (via diagnostic_bridge → fault_manager), but it does not reach Mosaico storage. A natural next step is to ship a `DiagnosticArray` adapter or define a dedicated `MedkitFault` ontology and write its adapter. + +## Mosaico SDK pin: PR #368 merged on 2026-04-13 + +The `LaserScanAdapter` we need landed in [mosaico-labs/mosaico#368][pr368] on 2026-04-13 as commit `b3867be`. We validated end to end against that PR while it was in draft (the `issue/367` branch at commit `8e090cd`); post-merge the code path is identical, so no demo-side change was required. + +The subsequent `mosaicolabs==0.3.2` PyPI wheel (2026-04-15) is missing the `futures` subpackage from the distributed wheel despite it being present in source on main, so `pip install mosaicolabs==0.3.2` cannot import `LaserScan`. Until a release with correct packaging ships, the bridge `Dockerfile` installs the SDK directly from the upstream repo pinned to `b3867be`. Once a fixed release is published this becomes a one-line change in the `Dockerfile` (swap the git install for `pip install mosaicolabs==`). + +On the **read** side the consumer still needs to `import mosaicolabs.models.futures.laser` to register the `laser_scan` ontology in the global registry. The notebook does this in its first cell. + +[pr368]: https://github.com/mosaico-labs/mosaico/pull/368 + +## Smart snapshots, not 24/7 recording + +This is the value prop: each entry in the Mosaico catalog is **only the 25 seconds around a confirmed fault** (15 s pre-fault baseline + 10 s post-fault), not hours of "nothing is happening" telemetry. A single fleet snapshot weighs roughly 2 MB of MCAP (LaserScan at 10 Hz, IMU at 100 Hz, GPS at 1 Hz, `/diagnostics`); the same four topics recorded 24/7 would produce approximately 6 GB per robot per day. At the demo's assumption of five confirmed faults per robot per day (so ~10 MB/day of smart snapshots), the catalog stays at ~600× less storage than naive always-on recording while keeping 100% of the frames that actually matter for forensics. `/sensors/image_raw` (30 Hz raw camera) is intentionally excluded from snapshot capture - that single topic would dominate the bag at ~27 MB/s; swap it for a `CompressedImage` topic if vision forensics is part of the workflow. + +## Files in this directory + +| Path | What | +|---|---| +| `docker-compose.yml` | Single-robot stack: postgres + mosaicod + sensor-demo + bridge | +| `docker-compose.fleet.yml` | Fleet stack: postgres + mosaicod + 3×(sensor-demo + bridge) | +| `bridge/bridge.py` | Subscribes SSE, downloads bag, calls `RosbagInjector`. Honors `POST_FAULT_WAIT_SEC` (default 12s) before download so the post-fault ring segment is finalized | +| `bridge/Dockerfile` | python:3.11-slim + Mosaico SDK pinned to PR #368 merge commit `b3867be` | +| `bridge/requirements.txt` | `httpx>=0.27,<0.30` | +| `medkit_overrides/medkit_params.yaml` | Sensor-demo medkit config: 15s pre + 10s post ring buffer, single 2 GB bag cap, `auto_cleanup: false` | +| `notebooks/mosaico_demo.ipynb` | Connect, list, query, plot - 7 sections covering single-robot and fleet variants | +| `scripts/trigger-fault.sh` | Single-robot: inject high noise on lidar-sim on `localhost:18080` | +| `scripts/trigger-fleet-faults.sh` | Fleet: inject three LIDAR_SIM faults with different signatures (noise/drift) and motion states (one robot rotating for the Step 2 filter) | + +## Verified end-to-end + +| What | Status | +|---|---| +| `docker compose build bridge` (LaserScan ontology sanity import passes) | ✅ | +| `docker compose up -d` brings postgres + sensor-demo to `healthy`, mosaicod + bridge to `running` | ✅ | +| medkit gateway responds at `localhost:18080/api/v1/health` | ✅ | +| `./scripts/trigger-fault.sh` injects fault, gateway returns CONFIRMED | ✅ | +| Bridge SSE connects, picks up `fault_confirmed` event | ✅ | +| Bridge resolves entity `apps/diagnostic-bridge` and downloads ~2 MB MCAP (25 s of LaserScan + IMU + GPS + /diagnostics) | ✅ | +| `RosbagInjector` finalizes 3 TopicWriters (`/sensors/{scan,imu,fix}`; `/diagnostics` silently dropped - no adapter yet) | ✅ | +| `MosaicoClient.list_sequences()` shows the new sequence within ~25 s of fault confirmation | ✅ | +| Notebook reads back `LaserScan` data with `range_min`, `range_max`, `ranges`, `intensities`, `frame_id` populated | ✅ | +| `IMU.Q.acceleration.z.between(...)` filter returns sequences | ✅ | + +## Known surprises (we hit them so you don't have to) + +1. **Medkit gateway path prefix**: SSE lives at `GET /api/v1/faults/stream`, not `GET /faults/stream`. Same for `/api/v1/components/...`. The bridge bakes the prefix into every URL. +2. **`reporting_sources` ≠ SOVD entity ID**: medkit reports the ROS publisher node name (`/bridge/diagnostic_bridge`), not the SOVD entity that owns the bag. The bridge enumerates apps + components via the gateway and HEAD-probes for `bulk-data/rosbags/{fault_code}` until one returns 200. +3. **Faults from the legacy diagnostic path land under `apps/diagnostic-bridge`**, not `apps/lidar-sim`. The diagnostic_bridge node is what owns the snapshot bag in this demo. +4. **Mosaico read-side registry**: even with PR #368 installed, you must `import mosaicolabs.models.futures.laser` before reading `LaserScan` data. Otherwise the topic reader raises `No ontology registered with tag 'laser_scan'`. The bridge does not need this (write side resolves adapters by ROS msg type) but the notebook does. +5. **Not all 5 listed topics actually land in Mosaico**: `/diagnostics` drops silently because no adapter is registered - the medkit ring buffer captures it, Mosaico just does not know what to do with it. `/sensors/image_raw` is a different kind of drop: the adapter ships in the SDK but the demo's `medkit_params.yaml` excludes that topic from snapshot capture on purpose (30 Hz uncompressed camera would dominate the bag). See the table and "Smart snapshots" section. +6. **Initial post-fault wait**: medkit holds the rosbag2 writer open for `duration_after_sec` (10s in this config) after `fault_confirmed`. The bridge waits `POST_FAULT_WAIT_SEC` seconds (default 12) before downloading so the trailing ring segment is finalized. +7. **Gateway port conflict on dev boxes**: the single-robot stack publishes on `18080` and `16726`; the fleet stack uses `18081/18082/18083` for the three gateways with `16726` shared by mosaicod. Adjust if you prefer defaults. +8. **`rosbag2` file splitting**: if `max_bag_size_mb` is hit mid-recording, `rosbag2` splits into `_0.mcap`, `_1.mcap`, ... and the medkit gateway serves only the first split. The 2 GB cap in `medkit_params.yaml` is there to prevent splitting for any realistic 25 s snapshot. + +## Troubleshooting + +- `docker compose build bridge` fails on the import sanity check → the pinned Mosaico commit `b3867be` is no longer fetchable, or the upstream source layout has changed. Update the `MOSAICO_PIN` build arg in `bridge/Dockerfile` to a current `main` commit that still contains `mosaicolabs/models/futures/laser.py`. +- `./scripts/trigger-fault.sh` returns curl error 22 → the gateway is up but needs `{"execution_type": "now"}` in the POST body. The script already does that; verify your gateway is actually `localhost:18080`. +- Bridge logs `Could not resolve entity for fault_code=X` → enumerate `/api/v1/apps` and `/api/v1/components` manually with `curl` and check whether any of them list the fault under `bulk-data/rosbags`. If none do, the gateway has not registered the bag yet (post-fault timer hasn't fired). Wait a few seconds and re-trigger. +- Notebook raises `No ontology registered with tag 'laser_scan'` → the `import mosaicolabs.models.futures.laser` cell did not run. Re-run it. +- `docker compose pull mosaicod` is slow on first run → the upstream image is ~110 MB, distroless. Expect 30-90s on a slow link. + +## Cleanup + +```bash +docker compose down -v # removes containers + named volumes +``` diff --git a/demos/mosaico_integration/bridge/Dockerfile b/demos/mosaico_integration/bridge/Dockerfile new file mode 100644 index 0000000..51af2f1 --- /dev/null +++ b/demos/mosaico_integration/bridge/Dockerfile @@ -0,0 +1,47 @@ +# Bridge container for the Mosaico ingestion demo. +# +# Subscribes to medkit /faults/stream SSE, downloads each fault snapshot +# bag from the gateway REST API, and ingests it into mosaicod via the +# mosaicolabs Python SDK over Apache Arrow Flight. +# +# License-safe pattern: bridge is a separate Python process that talks +# the public Apache Arrow Flight protocol to an unmodified mosaicod +# Docker image. We do not link or modify mosaicod or its Rust crates. +# +# SDK pinning: PR #368 (ROS adapters for futures ontology, including +# LaserScan) merged on 2026-04-13 as commit b3867be. The subsequent +# mosaicolabs==0.3.2 PyPI wheel (2026-04-15) ships with the +# `futures` subpackage missing from the wheel despite being in source +# on main, so `pip install mosaicolabs==0.3.2` cannot import LaserScan. +# We therefore install from the upstream repo at the PR #368 merge +# commit until a PyPI release with correct packaging ships. + +FROM python:3.11-slim + +ENV PYTHONUNBUFFERED=1 +ENV PIP_NO_CACHE_DIR=1 +ENV PIP_DISABLE_PIP_VERSION_CHECK=1 + +RUN apt-get update \ + && apt-get install -y --no-install-recommends git ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +ARG MOSAICO_REPO=https://github.com/mosaico-labs/mosaico.git +ARG MOSAICO_PIN=b3867be +RUN git clone "${MOSAICO_REPO}" /opt/mosaico \ + && cd /opt/mosaico \ + && git checkout "${MOSAICO_PIN}" \ + && git rev-parse HEAD > /opt/mosaico/.pinned_sha + +COPY requirements.txt /tmp/requirements.txt +RUN pip install -r /tmp/requirements.txt \ + && pip install /opt/mosaico/mosaico-sdk-py + +# Sanity check at build time: import the SDK and the LaserScan ontology +# so we fail fast if a future pin regresses on what we need. +RUN python -c "from mosaicolabs import MosaicoClient; from mosaicolabs.ros_bridge import RosbagInjector, ROSInjectionConfig; from mosaicolabs.models.futures.laser import LaserScan; print('mosaicolabs SDK + LaserScan ontology import OK')" + +WORKDIR /app +COPY bridge.py /app/bridge.py + +CMD ["python", "-u", "/app/bridge.py"] diff --git a/demos/mosaico_integration/bridge/bridge.py b/demos/mosaico_integration/bridge/bridge.py new file mode 100644 index 0000000..759e04b --- /dev/null +++ b/demos/mosaico_integration/bridge/bridge.py @@ -0,0 +1,503 @@ +#!/usr/bin/env python3 +"""Bridge: subscribe to the medkit /faults/stream SSE, ingest each fault +snapshot bag into mosaicod via the mosaicolabs Python SDK. + +Architecture: + 1. medkit gateway --[SSE: fault event]--> bridge + 2. bridge --[REST: GET snapshot]--> medkit gateway + 3. medkit gateway --[MCAP bag response]--> bridge + 4. bridge --[Arrow Flight]--> mosaicod + +License-safe: this is a separate process talking Arrow Flight to an +unmodified mosaicod docker image, per Mosaico's recommended pattern. +""" + +from __future__ import annotations + +import json +import logging +import os +import sys +import time +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterator, Optional + +import httpx + +LOG = logging.getLogger("mosaico_bridge") + +# --- Config (env-driven) --------------------------------------------------- + +MEDKIT_URL = os.environ.get("MEDKIT_URL", "http://localhost:8080") +MOSAICO_HOST = os.environ.get("MOSAICO_HOST", "localhost") +MOSAICO_PORT = int(os.environ.get("MOSAICO_PORT", "6726")) +LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() +ROBOT_ID = os.environ.get("ROBOT_ID", "sensor_demo_001") +SOURCE_DEMO = os.environ.get("SOURCE_DEMO", "sensor_diagnostics") +DOWNLOAD_DIR = Path(os.environ.get("DOWNLOAD_DIR", "/tmp/mosaico_bridge")) + +# Optional whitelist of fault codes to ingest into Mosaico. Comma-separated. +# Empty string (default) means "accept all fault codes". Set to e.g. +# "LIDAR_SIM" to ingest only LIDAR snapshots and skip everything else +# (IMU drifting, NaN injection, etc.) that medkit still captures on its +# own side. +_allowlist_raw = os.environ.get("FAULT_CODE_ALLOWLIST", "").strip() +FAULT_CODE_ALLOWLIST: set[str] = ( + {code.strip() for code in _allowlist_raw.split(",") if code.strip()} + if _allowlist_raw + else set() +) + +# Medkit fault severity mapping (matches ros2_medkit_msgs/Fault SEVERITY_*) +SEVERITY_NAMES = { + 0: "INFO", + 1: "WARNING", + 2: "ERROR", + 3: "CRITICAL", +} + + +# --- Data model ------------------------------------------------------------ + + +@dataclass +class FaultEvent: + """Subset of medkit FaultEvent we care about.""" + + event_id: int + event_type: str + fault_code: str + severity: int + severity_name: str + description: str + status: str + reporting_sources: list[str] + timestamp_sec: float + raw: dict + + @classmethod + def from_sse_data(cls, event_id: int, data_json: dict) -> "FaultEvent": + fault = data_json.get("fault", {}) or {} + sev = int(fault.get("severity", 0)) + return cls( + event_id=event_id, + event_type=data_json.get("event_type", ""), + fault_code=fault.get("fault_code", ""), + severity=sev, + severity_name=SEVERITY_NAMES.get(sev, f"UNKNOWN_{sev}"), + description=fault.get("description", ""), + status=fault.get("status", ""), + reporting_sources=list(fault.get("reporting_sources", []) or []), + timestamp_sec=float(data_json.get("timestamp", 0.0) or 0.0), + raw=data_json, + ) + + +# --- SSE stream parser ----------------------------------------------------- + + +def stream_fault_events( + base_url: str, + last_event_id: Optional[int] = None, +) -> Iterator[FaultEvent]: + """Yield fault events from medkit /api/v1/faults/stream. + + Reconnects forever; on each reconnect resumes from the most recent + event id we yielded via the SSE `Last-Event-ID` header (medkit + replays buffered events per the SSE spec). + """ + url = f"{base_url}/api/v1/faults/stream" + # Mutable one-slot box so the `set_last_id` closure passed into + # `_parse_sse` can update the value the outer loop reads on reconnect. + resume = [last_event_id] + + def _update_last(eid: int) -> None: + resume[0] = eid + + while True: + headers = {"Accept": "text/event-stream"} + if resume[0] is not None: + headers["Last-Event-ID"] = str(resume[0]) + + try: + LOG.info( + "Connecting SSE stream %s (last_event_id=%s)", url, resume[0] + ) + # No request timeout for SSE; medkit emits keepalives. + with httpx.stream("GET", url, headers=headers, timeout=None) as resp: + resp.raise_for_status() + LOG.info("SSE stream connected (status=%s)", resp.status_code) + yield from _parse_sse(resp.iter_lines(), set_last_id=_update_last) + except (httpx.HTTPError, httpx.StreamError) as exc: + LOG.warning( + "SSE stream error: %s; reconnecting in 2s (resume from id=%s)", + exc, + resume[0], + ) + time.sleep(2.0) + except KeyboardInterrupt: + LOG.info("Interrupted, exiting") + return + + +def _parse_sse(lines: Iterator[str], set_last_id) -> Iterator[FaultEvent]: + """Parse SSE protocol from a line iterator. Yields FaultEvent objects. + + Expected medkit frame format: + id: + event: + data: {} + + """ + cur_id: Optional[int] = None + cur_data: list[str] = [] + + for raw in lines: + line = raw.rstrip("\r") + if line == "": + # Dispatch the buffered event + if cur_data: + payload = "\n".join(cur_data) + try: + data_json = json.loads(payload) + except json.JSONDecodeError as exc: + # Not a transient issue - the gateway broke the SSE + # protocol contract. Log loudly and keep going. + LOG.error( + "SSE data not JSON (protocol break, dropping 1 frame): %s; raw=%r", + exc, + payload[:500], + ) + else: + eid = cur_id if cur_id is not None else 0 + yield FaultEvent.from_sse_data(eid, data_json) + set_last_id(eid) + cur_id = None + cur_data = [] + continue + + if line.startswith(":"): + # SSE comment / keepalive + continue + if line.startswith("id:"): + try: + cur_id = int(line[3:].strip()) + except ValueError: + cur_id = None + # SSE `event:` field is intentionally not parsed here; the + # semantic event type is carried inside the JSON body and ends up + # in `FaultEvent.event_type`. + elif line.startswith("data:"): + cur_data.append(line[5:].lstrip()) + + +# --- Entity resolution ----------------------------------------------------- + + +def resolve_entity_for_download( + base_url: str, fault: FaultEvent +) -> Optional[tuple[str, str]]: + """Walk medkit entity types to find one that owns this fault's bulk-data. + + Returns (entity_type, entity_id) or None. + + Notes: + - reporting_sources contains ROS node names like "/bridge/diagnostic_bridge", + NOT SOVD entity FQNs. We can't derive the entity ID from them directly. + - The fault snapshot bag is registered against the diagnostic-bridge APP + (kebab-case), discoverable via /apps/{id}/bulk-data/rosbags/{fault_code}. + - We discover the right entity by listing all apps + components and + asking each for the bulk-data/rosbags/{fault_code} until one returns 200. + The list is small (~10 entities) so this is cheap. + - This HEAD-probe discovery is a pragmatic workaround. The gateway-wide + SSE fault stream does not surface SOVD entity context today; tracked + in selfpatch/ros2_medkit#380 (either an x-medkit extension in the SSE + payload or per-entity fault subscriptions). Once that lands, this + function collapses to a single direct fetch. + """ + # Build candidate list: enumerate apps and components. + candidates: list[tuple[str, str]] = [] + for etype in ("apps", "components"): + try: + r = httpx.get(f"{base_url}/api/v1/{etype}", timeout=5.0) + r.raise_for_status() + items = r.json().get("items", []) or [] + for item in items: + eid = item.get("id") + if eid: + candidates.append((etype, eid)) + except httpx.HTTPError as exc: + LOG.warning( + "Failed to list /api/v1/%s while resolving fault %s: %s", + etype, + fault.fault_code, + exc, + ) + + if not candidates: + LOG.warning( + "No apps/components discovered for fault %s; gateway may be unreachable or still coming up", + fault.fault_code, + ) + return None + + tried = 0 + for etype, eid in candidates: + tried += 1 + url = f"{base_url}/api/v1/{etype}/{eid}/bulk-data/rosbags/{fault.fault_code}" + try: + r = httpx.head(url, timeout=5.0, follow_redirects=True) + except httpx.HTTPError as exc: + LOG.debug("HEAD probe failed for %s: %s", url, exc) + continue + if r.status_code == 200: + LOG.info( + "Resolved fault %s to %s/%s", fault.fault_code, etype, eid + ) + return (etype, eid) + LOG.debug("HEAD %s -> %d", url, r.status_code) + LOG.warning( + "No entity owns bulk-data for fault %s (probed %d candidates); " + "bag may not be registered yet (post-fault timer still running)", + fault.fault_code, + tried, + ) + return None + + +# --- Bag download ---------------------------------------------------------- + + +def download_bag(base_url: str, fault: FaultEvent) -> Optional[Path]: + """Download the .mcap snapshot bag for a confirmed fault. + + The medkit gateway resolves the rosbag2 directory and streams the inner + .mcap back to the caller. + """ + resolved = resolve_entity_for_download(base_url, fault) + if resolved is None: + LOG.error( + "Could not resolve entity for fault_code=%s sources=%s", + fault.fault_code, + fault.reporting_sources, + ) + return None + etype, eid = resolved + url = f"{base_url}/api/v1/{etype}/{eid}/bulk-data/rosbags/{fault.fault_code}" + DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) + out = DOWNLOAD_DIR / f"{fault.fault_code}_{int(fault.timestamp_sec)}.mcap" + LOG.info("Downloading bag: %s -> %s", url, out) + try: + with httpx.stream("GET", url, timeout=60.0) as resp: + resp.raise_for_status() + with out.open("wb") as f: + for chunk in resp.iter_bytes(chunk_size=64 * 1024): + if chunk: + f.write(chunk) + except httpx.HTTPError as exc: + LOG.error("Bag download failed: %s", exc) + return None + size = out.stat().st_size + LOG.info("Downloaded %.1f KB to %s", size / 1024.0, out) + if size == 0: + LOG.error("Downloaded bag is empty: %s", out) + return None + # Sanity-check the MCAP magic header. If the bag download raced the + # rosbag2 finalizer we'd otherwise hand a truncated/corrupt file to + # RosbagInjector and lose the root cause inside its exception path. + # MCAP magic: 0x89 "MCAP" 0x30 "\r\n" (8 bytes). + with out.open("rb") as fh: + magic = fh.read(8) + if magic != b"\x89MCAP0\r\n": + LOG.error( + "Downloaded bag has invalid MCAP magic header (got %r, size=%d): %s", + magic, + size, + out, + ) + return None + return out + + +# --- Mosaico ingest -------------------------------------------------------- + + +def ingest_to_mosaico(bag_path: Path, fault: FaultEvent) -> Optional[str]: + """Ingest bag into mosaicod via mosaicolabs SDK over Arrow Flight. + + Returns sequence_name on success, None on failure. + """ + # Lazy import to keep bridge importable for unit tests without SDK. + from mosaicolabs.ros_bridge import ROSInjectionConfig, RosbagInjector + from pyarrow.flight import FlightError + + sequence_name = ( + f"{SOURCE_DEMO}_{ROBOT_ID}_{fault.fault_code}" + f"_{int(fault.timestamp_sec)}_{fault.event_id}" + ) + metadata = { + "robot_id": ROBOT_ID, + "fault_code": fault.fault_code, + "fault_severity": fault.severity_name, + "fault_severity_int": fault.severity, + "fault_description": fault.description or "", + "fault_status": fault.status, + "reporting_sources": ",".join(fault.reporting_sources), + "captured_at_iso": datetime.fromtimestamp( + fault.timestamp_sec, tz=timezone.utc + ).isoformat(), + "event_id": fault.event_id, + "source_demo": SOURCE_DEMO, + "source_pipeline": "ros2_medkit_fault_snapshot_v0", + } + cfg = ROSInjectionConfig( + file_path=bag_path, + sequence_name=sequence_name, + metadata=metadata, + host=MOSAICO_HOST, + port=MOSAICO_PORT, + ) + LOG.info( + "Ingesting %s into mosaicod (sequence=%s, host=%s:%d)", + bag_path, + sequence_name, + MOSAICO_HOST, + MOSAICO_PORT, + ) + try: + RosbagInjector(cfg).run() + except (FlightError, OSError, ConnectionError, TimeoutError) as exc: + # Transport/flight failures are the only things we want to swallow + # into a retry-friendly None here. Programmer errors + # (AttributeError, KeyError, TypeError) from an SDK API rename + # intentionally propagate so the drift surfaces loudly instead of + # being logged as "failed ingest" forever. + LOG.error("RosbagInjector failed (transport/flight): %s", exc) + return None + LOG.info("Ingest complete: sequence=%s", sequence_name) + return sequence_name + + +def verify_sequence_in_mosaico(sequence_name: str) -> bool: + """Connect to mosaicod and assert our new sequence is listed.""" + from mosaicolabs import MosaicoClient + from pyarrow.flight import FlightError + + try: + with MosaicoClient.connect(host=MOSAICO_HOST, port=MOSAICO_PORT) as client: + seqs = list(client.list_sequences()) + except (FlightError, OSError, ConnectionError, TimeoutError) as exc: + # Same narrowing as ingest_to_mosaico: only transport failures + # degrade verification to "skip". API renames crash loudly. + LOG.error("Verification list_sequences failed (transport/flight): %s", exc) + return False + if sequence_name in seqs: + LOG.info("VERIFIED: sequence %s present in mosaicod", sequence_name) + return True + LOG.warning( + "Sequence %s NOT in mosaicod list (found %d sequences)", + sequence_name, + len(seqs), + ) + return False + + +# --- Main loop ------------------------------------------------------------- + + +def handle_fault_event(fault: FaultEvent) -> None: + LOG.info( + "Fault event id=%s type=%s code=%s severity=%s sources=%s", + fault.event_id, + fault.event_type, + fault.fault_code, + fault.severity_name, + fault.reporting_sources, + ) + if fault.event_type != "fault_confirmed": + LOG.debug("Ignoring non-confirmed event: %s", fault.event_type) + return + if not fault.fault_code: + LOG.warning("Confirmed fault has empty fault_code, skipping") + return + if FAULT_CODE_ALLOWLIST and fault.fault_code not in FAULT_CODE_ALLOWLIST: + LOG.info( + "Skipping fault_code=%s (not in FAULT_CODE_ALLOWLIST=%s)", + fault.fault_code, + sorted(FAULT_CODE_ALLOWLIST), + ) + return + + # Medkit's post-fault timer keeps the bag writer open for + # `duration_after_sec` (10s in this demo's medkit_params.yaml). Wait + # for finalization before trying to download. + post_fault_wait = float(os.environ.get("POST_FAULT_WAIT_SEC", "12")) + LOG.info("Waiting %.0fs for post-fault recording to finalize...", post_fault_wait) + time.sleep(post_fault_wait) + + bag = download_bag(MEDKIT_URL, fault) + if bag is None: + return + sequence_name = ingest_to_mosaico(bag, fault) + if sequence_name is None: + return + verify_sequence_in_mosaico(sequence_name) + + +def main() -> int: + logging.basicConfig( + level=getattr(logging, LOG_LEVEL, logging.INFO), + format="%(asctime)s %(levelname)s %(name)s %(message)s", + ) + LOG.info( + "Bridge starting: medkit=%s mosaicod=%s:%d robot_id=%s", + MEDKIT_URL, + MOSAICO_HOST, + MOSAICO_PORT, + ROBOT_ID, + ) + LOG.info("Download dir: %s", DOWNLOAD_DIR) + + # Wait until medkit is reachable so we don't crash on initial startup + # before sensor-demo healthchecks pass. + for attempt in range(60): + try: + r = httpx.get(f"{MEDKIT_URL}/api/v1/health", timeout=2.0) + if r.status_code == 200: + LOG.info("Medkit gateway healthy") + break + except (httpx.InvalidURL, httpx.UnsupportedProtocol) as exc: + # Config problem, not a transient network issue. Fail fast so + # the operator fixes MEDKIT_URL instead of waiting 120s. + LOG.error("MEDKIT_URL %r is unusable: %s", MEDKIT_URL, exc) + return 1 + except httpx.HTTPError: + pass + if attempt == 0: + LOG.info("Waiting for medkit gateway to come up at %s...", MEDKIT_URL) + elif attempt % 10 == 0: + LOG.info( + "Still waiting for medkit gateway at %s (attempt %d/60)...", + MEDKIT_URL, + attempt, + ) + time.sleep(2.0) + else: + LOG.error("Medkit gateway never became healthy after 120s") + return 1 + + try: + for fault in stream_fault_events(MEDKIT_URL): + try: + handle_fault_event(fault) + except Exception as exc: # noqa: BLE001 + LOG.exception("Error handling fault event: %s", exc) + except KeyboardInterrupt: + LOG.info("Bridge stopped") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/demos/mosaico_integration/bridge/requirements.txt b/demos/mosaico_integration/bridge/requirements.txt new file mode 100644 index 0000000..650dd11 --- /dev/null +++ b/demos/mosaico_integration/bridge/requirements.txt @@ -0,0 +1 @@ +httpx>=0.27,<0.30 diff --git a/demos/mosaico_integration/docker-compose.fleet.yml b/demos/mosaico_integration/docker-compose.fleet.yml new file mode 100644 index 0000000..ffff654 --- /dev/null +++ b/demos/mosaico_integration/docker-compose.fleet.yml @@ -0,0 +1,175 @@ +# Mosaico fleet demo: 3 robots, 3 bridges, 1 Mosaico archive. +# +# Shows cross-robot forensic queries that a folder of .mcap files cannot do. +# +# Robots: +# robot-01 warehouse aisle A port 18081 ROS_DOMAIN_ID=41 +# robot-02 warehouse aisle B port 18082 ROS_DOMAIN_ID=42 +# robot-03 outdoor yard port 18083 ROS_DOMAIN_ID=43 +# +# Each robot gets its own bridge that ingests fault snapshots into +# the shared mosaicod with robot_id metadata. +# +# Why one bridge per robot in this demo: +# - Keeps each `MEDKIT_URL`, `ROBOT_ID` and `FAULT_CODE_ALLOWLIST` a +# static env var instead of a per-event lookup table. +# - Simple failure isolation: one bridge crashing does not delay +# ingest for the other robots. +# - Makes the SSE subscription / reconnect loop single-threaded and +# easy to read. +# In a real fleet (tens to hundreds of robots) either a single async +# bridge subscribing to N gateways in parallel, or a bridge running as +# a sidecar on each gateway host, would scale better. For 3 robots the +# per-bridge layout is the clearest way to show the pipeline. + +name: mosaico-fleet + +services: + # ── Shared infrastructure ── + postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + POSTGRES_DB: mosaico + volumes: + - fleet_pgdata:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres -d mosaico"] + interval: 5s + timeout: 3s + retries: 20 + networks: [fleet_net] + + mosaicod: + image: ghcr.io/mosaico-labs/mosaicod:v0.3.0 + environment: + MOSAICOD_DB_URL: postgres://postgres:password@postgres:5432/mosaico + RUST_LOG: info + command: [run, --host, 0.0.0.0, --port, "6726", --local-store, /data/store] + volumes: + - fleet_store:/data/store + depends_on: + postgres: { condition: service_healthy } + ports: ["16726:6726"] + networks: [fleet_net] + + # ── Robot 01: Warehouse Aisle A ── + robot-01: + build: + context: ../sensor_diagnostics + dockerfile: Dockerfile + container_name: fleet_robot_01 + environment: + ROS_DOMAIN_ID: "41" + BEACON_MODE: none + ports: ["18081:8080"] + volumes: + - ./medkit_overrides/medkit_params.yaml:/root/demo_ws/install/sensor_diagnostics_demo/share/sensor_diagnostics_demo/config/medkit_params.yaml:ro + healthcheck: + test: ["CMD-SHELL", "curl -f http://localhost:8080/api/v1/health || exit 1"] + interval: 5s + timeout: 3s + retries: 30 + start_period: 30s + networks: [fleet_net] + + bridge-01: + build: { context: ./bridge } + container_name: fleet_bridge_01 + environment: + MEDKIT_URL: http://robot-01:8080 + MOSAICO_HOST: mosaicod + MOSAICO_PORT: "6726" + ROBOT_ID: robot-01-warehouse-A + SOURCE_DEMO: fleet_demo + LOG_LEVEL: INFO + POST_FAULT_WAIT_SEC: "15" + FAULT_CODE_ALLOWLIST: "LIDAR_SIM" + depends_on: + robot-01: { condition: service_healthy } + mosaicod: { condition: service_started } + networks: [fleet_net] + + # ── Robot 02: Warehouse Aisle B ── + robot-02: + build: + context: ../sensor_diagnostics + dockerfile: Dockerfile + container_name: fleet_robot_02 + environment: + ROS_DOMAIN_ID: "42" + BEACON_MODE: none + ports: ["18082:8080"] + volumes: + - ./medkit_overrides/medkit_params.yaml:/root/demo_ws/install/sensor_diagnostics_demo/share/sensor_diagnostics_demo/config/medkit_params.yaml:ro + healthcheck: + test: ["CMD-SHELL", "curl -f http://localhost:8080/api/v1/health || exit 1"] + interval: 5s + timeout: 3s + retries: 30 + start_period: 30s + networks: [fleet_net] + + bridge-02: + build: { context: ./bridge } + container_name: fleet_bridge_02 + environment: + MEDKIT_URL: http://robot-02:8080 + MOSAICO_HOST: mosaicod + MOSAICO_PORT: "6726" + ROBOT_ID: robot-02-warehouse-B + SOURCE_DEMO: fleet_demo + LOG_LEVEL: INFO + POST_FAULT_WAIT_SEC: "15" + FAULT_CODE_ALLOWLIST: "LIDAR_SIM" + depends_on: + robot-02: { condition: service_healthy } + mosaicod: { condition: service_started } + networks: [fleet_net] + + # ── Robot 03: Outdoor Yard ── + robot-03: + build: + context: ../sensor_diagnostics + dockerfile: Dockerfile + container_name: fleet_robot_03 + environment: + ROS_DOMAIN_ID: "43" + BEACON_MODE: none + ports: ["18083:8080"] + volumes: + - ./medkit_overrides/medkit_params.yaml:/root/demo_ws/install/sensor_diagnostics_demo/share/sensor_diagnostics_demo/config/medkit_params.yaml:ro + healthcheck: + test: ["CMD-SHELL", "curl -f http://localhost:8080/api/v1/health || exit 1"] + interval: 5s + timeout: 3s + retries: 30 + start_period: 30s + networks: [fleet_net] + + bridge-03: + build: { context: ./bridge } + container_name: fleet_bridge_03 + environment: + MEDKIT_URL: http://robot-03:8080 + MOSAICO_HOST: mosaicod + MOSAICO_PORT: "6726" + ROBOT_ID: robot-03-outdoor-yard + SOURCE_DEMO: fleet_demo + LOG_LEVEL: INFO + POST_FAULT_WAIT_SEC: "15" + FAULT_CODE_ALLOWLIST: "LIDAR_SIM" + depends_on: + robot-03: { condition: service_healthy } + mosaicod: { condition: service_started } + networks: [fleet_net] + +volumes: + fleet_pgdata: + fleet_store: + +networks: + fleet_net: + name: mosaico_fleet_net + driver: bridge diff --git a/demos/mosaico_integration/docker-compose.yml b/demos/mosaico_integration/docker-compose.yml new file mode 100644 index 0000000..fde9af5 --- /dev/null +++ b/demos/mosaico_integration/docker-compose.yml @@ -0,0 +1,112 @@ +# Mosaico integration demo: medkit fault snapshot -> Mosaico ingest. +# +# Stack: +# postgres - Mosaico metadata store (internal only) +# mosaicod - Apache Arrow Flight server (port 16726 on host) +# sensor-demo - ros2_medkit gateway + simulated sensors + diagnostic_bridge +# + fault_manager (port 18080 on host). +# Built from sibling sensor_diagnostics/ Dockerfile. +# medkit_params.yaml is override-mounted to flip auto_cleanup. +# bridge - subscribes /faults/stream SSE, downloads bag via REST, +# ingests to mosaicod via mosaicolabs SDK over Arrow Flight. +# +# Host ports avoid 8080/6726/5432/3000 so the stack does not collide with +# anything else that might be running on the dev host. + +name: mosaico-integration + +services: + postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + POSTGRES_DB: mosaico + volumes: + - mosaico_pgdata:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres -d mosaico"] + interval: 5s + timeout: 3s + retries: 20 + networks: + - mosaico_net + + mosaicod: + image: ghcr.io/mosaico-labs/mosaicod:v0.3.0 + environment: + MOSAICOD_DB_URL: postgres://postgres:password@postgres:5432/mosaico + RUST_LOG: info + command: + - run + - --host + - 0.0.0.0 + - --port + - "6726" + - --local-store + - /data/store + volumes: + - mosaico_store:/data/store + depends_on: + postgres: + condition: service_healthy + ports: + - "16726:6726" + networks: + - mosaico_net + + sensor-demo: + build: + context: ../sensor_diagnostics + dockerfile: Dockerfile + container_name: mosaico_sensor_demo + environment: + ROS_DOMAIN_ID: "40" + BEACON_MODE: none + ports: + - "18080:8080" + volumes: + # Override medkit_params.yaml to flip auto_cleanup defensively. + # Install path follows the sensor_diagnostics Dockerfile's + # COLCON_WS=/root/demo_ws + ROS install layout. + - ./medkit_overrides/medkit_params.yaml:/root/demo_ws/install/sensor_diagnostics_demo/share/sensor_diagnostics_demo/config/medkit_params.yaml:ro + healthcheck: + test: + - CMD-SHELL + - curl -fsS http://localhost:8080/api/v1/health > /dev/null || exit 1 + interval: 5s + timeout: 3s + retries: 30 + start_period: 30s + networks: + - mosaico_net + + bridge: + build: + context: ./bridge + dockerfile: Dockerfile + container_name: mosaico_bridge + environment: + MEDKIT_URL: http://sensor-demo:8080 + MOSAICO_HOST: mosaicod + MOSAICO_PORT: "6726" + ROBOT_ID: sensor_demo_001 + SOURCE_DEMO: sensor_diagnostics + LOG_LEVEL: INFO + DOWNLOAD_DIR: /tmp/mosaico_bridge + depends_on: + sensor-demo: + condition: service_healthy + mosaicod: + condition: service_started + networks: + - mosaico_net + +volumes: + mosaico_pgdata: + mosaico_store: + +networks: + mosaico_net: + name: mosaico_net + driver: bridge diff --git a/demos/mosaico_integration/medkit_overrides/medkit_params.yaml b/demos/mosaico_integration/medkit_overrides/medkit_params.yaml new file mode 100644 index 0000000..31cf88a --- /dev/null +++ b/demos/mosaico_integration/medkit_overrides/medkit_params.yaml @@ -0,0 +1,97 @@ +# ros2_medkit gateway configuration for Sensor Diagnostics Demo +# Gateway runs under /diagnostics namespace +diagnostics: + ros2_medkit_gateway: + ros__parameters: + server: + host: "0.0.0.0" + port: 8080 + + refresh_interval_ms: 10000 # 10 seconds (default), reduces log spam + + cors: + allowed_origins: ["*"] + allowed_methods: ["GET", "PUT", "POST", "DELETE", "OPTIONS"] + allowed_headers: ["Content-Type", "Accept"] + allow_credentials: false + max_age_seconds: 86400 + + # Discovery configuration + discovery: + mode: "hybrid" # runtime_only, manifest_only, or hybrid + manifest_path: "" # Will be set via launch argument + manifest_strict_validation: true + runtime: + create_synthetic_components: false # Manifest defines components + create_synthetic_areas: false # Manifest defines areas + merge_pipeline: + gap_fill: + allow_heuristic_areas: false # Manifest defines all areas + allow_heuristic_components: false # Manifest defines all components + allow_heuristic_apps: false # Manifest defines all apps + + # Plugin configuration (set by launch file when .so paths are resolved) + plugins: [""] + + # Scripts configuration (filesystem auto-discovery) + # TODO(#49): Migrate to manifest-defined scripts once ros2_medkit#303 lands + scripts: + scripts_dir: "/var/lib/ros2_medkit/scripts" + allow_uploads: false + max_concurrent_executions: 3 + default_timeout_sec: 60 + +# Fault Manager configuration (runs in root namespace) +fault_manager: + ros__parameters: + # Storage configuration + storage_type: "sqlite" + database_path: "/var/lib/ros2_medkit/faults.db" + + # Debounce configuration + confirmation_threshold: 0 # Immediate confirmation + healing_enabled: false + healing_threshold: 3 + auto_confirm_after_sec: 0.0 + + # Snapshot configuration (freeze frames) + snapshots: + enabled: true + background_capture: true # Non-blocking capture + timeout_sec: 2.0 + max_message_size: 131072 # 128KB max per message + + # Topics to capture for all faults. + # /sensors/image_raw (30 Hz, 640x480 rgb8) is intentionally excluded + # from snapshot capture - the raw camera feed would dominate the bag + # (~27 MB/s), and LIDAR + IMU + GPS + diagnostics are the relevant + # topics for the fault signatures this demo compares. For vision + # forensics a CompressedImage topic can be added here instead. + default_topics: + - /sensors/scan + - /sensors/imu + - /sensors/fix + - /diagnostics + + # Rosbag recording configuration + rosbag: + enabled: true + duration_sec: 15.0 # Record 15 seconds before fault confirmation + duration_after_sec: 10.0 # Record 10 seconds after confirmation + lazy_start: false # Always recording (ring buffer) + format: "mcap" # MCAP format (recommended for cross-platform) + storage_path: "/var/lib/ros2_medkit/rosbags" + max_bag_size_mb: 2000 # Single-file cap; prevents rosbag2 from splitting (gateway only serves the first file) + max_total_storage_mb: 10000 # 10 GB total; higher than single-bag cap so auto_cleanup=false can retain multiple snapshots across runs without the per-file cap becoming unreachable + auto_cleanup: false # Retain rosbag on fault clear so the bridge can ingest it + + # Topics to record (use 'config' or 'all') + topics: "config" # Use include/exclude lists below + include_topics: + - /sensors/scan + - /sensors/imu + - /sensors/fix + - /diagnostics + exclude_topics: + - /rosout + - /parameter_events diff --git a/demos/mosaico_integration/notebooks/mosaico_demo.ipynb b/demos/mosaico_integration/notebooks/mosaico_demo.ipynb new file mode 100644 index 0000000..aa5fdef --- /dev/null +++ b/demos/mosaico_integration/notebooks/mosaico_demo.ipynb @@ -0,0 +1,410 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Mosaico ingestion demo: medkit fault snapshots as queryable forensic data\n", + "\n", + "**What this notebook shows.** Robot stack runs in Docker. A fault is injected on the LiDAR (high noise). The medkit gateway detects it via the standard `/diagnostics` ROS topic, confirms the fault, and flushes its 15-second pre-fault + 10-second post-fault ring buffer to an `.mcap` file. A small Python bridge (separate process, talking Apache Arrow Flight) picks up the SSE event, downloads the bag from the gateway REST API, and ingests it into mosaicod.\n", + "\n", + "By the time you run this notebook, the bag is already a Mosaico **Sequence** with **typed**, **queryable** topics: `LaserScan`, `Imu`, and `GPS` ontologies (`/diagnostics` still rides along in the MCAP but has no Mosaico adapter yet, so it lands silently). The raw camera topic `/sensors/image_raw` is intentionally left out of the snapshot - at 30 Hz uncompressed it would dominate the bag - so the catalog entry is a few MB rather than a few hundred. Mosaico's Image adapter does ship, so dropping in a `CompressedImage` topic later is a config change, not a code change.\n", + "\n", + "We never recorded the robot 24/7 - we only kept the 25 seconds around the fault, but those 25 seconds are now indexed, schema-validated, cross-referenceable, and ready for `.Q` queries.\n", + "\n", + "Pipeline:\n", + "\n", + "```\n", + "lidar_sim --(LaserScan)--> /sensors/scan --[ring buffer 15s pre + 10s post]\n", + " |\n", + " noise injection v\n", + " | confirm fault flush to .mcap\n", + " v ^ |\n", + " diagnostic (ERROR) --> diagnostic_bridge --> fault_manager --> SSE /faults/stream\n", + " |\n", + " v\n", + " bridge downloads .mcap via REST\n", + " |\n", + " v\n", + " RosbagInjector --> mosaicod (Arrow Flight)\n", + "```\n", + "\n", + "**License-safe pattern.** mosaicod is the unmodified upstream Docker image. The bridge is a separate Python process talking the public Apache Arrow Flight protocol via Mosaico's own SDK. We never link or modify mosaicod or its Rust crates." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "Run this notebook from the host with the Mosaico integration stack running. Ports the stack publishes:\n", + "\n", + "- `localhost:18080` - medkit gateway (REST + SSE)\n", + "- `localhost:16726` - mosaicod (Apache Arrow Flight)\n", + "\n", + "The `LaserScanAdapter` we need merged into mosaico-labs/mosaico via PR #368 on 2026-04-13 (commit `b3867be`). The subsequent `mosaicolabs==0.3.2` wheel on PyPI is missing the `futures` subpackage, so until a fixed release ships install from the merged-PR commit directly:\n", + "\n", + "```bash\n", + "pip install matplotlib pandas\n", + "git clone https://github.com/mosaico-labs/mosaico.git /tmp/mosaico\n", + "cd /tmp/mosaico && git checkout b3867be\n", + "pip install /tmp/mosaico/mosaico-sdk-py\n", + "```\n", + "\n", + "If you skipped installing locally and only have the docker stack, you can still execute the cells inside the bridge container with `docker compose exec bridge python -c '...'`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# IMPORTANT: import the futures.laser module so the LaserScan ontology\n", + "# is registered in Mosaico's global ontology registry. Without this the\n", + "# topic reader raises 'No ontology registered with tag laser_scan'.\n", + "import mosaicolabs.models.futures.laser # noqa: F401\n", + "\n", + "from mosaicolabs import IMU, MosaicoClient, QueryOntologyCatalog, QueryTopic\n", + "from mosaicolabs.models.futures import LaserScan\n", + "\n", + "import math\n", + "import numpy as np\n", + "import pandas as pd\n", + "import matplotlib.pyplot as plt\n", + "\n", + "MOSAICO_HOST = \"localhost\"\n", + "MOSAICO_PORT = 16726\n", + "\n", + "client = MosaicoClient.connect(host=MOSAICO_HOST, port=MOSAICO_PORT)\n", + "print(\"Connected to mosaicod\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. List ingested sequences\n", + "\n", + "Each fault snapshot landed as a `Sequence` named `{source_demo}_{robot_id}_{fault_code}_{timestamp}_{event_id}` (for example `fleet_demo_robot-01-warehouse-A_LIDAR_SIM_1776369146_1`). The `robot_id` component keeps fleet runs from colliding when two robots happen to hit `event_id=1` at the same wall-clock second. Sequence metadata includes the fault code, severity, timestamp, and reporting source - all queryable later from any `.Q` filter." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sequence_names = sorted(client.list_sequences())\n", + "print(f\"Found {len(sequence_names)} sequences\\n\")\n", + "\n", + "rows = []\n", + "for name in sequence_names:\n", + " sh = client.sequence_handler(name)\n", + " if sh is None:\n", + " continue\n", + " rows.append(\n", + " {\n", + " \"sequence\": name,\n", + " \"size_MB\": round(sh.total_size_bytes / (1024 * 1024), 2),\n", + " \"topics\": \", \".join(sorted(sh.topics)),\n", + " }\n", + " )\n", + "\n", + "pd.DataFrame(rows)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Query 1: which sequences carry LaserScan data?\n", + "\n", + "We use Mosaico's type-safe ontology query to find every topic that carries `LaserScan` payloads, anywhere in the catalog. This is the kind of cross-sequence search that a folder of `.mcap` files can never give you - you would otherwise grep through filenames and pray." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "results = client.query(\n", + " QueryTopic().with_ontology_tag(LaserScan.ontology_tag()),\n", + ")\n", + "for item in results:\n", + " print(item.sequence.name)\n", + " for topic in item.topics:\n", + " print(f\" {topic.name}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Query 2: pull /sensors/scan from a chosen sequence\n", + "\n", + "Pick the most recent LIDAR_SIM fault sequence and walk through its scans. Each item arrives as a typed `LaserScan` with all the standard fields: `angle_min`, `angle_max`, `range_min`, `range_max`, `ranges`, `intensities`, plus `frame_id` from the original ROS message header." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "lidar_sequences = [n for n in sequence_names if \"LIDAR_SIM\" in n]\n", + "if not lidar_sequences:\n", + " raise SystemExit(\n", + " \"No LIDAR_SIM sequences yet. Run ./scripts/trigger-fault.sh first.\"\n", + " )\n", + "target_sequence = lidar_sequences[-1]\n", + "print(f\"Target sequence: {target_sequence}\")\n", + "\n", + "sh = client.sequence_handler(target_sequence)\n", + "scan_topic = sh.get_topic_handler(\"/sensors/scan\")\n", + "\n", + "scan_records = []\n", + "for item in scan_topic.get_data_streamer():\n", + " ls = item.data\n", + " arr = np.array(ls.ranges, dtype=float)\n", + " finite = arr[np.isfinite(arr)]\n", + " scan_records.append(\n", + " {\n", + " \"timestamp_ns\": item.timestamp_ns,\n", + " \"frame_id\": item.frame_id,\n", + " \"range_min\": float(finite.min()) if finite.size else math.nan,\n", + " \"range_max\": float(finite.max()) if finite.size else math.nan,\n", + " \"range_mean\": float(finite.mean()) if finite.size else math.nan,\n", + " \"range_std\": float(finite.std()) if finite.size else math.nan,\n", + " \"n_beams\": len(ls.ranges),\n", + " }\n", + " )\n", + "\n", + "scan_df = pd.DataFrame(scan_records)\n", + "scan_df[\"t_s\"] = (scan_df[\"timestamp_ns\"] - scan_df[\"timestamp_ns\"].iloc[0]) / 1e9\n", + "scan_df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Query 3: pull /sensors/imu from the same sequence\n", + "\n", + "Same pattern, different ontology. We pull the IMU acceleration and angular velocity to check whether the robot was moving when the LiDAR fault appeared. If the IMU shows zero horizontal acceleration and zero rotation throughout the 25 s window, the fault is in the sensor, not caused by robot motion." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "imu_topic = sh.get_topic_handler(\"/sensors/imu\")\n", + "imu_records = []\n", + "for item in imu_topic.get_data_streamer():\n", + " imu = item.data\n", + " imu_records.append(\n", + " {\n", + " \"timestamp_ns\": item.timestamp_ns,\n", + " \"acc_x\": imu.acceleration.x,\n", + " \"acc_y\": imu.acceleration.y,\n", + " \"acc_z\": imu.acceleration.z,\n", + " }\n", + " )\n", + "imu_df = pd.DataFrame(imu_records)\n", + "imu_df[\"t_s\"] = (imu_df[\"timestamp_ns\"] - scan_df[\"timestamp_ns\"].iloc[0]) / 1e9\n", + "imu_df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 5. The money shot plot\n", + "\n", + "Three subplots on the same time axis:\n", + "\n", + "1. **LaserScan range_std** - jumps when noise injection hits. This is the fault signature.\n", + "2. **LaserScan range_mean** - shifts when drift injection hits, stays stable under noise. Different failure modes produce different shapes.\n", + "3. **IMU acceleration_z** - sits at ~9.8 m/s^2 the whole time. A quick sanity check that the platform was level, not tilted or falling. This is a first-pass visual - a proper stationarity check needs all six IMU axes (see section 6 below for the full compound `.Q` query).\n", + "\n", + "This cross-topic correlation is what a structured, schema-indexed catalog enables." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "\n", + "fig, axes = plt.subplots(3, 1, sharex=True, figsize=(11, 7))\n", + "\n", + "axes[0].plot(scan_df[\"t_s\"], scan_df[\"range_std\"], color=\"crimson\", linewidth=1.6)\n", + "axes[0].set_ylabel(\"LaserScan\\nrange std [m]\")\n", + "axes[0].set_title(f\"{target_sequence}\")\n", + "axes[0].grid(alpha=0.3)\n", + "\n", + "axes[1].plot(scan_df[\"t_s\"], scan_df[\"range_mean\"], color=\"steelblue\", linewidth=1.6)\n", + "axes[1].set_ylabel(\"LaserScan\\nrange mean [m]\")\n", + "axes[1].grid(alpha=0.3)\n", + "\n", + "axes[2].plot(imu_df[\"t_s\"], imu_df[\"acc_z\"], color=\"forestgreen\", linewidth=1.0)\n", + "axes[2].axhline(9.81, color=\"black\", linestyle=\":\", alpha=0.7, label=\"1 g\")\n", + "axes[2].set_ylabel(\"IMU acc_z\\n[m/s^2]\")\n", + "axes[2].set_xlabel(\"time within snapshot [s]\")\n", + "axes[2].grid(alpha=0.3)\n", + "axes[2].legend(loc=\"upper right\")\n", + "\n", + "plt.tight_layout()\n", + "_out_dir = Path(\"../docs\")\n", + "_out_dir.mkdir(parents=True, exist_ok=True)\n", + "_out_path = _out_dir / \"lidar_noise_plot.png\"\n", + "plt.savefig(_out_path, dpi=120, bbox_inches=\"tight\")\n", + "plt.show()\n", + "print(f\"Saved plot to {_out_path}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 6. Compound .Q filter: find snapshots where the robot was stationary\n", + "\n", + "A single `acc_z ~ 9.81` check only confirms the sensor is level, not that the robot is parked. A proper stationarity check needs all six IMU degrees of freedom: linear acceleration near zero on X/Y (no horizontal motion), near 1 g on Z (level, not falling), and angular velocity near zero on all axes (not rotating).\n", + "\n", + "Mosaico's `QueryOntologyCatalog` supports compound expressions with implicit AND, so we can push all six conditions into one server-side query.\n", + "\n", + "In the fleet variant this query is not cosmetic: the fleet injector rotates robot-02 (IMU `drift_rate=0.3 rad/s`) while triggering the same LIDAR noise as robot-01, so robot-02's `angular_velocity.z` sits around 0.3 rad/s. The `between(-0.1, 0.1)` predicate excludes that sequence, leaving only the two stationary LIDAR faults for downstream content comparison (§7). Step 2 doing real work, not just showing the SDK surface." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "stationary = client.query(\n", + " QueryOntologyCatalog(\n", + " IMU.Q.acceleration.x.between(-0.5, 0.5), # no horizontal accel\n", + " IMU.Q.acceleration.y.between(-0.5, 0.5), # no horizontal accel\n", + " IMU.Q.acceleration.z.between(9.5, 10.1), # gravity only, level\n", + " IMU.Q.angular_velocity.x.between(-0.1, 0.1), # no roll\n", + " IMU.Q.angular_velocity.y.between(-0.1, 0.1), # no pitch\n", + " IMU.Q.angular_velocity.z.between(-0.1, 0.1), # no yaw\n", + " include_timestamp_range=True,\n", + " ),\n", + ")\n", + "print(f\"Sequences matching full stationarity check:\\n\")\n", + "for item in stationary:\n", + " print(item.sequence.name)\n", + " for topic in item.topics:\n", + " if topic.timestamp_range:\n", + " print(\n", + " f\" {topic.name} {topic.timestamp_range.start}..{topic.timestamp_range.end}\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "source": [ + "## 7. Fleet comparison: noise vs drift\n", + "\n", + "The fleet variant triggers LIDAR faults on all three robots (both noise and drift signatures under the same `LIDAR_SIM` fault code). The previous cell's compound IMU query already excluded robot-02 (which was rotating during its fault window). The remaining two stationary LIDAR sequences, robot-01 (noise injection) and robot-03 (drift injection), have the same metadata tag but completely different scan-level signatures. Pulling the actual scan statistics from Mosaico reveals those two distinct failure modes.\n", + "\n", + "Skip this section if you only ran the single-robot variant (it needs 2+ LIDAR_SIM sequences)." + ], + "metadata": {} + }, + { + "cell_type": "code", + "source": [ + "from pathlib import Path\n", + "\n", + "if len(lidar_sequences) >= 2:\n", + " def _pull_scan_stats(seq_name):\n", + " _sh = client.sequence_handler(seq_name)\n", + " _topic = _sh.get_topic_handler(\"/sensors/scan\")\n", + " recs = []\n", + " for item in _topic.get_data_streamer():\n", + " arr = np.array(item.data.ranges, dtype=float)\n", + " fin = arr[np.isfinite(arr)]\n", + " recs.append({\n", + " \"timestamp_ns\": item.timestamp_ns,\n", + " \"range_mean\": float(fin.mean()) if fin.size else math.nan,\n", + " \"range_std\": float(fin.std()) if fin.size else math.nan,\n", + " })\n", + " df = pd.DataFrame(recs)\n", + " df[\"t_s\"] = (df[\"timestamp_ns\"] - df[\"timestamp_ns\"].iloc[0]) / 1e9\n", + " return df\n", + "\n", + " seq_a, seq_b = lidar_sequences[0], lidar_sequences[1]\n", + " df_a = _pull_scan_stats(seq_a)\n", + " df_b = _pull_scan_stats(seq_b)\n", + "\n", + " fig, axes = plt.subplots(2, 2, figsize=(14, 6), sharex=\"col\")\n", + "\n", + " axes[0, 0].plot(df_a[\"t_s\"], df_a[\"range_std\"], color=\"crimson\", lw=1.4)\n", + " axes[0, 0].set_ylabel(\"range std [m]\")\n", + " axes[0, 0].set_title(seq_a, fontsize=9)\n", + " axes[0, 0].grid(alpha=0.3)\n", + "\n", + " axes[0, 1].plot(df_b[\"t_s\"], df_b[\"range_std\"], color=\"crimson\", lw=1.4)\n", + " axes[0, 1].set_title(seq_b, fontsize=9)\n", + " axes[0, 1].grid(alpha=0.3)\n", + "\n", + " axes[1, 0].plot(df_a[\"t_s\"], df_a[\"range_mean\"], color=\"steelblue\", lw=1.4)\n", + " axes[1, 0].set_ylabel(\"range mean [m]\")\n", + " axes[1, 0].set_xlabel(\"time [s]\")\n", + " axes[1, 0].grid(alpha=0.3)\n", + "\n", + " axes[1, 1].plot(df_b[\"t_s\"], df_b[\"range_mean\"], color=\"steelblue\", lw=1.4)\n", + " axes[1, 1].set_xlabel(\"time [s]\")\n", + " axes[1, 1].grid(alpha=0.3)\n", + "\n", + " fig.suptitle(\n", + " \"Same fault_code (LIDAR_SIM), different root cause\",\n", + " fontsize=13, fontweight=\"bold\",\n", + " )\n", + " plt.tight_layout()\n", + " _out_dir = Path(\"../docs\")\n", + " _out_dir.mkdir(parents=True, exist_ok=True)\n", + " _out_path = _out_dir / \"fleet_comparison.png\"\n", + " plt.savefig(_out_path, dpi=120, bbox_inches=\"tight\")\n", + " plt.show()\n", + " print(f\"Left: {seq_a}\\nRight: {seq_b}\\nSaved to {_out_path}\")\n", + "else:\n", + " print(f\"Only {len(lidar_sequences)} LIDAR_SIM sequence(s). Run the fleet variant for comparison.\")" + ], + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "client.close()" + ], + "metadata": {}, + "execution_count": null, + "outputs": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.11" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/demos/mosaico_integration/scripts/trigger-fault.sh b/demos/mosaico_integration/scripts/trigger-fault.sh new file mode 100755 index 0000000..265e3d7 --- /dev/null +++ b/demos/mosaico_integration/scripts/trigger-fault.sh @@ -0,0 +1,84 @@ +#!/bin/bash +# Trigger the LiDAR HIGH_NOISE fault on the running single-robot stack. +# +# Calls the medkit gateway scripts API to execute the inject-noise script, +# which sets lidar-sim noise_stddev=0.5 and camera-sim noise_level=0.3. +# That makes lidar_sim publish noisy ranges + a HIGH_NOISE diagnostic, +# diagnostic_bridge picks it up and reports a fault to fault_manager, +# fault_manager confirms it (confirmation_threshold=0), the rosbag ring +# buffer is flushed, and a few seconds later the bridge container picks +# up the fault_confirmed SSE event and ingests the bag into mosaicod. + +set -euo pipefail + +GATEWAY_URL="${GATEWAY_URL:-http://localhost:18080}" +API_BASE="${GATEWAY_URL}/api/v1" + +# Dependency check +for cmd in curl jq; do + if ! command -v "$cmd" >/dev/null 2>&1; then + echo "Required tool '$cmd' is not installed." + exit 1 + fi +done + +# Make sure the gateway is up +if ! curl -sf "${API_BASE}/health" > /dev/null; then + echo "Gateway not reachable at ${GATEWAY_URL}." + echo "Did you run 'docker compose up -d' from the demo directory?" + exit 1 +fi + +echo ">> Triggering inject-noise on compute-unit" + +# Start the script via the SOVD scripts API. medkit requires execution_type +# in the POST body. +START_RESP=$(curl -sf -X POST "${API_BASE}/components/compute-unit/scripts/inject-noise/executions" \ + -H "Content-Type: application/json" -d '{"execution_type": "now"}') + +EXEC_ID=$(echo "$START_RESP" | jq -r '.id // .execution_id // empty') +if [ -z "$EXEC_ID" ]; then + echo "Could not parse execution id from response:" + echo "$START_RESP" + exit 1 +fi +echo " execution id: $EXEC_ID" + +# Poll until the script finishes (max 30s) +completed=0 +STATUS="unknown" +for _ in $(seq 1 30); do + STATUS=$(curl -sf "${API_BASE}/components/compute-unit/scripts/inject-noise/executions/${EXEC_ID}" \ + | jq -r '.status // "unknown"') + case "$STATUS" in + completed|succeeded|success) + echo ">> inject-noise completed" + completed=1 + break + ;; + failed|error) + echo ">> inject-noise FAILED (status=$STATUS)" >&2 + exit 1 + ;; + *) + sleep 1 + ;; + esac +done + +if [ "$completed" -ne 1 ]; then + echo ">> inject-noise TIMED OUT after 30s (last status=$STATUS)" >&2 + echo " The gateway did not report completed/succeeded/success before the deadline." >&2 + exit 1 +fi + +echo "" +echo "Fault injected. The bridge waits POST_FAULT_WAIT_SEC (default 12s)" +echo "after fault_confirmed before downloading the bag, so expect the" +echo "fault_confirmed event and ingest ~15-20s from now. Watch the bridge logs:" +echo "" +echo " docker compose logs -f bridge" +echo "" +echo "Then query mosaicod from a notebook against:" +echo "" +echo " MosaicoClient.connect(host='localhost', port=16726)" diff --git a/demos/mosaico_integration/scripts/trigger-fleet-faults.sh b/demos/mosaico_integration/scripts/trigger-fleet-faults.sh new file mode 100755 index 0000000..f57b9fe --- /dev/null +++ b/demos/mosaico_integration/scripts/trigger-fleet-faults.sh @@ -0,0 +1,63 @@ +#!/bin/bash +# Fleet fault injection - three LIDAR_SIM faults, one under rotation. +# +# Scenario: all three robots report LIDAR_SIM (Step 1 metadata filter +# matches 3 of 3). Robot-02 is rotating during the fault window, so +# the compound IMU .Q stationarity query (Step 2) excludes it - leaving +# 2 of 3 sequences for content-level drill-down (noise vs drift). +# +# robot-01 warehouse-A : LIDAR noise_stddev=0.5 , stationary +# robot-02 warehouse-B : LIDAR noise_stddev=0.5 , rotating (IMU drift_rate=0.3 rad/s) +# robot-03 outdoor-yard : LIDAR drift_rate=0.5 , stationary +# +# The medkit side of robot-02 will also emit an IMU DRIFTING diagnostic +# once |drift_offset| > 0.1 rad, which becomes a second fault on the +# gateway. The bridge's FAULT_CODE_ALLOWLIST=LIDAR_SIM env var (set in +# docker-compose.fleet.yml) keeps that IMU fault out of Mosaico so the +# ingested catalog stays at one LIDAR_SIM sequence per robot. +set -euo pipefail + +IMU_DRIFT_RAD_PER_S="${IMU_DRIFT_RAD_PER_S:-0.3}" +PRE_FAULT_WAIT_SEC="${PRE_FAULT_WAIT_SEC:-20}" + +# put_config +# Wraps the PUT so a curl failure names the robot + endpoint instead of +# surfacing a bare `curl: (22) ...` via set -e. +put_config() { + local robot="$1" addr="$2" app="$3" param="$4" value="$5" + if ! curl -sf -X PUT "http://${addr}/api/v1/apps/${app}/configurations/${param}" \ + -H "Content-Type: application/json" -d "{\"value\": ${value}}" > /dev/null; then + echo "!! ${robot}: PUT ${app}/${param}=${value} FAILED at ${addr}" >&2 + exit 1 + fi +} + +echo "=== Fleet Fault Injection ===" +echo "" + +echo ">> robot-02 (warehouse-B): IMU drift_rate -> ${IMU_DRIFT_RAD_PER_S} rad/s (rotating)" +put_config robot-02 localhost:18082 imu-sim drift_rate "${IMU_DRIFT_RAD_PER_S}" + +echo ">> Priming the ring buffer with ${PRE_FAULT_WAIT_SEC}s of rotation data..." +sleep "${PRE_FAULT_WAIT_SEC}" + +echo ">> robot-01 (warehouse-A): LIDAR noise_stddev -> 0.5" +put_config robot-01 localhost:18081 lidar-sim noise_stddev 0.5 + +echo ">> robot-02 (warehouse-B): LIDAR noise_stddev -> 0.5 (same as robot-01, but under rotation)" +put_config robot-02 localhost:18082 lidar-sim noise_stddev 0.5 + +echo ">> robot-03 (outdoor-yard): LIDAR drift_rate -> 0.5" +put_config robot-03 localhost:18083 lidar-sim drift_rate 0.5 + +echo "" +echo "=== Injected ===" +echo " robot-01: LIDAR noise (stationary)" +echo " robot-02: LIDAR noise under rotation (IMU angular_velocity.z ~ ${IMU_DRIFT_RAD_PER_S} rad/s)" +echo " robot-03: LIDAR drift (stationary)" +echo "" +echo "Wait ~60s for post-fault capture + bridge ingest." +echo "" +echo "Expected in Mosaico after ingest:" +echo " 3 sequences, all fault_code=LIDAR_SIM" +echo " compound IMU.Q stationarity query matches 2 of 3 (robot-02 excluded by rotation)"