Data Source Plugins
Connect live subscription databases to your Hypen UI with the data source plugin system
Data Source Plugins
Data source plugins connect live subscription databases — like SpacetimeDB, Firebase, Convex, or Electric SQL — to your Hypen UI. Data flows one way: the database pushes updates into the engine, and the engine reactively re-renders bound nodes.
How It Works
Database → Plugin.onChange() → DataSourceManager → engine.setContext() → re-render- A plugin connects to a database and subscribes to tables
- When rows change, the plugin calls
onChange()with the changed paths and values - The
DataSourceManagermerges sparse updates and pushes the full state viasetContext() - The engine marks all nodes bound to
$provider.*as dirty and re-renders them
Using a Plugin
Register a plugin in your module definition with .useDataSource():
import { app, hypen, state } from "@hypen-space/core";
import { SpacetimeDBPlugin } from "@hypen-space/plugin-spacetimedb";
export default app
.defineState({ messageText: "" })
.useDataSource(new SpacetimeDBPlugin(), {
uri: "ws://localhost:3000",
moduleName: "chat",
tables: ["user", "message"],
})
.onAction("sendMessage", async ({ state, dataSources }) => {
await dataSources.spacetime.sendMessage(state.messageText);
state.messageText = "";
})
.ui(hypen`
Column {
ForEach(items: $spacetime.message, key: "id") {
Row {
Text("${item.sender}")
.fontWeight("bold")
Text("${item.text}")
}
}
Row {
Input(placeholder: "Type a message...")
.bind(@state.messageText)
Button { Text("Send") }
.onClick(@actions.sendMessage)
}
}
`);DSL Binding Syntax
Data source bindings use $provider.path syntax in Hypen DSL:
// Bind to a table
ForEach(items: $spacetime.message, key: "id") {
Text("${item.text}")
}
// Bind to a nested path
Text("Status: $firebase.user.status")
// Use in conditionals
When(value: $spacetime.connectionStatus) {
Case(match: "connected") { Text("Online") }
Case(match: "connecting") { Spinner() }
Else { Text("Offline") }
}
// Use in If conditions
If(condition: $spacetime.connected) {
Text("Connected to database")
}The $ prefix distinguishes data source bindings from ${state.*} template bindings and @state.* references.
Calling Mutations
In action handlers, access registered plugins via the dataSources parameter. Method calls are forwarded directly to the plugin's call() method:
.onAction("sendMessage", async ({ state, dataSources }) => {
// Call a SpacetimeDB reducer
await dataSources.spacetime.sendMessage(state.messageText);
// Call a Firebase function
await dataSources.firebase.updateProfile({ name: "Alice" });
})This works because each plugin is wrapped in a Proxy — dataSources.spacetime.sendMessage(text) becomes plugin.call("sendMessage", text) under the hood. You can also use .call() directly if you prefer.
Each plugin is keyed by its name property (e.g., "spacetime", "firebase").
Writing a Plugin
A data source plugin implements the DataSourcePlugin<TConfig> interface:
import type {
DataSourcePlugin,
DataSourceStatus,
DataSourceQuery,
DataSourceSubscription,
DataSourceChange,
} from "@hypen-space/core";The Interface
interface DataSourcePlugin<TConfig = unknown> {
/** Unique provider name (e.g., "spacetime", "firebase") */
readonly name: string;
/** Current connection status */
status: DataSourceStatus;
/** Connect to the data source and begin pushing changes */
connect(
config: TConfig,
onChange: (change: DataSourceChange) => void
): Promise<void>;
/** Subscribe to a table or query */
subscribe(query: DataSourceQuery): DataSourceSubscription;
/** Call a remote procedure / reducer / mutation */
call(method: string, ...args: unknown[]): Promise<unknown>;
/** Disconnect and clean up all subscriptions */
disconnect(): Promise<void>;
}Status Values
type DataSourceStatus =
| "disconnected" // Not connected
| "connecting" // Connection in progress
| "connected" // Active and receiving updates
| "reconnecting" // Temporarily lost, attempting to reconnect
| "error"; // Connection failedPushing Changes
The onChange callback is how your plugin pushes data into the Hypen engine. Each call provides the changed paths and their new values:
interface DataSourceChange {
/** Changed paths relative to the provider root */
paths: string[];
/** New values at those paths */
values: Record<string, unknown>;
}For example, when a SpacetimeDB table updates:
onChange({
paths: ["message"],
values: {
message: [
{ id: 1, text: "Hello", sender: "Alice" },
{ id: 2, text: "World", sender: "Bob" },
],
},
});The DataSourceManager accumulates these sparse updates into a single state object per provider, then pushes the full merged state to the engine via setContext(). This means you can push individual table updates without worrying about losing other tables' data.
Example: Minimal Plugin
Here's a minimal plugin that polls a REST API:
import type {
DataSourcePlugin,
DataSourceStatus,
DataSourceQuery,
DataSourceSubscription,
DataSourceChange,
} from "@hypen-space/core";
interface RestPollingConfig {
baseUrl: string;
tables: string[];
intervalMs?: number;
}
export class RestPollingPlugin implements DataSourcePlugin<RestPollingConfig> {
readonly name = "rest";
status: DataSourceStatus = "disconnected";
private intervalIds: number[] = [];
private onChange?: (change: DataSourceChange) => void;
async connect(
config: RestPollingConfig,
onChange: (change: DataSourceChange) => void
): Promise<void> {
this.onChange = onChange;
this.status = "connecting";
const interval = config.intervalMs ?? 5000;
for (const table of config.tables) {
const id = setInterval(async () => {
try {
const res = await fetch(`${config.baseUrl}/${table}`);
const data = await res.json();
onChange({
paths: [table],
values: { [table]: data },
});
} catch {
// Silently retry on next interval
}
}, interval);
this.intervalIds.push(id as unknown as number);
// Initial fetch
try {
const res = await fetch(`${config.baseUrl}/${table}`);
const data = await res.json();
onChange({
paths: [table],
values: { [table]: data },
});
} catch {
// Will retry on interval
}
}
this.status = "connected";
}
subscribe(query: DataSourceQuery): DataSourceSubscription {
return { unsubscribe: () => {}, status: "applied" };
}
async call(method: string, ...args: unknown[]): Promise<unknown> {
const res = await fetch(`${this.baseUrl}/${method}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(args),
});
return res.json();
}
private baseUrl = "";
async disconnect(): Promise<void> {
for (const id of this.intervalIds) {
clearInterval(id);
}
this.intervalIds = [];
this.status = "disconnected";
}
}Example: WebSocket Plugin
A more realistic plugin using WebSocket subscriptions:
import type {
DataSourcePlugin,
DataSourceStatus,
DataSourceQuery,
DataSourceSubscription,
DataSourceChange,
} from "@hypen-space/core";
interface WsConfig {
url: string;
tables: string[];
}
export class WebSocketPlugin implements DataSourcePlugin<WsConfig> {
readonly name = "ws";
status: DataSourceStatus = "disconnected";
private ws?: WebSocket;
private onChange?: (change: DataSourceChange) => void;
private subscriptions = new Map<string, () => void>();
async connect(config: WsConfig, onChange: (change: DataSourceChange) => void): Promise<void> {
this.onChange = onChange;
this.status = "connecting";
return new Promise((resolve, reject) => {
this.ws = new WebSocket(config.url);
this.ws.onopen = () => {
this.status = "connected";
// Subscribe to configured tables
for (const table of config.tables) {
this.ws!.send(JSON.stringify({ type: "subscribe", table }));
}
resolve();
};
this.ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === "update" && msg.table) {
onChange({
paths: [msg.table],
values: { [msg.table]: msg.rows },
});
}
};
this.ws.onclose = () => {
this.status = "disconnected";
};
this.ws.onerror = () => {
this.status = "error";
reject(new Error("WebSocket connection failed"));
};
});
}
subscribe(query: DataSourceQuery): DataSourceSubscription {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: "subscribe", table: query.table, filter: query.filter }));
}
const unsub = () => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: "unsubscribe", table: query.table }));
}
};
this.subscriptions.set(query.table, unsub);
return { unsubscribe: unsub, status: "applied" };
}
async call(method: string, ...args: unknown[]): Promise<unknown> {
return new Promise((resolve) => {
const id = crypto.randomUUID();
const handler = (event: MessageEvent) => {
const msg = JSON.parse(event.data);
if (msg.type === "response" && msg.id === id) {
this.ws!.removeEventListener("message", handler);
resolve(msg.result);
}
};
this.ws!.addEventListener("message", handler);
this.ws!.send(JSON.stringify({ type: "call", id, method, args }));
});
}
async disconnect(): Promise<void> {
for (const unsub of this.subscriptions.values()) {
unsub();
}
this.subscriptions.clear();
this.ws?.close();
this.ws = undefined;
this.status = "disconnected";
}
}Publishing a Plugin
Data source plugins should be published as separate npm packages following the naming convention @hypen-space/plugin-{name} or hypen-plugin-{name}.
Package Structure
hypen-plugin-mydb/
├── src/
│ └── index.ts # Plugin implementation + config type export
├── package.json
├── tsconfig.json
└── README.mdpackage.json
{
"name": "@hypen-space/plugin-mydb",
"version": "0.1.0",
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"peerDependencies": {
"@hypen-space/core": ">=0.1.0"
},
"dependencies": {
"mydb-client-sdk": "^1.0.0"
}
}@hypen-space/core should be a peer dependency — the plugin imports types from it but shouldn't bundle its own copy.
Exports
Export both the plugin class and its config type:
export { MyDBPlugin } from "./plugin.js";
export type { MyDBConfig } from "./plugin.js";Architecture Details
Data Flow
┌─────────────┐ onChange() ┌──────────────────┐ setContext() ┌────────┐
│ Database │ ──────────────→ │ DataSourceManager │ ──────────────→ │ Engine │
│ (Plugin) │ │ (sparse merge) │ │ (WASM) │
└─────────────┘ └──────────────────┘ └────────┘
│
re-render dirty nodes
│
▼
┌──────────┐
│ Patches │
└──────────┘Dependency Tracking
When the engine encounters a $spacetime.messages binding in a template, it registers a dependency with the path ds:spacetime:messages. When setContext("spacetime", data) is called, the engine finds all nodes whose dependency paths match ds:spacetime:* and marks them as dirty for re-rendering.
If a provider hasn't been registered yet when a binding is encountered, the engine tracks it as an "unregistered provider reference" for debugging.
Sparse Merging
The DataSourceManager maintains an accumulated state per provider. When a plugin pushes a change for one table (e.g., messages), only that key is updated — other tables (e.g., users) keep their previous values. The full merged state is then passed to setContext().
// First update: messages table
onChange({ paths: ["messages"], values: { messages: [...] } });
// Manager state: { messages: [...] }
// Second update: users table
onChange({ paths: ["users"], values: { users: [...] } });
// Manager state: { messages: [...], users: [...] }
// Both tables are passed to setContext()Null Resolution
If a $provider.path binding can't be resolved (provider not registered, path doesn't exist, or context was removed), it resolves to null. This allows UIs to gracefully handle loading states:
When(value: $spacetime.messages) {
Case(match: null) { Text("Loading...") }
Else {
ForEach(items: $spacetime.messages, key: "id") {
Text("${item.text}")
}
}
}Claude Code Skill
A Claude Code skill is available to scaffold new data source plugins. Add this to your .claude/skills/ directory to enable the /create-datasource-plugin command:
See the skill file at .claude/skills/create-datasource-plugin.md or create it locally using the template in the next section.
Using the Skill
/create-datasource-plugin mydbThis scaffolds a new plugin package with:
- The
DataSourcePlugininterface implementation - Config type definition
- Package.json with correct peer dependencies
- Module registration via
.useDataSource()