Channels
Typed communication channels for inter-step and inter-process messaging.
Quick Example
import { z } from 'zod';
import { channel, step } from '@noetic/core';
const notifications = channel('notifications', {
schema: z.object({ message: z.string(), level: z.enum(['info', 'warn', 'error']) }),
mode: 'queue',
capacity: 100,
});
const producer = step.run('produce', async (input: string, ctx) => {
ctx.send(notifications, { message: input, level: 'info' });
return input;
});
const consumer = step.run('consume', async (_input: unknown, ctx) => {
const note = await ctx.recv(notifications, { timeout: 5_000 });
console.log(`[${note.level}] ${note.message}`);
return note;
});Channels provide typed, schema-validated communication between steps, between loop iterations, and between your application and a running agent. They replace ad-hoc state mutation with explicit message passing.
Creating Channels
Use the channel() builder to define a channel. Every channel has a name, a Zod schema, and a mode:
import { z } from 'zod';
import { channel } from '@noetic/core';
const progress = channel('progress', {
schema: z.number().min(0).max(100),
mode: 'value',
});Builder Options
| Option | Type | Required | Description |
|---|---|---|---|
schema | ZodType<T> | Yes | Zod schema that validates every value sent through the channel. |
mode | 'value' | 'queue' | 'topic' | Yes | Delivery semantics (see below). |
capacity | number | No | Maximum buffer size for queue mode. Defaults to 1000. |
external | boolean | No | When true, creates an ExternalChannel that outside code can write to. |
Channel Modes
Value Mode
Holds the latest value. Readers always get the most recent write. If no value has been sent yet, recv blocks until one arrives.
const config = channel('config', {
schema: z.object({ temperature: z.number() }),
mode: 'value',
});
// Writer sets the value
ctx.send(config, { temperature: 0.7 });
// Any reader gets the latest value immediately
const current = ctx.tryRecv(config);
// { temperature: 0.7 }Best for: shared configuration, latest-status indicators, feature flags.
Queue Mode
FIFO buffer. Each value is consumed by exactly one reader. Values accumulate up to capacity; when full, new messages are dropped (with a warning). If the queue is empty, recv blocks until a value arrives.
const tasks = channel('tasks', {
schema: z.object({ url: z.string().url() }),
mode: 'queue',
capacity: 50,
});
// Producer adds work
ctx.send(tasks, { url: 'https://example.com/page1' });
ctx.send(tasks, { url: 'https://example.com/page2' });
// Consumer takes one item at a time
const task = await ctx.recv(tasks);
// { url: 'https://example.com/page1' }Best for: work queues, task distribution, ordered pipelines.
Topic Mode
Pub/sub broadcast. Every recv call that is waiting at the time of send receives the value. Values are not buffered -- if nobody is listening, the message is lost. tryRecv always returns null in topic mode.
const events = channel('events', {
schema: z.object({ type: z.string(), payload: z.unknown() }),
mode: 'topic',
});
// Multiple listeners can await simultaneously
const listener1 = ctx.recv(events, { timeout: 10_000 });
const listener2 = ctx.recv(events, { timeout: 10_000 });
// Both receive the same value
ctx.send(events, { type: 'update', payload: { status: 'done' } });Best for: event broadcasting, fan-out notifications, real-time updates.
Channel Interface
interface Channel<T> {
readonly name: string;
readonly schema: ZodType<T>;
readonly mode: 'value' | 'queue' | 'topic';
readonly capacity?: number;
}Sending and Receiving
Channels are accessed through the Context or Runtime:
From a Step (via Context)
| Method | Signature | Description |
|---|---|---|
ctx.send | send<T>(channel: Channel<T>, value: T): void | Send a value. Synchronous. |
ctx.recv | recv<T>(channel: Channel<T>, opts?: { timeout?: number }): Promise<T> | Wait for a value. Throws channel_timeout on expiry (default 30s). |
ctx.tryRecv | tryRecv<T>(channel: Channel<T>): T | null | Non-blocking read. Returns null if nothing is available. |
From the Runtime
The runtime provides the same three methods, taking an additional ctx parameter:
runtime.send(channel, value, ctx);
const val = await runtime.recv(channel, ctx, { timeout: 5_000 });
const maybe = runtime.tryRecv(channel, ctx);External Channels
External channels let code outside the agent (your application, an API handler, a UI) push values into a running execution.
const userInput = channel('user-input', {
schema: z.string(),
mode: 'queue',
external: true,
});The external: true option changes the return type to ExternalChannel<T>, which enables getChannelHandle:
ChannelHandle
interface ChannelHandle<T> {
send(value: T): void;
readonly closed: boolean;
readonly channel: Channel<T>;
}Get a handle from the runtime using the execution ID:
const handle = runtime.getChannelHandle(userInput, ctx.id);
// From your API handler, UI callback, etc:
handle.send('What is the weather in Tokyo?');
// Check if the execution is still running
if (handle.closed) {
console.log('Execution has ended');
}| Property / Method | Description |
|---|---|
send(value) | Push a value into the channel. Throws channel_closed if the execution has ended. |
closed | Whether the execution associated with this handle has been closed. |
channel | Reference back to the channel definition. |
External Channel Example
A typical human-in-the-loop pattern:
import { z } from 'zod';
import { channel, step, loop, until } from '@noetic/core';
const approval = channel('approval', {
schema: z.enum(['approve', 'reject']),
mode: 'queue',
external: true,
});
const askForApproval = step.run('ask-approval', async (plan: string, ctx) => {
// Send plan to user via your app layer...
const decision = await ctx.recv(approval, { timeout: 300_000 }); // 5 min
return decision;
});
// Meanwhile, from your server:
const handle = runtime.getChannelHandle(approval, ctx.id);
handle.send('approve'); // Unblocks the stepSchema Validation
Every value sent through a channel is validated against its Zod schema at runtime. This catches type mismatches, malformed data, and contract violations early:
const scores = channel('scores', {
schema: z.number().int().min(0).max(100),
mode: 'queue',
});
ctx.send(scores, 85); // OK
ctx.send(scores, 101); // Throws ZodError: Number must be <= 100
ctx.send(scores, 'high'); // Throws ZodError: Expected number, received stringMode Comparison
| Behavior | Value | Queue | Topic |
|---|---|---|---|
| Buffered | Latest only | Up to capacity | Not buffered |
| Multiple readers | All get same value | Each gets unique item | All waiting readers get copy |
tryRecv when empty | null | null | Always null |
| Backpressure | Overwrites | Drops at capacity | Drops if no listeners |
Related Pages
- Context & Event Log --
recv,send, andtryRecvon the context. - Runtime --
getChannelHandlefor external channels. - Error Model --
channel_timeoutandchannel_closederrors.