Channels

Typed communication channels for inter-step and inter-process messaging.

Quick Example

import { z } from 'zod';
import { channel, step } from '@noetic/core';

const notifications = channel('notifications', {
  schema: z.object({ message: z.string(), level: z.enum(['info', 'warn', 'error']) }),
  mode: 'queue',
  capacity: 100,
});

const producer = step.run('produce', async (input: string, ctx) => {
  ctx.send(notifications, { message: input, level: 'info' });
  return input;
});

const consumer = step.run('consume', async (_input: unknown, ctx) => {
  const note = await ctx.recv(notifications, { timeout: 5_000 });
  console.log(`[${note.level}] ${note.message}`);
  return note;
});

Channels provide typed, schema-validated communication between steps, between loop iterations, and between your application and a running agent. They replace ad-hoc state mutation with explicit message passing.

Creating Channels

Use the channel() builder to define a channel. Every channel has a name, a Zod schema, and a mode:

import { z } from 'zod';
import { channel } from '@noetic/core';

const progress = channel('progress', {
  schema: z.number().min(0).max(100),
  mode: 'value',
});

Builder Options

OptionTypeRequiredDescription
schemaZodType<T>YesZod schema that validates every value sent through the channel.
mode'value' | 'queue' | 'topic'YesDelivery semantics (see below).
capacitynumberNoMaximum buffer size for queue mode. Defaults to 1000.
externalbooleanNoWhen true, creates an ExternalChannel that outside code can write to.

Channel Modes

Value Mode

Holds the latest value. Readers always get the most recent write. If no value has been sent yet, recv blocks until one arrives.

const config = channel('config', {
  schema: z.object({ temperature: z.number() }),
  mode: 'value',
});

// Writer sets the value
ctx.send(config, { temperature: 0.7 });

// Any reader gets the latest value immediately
const current = ctx.tryRecv(config);
// { temperature: 0.7 }

Best for: shared configuration, latest-status indicators, feature flags.

Queue Mode

FIFO buffer. Each value is consumed by exactly one reader. Values accumulate up to capacity; when full, new messages are dropped (with a warning). If the queue is empty, recv blocks until a value arrives.

const tasks = channel('tasks', {
  schema: z.object({ url: z.string().url() }),
  mode: 'queue',
  capacity: 50,
});

// Producer adds work
ctx.send(tasks, { url: 'https://example.com/page1' });
ctx.send(tasks, { url: 'https://example.com/page2' });

// Consumer takes one item at a time
const task = await ctx.recv(tasks);
// { url: 'https://example.com/page1' }

Best for: work queues, task distribution, ordered pipelines.

Topic Mode

Pub/sub broadcast. Every recv call that is waiting at the time of send receives the value. Values are not buffered -- if nobody is listening, the message is lost. tryRecv always returns null in topic mode.

const events = channel('events', {
  schema: z.object({ type: z.string(), payload: z.unknown() }),
  mode: 'topic',
});

// Multiple listeners can await simultaneously
const listener1 = ctx.recv(events, { timeout: 10_000 });
const listener2 = ctx.recv(events, { timeout: 10_000 });

// Both receive the same value
ctx.send(events, { type: 'update', payload: { status: 'done' } });

Best for: event broadcasting, fan-out notifications, real-time updates.

Channel Interface

interface Channel<T> {
  readonly name: string;
  readonly schema: ZodType<T>;
  readonly mode: 'value' | 'queue' | 'topic';
  readonly capacity?: number;
}

Sending and Receiving

Channels are accessed through the Context or Runtime:

From a Step (via Context)

MethodSignatureDescription
ctx.sendsend<T>(channel: Channel<T>, value: T): voidSend a value. Synchronous.
ctx.recvrecv<T>(channel: Channel<T>, opts?: { timeout?: number }): Promise<T>Wait for a value. Throws channel_timeout on expiry (default 30s).
ctx.tryRecvtryRecv<T>(channel: Channel<T>): T | nullNon-blocking read. Returns null if nothing is available.

From the Runtime

The runtime provides the same three methods, taking an additional ctx parameter:

runtime.send(channel, value, ctx);
const val = await runtime.recv(channel, ctx, { timeout: 5_000 });
const maybe = runtime.tryRecv(channel, ctx);

External Channels

External channels let code outside the agent (your application, an API handler, a UI) push values into a running execution.

const userInput = channel('user-input', {
  schema: z.string(),
  mode: 'queue',
  external: true,
});

The external: true option changes the return type to ExternalChannel<T>, which enables getChannelHandle:

ChannelHandle

interface ChannelHandle<T> {
  send(value: T): void;
  readonly closed: boolean;
  readonly channel: Channel<T>;
}

Get a handle from the runtime using the execution ID:

const handle = runtime.getChannelHandle(userInput, ctx.id);

// From your API handler, UI callback, etc:
handle.send('What is the weather in Tokyo?');

// Check if the execution is still running
if (handle.closed) {
  console.log('Execution has ended');
}
Property / MethodDescription
send(value)Push a value into the channel. Throws channel_closed if the execution has ended.
closedWhether the execution associated with this handle has been closed.
channelReference back to the channel definition.

External Channel Example

A typical human-in-the-loop pattern:

import { z } from 'zod';
import { channel, step, loop, until } from '@noetic/core';

const approval = channel('approval', {
  schema: z.enum(['approve', 'reject']),
  mode: 'queue',
  external: true,
});

const askForApproval = step.run('ask-approval', async (plan: string, ctx) => {
  // Send plan to user via your app layer...
  const decision = await ctx.recv(approval, { timeout: 300_000 }); // 5 min
  return decision;
});

// Meanwhile, from your server:
const handle = runtime.getChannelHandle(approval, ctx.id);
handle.send('approve'); // Unblocks the step

Schema Validation

Every value sent through a channel is validated against its Zod schema at runtime. This catches type mismatches, malformed data, and contract violations early:

const scores = channel('scores', {
  schema: z.number().int().min(0).max(100),
  mode: 'queue',
});

ctx.send(scores, 85);    // OK
ctx.send(scores, 101);   // Throws ZodError: Number must be <= 100
ctx.send(scores, 'high'); // Throws ZodError: Expected number, received string

Mode Comparison

BehaviorValueQueueTopic
BufferedLatest onlyUp to capacityNot buffered
Multiple readersAll get same valueEach gets unique itemAll waiting readers get copy
tryRecv when emptynullnullAlways null
BackpressureOverwritesDrops at capacityDrops if no listeners

On this page