Engine & SDK
Mission Engine core runtime, plugin lifecycle, event bus, ARES-E and Energy integrations, and the developer SDK
The Mission Engine (engine/) is the runtime kernel that all OMEN mission applications plug into. The SDK (sdk/) provides typed interfaces, data contracts, and authoring guides for plugin and adapter developers.
Engine Core
Plugin Runtime — engine/core/runtime.py
The PluginRuntime class manages the complete lifecycle of mission plugins:
| Class | Purpose |
|---|---|
PluginManifest |
Dataclass defining plugin metadata — id, name, version, capabilities, dependencies, schema, classification, AI_assisted flag |
PluginRegistry |
Central registry of loaded plugins with manifest tracking and duplicate prevention |
PluginRuntime |
Lifecycle manager for plugin discovery, loading, event routing, and fault isolation |
Key Methods:
| Method | Description |
|---|---|
register(plugin, manifest) |
Register a plugin with duplicate ID prevention |
start_plugin(plugin_id, config) |
Load and initialize a plugin with configuration |
stop_plugin(plugin_id) |
Graceful shutdown with cleanup |
dispatch_event(event) |
Route events to all running plugins; collect output events |
running_plugins() |
List all currently active plugin IDs |
Fault Isolation: An exception in one plugin does not crash others — each plugin’s on_event() is wrapped in its own error boundary. Failed events are logged but do not propagate.
Event Bus — engine/core/event_bus.py
The InProcessEventBus provides synchronous in-process event routing:
bus = InProcessEventBus()
# Subscribe with exact topic
sub_id = bus.subscribe("track_update", handler_fn)
# Subscribe with wildcard
bus.subscribe("threat.*", threat_handler)
# Subscribe to all events
bus.subscribe("*", audit_logger)
# Publish
bus.publish("track_update", event_data)
Pattern Matching:
- Exact match —
"track_update"matches only"track_update" - Wildcard prefix —
"threat.*"matches"threat.new","threat.update", etc. - Broadcast —
"*"receives all published events
Error Isolation: Exceptions in subscriber handlers are caught and logged — other subscribers still receive the event.
Phase 1 Note: The current implementation is synchronous and in-process. Phase 2 will replace with NATS/MQTT backend, add dead-letter queue, replay support, and typed message envelopes.
ARES-E Integration — engine/ares_e/hitl.py
OMEN integrates with DaScient ARES-E for deterministic evaluation and human-in-the-loop governance.
Human-in-the-Loop Gate
gate = HITLGate()
decision = gate.request_approval(
payload={"action": "reroute", "new_waypoint": "WP-ALPHA"},
evidence={"risk_score": 0.82, "model": "route-risk-v1"},
requester="bft-monitor-plugin"
)
# Returns: HITLDecision.APPROVED | REJECTED | EXPIRED
| Class | Purpose |
|---|---|
HITLDecision |
Enum: APPROVED, REJECTED, EXPIRED |
HITLRequest |
Request record with action_type, consequence_level, payload, evidence, requester |
HITLReviewRecord |
Review outcome with decision, reviewer_id, reviewed_at, notes |
HITLGate |
Review gate that queues AI recommendations for human approval |
HarnessHooks |
Engine lifecycle hooks for recording events for deterministic replay |
Consequence Levels:
- Low — auto-approved (stub behavior in Phase 1)
- Medium — logged, requires review in production
- High/Critical — mandatory human review before action proceeds
Harness Hooks
hooks = register_harness_hooks(engine, config)
hooks.enable()
# ... run scenario ...
hooks.flush_to_disk() # Serialize event log to JSON for replay
Energy Integration — engine/energy/energy_client.py
OMEN integrates with DaScient Energy for resource-constrained operation on edge hardware.
Energy State Model
class EnergyState:
battery_soc_pct: float # State of charge (0-100%)
power_source: str # "battery", "ac", "usb"
thermal_state: ThermalState # NORMAL, WARM, THROTTLED, CRITICAL
cpu_temperature_celsius: float
current_mode: EnergyMode # PERFORMANCE, BALANCED, LOW_POWER, EMERGENCY
source: str # "dascient-energy" or "platform"
Mode Controller
The ModeController evaluates the current EnergyState and transitions between operating modes:
| Condition | Mode |
|---|---|
| Battery > 50%, thermal NORMAL | PERFORMANCE |
| Battery 20–50% or thermal WARM | BALANCED |
| Battery 10–20% or thermal THROTTLED | LOW_POWER |
| Battery < 10% or thermal CRITICAL | EMERGENCY |
client = EnergyClient(config)
controller = ModeController()
state = client.get_energy_state()
changed = controller.update(state)
if changed:
print(f"Mode changed to: {controller.current_mode}")
# Register a listener
controller.on_mode_change(lambda old, new: print(f"{old} → {new}"))
Platform Fallback: If the DaScient Energy API is unavailable, the client reads directly from Linux /sys/class/power_supply to provide battery and thermal data.
SDK
The SDK (sdk/) provides the contracts and guides needed to build OMEN plugins and adapters.
Directory Structure
Canonical Data Models — sdk/contracts/canonical_models.py
All data flowing through OMEN is typed using Pydantic v2 models:
| Model | Description | Key Fields |
|---|---|---|
EntityMetadata |
Shared metadata for all canonical entities | entity_id, source_id, source_format, confidence (0–1), completeness (0–1), is_stale, provenance[], classification |
TrackPosition |
Immutable geospatial position | latitude (±90), longitude (±180), altitude_m, heading_deg (0–360), speed_mps |
Track |
Blue-force, hostile, or unknown track | metadata, callsign, track_type (FRIENDLY/HOSTILE/UNKNOWN/NEUTRAL), position |
Route |
Mission route with waypoints | route_id, waypoints[], total_distance_nm, estimated_duration_min |
Threat |
Threat entity with engagement envelope | threat_type, position, engagement_radius_nm, threat_level (LOW–CRITICAL) |
Airspace |
Controlled or restricted airspace | name, airspace_type, geometry (GeoJSON), altitude range, schedule |
Notam |
Notice to Air Missions | notam_id, text, affected_area, altitude range, effective dates |
MissionEvent |
Operational event record | event_subtype, description, severity (INFO/CAUTION/WARNING) |
SensorReading |
Telemetry sensor value | sensor_id, parameter, value, unit, is_anomalous |
Enumerations:
TrackType— FRIENDLY, HOSTILE, UNKNOWN, NEUTRALThreatLevel— LOW, MEDIUM, HIGH, CRITICALWaypointType— DEPARTURE, CHECKPOINT, OBJECTIVE, ARRIVAL, ALTERNATEAirspaceType— CLASS_A through CLASS_C, RESTRICTED, PROHIBITED, DANGER, TFR, MOA, WARNING, OTHER
Union Type: CanonicalEntity = Track | Route | Threat | Airspace | Notam | MissionEvent | SensorReading
Plugin Interface — sdk/interfaces/plugin_interface.py
All OMEN plugins implement the IPlugin abstract base class:
class IPlugin(ABC):
@property
@abstractmethod
def plugin_id(self) -> str: ...
@abstractmethod
def on_start(self, config: dict) -> None: ...
@abstractmethod
def on_stop(self) -> None: ...
@abstractmethod
def on_event(self, event: dict) -> list[Any]: ...
Adapter Interface — sdk/interfaces/plugin_interface.py
All CAL adapters implement the IAdapter abstract base class:
class IAdapter(ABC):
@property
@abstractmethod
def adapter_id(self) -> str: ...
@property
@abstractmethod
def supported_formats(self) -> list[str]: ...
@abstractmethod
def ingest(self, raw: Any) -> list[Any]: ...
@abstractmethod
def validate(self, raw: Any) -> ValidationResult: ...
@abstractmethod
def health(self) -> AdapterHealth: ...
@abstractmethod
def schema(self) -> dict: ...
TypeScript Interfaces — sdk/interfaces/plugin_interface.ts
TypeScript mirrors of the Python interfaces for map-app and frontend plugin development:
IPlugin— pluginId, onStart(), onStop(), onEvent()IAdapter— adapterId, supportedFormats, ingest(), validate(), health(), schema()IEventBus— publish(), subscribe(), unsubscribe()- Canonical entity types: Track, Route, Threat, Airspace, Notam
- Event types: OmenEvent, TrackUpdateEvent, ProximityAlertEvent, ConnectivityStateChangeEvent, EnergyStateChangeEvent
Service Interface
Engine-managed services implement IService:
class IService(ABC):
@property
@abstractmethod
def service_id(self) -> str: ...
@abstractmethod
async def start(self) -> None: ...
@abstractmethod
async def stop(self) -> None: ...
@abstractmethod
def health(self) -> ServiceHealth: ...
Engine Test Coverage
The engine core is fully tested in engine/core/test_core.py (209 lines):
| Test Suite | Tests | Coverage |
|---|---|---|
TestPluginRegistry |
5 tests | Registration, duplicate detection, listing, unregistration, manifest retrieval |
TestPluginRuntime |
5 tests | Lifecycle start/stop, event dispatch, fault isolation, error handling |
TestInProcessEventBus |
5 tests | Subscribe/publish, unsubscribe, wildcard patterns, multi-subscriber, error isolation |