Spaxiom Logo
Spaxiom Technical Series - Part 5

Integration Patterns and Ecosystem Connectors

From Standalone On-Premises to Cloud-Native Deployments

Joe Scanlin

November 2025

About This Section

This section describes Spaxiom's integration architecture, which supports flexible deployment across a spectrum from fully standalone on-premises systems to cloud-integrated hybrid architectures. Whether you're running on a single Raspberry Pi in a factory or coordinating 10,000 sites through cloud aggregators, Spaxiom provides the connectors you need.

You'll learn about direct sensor onboarding, local data persistence, cloud platform integrations (MQTT, Kafka, webhooks), protocol adapters for proprietary sensors, multi-site orchestration, AI/ML platform integration, and developer tools for simulation and debugging.

2.6 Integration Patterns and Ecosystem Connectors

Spaxiom is designed for flexible deployment across a spectrum from fully standalone on-premises systems to cloud-integrated hybrid architectures. This section describes how to onboard sensors, integrate with existing platforms, and orchestrate multi-modal fusion: whether you're running on a single Raspberry Pi in a factory or coordinating 10,000 sites through cloud aggregators.

Standalone on-premises deployment: zero cloud dependencies

Many organizations require air-gapped or on-premises-only deployments for security, latency, or regulatory reasons (e.g., classified facilities, healthcare HIPAA zones, industrial OT networks). Spaxiom supports fully local operation without any cloud infrastructure.

Direct sensor onboarding

Spaxiom provides protocol adapters for common sensor interfaces. You can connect sensors directly to the edge runtime via:

Example: Onboard a Modbus temperature sensor and a USB-connected PIR motion detector:

from spaxiom import Sensor
from spaxiom.adapters import ModbusSensor, SerialSensor

# Modbus RTU temperature sensor on /dev/ttyUSB0
temp_sensor = ModbusSensor(
    port="/dev/ttyUSB0",
    baudrate=9600,
    slave_id=1,
    register=0x0000,
    data_type="float32",
    unit="celsius"
)

# USB serial PIR sensor (sends "1" for motion, "0" for idle)
pir_sensor = SerialSensor(
    port="/dev/ttyUSB1",
    baudrate=115200,
    parser=lambda line: int(line.strip()) == 1,
    data_type="boolean"
)

# Wrap in Spaxiom Sensor abstraction
temp = Sensor(name="zone_temp", adapter=temp_sensor, sample_rate=1.0)
motion = Sensor(name="zone_motion", adapter=pir_sensor, sample_rate=10.0)

# Now use in patterns and conditions
from spaxiom import Condition
overheating = Condition(lambda: temp.read() > 30.0)
occupied = Condition(lambda: motion.read())

Local data persistence

For on-prem deployments, Spaxiom can store events locally using:

from spaxiom.storage import LocalEventStore

# SQLite database in /var/lib/spaxiom/events.db
store = LocalEventStore(backend="sqlite", path="/var/lib/spaxiom/events.db")

# Configure runtime to persist events locally
runtime.set_event_store(store)

# Query recent events
recent = store.query(event_type="DoorOpened", since="2025-01-01", limit=100)

Local web dashboard

Spaxiom includes an optional web UI (FastAPI + React) that runs entirely on the edge device:

from spaxiom.ui import WebDashboard

dashboard = WebDashboard(runtime=runtime, port=8080)
dashboard.start()  # Now accessible at http://localhost:8080

The dashboard provides:

Zero-cloud orchestration example

Here's a complete standalone deployment for a warehouse with 20 sensors, no internet connection:

from spaxiom import SpaxiomRuntime, Sensor, Zone, Condition
from spaxiom.adapters import ModbusSensor, CameraSensor, GPIOSensor
from spaxiom.intent import OccupancyField, QueueFlow
from spaxiom.storage import LocalEventStore
from spaxiom.ui import WebDashboard

# Initialize runtime
runtime = SpaxiomRuntime(tick_rate=10.0)

# Onboard 20 Modbus temperature sensors
zones = ["loading", "staging", "storage_a", "storage_b"]
temp_sensors = {}
for i, zone_name in enumerate(zones):
    sensor = ModbusSensor(port="/dev/ttyUSB0", slave_id=i+1, register=0x0000)
    temp_sensors[zone_name] = Sensor(f"{zone_name}_temp", adapter=sensor)
    runtime.add_sensor(temp_sensors[zone_name])

# Onboard 4 ceiling cameras for occupancy
cameras = {}
for zone_name in zones:
    cam = CameraSensor(rtsp_url=f"rtsp://192.168.1.{10+zones.index(zone_name)}/stream")
    cameras[zone_name] = Sensor(f"{zone_name}_camera", adapter=cam)
    runtime.add_sensor(cameras[zone_name])

# Onboard 4 door sensors (GPIO on Raspberry Pi)
door_sensors = {}
for i, zone_name in enumerate(zones):
    gpio = GPIOSensor(pin=17+i, mode="input")
    door_sensors[zone_name] = Sensor(f"{zone_name}_door", adapter=gpio)
    runtime.add_sensor(door_sensors[zone_name])

# Define zones
zone_objs = {name: Zone(name=name, x=i*10, y=0, width=10, height=10)
             for i, name in enumerate(zones)}

# Create INTENT patterns
occupancy_patterns = {}
for zone_name in zones:
    pattern = OccupancyField(
        zone=zone_objs[zone_name],
        camera=cameras[zone_name],
        count_threshold=5
    )
    occupancy_patterns[zone_name] = pattern
    runtime.add_pattern(pattern)

# Queue flow for loading dock
queue_pattern = QueueFlow(
    entry_zone=zone_objs["loading"],
    camera=cameras["loading"],
    max_wait_time=300.0  # 5 minutes
)
runtime.add_pattern(queue_pattern)

# Safety condition: overheating
overheating = Condition(lambda: any(s.read() > 35.0 for s in temp_sensors.values()))
runtime.on(overheating, lambda: print("ALERT: Overheating detected!"))

# Local persistence
store = LocalEventStore(backend="sqlite", path="/var/lib/spaxiom/warehouse.db")
runtime.set_event_store(store)

# Local dashboard
dashboard = WebDashboard(runtime=runtime, port=8080)
dashboard.start()

# Run indefinitely on-prem
runtime.run()  # No cloud, no internet, fully autonomous

This example demonstrates a completely self-contained deployment: sensors → Spaxiom runtime → local storage → local UI, all on a single Raspberry Pi 4 or industrial PC.

Cloud platform integrations

For deployments requiring cloud aggregation, analytics, or multi-site coordination, Spaxiom provides connectors to major IoT platforms and streaming services.

MQTT bridges

Publish Spaxiom events to MQTT brokers (AWS IoT Core, Azure IoT Hub, Eclipse Mosquitto):

from spaxiom.connectors import MQTTBridge

# AWS IoT Core example
bridge = MQTTBridge(
    broker="a1b2c3d4e5f6g7.iot.us-west-2.amazonaws.com",
    port=8883,
    client_cert="/path/to/device-cert.pem",
    client_key="/path/to/device-key.pem",
    ca_cert="/path/to/AmazonRootCA1.pem",
    topic_prefix="spaxiom/site-42"
)

runtime.add_connector(bridge)

# Now all events auto-publish to MQTT topic:
# spaxiom/site-42/DoorOpened, spaxiom/site-42/QueueFormed, etc.

Kafka / streaming platforms

For high-throughput event streaming to data lakes or real-time analytics:

from spaxiom.connectors import KafkaConnector

kafka = KafkaConnector(
    bootstrap_servers=["kafka1.example.com:9092", "kafka2.example.com:9092"],
    topic="spaxiom-events",
    compression="gzip",
    acks="all"  # Strong durability
)

runtime.add_connector(kafka)

# Events now stream to Kafka for consumption by Flink, Spark, etc.

REST/GraphQL webhooks

Push events to HTTP endpoints (Zapier, n8n, custom services):

from spaxiom.connectors import WebhookConnector

webhook = WebhookConnector(
    url="https://api.example.com/spaxiom/events",
    method="POST",
    headers={"Authorization": "Bearer YOUR_TOKEN"},
    batch_size=10,  # Send 10 events per request
    retry_policy={"max_retries": 3, "backoff": "exponential"}
)

runtime.add_connector(webhook)

Time-series databases

Direct ingestion to InfluxDB, Prometheus, TimescaleDB for monitoring dashboards:

from spaxiom.connectors import InfluxDBConnector

influx = InfluxDBConnector(
    url="http://localhost:8086",
    token="YOUR_INFLUX_TOKEN",
    org="my-org",
    bucket="spaxiom-events"
)

runtime.add_connector(influx)

# Query in Grafana: SELECT * FROM "DoorOpened" WHERE time > now() - 1h

Cloud storage for archival

Batch upload events to S3, Azure Blob, GCS for long-term storage and offline training:

from spaxiom.connectors import S3Connector

s3 = S3Connector(
    bucket="spaxiom-events-archive",
    region="us-west-2",
    prefix="site-42/year=2025/month=01",
    format="parquet",  # Or "jsonl", "csv"
    upload_interval=3600  # Upload every hour
)

runtime.add_connector(s3)

Protocol adapters: bridging proprietary sensors

Many industrial and commercial sensors use proprietary protocols. Spaxiom's adapter framework lets you write thin translation layers:

from spaxiom.adapters import SensorAdapter

class CustomProtocolAdapter(SensorAdapter):
    def __init__(self, device_id: str, api_key: str):
        self.device_id = device_id
        self.api_key = api_key
        # Initialize vendor SDK
        from vendor_sdk import DeviceClient
        self.client = DeviceClient(device_id, api_key)

    def read(self) -> float:
        # Poll vendor API
        response = self.client.get_latest_reading()
        return response.value

    def health_check(self) -> bool:
        return self.client.is_connected()

# Use it like any other sensor
sensor = Sensor(name="custom_temp", adapter=CustomProtocolAdapter("DEV-123", "key"))
runtime.add_sensor(sensor)

Spaxiom includes pre-built adapters for:

Multi-site orchestration: from edge to cloud

Large deployments often follow a hub-and-spoke pattern: edge runtimes at each site emit events to regional or global aggregators.

Edge-to-cloud event forwarding

Configure edge runtime to forward selected events to cloud:

from spaxiom.connectors import CloudForwarder

# Only forward high-priority events to cloud
forwarder = CloudForwarder(
    backend="mqtt",  # Or "kafka", "https"
    broker="cloud.example.com",
    event_filter=lambda evt: evt["priority"] in ["high", "critical"]
)

runtime.add_connector(forwarder)

# Low-priority events (e.g., routine occupancy) stay local
# High-priority events (e.g., safety violations) forwarded to cloud

Bidirectional policy updates

Cloud can push updated policies, thresholds, or learned models back to edge:

from spaxiom.connectors import PolicySubscriber

subscriber = PolicySubscriber(
    broker="cloud.example.com",
    topic="spaxiom/site-42/policies"
)

# When cloud publishes new policy, runtime hot-reloads
def on_policy_update(policy):
    runtime.update_thresholds(policy["thresholds"])
    runtime.reload_patterns(policy["patterns"])

subscriber.on_message(on_policy_update)
runtime.add_connector(subscriber)

Integration with AI/ML platforms

Spaxiom events serve as training data and real-time features for ML models.

Feature pipelines (Tecton, Feast)

Export events as features for online/offline ML:

from spaxiom.ml import FeatureExporter

exporter = FeatureExporter(
    backend="feast",
    feature_repo="/path/to/feast/repo",
    entity_key="zone_id"
)

# Define feature transformations
exporter.register_feature(
    name="occupancy_rolling_avg",
    event_type="OccupancyChanged",
    aggregation="mean",
    window="30m"
)

runtime.add_connector(exporter)

Online inference serving

Trigger model inference on specific events:

from spaxiom.ml import ModelInvoker

# When queue forms, invoke predictive model
invoker = ModelInvoker(
    endpoint="https://api.example.com/predict/queue-wait-time",
    trigger_event="QueueFormed"
)

def on_prediction(event, prediction):
    if prediction["wait_time_minutes"] > 15:
        print(f"Alert: Long queue predicted in {event['zone']}")

invoker.on_response(on_prediction)
runtime.add_connector(invoker)

Developer tools and debugging

For rapid development and troubleshooting, Spaxiom provides:

Simulation mode

Test patterns with synthetic sensor data (no hardware required):

from spaxiom.sim import SimulatedSensor

# Generate synthetic temperature oscillations
temp_sim = SimulatedSensor(
    name="sim_temp",
    generator=lambda t: 20 + 5 * math.sin(t / 60.0),  # Oscillates 20-25°C
    sample_rate=1.0
)

runtime.add_sensor(temp_sim)

# Run in fast-forward for testing
runtime.run(speed_multiplier=100.0)  # 100x real-time

Event replay

Record events from production, replay for debugging:

from spaxiom.replay import EventRecorder, EventReplayer

# Record 1 hour of production events
recorder = EventRecorder(path="/tmp/events.jsonl")
runtime.add_connector(recorder)
runtime.run(duration=3600)

# Later: replay for debugging
replayer = EventReplayer(path="/tmp/events.jsonl")
runtime = SpaxiomRuntime()
replayer.attach(runtime)
runtime.run()  # Replays exact sequence from production

Performance profiling

Identify bottlenecks in patterns or callbacks:

from spaxiom.profiling import Profiler

profiler = Profiler(runtime)
profiler.start()

runtime.run(duration=300)  # Run for 5 minutes

profiler.stop()
profiler.report()  # Shows per-pattern latency, callback duration, etc.

Summary: deployment flexibility

Spaxiom's integration architecture supports a wide spectrum of deployments:

The key insight: Spaxiom abstracts sensor heterogeneity. Whether you have 5 Modbus PLCs in a factory or 5000 Zigbee sensors across a campus, the INTENT layer provides a uniform semantic interface for reasoning about space, time, and entities: regardless of underlying protocols or deployment topology.