EnfinitOSEnfinitOS
DevelopersRobotics & flight
Production-ready scaffold

Robotics SDK — Python

The robotics SDK most fleets will start with — ROS 2, Starship, Nuro, Refraction are all Python-heavy.

enfinitos-sdk-roboticsSubstrate RoboticsPython
Install

Get the SDK

pip install enfinitos-sdk-robotics

About this status badge

Typed, tested, documented, and wired to the EnfinitOS platform endpoints that exist today. Vendor-side SDK integrations (Broadsign / VIOOH / DJI / Tizen / Alexa / Twilio / Stripe / etc.) land per-customer at pilot integration time — those bring the renderer/transport/exchange-specific code; the EnfinitOS half is ready.

README

The developer-facing documentation in full

The same README the SDK package ships with — rendered here at build time so what you read matches exactly what you install.

enfinitos-sdk-robotics (Python)

EnfinitOS reference SDK for the ROBOTICS substrate — Python edition.

Python is the most-used language in real robotic fleets — Starship, Nuro, Refraction, and ROS 2 nodes are all Python-heavy. This is the SDK most customers will start with.

The Python SDK has the same surface and contract as the TypeScript SDK at packages/sdks/robotics-ts/; the two are kept in sync. See the TS README for the long-form research synthesis, architecture diagram, and adapter pattern — duplicated below in condensed form so a Python-only reader has everything in one place.

Platform-side counterpart. This SDK consumes the platform's /v1/robotics/connect WebSocket endpoint. That endpoint is a separate engineering item. The SDK targets the documented wire contract in src/enfinitos_robotics/types.py; the platform-side service to accept these connections is its own tranche.

Architecture

                          ┌──────────────────────────┐
                          │   EnfinitOS Platform     │
                          │  (rights, policy, audit) │
                          └────────────┬─────────────┘
                                       │  WebSocket (JWT in handshake)
                                       │
   ┌────────────── (1) policy_push ────▼─────────────┐
   │                                                  │
   │    ┌────────────────────────────────────┐        │
   │    │      EnfinitOSRobotClient          │        │
   │    │  Lifecycle | Policy | Reporter | E-stop     │
   │    └────────────────────────────────────┘        │
   │                                                  │
   └──────────────────────────────────────────────────┘
                   │                  ▲
   (2) policy_ack  │                  │ (4) emergency_stop
   (3) behavioural │                  │     (priority channel —
       event       │                  │      never queued)
   (5) telemetry   │                  │
                   ▼                  │
   ┌──────────────────────────────────┴───────────────┐
   │           Adapter (ROS 2 / VDA 5050 /            │
   │            Starship-style / custom)              │
   └──────────────────────────────────────────────────┘

Research synthesis — why this shape

The SDK borrows from five existing patterns:

Pattern sourceWhat we adopted
ROS 2 managed-node lifecycleFive-state machine; ERROR must convalesce through SUSPENDED (ISO 10218).
VDA 5050 (Toyota / Linde / BMW)Priority channel for emergency stops; our send_immediate() is the instantActions equivalent.
Starship Technologies (sidewalk delivery)REST + WebSocket telemetry @ 1–5 Hz; remote-operator handoff via state transition.
Boston Dynamics Spot SDKE-Stop never queued behind other traffic.
Nuro / Refraction AI / CobaltRules-based geofencing + remote oversight ⇒ exclusion-zone reporting + forced flag for safety-system overrides.

Getting started

Install

pip install enfinitos-sdk-robotics

Five-minute hello-world

import asyncio
from datetime import datetime, timezone

from enfinitos_robotics import (
    EnfinitOSRobotClient,
    EnfinitOSRobotClientOptions,
    GeoCoordinate,
    HeartbeatOptions,
    TelemetryReport,
)


async def main() -> None:
    client = EnfinitOSRobotClient(
        EnfinitOSRobotClientOptions(
            org_id="org_acme",
            robot_id="robot_001",
            fleet_id="fleet_main",
            auth_token="<JWT>",
            api_base_url="https://api.enfinitos.com",
        )
    )

    # 1) wire policy receipt
    def on_policy(policy):
        print(f"policy v{policy.version} with {len(policy.rules)} rules")
        asyncio.create_task(client.acknowledge_policy(policy.version))

    client.subscribe_policy(on_policy)

    # 2) wire emergency stop receipt
    def on_estop(cmd):
        print(f"E-STOP from {cmd.issuedBy}: {cmd.reason}")
        # halt motion, transition to SUSPENDED.

    client.on_emergency_stop_received(on_estop)

    # 3) connect
    await client.connect()

    # 4) heartbeat
    client.start_telemetry_heartbeat(
        HeartbeatOptions(
            interval_ms=1000,
            build_report=lambda: TelemetryReport(
                reportedAt=datetime.now(timezone.utc)
                .isoformat()
                .replace("+00:00", "Z"),
                location=GeoCoordinate(lat=51.5, lng=-0.1),
                batteryPct=87.0,
                speedKph=5.4,
                headingDeg=90.0,
                state=client.get_state(),
            ),
        )
    )

    await asyncio.sleep(30)
    await client.disconnect()


if __name__ == "__main__":
    asyncio.run(main())

Lifecycle states

StateWhat it meansReachable from
CONFIGUREDConnected and authenticated; awaiting first policy.(initial)
ACTIVEHas a policy and is executing missions.CONFIGURED, SUSPENDED
SUSPENDEDPaused (operator intervention, low battery, soft fault).CONFIGURED, ACTIVE, ERROR
ERRORHard fault. Must transition through SUSPENDED before re-activating.(any non-terminal)
TERMINATEDTerminal. Client must be reconstructed to reconnect.(any non-terminal)

The ERROR → SUSPENDED requirement is intentional: it matches ISO 10218 robotics-safety guidance, forcing an operator to acknowledge the fault before the robot can move again.

Policy update flow

   PLATFORM                                     ROBOT (SDK)
       │                                            │
       │   policy_push { version: 7, rules: […] }   │
       │ ─────────────────────────────────────────► │
       │                                            │ ─┐  apply()
       │                                            │  │  fan out to
       │                                            │  │  subscribers
       │                                            │ ◄┘
       │                                            │
       │     policy_ack { policyVersion: 7,         │
       │       robotState: "ACTIVE", ackedAt: … }   │
       │ ◄───────────────────────────────────────── │

The platform considers a policy "live" only after the robot acks. Out-of-order pushes are silently dropped by the resolver; stale ack attempts are also dropped client-side.

Behavioural rule kinds

The eleven BehaviourRule variants (mirroring the platform's operational schema at apps/api/src/modules/rights/contracts/scope.ts):

KindConvenience helper
YIELD_TO_PEDESTRIANSreport_yield_event(...)
MAX_SPEED_KPHreport_speed_exceedance(...)
EXCLUSION_ZONEreport_exclusion_zone_entry(...)
OPERATING_HOURSreport_behavioural_event(...) (generic)
REQUIRE_HUMAN_OPERATORgeneric
DONT_DISPLAY_WHILE_MOVING / DRIVER_ATTENTION_REQUIRED / INTEGRATE_WITH_VEHICLE_DISTRACTION_SYSTEM / PASSENGER_DISPLAY_ONLY / REQUIRE_HANDS_FREEgeneric — AUTOMOTIVE substrate cross-over
CUSTOMgeneric

Event types: rule_fired, rule_violated, rule_skipped, rule_unknown.

Adapter pattern

Each fleet ships an adapter that implements four protocols in src/enfinitos_robotics/adapters/_template.py:

ProtocolResponsibility
FleetCommandSinkapply EnfinitOS policy onto the fleet's native command vocabulary
FleetEventSourcetranslate native fleet events into BehaviouralEvents
FleetTelemetrySourcebuild TelemetryReports from native pose/battery/sensors
FleetLifecycleHookoptional — react to lifecycle transitions

Skeletons shipped:

  • adapters/ros2.py — ROS 2 via rclpy, maps onto managed-node lifecycle, latched topics for policy publication.
  • adapters/vda5050.py — VDA 5050 v2 via MQTT, instantActions for most policy rules.

Telemetry pattern

Recommended cadence: 1 Hz for sidewalk delivery and warehouse AGVs; up to 5 Hz for autonomous-vehicle telemetry. The platform throttles ingest above 10 Hz to protect the audit ledger.

TelemetryReport shape: location, battery, speed, heading, state, free-form sensorHealth. Keep sensorHealth keys stable across reports so the SRE dashboard's time series stay coherent.

Emergency stop semantics

  • Priority channel. send_immediate() bypasses the normal outbound queue. An emergency stop fires even if a policy ack or telemetry frame is mid-send.
  • Idempotent. commandId is the dedup key.
  • Never queued behind reconnect backoff. If the WS is down when emergency_stop() is called, the message sits at the head of the priority queue and is the FIRST thing flushed when the socket re-opens.
  • Lifecycle. Robot-side emergency_stop() auto-transitions to SUSPENDED. Inbound emergency stops do NOT auto-transition.

Sample integration — minimal viable robot

import asyncio
from datetime import datetime, timezone

from enfinitos_robotics import (
    EnfinitOSRobotClient,
    EnfinitOSRobotClientOptions,
    GeoCoordinate,
    TelemetryReport,
    YieldEventDetail,
)


async def main() -> None:
    client = EnfinitOSRobotClient(
        EnfinitOSRobotClientOptions(
            org_id="org_acme",
            robot_id="robot_001",
            fleet_id="fleet_main",
            auth_token="<JWT>",
            api_base_url="https://api.enfinitos.com",
        )
    )

    def on_policy(policy):
        print(f"got policy v{policy.version}")
        asyncio.create_task(client.acknowledge_policy(policy.version))

    client.subscribe_policy(on_policy)
    await client.connect()

    # one heartbeat
    await client.report_telemetry(
        TelemetryReport(
            reportedAt=datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
            location=GeoCoordinate(lat=51.5, lng=-0.1),
            batteryPct=87.0,
            speedKph=0.0,
            headingDeg=90.0,
            state=client.get_state(),
        )
    )

    # one behavioural event
    await client.report_yield_event(
        YieldEventDetail(
            occurredAt=datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
            location=GeoCoordinate(lat=51.5, lng=-0.1),
            detectedPedestrians=1,
            closestDistanceM=1.8,
        )
    )

    await asyncio.sleep(0.5)
    await client.disconnect()


if __name__ == "__main__":
    asyncio.run(main())

What's SDK-side vs platform-side

ConcernThis SDKPlatform tranche (separate)
Wire protocol (WireMessage)yes — defined hereyes — implemented by platform-side server
/v1/robotics/connect WS endpointnopending — separate work item
Audit-log persistencenopending — separate work item
JWT issuancenoreuses existing /auth/* plane
Policy authoring UInoalready shipped in apps/web/.../compose
Adapters (ROS 2, VDA 5050, custom)yes — skeletons(customer-owned)

See also

  • apps/api/src/modules/rights/contracts/scope.tsBehaviourRule source-of-truth on the platform side.
  • packages/sdks/robotics-ts/ — TypeScript twin of this SDK.
  • docs/launch/substrate-readiness-matrix.md — ROBOTICS substrate readiness tracker.
API reference

Hit the HTTP surface directly

The Robotics SDK — Python is a thin client over the same governed HTTP API every other SDK calls. The full OpenAPI 3.1 reference lives on the docs site, published alongside the April 2027 platform launch.

Sandbox

Run this SDK against a real tenant

The browser demo at enfinitos.com/sandbox runs today against a shared synthetic tenant. The dedicated developer sandbox — your own persistent tenant, API keys, full HTTP-contract coverage — opens ahead of the April 2027 platform launch.