Streaming
Workflows can stream data in real-time to clients without waiting for the entire workflow to complete. This enables progress updates, AI-generated content, log messages, and other incremental data to be delivered as workflows execute.
Getting Started with getWritable()
Every workflow run has a default writable stream that steps can write to using getWritable(). Data written to this stream becomes immediately available to clients consuming the workflow's output.
import { getWritable } from 'workflow';
async function writeProgress(message: string) {
"use step";
// Steps can write to the run's default stream
const writable = getWritable<string>();
const writer = writable.getWriter();
await writer.write(message);
writer.releaseLock();
}
export async function simpleStreamingWorkflow() {
"use workflow";
await writeProgress("Starting task...");
await writeProgress("Processing data...");
await writeProgress("Task complete!");
}Consuming the Stream
Use the Run object's readable property to consume the stream from your API route:
import { start } from 'workflow/api';
import { simpleStreamingWorkflow } from './workflows/simple';
export async function POST() {
const run = await start(simpleStreamingWorkflow);
// Return the readable stream to the client
return new Response(run.readable, {
headers: { 'Content-Type': 'text/plain' }
});
}When a client makes a request to this endpoint, they'll receive each message as it's written, without waiting for the workflow to complete.
Resuming Streams from a Specific Point
Use run.getReadable({ startIndex }) to resume a stream from a specific position. This is useful for reconnecting after timeouts or network interruptions:
import { getRun } from 'workflow/api';
export async function GET(
request: Request,
{ params }: { params: Promise<{ runId: string }> }
) {
const { runId } = await params;
const { searchParams } = new URL(request.url);
// Client provides the last chunk index they received
const startIndexParam = searchParams.get('startIndex');
const startIndex = startIndexParam ? parseInt(startIndexParam, 10) : undefined;
const run = getRun(runId);
const stream = run.getReadable({ startIndex });
return new Response(stream, {
headers: { 'Content-Type': 'text/plain' }
});
}This allows clients to reconnect and continue receiving data from where they left off, rather than restarting from the beginning.
Streams as Data Types
ReadableStream and WritableStream are standard Web Streams API types that Workflow DevKit makes serializable. These are not custom types - they follow the web standard - but Workflow DevKit adds the ability to pass them between functions while maintaining their streaming capabilities.
Unlike regular values that are fully serialized to the event log, streams maintain their streaming capabilities when passed between functions.
Key properties:
- Stream references can be passed between workflow and step functions
- Stream data flows directly without being stored in the event log
- Streams preserve their state across workflow suspension points
How Streams Persist Across Workflow Suspensions
Streams in Workflow DevKit are backed by persistent, resumable storage provided by the "world" implementation. This is what enables streams to maintain their state even when workflows suspend and resume:
- Vercel deployments: Streams are backed by a performant Redis-based stream
- Local development: Stream chunks are stored in the filesystem
Passing Streams as Arguments
Since streams are serializable data types, you don't need to use the special getWritable(). You can even wire your own streams through workflows, passing them as arguments from outside into steps.
Here's an example of passing a request body stream through a workflow to a step that processes it:
import { start } from 'workflow/api';
import { streamProcessingWorkflow } from './workflows/streaming';
export async function POST(request: Request) {
// Streams can be passed as workflow arguments
const run = await start(streamProcessingWorkflow, [request.body]);
await run.result();
return Response.json({ status: 'complete' });
}export async function streamProcessingWorkflow(
inputStream: ReadableStream<Uint8Array>
) {
"use workflow";
// Workflow passes stream to step for processing
const result = await processInputStream(inputStream);
return { length: result.length };
}
async function processInputStream(input: ReadableStream<Uint8Array>) {
"use step";
// Step reads from the stream
const chunks: Uint8Array[] = [];
for await (const chunk of input) {
chunks.push(chunk);
}
return Buffer.concat(chunks).toString('utf8');
}Important Limitation
Streams Cannot Be Used Directly in Workflow Context
You cannot read from or write to streams directly within a workflow function. All stream operations must happen in step functions.
Workflow functions must be deterministic to support replay. Since streams bypass the event log for performance, reading stream data in a workflow would break determinism - each replay could see different data. By requiring all stream operations to happen in steps, the framework ensures consistent behavior.
For more on determinism and replay, see Workflows and Steps.
export async function badWorkflow() {
"use workflow";
const writable = getWritable<string>();
// Cannot read/write streams in workflow context
const writer = writable.getWriter();
await writer.write("data");
}export async function goodWorkflow() {
"use workflow";
// Delegate stream operations to steps
await writeToStream("data");
}
async function writeToStream(data: string) {
"use step";
// Stream operations must happen in steps
const writable = getWritable<string>();
const writer = writable.getWriter();
await writer.write(data);
writer.releaseLock();
}Namespaced Streams
Use getWritable({ namespace: 'name' }) to create multiple independent streams for different types of data. This is useful when you want to separate logs, metrics, data outputs, or other distinct channels.
import { getWritable } from 'workflow';
type LogEntry = { level: string; message: string };
type MetricEntry = { cpu: number; memory: number };
async function writeLogs() {
"use step";
const logs = getWritable<LogEntry>({ namespace: 'logs' });
const writer = logs.getWriter();
await writer.write({ level: 'info', message: 'Task started' });
await writer.write({ level: 'info', message: 'Processing...' });
writer.releaseLock();
}
async function writeMetrics() {
"use step";
const metrics = getWritable<MetricEntry>({ namespace: 'metrics' });
const writer = metrics.getWriter();
await writer.write({ cpu: 45, memory: 512 });
await writer.write({ cpu: 52, memory: 520 });
writer.releaseLock();
}
async function closeStreams() {
"use step";
await getWritable({ namespace: 'logs' }).close();
await getWritable({ namespace: 'metrics' }).close();
}
export async function multiStreamWorkflow() {
"use workflow";
await writeLogs();
await writeMetrics();
await closeStreams();
}Consuming Namespaced Streams
Use run.getReadable({ namespace: 'name' }) to access specific streams:
import { start } from 'workflow/api';
import { multiStreamWorkflow } from './workflows/multi';
type LogEntry = { level: string; message: string };
type MetricEntry = { cpu: number; memory: number };
export async function POST(request: Request) {
const run = await start(multiStreamWorkflow);
// Access specific named streams
const logs = run.getReadable<LogEntry>({ namespace: 'logs' });
const metrics = run.getReadable<MetricEntry>({ namespace: 'metrics' });
// Return the logs stream to the client
return new Response(logs, {
headers: { 'Content-Type': 'application/json' }
});
}Common Patterns
Progress Updates for Long-Running Tasks
Send incremental progress updates to keep users informed during lengthy workflows:
import { getWritable, sleep } from 'workflow';
type ProgressUpdate = {
item: string;
progress: number;
status: string;
};
async function processItem(
item: string,
current: number,
total: number
) {
"use step";
const writable = getWritable<ProgressUpdate>();
const writer = writable.getWriter();
// Simulate processing
await new Promise(resolve => setTimeout(resolve, 1000));
// Send progress update
await writer.write({
item,
progress: Math.round((current / total) * 100),
status: 'processing'
});
writer.releaseLock();
}
async function finalizeProgress() {
"use step";
await getWritable().close();
}
export async function batchProcessingWorkflow(items: string[]) {
"use workflow";
for (let i = 0; i < items.length; i++) {
await processItem(items[i], i + 1, items.length);
await sleep("1s");
}
await finalizeProgress();
}Streaming AI Responses with DurableAgent
Stream AI-generated content using DurableAgent from @workflow/ai. Tools can also emit progress updates to the same stream using data chunks with the UIMessageChunk type from the AI SDK:
import { DurableAgent } from '@workflow/ai/agent';
import { getWritable } from 'workflow';
import { z } from 'zod';
import type { UIMessageChunk } from 'ai';
async function searchFlights({ query }: { query: string }) {
"use step";
// Tools can emit progress updates to the stream
const writable = getWritable<UIMessageChunk>();
const writer = writable.getWriter();
await writer.write({
type: 'data-progress',
data: { message: `Searching flights for ${query}...` },
transient: true,
});
writer.releaseLock();
// ... search logic ...
return { flights: [/* results */] };
}
export async function aiAssistantWorkflow(userMessage: string) {
"use workflow";
const agent = new DurableAgent({
model: 'anthropic/claude-haiku-4.5',
system: 'You are a helpful flight assistant.',
tools: {
searchFlights: {
description: 'Search for flights',
inputSchema: z.object({ query: z.string() }),
execute: searchFlights,
},
},
});
// LLM response will be streamed to the run's writable
await agent.stream({
messages: [{ role: 'user', content: userMessage }],
writable: getWritable<UIMessageChunk>(),
});
}import { createUIMessageStreamResponse } from 'ai';
import { start } from 'workflow/api';
import { aiAssistantWorkflow } from './workflows/ai';
export async function POST(request: Request) {
const { message } = await request.json();
const run = await start(aiAssistantWorkflow, [message]);
return createUIMessageStreamResponse({
stream: run.readable,
});
}For a complete implementation, see the flight booking example which demonstrates streaming AI responses with tool progress updates.
Streaming Between Steps
One step produces a stream and another step consumes it:
export async function streamPipelineWorkflow() {
"use workflow";
// Streams can be passed between steps
const stream = await generateData();
const results = await consumeData(stream);
return { count: results.length };
}
async function generateData(): Promise<ReadableStream<number>> {
"use step";
// Producer step creates a stream
return new ReadableStream<number>({
start(controller) {
for (let i = 0; i < 10; i++) {
controller.enqueue(i);
}
controller.close();
}
});
}
async function consumeData(readable: ReadableStream<number>) {
"use step";
// Consumer step reads from the stream
const values: number[] = [];
for await (const value of readable) {
values.push(value);
}
return values;
}Processing Large Files Without Memory Overhead
Process large files by streaming chunks through transformation steps:
export async function fileProcessingWorkflow(fileUrl: string) {
"use workflow";
// Chain streams through multiple processing steps
const rawStream = await downloadFile(fileUrl);
const processedStream = await transformData(rawStream);
await uploadResult(processedStream);
}
async function downloadFile(url: string): Promise<ReadableStream<Uint8Array>> {
"use step";
const response = await fetch(url);
return response.body!;
}
async function transformData(input: ReadableStream<Uint8Array>): Promise<ReadableStream<Uint8Array>> {
"use step";
// Transform stream chunks without loading entire file into memory
return input.pipeThrough(new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
// Process each chunk individually
controller.enqueue(chunk);
}
}));
}
async function uploadResult(stream: ReadableStream<Uint8Array>) {
"use step";
await fetch('https://storage.example.com/upload', {
method: 'POST',
body: stream,
});
}Best Practices
Release locks properly:
const writer = writable.getWriter();
try {
await writer.write(data);
} finally {
writer.releaseLock(); // Always release
}Stream locks acquired in a step only apply within that step, not across other steps. This enables multiple writers to write to the same stream concurrently.
If a lock is not released, the step process cannot terminate. Even though the step returns and the workflow continues, the underlying process will remain active until it times out.
Close streams when done:
async function finalizeStream() {
"use step";
await getWritable().close(); // Signal completion
}Streams are automatically closed when the workflow run completes, but explicitly closing them signals completion to consumers earlier.
Use typed streams for type safety:
const writable = getWritable<MyDataType>();
const writer = writable.getWriter();
await writer.write({ /* typed data */ });Stream Failures
When a step returns a stream, the step is considered successful once it returns, even if the stream later encounters an error. The workflow won't automatically retry the step. The consumer of the stream must handle errors gracefully. For more on retry behavior, see Errors and Retries.
import { FatalError } from 'workflow';
async function produceStream(): Promise<ReadableStream<number>> {
"use step";
// Step succeeds once it returns the stream
return new ReadableStream<number>({
start(controller) {
controller.enqueue(1);
controller.enqueue(2);
// Error occurs after step has completed
controller.error(new Error('Stream failed'));
}
});
}
async function consumeStream(stream: ReadableStream<number>) {
"use step";
try {
for await (const value of stream) {
console.log(value);
}
} catch (error) {
// Retrying won't help since the stream is already errored
throw new FatalError('Stream failed');
}
}
export async function streamErrorWorkflow() {
"use workflow";
const stream = await produceStream(); // Step succeeds
await consumeStream(stream); // Consumer handles errors
}Stream errors don't trigger automatic retries for the producer step. Design your stream consumers to handle errors appropriately. Since the stream is already in an errored state, retrying the consumer won't help - use FatalError to fail the workflow immediately.
Related Documentation
-
getWritable()API Reference - Get the workflow's writable stream -
sleep()API Reference - Pause workflow execution for a duration -
start()API Reference - Start workflows and access theRunobject -
getRun()API Reference - Retrieve runs and their streams later - DurableAgent - AI agents with built-in streaming support
- Errors and Retries - Understanding error handling and retry behavior
- Serialization - Understanding what data types can be passed in workflows
- Workflows and Steps - Core concepts of workflow execution