Spaxiom Logo
Spaxiom Technical Series - Part 4

Runtime Architecture and Execution Model

Event Loop, Scheduling, and Deployment Strategies

Joe Scanlin

November 2025

About This Section

This section describes the Spaxiom runtime's architecture and execution model. The runtime orchestrates sensor reads, evaluates conditions, triggers callbacks, and manages pattern lifecycle while handling concurrency, failure modes, and resource constraints.

You'll learn about the async event loop, sensor buffering strategies, condition evaluation modes, callback execution guarantees, memory management, error handling, performance profiling, and deployment strategies for both edge and cloud environments.

2.5 Runtime Architecture and Execution Model

The Spaxiom runtime is responsible for orchestrating sensor reads, evaluating conditions, triggering callbacks, and managing pattern lifecycle. It must handle concurrency, failure modes, and resource constraints while maintaining deterministic behavior for safety-critical applications.

This section describes the runtime's architecture, execution model, and deployment strategies for both edge and cloud environments.

Event loop and scheduling

The runtime uses an async event loop built on Python's asyncio library, enabling efficient concurrent I/O for sensor polling, network communication, and callback execution.

Core event loop structure:

import asyncio
from typing import List, Dict
from spaxiom import Sensor, Condition, Pattern

class SpaxiomRuntime:
    def __init__(self, tick_rate: float = 10.0):
        self.tick_rate = tick_rate  # Hz
        self.tick_period = 1.0 / tick_rate  # seconds
        self.sensors: List[Sensor] = []
        self.conditions: Dict[str, Condition] = {}
        self.patterns: List[Pattern] = []
        self.callbacks: Dict[Condition, List[Callable]] = {}
        self._running = False

    async def run(self):
        """Main event loop."""
        self._running = True
        last_tick = asyncio.get_event_loop().time()

        while self._running:
            current_time = asyncio.get_event_loop().time()
            dt = current_time - last_tick

            # Phase 1: Sensor reads (concurrent)
            await self._read_sensors()

            # Phase 2: Pattern updates (sequential, in dependency order)
            await self._update_patterns(dt)

            # Phase 3: Condition evaluation
            await self._evaluate_conditions()

            # Phase 4: Callback dispatch (concurrent, isolated)
            await self._dispatch_callbacks()

            # Sleep until next tick
            last_tick = current_time
            sleep_time = max(0, self.tick_period - (asyncio.get_event_loop().time() - current_time))
            await asyncio.sleep(sleep_time)

    async def _read_sensors(self):
        """Concurrent sensor reads with timeout."""
        tasks = [sensor.async_read() for sensor in self.sensors]
        await asyncio.gather(*tasks, return_exceptions=True)

    async def _update_patterns(self, dt: float):
        """Update patterns in topological order."""
        for pattern in self._topological_sort(self.patterns):
            pattern.update(dt)

    async def _evaluate_conditions(self):
        """Evaluate all registered conditions."""
        for name, condition in self.conditions.items():
            condition._evaluate()  # Internal state update

    async def _dispatch_callbacks(self):
        """Dispatch callbacks for triggered conditions."""
        tasks = []
        for condition, callbacks in self.callbacks.items():
            if condition.just_became_true():  # Edge-triggered
                for callback in callbacks:
                    tasks.append(self._safe_callback(callback))
        await asyncio.gather(*tasks, return_exceptions=True)

    async def _safe_callback(self, callback: Callable):
        """Execute callback with exception isolation."""
        try:
            if asyncio.iscoroutinefunction(callback):
                await callback()
            else:
                callback()
        except Exception as e:
            self._log_error(f"Callback {callback.__name__} failed: {e}")

Key design decisions:

Sensor buffering and sampling strategies

Sensors may produce data at rates different from the runtime tick rate. The runtime provides several buffering strategies:

from spaxiom import Sensor
from spaxiom.units import celsius

temp_sensor = Sensor("room_temp", units=celsius)

# Latest value
current_temp = temp_sensor.read()  # → Quantity(22.5, celsius)

# Moving window (last 60 seconds)
window = temp_sensor.window(60.0)
mean_temp = sum(window) / len(window)
max_temp = max(window)

Condition evaluation: polling vs event-driven

Conditions can be evaluated in two modes:

  1. Polling (default): conditions are evaluated on every tick. Suitable for conditions that depend on slowly changing sensor state or complex temporal logic.
  2. Event-driven: conditions register callbacks on sensor value changes. The runtime triggers evaluation only when dependencies change. This is more efficient for sparse events (e.g., door open/close) but requires careful dependency tracking.

The runtime automatically selects the appropriate mode based on condition complexity. Users can override with:

condition = Condition(lambda: door.read() > 0.5, mode="event-driven")

Callback execution model: async/sync and threading

Callbacks registered with @on(condition) can be either synchronous or asynchronous:

@on(high_temp_alarm)
def sync_callback():
    """Runs in event loop thread, should be fast (<10 ms)."""
    print("Temperature alarm!")

@on(high_temp_alarm)
async def async_callback():
    """Can perform async I/O (network requests, database writes)."""
    await send_alert_email()

@on(high_temp_alarm)
@threaded  # Decorator for CPU-bound work
def cpu_intensive_callback():
    """Runs in thread pool, does not block event loop."""
    run_expensive_simulation()

Callback execution guarantees:

Memory management for time-windowed operations

Temporal operators like within(duration, condition) maintain circular buffers of recent condition evaluations. Memory usage is O(duration × tick_rate).

For example, within(300.0, c) at 10 Hz requires 3000 boolean samples ≈ 3 KB. To prevent unbounded memory growth:

Handling sensor failures and missing data

Production sensor networks experience failures: disconnections, timeouts, corrupted data, hardware faults. The runtime provides several failure handling modes:

Performance profiling and debugging

The runtime includes built-in profiling and debugging tools:

from spaxiom.runtime import SpaxiomRuntime
from spaxiom.profiler import enable_profiling

runtime = SpaxiomRuntime(tick_rate=10.0)
enable_profiling(runtime)

# After running for some time:
stats = runtime.profiler.get_stats()
print(f"Avg tick latency: {stats['avg_tick_ms']:.1f} ms")
print(f"Sensor read p99: {stats['sensor_read_p99_ms']:.1f} ms")
print(f"Callback failures: {stats['callback_failures']}")

# Trace a specific condition
runtime.profiler.trace_condition("high_temp_alarm")
# Logs: evaluation results, timing, sensor reads, callback dispatch

Profiling overhead is <1% when enabled, making it suitable for production use.

Edge vs cloud deployment

Spaxiom supports two primary deployment modes:

Edge deployment

Target hardware: Raspberry Pi, NVIDIA Jetson, industrial edge gateways (ARM Cortex-A, x86-64).

Characteristics:

Optimizations:

# Edge deployment example
from spaxiom.runtime import SpaxiomRuntime

runtime = SpaxiomRuntime(
    tick_rate=10.0,
    backend="edge",  # Optimizes for low memory
    max_memory_mb=256
)
runtime.load_config("edge_config.yaml")
runtime.run()

Cloud deployment

Target platforms: AWS Lambda, Google Cloud Run, Kubernetes pods.

Characteristics:

Architecture:

# Cloud deployment with multi-tenancy
from spaxiom.runtime import SpaxiomRuntime
from spaxiom.cloud import KafkaSource, PostgresSink

runtime = SpaxiomRuntime(
    tick_rate=1.0,  # Cloud uses slower tick for efficiency
    backend="cloud"
)

# Ingest from Kafka topic per site
runtime.add_source(KafkaSource(topic="spaxiom-sensors", group="site-123"))

# Persist events to PostgreSQL
runtime.add_sink(PostgresSink(table="events", batch_size=100))

await runtime.run_async()  # Non-blocking, integrates with FastAPI/Flask

Deployment topology: hybrid edge-cloud

Many deployments use a hybrid topology: edge runtimes for low-latency local control, cloud runtime for aggregation, learning, and global coordination.

Cloud Layer Event Store (PostgreSQL) Time-series DB RL Training (PyTorch) Policy Learning Global Agent (GPT-4) Coordination Event Stream Policy Updates Commands Edge Runtime (Raspberry Pi / Jetson) Patterns (INTENT) OccupancyField QueueFlow ADLTracker Conditions (Safety) Temporal Logic Zone Checking Collision Avoid Local Agent (Fast Reaction) Emergency Stop Alert Dispatch <10ms Loop Physical Site Sensor Fusion Floor PIR Temp Temporal Logic Door Pressure Actuators HVAC Light Robot 1-60s latency 1-100ms latency <10ms loop

Hybrid edge-cloud deployment: Edge runtime handles low-latency safety and control loops (1-100 ms). Cloud runtime aggregates events for learning, analytics, and global optimization (1-60 s latency).

In this topology:

Fault tolerance and state recovery

For production reliability, the runtime supports:

from spaxiom.runtime import SpaxiomRuntime

runtime = SpaxiomRuntime()

# Enable checkpointing every 60 seconds
runtime.enable_checkpointing(interval_s=60.0, path="/var/lib/spaxiom/checkpoints")

# On startup, attempt recovery
if runtime.checkpoint_exists():
    runtime.restore_from_checkpoint()
    print("Restored from checkpoint")
else:
    runtime.initialize_fresh()

runtime.run()

With these mechanisms, Spaxiom runtimes achieve >99.9% uptime in production deployments.