From Standalone On-Premises to Cloud-Native Deployments
Joe Scanlin
November 2025
This section describes Spaxiom's integration architecture, which supports flexible deployment across a spectrum from fully standalone on-premises systems to cloud-integrated hybrid architectures. Whether you're running on a single Raspberry Pi in a factory or coordinating 10,000 sites through cloud aggregators, Spaxiom provides the connectors you need.
You'll learn about direct sensor onboarding, local data persistence, cloud platform integrations (MQTT, Kafka, webhooks), protocol adapters for proprietary sensors, multi-site orchestration, AI/ML platform integration, and developer tools for simulation and debugging.
Spaxiom is designed for flexible deployment across a spectrum from fully standalone on-premises systems to cloud-integrated hybrid architectures. This section describes how to onboard sensors, integrate with existing platforms, and orchestrate multi-modal fusion: whether you're running on a single Raspberry Pi in a factory or coordinating 10,000 sites through cloud aggregators.
Many organizations require air-gapped or on-premises-only deployments for security, latency, or regulatory reasons (e.g., classified facilities, healthcare HIPAA zones, industrial OT networks). Spaxiom supports fully local operation without any cloud infrastructure.
Spaxiom provides protocol adapters for common sensor interfaces. You can connect sensors directly to the edge runtime via:
Example: Onboard a Modbus temperature sensor and a USB-connected PIR motion detector:
from spaxiom import Sensor
from spaxiom.adapters import ModbusSensor, SerialSensor
# Modbus RTU temperature sensor on /dev/ttyUSB0
temp_sensor = ModbusSensor(
port="/dev/ttyUSB0",
baudrate=9600,
slave_id=1,
register=0x0000,
data_type="float32",
unit="celsius"
)
# USB serial PIR sensor (sends "1" for motion, "0" for idle)
pir_sensor = SerialSensor(
port="/dev/ttyUSB1",
baudrate=115200,
parser=lambda line: int(line.strip()) == 1,
data_type="boolean"
)
# Wrap in Spaxiom Sensor abstraction
temp = Sensor(name="zone_temp", adapter=temp_sensor, sample_rate=1.0)
motion = Sensor(name="zone_motion", adapter=pir_sensor, sample_rate=10.0)
# Now use in patterns and conditions
from spaxiom import Condition
overheating = Condition(lambda: temp.read() > 30.0)
occupied = Condition(lambda: motion.read())
For on-prem deployments, Spaxiom can store events locally using:
from spaxiom.storage import LocalEventStore
# SQLite database in /var/lib/spaxiom/events.db
store = LocalEventStore(backend="sqlite", path="/var/lib/spaxiom/events.db")
# Configure runtime to persist events locally
runtime.set_event_store(store)
# Query recent events
recent = store.query(event_type="DoorOpened", since="2025-01-01", limit=100)
Spaxiom includes an optional web UI (FastAPI + React) that runs entirely on the edge device:
from spaxiom.ui import WebDashboard
dashboard = WebDashboard(runtime=runtime, port=8080)
dashboard.start() # Now accessible at http://localhost:8080
The dashboard provides:
Here's a complete standalone deployment for a warehouse with 20 sensors, no internet connection:
from spaxiom import SpaxiomRuntime, Sensor, Zone, Condition
from spaxiom.adapters import ModbusSensor, CameraSensor, GPIOSensor
from spaxiom.intent import OccupancyField, QueueFlow
from spaxiom.storage import LocalEventStore
from spaxiom.ui import WebDashboard
# Initialize runtime
runtime = SpaxiomRuntime(tick_rate=10.0)
# Onboard 20 Modbus temperature sensors
zones = ["loading", "staging", "storage_a", "storage_b"]
temp_sensors = {}
for i, zone_name in enumerate(zones):
sensor = ModbusSensor(port="/dev/ttyUSB0", slave_id=i+1, register=0x0000)
temp_sensors[zone_name] = Sensor(f"{zone_name}_temp", adapter=sensor)
runtime.add_sensor(temp_sensors[zone_name])
# Onboard 4 ceiling cameras for occupancy
cameras = {}
for zone_name in zones:
cam = CameraSensor(rtsp_url=f"rtsp://192.168.1.{10+zones.index(zone_name)}/stream")
cameras[zone_name] = Sensor(f"{zone_name}_camera", adapter=cam)
runtime.add_sensor(cameras[zone_name])
# Onboard 4 door sensors (GPIO on Raspberry Pi)
door_sensors = {}
for i, zone_name in enumerate(zones):
gpio = GPIOSensor(pin=17+i, mode="input")
door_sensors[zone_name] = Sensor(f"{zone_name}_door", adapter=gpio)
runtime.add_sensor(door_sensors[zone_name])
# Define zones
zone_objs = {name: Zone(name=name, x=i*10, y=0, width=10, height=10)
for i, name in enumerate(zones)}
# Create INTENT patterns
occupancy_patterns = {}
for zone_name in zones:
pattern = OccupancyField(
zone=zone_objs[zone_name],
camera=cameras[zone_name],
count_threshold=5
)
occupancy_patterns[zone_name] = pattern
runtime.add_pattern(pattern)
# Queue flow for loading dock
queue_pattern = QueueFlow(
entry_zone=zone_objs["loading"],
camera=cameras["loading"],
max_wait_time=300.0 # 5 minutes
)
runtime.add_pattern(queue_pattern)
# Safety condition: overheating
overheating = Condition(lambda: any(s.read() > 35.0 for s in temp_sensors.values()))
runtime.on(overheating, lambda: print("ALERT: Overheating detected!"))
# Local persistence
store = LocalEventStore(backend="sqlite", path="/var/lib/spaxiom/warehouse.db")
runtime.set_event_store(store)
# Local dashboard
dashboard = WebDashboard(runtime=runtime, port=8080)
dashboard.start()
# Run indefinitely on-prem
runtime.run() # No cloud, no internet, fully autonomous
This example demonstrates a completely self-contained deployment: sensors → Spaxiom runtime → local storage → local UI, all on a single Raspberry Pi 4 or industrial PC.
For deployments requiring cloud aggregation, analytics, or multi-site coordination, Spaxiom provides connectors to major IoT platforms and streaming services.
Publish Spaxiom events to MQTT brokers (AWS IoT Core, Azure IoT Hub, Eclipse Mosquitto):
from spaxiom.connectors import MQTTBridge
# AWS IoT Core example
bridge = MQTTBridge(
broker="a1b2c3d4e5f6g7.iot.us-west-2.amazonaws.com",
port=8883,
client_cert="/path/to/device-cert.pem",
client_key="/path/to/device-key.pem",
ca_cert="/path/to/AmazonRootCA1.pem",
topic_prefix="spaxiom/site-42"
)
runtime.add_connector(bridge)
# Now all events auto-publish to MQTT topic:
# spaxiom/site-42/DoorOpened, spaxiom/site-42/QueueFormed, etc.
For high-throughput event streaming to data lakes or real-time analytics:
from spaxiom.connectors import KafkaConnector
kafka = KafkaConnector(
bootstrap_servers=["kafka1.example.com:9092", "kafka2.example.com:9092"],
topic="spaxiom-events",
compression="gzip",
acks="all" # Strong durability
)
runtime.add_connector(kafka)
# Events now stream to Kafka for consumption by Flink, Spark, etc.
Push events to HTTP endpoints (Zapier, n8n, custom services):
from spaxiom.connectors import WebhookConnector
webhook = WebhookConnector(
url="https://api.example.com/spaxiom/events",
method="POST",
headers={"Authorization": "Bearer YOUR_TOKEN"},
batch_size=10, # Send 10 events per request
retry_policy={"max_retries": 3, "backoff": "exponential"}
)
runtime.add_connector(webhook)
Direct ingestion to InfluxDB, Prometheus, TimescaleDB for monitoring dashboards:
from spaxiom.connectors import InfluxDBConnector
influx = InfluxDBConnector(
url="http://localhost:8086",
token="YOUR_INFLUX_TOKEN",
org="my-org",
bucket="spaxiom-events"
)
runtime.add_connector(influx)
# Query in Grafana: SELECT * FROM "DoorOpened" WHERE time > now() - 1h
Batch upload events to S3, Azure Blob, GCS for long-term storage and offline training:
from spaxiom.connectors import S3Connector
s3 = S3Connector(
bucket="spaxiom-events-archive",
region="us-west-2",
prefix="site-42/year=2025/month=01",
format="parquet", # Or "jsonl", "csv"
upload_interval=3600 # Upload every hour
)
runtime.add_connector(s3)
Many industrial and commercial sensors use proprietary protocols. Spaxiom's adapter framework lets you write thin translation layers:
from spaxiom.adapters import SensorAdapter
class CustomProtocolAdapter(SensorAdapter):
def __init__(self, device_id: str, api_key: str):
self.device_id = device_id
self.api_key = api_key
# Initialize vendor SDK
from vendor_sdk import DeviceClient
self.client = DeviceClient(device_id, api_key)
def read(self) -> float:
# Poll vendor API
response = self.client.get_latest_reading()
return response.value
def health_check(self) -> bool:
return self.client.is_connected()
# Use it like any other sensor
sensor = Sensor(name="custom_temp", adapter=CustomProtocolAdapter("DEV-123", "key"))
runtime.add_sensor(sensor)
Spaxiom includes pre-built adapters for:
Large deployments often follow a hub-and-spoke pattern: edge runtimes at each site emit events to regional or global aggregators.
Configure edge runtime to forward selected events to cloud:
from spaxiom.connectors import CloudForwarder
# Only forward high-priority events to cloud
forwarder = CloudForwarder(
backend="mqtt", # Or "kafka", "https"
broker="cloud.example.com",
event_filter=lambda evt: evt["priority"] in ["high", "critical"]
)
runtime.add_connector(forwarder)
# Low-priority events (e.g., routine occupancy) stay local
# High-priority events (e.g., safety violations) forwarded to cloud
Cloud can push updated policies, thresholds, or learned models back to edge:
from spaxiom.connectors import PolicySubscriber
subscriber = PolicySubscriber(
broker="cloud.example.com",
topic="spaxiom/site-42/policies"
)
# When cloud publishes new policy, runtime hot-reloads
def on_policy_update(policy):
runtime.update_thresholds(policy["thresholds"])
runtime.reload_patterns(policy["patterns"])
subscriber.on_message(on_policy_update)
runtime.add_connector(subscriber)
Spaxiom events serve as training data and real-time features for ML models.
Export events as features for online/offline ML:
from spaxiom.ml import FeatureExporter
exporter = FeatureExporter(
backend="feast",
feature_repo="/path/to/feast/repo",
entity_key="zone_id"
)
# Define feature transformations
exporter.register_feature(
name="occupancy_rolling_avg",
event_type="OccupancyChanged",
aggregation="mean",
window="30m"
)
runtime.add_connector(exporter)
Trigger model inference on specific events:
from spaxiom.ml import ModelInvoker
# When queue forms, invoke predictive model
invoker = ModelInvoker(
endpoint="https://api.example.com/predict/queue-wait-time",
trigger_event="QueueFormed"
)
def on_prediction(event, prediction):
if prediction["wait_time_minutes"] > 15:
print(f"Alert: Long queue predicted in {event['zone']}")
invoker.on_response(on_prediction)
runtime.add_connector(invoker)
For rapid development and troubleshooting, Spaxiom provides:
Test patterns with synthetic sensor data (no hardware required):
from spaxiom.sim import SimulatedSensor
# Generate synthetic temperature oscillations
temp_sim = SimulatedSensor(
name="sim_temp",
generator=lambda t: 20 + 5 * math.sin(t / 60.0), # Oscillates 20-25°C
sample_rate=1.0
)
runtime.add_sensor(temp_sim)
# Run in fast-forward for testing
runtime.run(speed_multiplier=100.0) # 100x real-time
Record events from production, replay for debugging:
from spaxiom.replay import EventRecorder, EventReplayer
# Record 1 hour of production events
recorder = EventRecorder(path="/tmp/events.jsonl")
runtime.add_connector(recorder)
runtime.run(duration=3600)
# Later: replay for debugging
replayer = EventReplayer(path="/tmp/events.jsonl")
runtime = SpaxiomRuntime()
replayer.attach(runtime)
runtime.run() # Replays exact sequence from production
Identify bottlenecks in patterns or callbacks:
from spaxiom.profiling import Profiler
profiler = Profiler(runtime)
profiler.start()
runtime.run(duration=300) # Run for 5 minutes
profiler.stop()
profiler.report() # Shows per-pattern latency, callback duration, etc.
Spaxiom's integration architecture supports a wide spectrum of deployments:
The key insight: Spaxiom abstracts sensor heterogeneity. Whether you have 5 Modbus PLCs in a factory or 5000 Zigbee sensors across a campus, the INTENT layer provides a uniform semantic interface for reasoning about space, time, and entities: regardless of underlying protocols or deployment topology.