Event Loop, Scheduling, and Deployment Strategies
Joe Scanlin
November 2025
This section describes the Spaxiom runtime's architecture and execution model. The runtime orchestrates sensor reads, evaluates conditions, triggers callbacks, and manages pattern lifecycle while handling concurrency, failure modes, and resource constraints.
You'll learn about the async event loop, sensor buffering strategies, condition evaluation modes, callback execution guarantees, memory management, error handling, performance profiling, and deployment strategies for both edge and cloud environments.
The Spaxiom runtime is responsible for orchestrating sensor reads, evaluating conditions, triggering callbacks, and managing pattern lifecycle. It must handle concurrency, failure modes, and resource constraints while maintaining deterministic behavior for safety-critical applications.
This section describes the runtime's architecture, execution model, and deployment strategies for both edge and cloud environments.
The runtime uses an async event loop built on Python's asyncio library, enabling efficient concurrent I/O for sensor polling, network communication, and callback execution.
Core event loop structure:
import asyncio
from typing import List, Dict
from spaxiom import Sensor, Condition, Pattern
class SpaxiomRuntime:
def __init__(self, tick_rate: float = 10.0):
self.tick_rate = tick_rate # Hz
self.tick_period = 1.0 / tick_rate # seconds
self.sensors: List[Sensor] = []
self.conditions: Dict[str, Condition] = {}
self.patterns: List[Pattern] = []
self.callbacks: Dict[Condition, List[Callable]] = {}
self._running = False
async def run(self):
"""Main event loop."""
self._running = True
last_tick = asyncio.get_event_loop().time()
while self._running:
current_time = asyncio.get_event_loop().time()
dt = current_time - last_tick
# Phase 1: Sensor reads (concurrent)
await self._read_sensors()
# Phase 2: Pattern updates (sequential, in dependency order)
await self._update_patterns(dt)
# Phase 3: Condition evaluation
await self._evaluate_conditions()
# Phase 4: Callback dispatch (concurrent, isolated)
await self._dispatch_callbacks()
# Sleep until next tick
last_tick = current_time
sleep_time = max(0, self.tick_period - (asyncio.get_event_loop().time() - current_time))
await asyncio.sleep(sleep_time)
async def _read_sensors(self):
"""Concurrent sensor reads with timeout."""
tasks = [sensor.async_read() for sensor in self.sensors]
await asyncio.gather(*tasks, return_exceptions=True)
async def _update_patterns(self, dt: float):
"""Update patterns in topological order."""
for pattern in self._topological_sort(self.patterns):
pattern.update(dt)
async def _evaluate_conditions(self):
"""Evaluate all registered conditions."""
for name, condition in self.conditions.items():
condition._evaluate() # Internal state update
async def _dispatch_callbacks(self):
"""Dispatch callbacks for triggered conditions."""
tasks = []
for condition, callbacks in self.callbacks.items():
if condition.just_became_true(): # Edge-triggered
for callback in callbacks:
tasks.append(self._safe_callback(callback))
await asyncio.gather(*tasks, return_exceptions=True)
async def _safe_callback(self, callback: Callable):
"""Execute callback with exception isolation."""
try:
if asyncio.iscoroutinefunction(callback):
await callback()
else:
callback()
except Exception as e:
self._log_error(f"Callback {callback.__name__} failed: {e}")
Key design decisions:
within() have consistent semantics.asyncio.gather(), maximizing throughput on I/O-bound workloads.Sensors may produce data at rates different from the runtime tick rate. The runtime provides several buffering strategies:
sensor.read() returns the most recent value. Fast sensors (>100 Hz) are downsampled; slow sensors (<1 Hz) return stale values with a staleness timestamp.sensor.window(duration) returns all values from the past duration seconds, stored in a circular buffer. Used for statistical aggregation (mean, variance, percentiles).from spaxiom import Sensor
from spaxiom.units import celsius
temp_sensor = Sensor("room_temp", units=celsius)
# Latest value
current_temp = temp_sensor.read() # → Quantity(22.5, celsius)
# Moving window (last 60 seconds)
window = temp_sensor.window(60.0)
mean_temp = sum(window) / len(window)
max_temp = max(window)
Conditions can be evaluated in two modes:
The runtime automatically selects the appropriate mode based on condition complexity. Users can override with:
condition = Condition(lambda: door.read() > 0.5, mode="event-driven")
Callbacks registered with @on(condition) can be either synchronous or asynchronous:
@on(high_temp_alarm)
def sync_callback():
"""Runs in event loop thread, should be fast (<10 ms)."""
print("Temperature alarm!")
@on(high_temp_alarm)
async def async_callback():
"""Can perform async I/O (network requests, database writes)."""
await send_alert_email()
@on(high_temp_alarm)
@threaded # Decorator for CPU-bound work
def cpu_intensive_callback():
"""Runs in thread pool, does not block event loop."""
run_expensive_simulation()
Callback execution guarantees:
Temporal operators like within(duration, condition) maintain circular buffers of recent condition evaluations. Memory usage is O(duration × tick_rate).
For example, within(300.0, c) at 10 Hz requires 3000 boolean samples ≈ 3 KB. To prevent unbounded memory growth:
within(60.0, c1) & within(60.0, c2) shares a single 60s buffer.Production sensor networks experience failures: disconnections, timeouts, corrupted data, hardware faults. The runtime provides several failure handling modes:
required=True, causing the runtime to halt or trigger an emergency callback on persistent failure.timestamp and staleness field. Conditions can check staleness:
fresh_temp = Condition(lambda: temp_sensor.read().staleness < 5.0) # < 5s old
The runtime includes built-in profiling and debugging tools:
from spaxiom.runtime import SpaxiomRuntime
from spaxiom.profiler import enable_profiling
runtime = SpaxiomRuntime(tick_rate=10.0)
enable_profiling(runtime)
# After running for some time:
stats = runtime.profiler.get_stats()
print(f"Avg tick latency: {stats['avg_tick_ms']:.1f} ms")
print(f"Sensor read p99: {stats['sensor_read_p99_ms']:.1f} ms")
print(f"Callback failures: {stats['callback_failures']}")
# Trace a specific condition
runtime.profiler.trace_condition("high_temp_alarm")
# Logs: evaluation results, timing, sensor reads, callback dispatch
Profiling overhead is <1% when enabled, making it suitable for production use.
Spaxiom supports two primary deployment modes:
Target hardware: Raspberry Pi, NVIDIA Jetson, industrial edge gateways (ARM Cortex-A, x86-64).
Characteristics:
Optimizations:
# Edge deployment example
from spaxiom.runtime import SpaxiomRuntime
runtime = SpaxiomRuntime(
tick_rate=10.0,
backend="edge", # Optimizes for low memory
max_memory_mb=256
)
runtime.load_config("edge_config.yaml")
runtime.run()
Target platforms: AWS Lambda, Google Cloud Run, Kubernetes pods.
Characteristics:
Architecture:
# Cloud deployment with multi-tenancy
from spaxiom.runtime import SpaxiomRuntime
from spaxiom.cloud import KafkaSource, PostgresSink
runtime = SpaxiomRuntime(
tick_rate=1.0, # Cloud uses slower tick for efficiency
backend="cloud"
)
# Ingest from Kafka topic per site
runtime.add_source(KafkaSource(topic="spaxiom-sensors", group="site-123"))
# Persist events to PostgreSQL
runtime.add_sink(PostgresSink(table="events", batch_size=100))
await runtime.run_async() # Non-blocking, integrates with FastAPI/Flask
Many deployments use a hybrid topology: edge runtimes for low-latency local control, cloud runtime for aggregation, learning, and global coordination.
Hybrid edge-cloud deployment: Edge runtime handles low-latency safety and control loops (1-100 ms). Cloud runtime aggregates events for learning, analytics, and global optimization (1-60 s latency).
In this topology:
For production reliability, the runtime supports:
from spaxiom.runtime import SpaxiomRuntime
runtime = SpaxiomRuntime()
# Enable checkpointing every 60 seconds
runtime.enable_checkpointing(interval_s=60.0, path="/var/lib/spaxiom/checkpoints")
# On startup, attempt recovery
if runtime.checkpoint_exists():
runtime.restore_from_checkpoint()
print("Restored from checkpoint")
else:
runtime.initialize_fresh()
runtime.run()
With these mechanisms, Spaxiom runtimes achieve >99.9% uptime in production deployments.