Streaming

Workflows can stream data in real-time to clients without waiting for the entire workflow to complete. This enables progress updates, AI-generated content, log messages, and other incremental data to be delivered as workflows execute.


Getting Started with getWritable()

Every workflow run has a default writable stream that steps can write to using getWritable(). Data written to this stream becomes immediately available to clients consuming the workflow's output.

workflows/simple-streaming.ts
import { getWritable } from 'workflow';

async function writeProgress(message: string) {
  "use step";

  // Steps can write to the run's default stream
  const writable = getWritable<string>(); 
  const writer = writable.getWriter();
  await writer.write(message);
  writer.releaseLock();
}

export async function simpleStreamingWorkflow() {
  "use workflow";

  await writeProgress("Starting task...");
  await writeProgress("Processing data...");
  await writeProgress("Task complete!");
}

Consuming the Stream

Use the Run object's readable property to consume the stream from your API route:

app/api/stream/route.ts
import { start } from 'workflow/api';
import { simpleStreamingWorkflow } from './workflows/simple';

export async function POST() {
  const run = await start(simpleStreamingWorkflow);

  // Return the readable stream to the client
  return new Response(run.readable, {
    headers: { 'Content-Type': 'text/plain' }
  });
}

When a client makes a request to this endpoint, they'll receive each message as it's written, without waiting for the workflow to complete.

Resuming Streams from a Specific Point

Use run.getReadable({ startIndex }) to resume a stream from a specific position. This is useful for reconnecting after timeouts or network interruptions:

app/api/resume-stream/[runId]/route.ts
import { getRun } from 'workflow/api';

export async function GET(
  request: Request,
  { params }: { params: Promise<{ runId: string }> }
) {
  const { runId } = await params;
  const { searchParams } = new URL(request.url);

  // Client provides the last chunk index they received
  const startIndexParam = searchParams.get('startIndex'); 
  const startIndex = startIndexParam ? parseInt(startIndexParam, 10) : undefined; 

  const run = getRun(runId);
  const stream = run.getReadable({ startIndex }); 

  return new Response(stream, {
    headers: { 'Content-Type': 'text/plain' }
  });
}

This allows clients to reconnect and continue receiving data from where they left off, rather than restarting from the beginning.


Streams as Data Types

ReadableStreamExternal link and WritableStreamExternal link are standard Web Streams API types that Workflow DevKit makes serializable. These are not custom types - they follow the web standard - but Workflow DevKit adds the ability to pass them between functions while maintaining their streaming capabilities.

Unlike regular values that are fully serialized to the event log, streams maintain their streaming capabilities when passed between functions.

Key properties:

  • Stream references can be passed between workflow and step functions
  • Stream data flows directly without being stored in the event log
  • Streams preserve their state across workflow suspension points

How Streams Persist Across Workflow Suspensions

Streams in Workflow DevKit are backed by persistent, resumable storage provided by the "world" implementation. This is what enables streams to maintain their state even when workflows suspend and resume:

  • Vercel deployments: Streams are backed by a performant Redis-based stream
  • Local development: Stream chunks are stored in the filesystem

Passing Streams as Arguments

Since streams are serializable data types, you don't need to use the special getWritable(). You can even wire your own streams through workflows, passing them as arguments from outside into steps.

Here's an example of passing a request body stream through a workflow to a step that processes it:

app/api/upload/route.ts
import { start } from 'workflow/api';
import { streamProcessingWorkflow } from './workflows/streaming';

export async function POST(request: Request) {
  // Streams can be passed as workflow arguments
  const run = await start(streamProcessingWorkflow, [request.body]); 
  await run.result();

  return Response.json({ status: 'complete' });
}
workflows/streaming.ts
export async function streamProcessingWorkflow(
  inputStream: ReadableStream<Uint8Array> 
) {
  "use workflow";

  // Workflow passes stream to step for processing
  const result = await processInputStream(inputStream); 
  return { length: result.length };
}

async function processInputStream(input: ReadableStream<Uint8Array>) {
  "use step";

  // Step reads from the stream
  const chunks: Uint8Array[] = [];

  for await (const chunk of input) {
    chunks.push(chunk);
  }

  return Buffer.concat(chunks).toString('utf8');
}

Important Limitation

Streams Cannot Be Used Directly in Workflow Context

You cannot read from or write to streams directly within a workflow function. All stream operations must happen in step functions.

Workflow functions must be deterministic to support replay. Since streams bypass the event log for performance, reading stream data in a workflow would break determinism - each replay could see different data. By requiring all stream operations to happen in steps, the framework ensures consistent behavior.

For more on determinism and replay, see Workflows and Steps.

workflows/bad-example.ts
export async function badWorkflow() {
  "use workflow";

  const writable = getWritable<string>();

  // Cannot read/write streams in workflow context
  const writer = writable.getWriter(); 
  await writer.write("data"); 
}
workflows/good-example.ts
export async function goodWorkflow() {
  "use workflow";

  // Delegate stream operations to steps
  await writeToStream("data");
}

async function writeToStream(data: string) {
  "use step";

  // Stream operations must happen in steps
  const writable = getWritable<string>();
  const writer = writable.getWriter();
  await writer.write(data);
  writer.releaseLock();
}

Namespaced Streams

Use getWritable({ namespace: 'name' }) to create multiple independent streams for different types of data. This is useful when you want to separate logs, metrics, data outputs, or other distinct channels.

workflows/multi-stream.ts
import { getWritable } from 'workflow';

type LogEntry = { level: string; message: string };
type MetricEntry = { cpu: number; memory: number };

async function writeLogs() {
  "use step";

  const logs = getWritable<LogEntry>({ namespace: 'logs' }); 
  const writer = logs.getWriter();

  await writer.write({ level: 'info', message: 'Task started' });
  await writer.write({ level: 'info', message: 'Processing...' });

  writer.releaseLock();
}

async function writeMetrics() {
  "use step";

  const metrics = getWritable<MetricEntry>({ namespace: 'metrics' }); 
  const writer = metrics.getWriter();

  await writer.write({ cpu: 45, memory: 512 });
  await writer.write({ cpu: 52, memory: 520 });

  writer.releaseLock();
}

async function closeStreams() {
  "use step";

  await getWritable({ namespace: 'logs' }).close();
  await getWritable({ namespace: 'metrics' }).close();
}

export async function multiStreamWorkflow() {
  "use workflow";

  await writeLogs();
  await writeMetrics();
  await closeStreams();
}

Consuming Namespaced Streams

Use run.getReadable({ namespace: 'name' }) to access specific streams:

app/api/multi-stream/route.ts
import { start } from 'workflow/api';
import { multiStreamWorkflow } from './workflows/multi';

type LogEntry = { level: string; message: string };
type MetricEntry = { cpu: number; memory: number };

export async function POST(request: Request) {
  const run = await start(multiStreamWorkflow);

  // Access specific named streams
  const logs = run.getReadable<LogEntry>({ namespace: 'logs' }); 
  const metrics = run.getReadable<MetricEntry>({ namespace: 'metrics' }); 

  // Return the logs stream to the client
  return new Response(logs, {
    headers: { 'Content-Type': 'application/json' }
  });
}

Common Patterns

Progress Updates for Long-Running Tasks

Send incremental progress updates to keep users informed during lengthy workflows:

workflows/batch-processing.ts
import { getWritable, sleep } from 'workflow';

type ProgressUpdate = {
  item: string;
  progress: number;
  status: string;
};

async function processItem(
  item: string,
  current: number,
  total: number
) {
  "use step";

  const writable = getWritable<ProgressUpdate>(); 
  const writer = writable.getWriter();

  // Simulate processing
  await new Promise(resolve => setTimeout(resolve, 1000));

  // Send progress update
  await writer.write({ 
    item, 
    progress: Math.round((current / total) * 100), 
    status: 'processing'
  }); 

  writer.releaseLock();
}

async function finalizeProgress() {
  "use step";

  await getWritable().close();
}

export async function batchProcessingWorkflow(items: string[]) {
  "use workflow";

  for (let i = 0; i < items.length; i++) {
    await processItem(items[i], i + 1, items.length);
    await sleep("1s");
  }

  await finalizeProgress();
}

Streaming AI Responses with DurableAgent

Stream AI-generated content using DurableAgent from @workflow/ai. Tools can also emit progress updates to the same stream using data chunksExternal link with the UIMessageChunkExternal link type from the AI SDK:

workflows/ai-assistant.ts
import { DurableAgent } from '@workflow/ai/agent';
import { getWritable } from 'workflow';
import { z } from 'zod';
import type { UIMessageChunk } from 'ai';

async function searchFlights({ query }: { query: string }) {
  "use step";

  // Tools can emit progress updates to the stream
  const writable = getWritable<UIMessageChunk>(); 
  const writer = writable.getWriter(); 
  await writer.write({ 
    type: 'data-progress', 
    data: { message: `Searching flights for ${query}...` }, 
    transient: true, 
  }); 
  writer.releaseLock(); 

  // ... search logic ...
  return { flights: [/* results */] };
}

export async function aiAssistantWorkflow(userMessage: string) {
  "use workflow";

  const agent = new DurableAgent({
    model: 'anthropic/claude-haiku-4.5',
    system: 'You are a helpful flight assistant.',
    tools: {
      searchFlights: {
        description: 'Search for flights',
        inputSchema: z.object({ query: z.string() }),
        execute: searchFlights,
      },
    },
  });

  // LLM response will be streamed to the run's writable
  await agent.stream({
    messages: [{ role: 'user', content: userMessage }],
    writable: getWritable<UIMessageChunk>(), 
  });
}
app/api/ai-assistant/route.ts
import { createUIMessageStreamResponse } from 'ai';
import { start } from 'workflow/api';
import { aiAssistantWorkflow } from './workflows/ai';

export async function POST(request: Request) {
  const { message } = await request.json();

  const run = await start(aiAssistantWorkflow, [message]);

  return createUIMessageStreamResponse({
    stream: run.readable,
  });
}

For a complete implementation, see the flight booking exampleExternal link which demonstrates streaming AI responses with tool progress updates.

Streaming Between Steps

One step produces a stream and another step consumes it:

workflows/stream-pipeline.ts
export async function streamPipelineWorkflow() {
  "use workflow";

  // Streams can be passed between steps
  const stream = await generateData(); 
  const results = await consumeData(stream); 

  return { count: results.length };
}

async function generateData(): Promise<ReadableStream<number>> {
  "use step";

  // Producer step creates a stream
  return new ReadableStream<number>({
    start(controller) {
      for (let i = 0; i < 10; i++) {
        controller.enqueue(i);
      }
      controller.close();
    }
  });
}

async function consumeData(readable: ReadableStream<number>) {
  "use step";

  // Consumer step reads from the stream
  const values: number[] = [];
  for await (const value of readable) {
    values.push(value);
  }
  return values;
}

Processing Large Files Without Memory Overhead

Process large files by streaming chunks through transformation steps:

workflows/file-processing.ts
export async function fileProcessingWorkflow(fileUrl: string) {
  "use workflow";

  // Chain streams through multiple processing steps
  const rawStream = await downloadFile(fileUrl); 
  const processedStream = await transformData(rawStream); 
  await uploadResult(processedStream); 
}

async function downloadFile(url: string): Promise<ReadableStream<Uint8Array>> {
  "use step";
  const response = await fetch(url);
  return response.body!;
}

async function transformData(input: ReadableStream<Uint8Array>): Promise<ReadableStream<Uint8Array>> {
  "use step";

  // Transform stream chunks without loading entire file into memory
  return input.pipeThrough(new TransformStream<Uint8Array, Uint8Array>({
    transform(chunk, controller) {
      // Process each chunk individually
      controller.enqueue(chunk);
    }
  }));
}

async function uploadResult(stream: ReadableStream<Uint8Array>) {
  "use step";
  await fetch('https://storage.example.com/upload', {
    method: 'POST',
    body: stream,
  });
}

Best Practices

Release locks properly:

const writer = writable.getWriter();
try {
  await writer.write(data);
} finally {
  writer.releaseLock(); // Always release
}

Stream locks acquired in a step only apply within that step, not across other steps. This enables multiple writers to write to the same stream concurrently.

If a lock is not released, the step process cannot terminate. Even though the step returns and the workflow continues, the underlying process will remain active until it times out.

Close streams when done:

async function finalizeStream() {
  "use step";

  await getWritable().close(); // Signal completion
}

Streams are automatically closed when the workflow run completes, but explicitly closing them signals completion to consumers earlier.

Use typed streams for type safety:

const writable = getWritable<MyDataType>();
const writer = writable.getWriter();
await writer.write({ /* typed data */ });

Stream Failures

When a step returns a stream, the step is considered successful once it returns, even if the stream later encounters an error. The workflow won't automatically retry the step. The consumer of the stream must handle errors gracefully. For more on retry behavior, see Errors and Retries.

workflows/stream-error-handling.ts
import { FatalError } from 'workflow';

async function produceStream(): Promise<ReadableStream<number>> {
  "use step";

  // Step succeeds once it returns the stream
  return new ReadableStream<number>({
    start(controller) {
      controller.enqueue(1);
      controller.enqueue(2);
      // Error occurs after step has completed
      controller.error(new Error('Stream failed')); 
    }
  });
}

async function consumeStream(stream: ReadableStream<number>) {
  "use step";

  try { 
    for await (const value of stream) {
      console.log(value);
    }
  } catch (error) { 
    // Retrying won't help since the stream is already errored
    throw new FatalError('Stream failed'); 
  } 
}

export async function streamErrorWorkflow() {
  "use workflow";

  const stream = await produceStream(); // Step succeeds
  await consumeStream(stream); // Consumer handles errors
}

Stream errors don't trigger automatic retries for the producer step. Design your stream consumers to handle errors appropriately. Since the stream is already in an errored state, retrying the consumer won't help - use FatalError to fail the workflow immediately.