getWritable()

Retrieves the current workflow run's default writable stream. The writable stream is intended to be passed as an argument to steps which can write to it. Chunks written to this stream can be read outside the workflow by using the readable property of the Run object.

Use this function in your workflows to produce streaming output that can be consumed by clients in real-time.

This function can only be called inside a workflow function (functions with "use workflow" directive)

import { getWritable } from 'workflow';

export async function myWorkflow() {
  "use workflow";

  const writable = getWritable(); 
  const writer = writable.getWriter();

  await writer.write(new TextEncoder().encode('Hello from workflow!'));
  await writer.close();
}

API Signature

Parameters

NameTypeDescription
optionsWorkflowWritableStreamOptions

Returns

WritableStream<W>

Returns a WritableStream<W> where W is the type of data you plan to write to the stream.

Good to Know

  • The stream should typically be passed to step functions for writing.
  • Always release the writer lock after writing to prevent resource leaks.
  • The stream can write binary data (using TextEncoder) or structured objects.
  • Remember to close the stream when finished to signal completion.

Examples

Basic Text Streaming

Here's a simple example streaming text data:

import { sleep, getWritable } from 'workflow';

export async function outputStreamWorkflow() {
  "use workflow";

  const writable = getWritable(); 

  await sleep("1s");
  await stepWithOutputStream(writable);
  await sleep("1s");
  await stepCloseOutputStream(writable);

  return 'done';
}

async function stepWithOutputStream(writable: WritableStream) {
  "use step";

  const writer = writable.getWriter();
  // Write binary data using TextEncoder
  await writer.write(new TextEncoder().encode('Hello, world!'));
  writer.releaseLock();
}

async function stepCloseOutputStream(writable: WritableStream) {
  "use step";

  // Close the stream to signal completion
  await writable.close();
}

Advanced Chat Streaming

Here's a more complex example showing how you might stream AI chat responses:

import { getWritable } from 'workflow';
import { generateId, streamText, type UIMessageChunk } from 'ai';

export async function chat(messages: UIMessage[]) {
  'use workflow';

  // Get typed writable stream for UI message chunks
  const writable = getWritable<UIMessageChunk>(); 

  // Start the stream
  await startStream(writable);

  let currentMessages = [...messages];

  // Process messages in steps
  for (let i = 0; i < MAX_STEPS; i++) {
    const result = await streamTextStep(currentMessages, writable);
    currentMessages.push(result.messages)

    if (result.finishReason !== 'tool-calls') {
      break;
    }
  }

  // End the stream
  await endStream(writable);
}

async function startStream(writable: WritableStream<UIMessageChunk>) {
  'use step';

  const writer = writable.getWriter();

  // Send start message
  writer.write({
    type: 'start',
    messageMetadata: {
      createdAt: Date.now(),
      messageId: generateId(),
    },
  });

  writer.releaseLock();
}

async function streamTextStep(writable: WritableStream<UIMessageChunk>) {
  'use step';

  const writer = writable.getWriter();

  // Call streamText from the AI SDK
  const result = streamText({
    model: 'gpt-4',
    messages,
    /* other options */
  });

  // Pipe the AI stream into the writable stream
  const reader = result
    .toUIMessageStream({ sendStart: false, sendFinish: false })
    .getReader();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    await writer.write(value);
  }

  reader.releaseLock();


  // Close the stream
  writer.close();
  writer.releaseLock();
}

async function endStream(writable: WritableStream<UIMessageChunk>) {
  'use step';

  const writer = writable.getWriter();

  // Close the stream
  writer.close();
  writer.releaseLock();
}