---
title: Streams
description: Read, write, and manage real-time data streams for workflow runs.
type: reference
summary: Methods: streams.write(), streams.writeMulti(), streams.get(), streams.close(), streams.list(), streams.getChunks(), streams.getInfo(). Stream methods live on world.streams.
prerequisites:
  - /docs/api-reference/workflow-api/get-world
related:
  - /docs/foundations/streaming
  - /docs/api-reference/workflow/get-writable
---

# Streams



Stream methods live on `world.streams` (the `streams` sub-object of the `World` instance returned by `await getWorld()`). Use them to write chunks, read streams, and manage stream lifecycle outside of the standard `getWritable()` pattern.

<Callout type="info">
  For most streaming use cases, use [`getWritable()`](/docs/api-reference/workflow/get-writable) inside steps. Direct stream methods are for advanced scenarios like building custom stream consumers or managing streams from outside a workflow.
</Callout>

## Import

```typescript lineNumbers
import { getWorld } from "workflow/runtime";

const world = await getWorld(); // [!code highlight]
// Stream methods are called on world.streams — e.g. world.streams.write()
```

## Methods

### write()

Write a data chunk to a named stream.

```typescript lineNumbers
await world.streams.write(runId, "default", chunk); // [!code highlight]
```

**Parameters:**

| Parameter | Type                   | Description         |
| --------- | ---------------------- | ------------------- |
| `runId`   | `string`               | The workflow run ID |
| `name`    | `string`               | The stream name     |
| `chunk`   | `string \| Uint8Array` | Data to write       |

### writeMulti()

Write multiple chunks in a single operation. Optional optimization — not all World implementations support it. Falls back to sequential `write()` calls if unavailable.

```typescript lineNumbers
await world.streams.writeMulti?.(runId, "default", [chunk1, chunk2]); // [!code highlight]
```

**Parameters:**

| Parameter | Type                       | Description               |
| --------- | -------------------------- | ------------------------- |
| `runId`   | `string`                   | The workflow run ID       |
| `name`    | `string`                   | The stream name           |
| `chunks`  | `(string \| Uint8Array)[]` | Chunks to write, in order |

### get()

Read data from a named stream as a live `ReadableStream` that waits for new chunks in real time.

```typescript lineNumbers
const readable = await world.streams.get(runId, "default"); // [!code highlight]
```

**Parameters:**

| Parameter    | Type     | Description                                                                                                                                                |
| ------------ | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `runId`      | `string` | The workflow run ID                                                                                                                                        |
| `name`       | `string` | The stream name                                                                                                                                            |
| `startIndex` | `number` | Optional. Positive values skip chunks from the start (0-based). Negative values read from the tail (e.g. `-3` starts 3 chunks from the end). Clamped to 0. |

**Returns:** `ReadableStream<Uint8Array>`

### close()

Close a stream when done writing.

```typescript lineNumbers
await world.streams.close(runId, "default"); // [!code highlight]
```

**Parameters:**

| Parameter | Type     | Description         |
| --------- | -------- | ------------------- |
| `runId`   | `string` | The workflow run ID |
| `name`    | `string` | The stream name     |

### list()

List all stream names associated with a workflow run.

```typescript lineNumbers
const streamNames = await world.streams.list(runId); // [!code highlight]
```

**Parameters:**

| Parameter | Type     | Description         |
| --------- | -------- | ------------------- |
| `runId`   | `string` | The workflow run ID |

**Returns:** `string[]`

### getChunks()

Fetch stream chunks with cursor-based pagination. Unlike `get()` (which returns a live `ReadableStream`), this returns a snapshot of currently available chunks.

```typescript lineNumbers
const result = await world.streams.getChunks(runId, "default", { // [!code highlight]
  limit: 50,
}); // [!code highlight]
// result.data: StreamChunk[], result.cursor, result.hasMore, result.done
```

**Parameters:**

| Parameter        | Type     | Description                                   |
| ---------------- | -------- | --------------------------------------------- |
| `runId`          | `string` | The workflow run ID                           |
| `name`           | `string` | The stream name                               |
| `options.limit`  | `number` | Max chunks per page (default: 100, max: 1000) |
| `options.cursor` | `string` | Cursor from a previous response               |

**Returns:** `StreamChunksResponse`

| Field     | Type             | Description                                                                                                                 |
| --------- | ---------------- | --------------------------------------------------------------------------------------------------------------------------- |
| `data`    | `StreamChunk[]`  | Chunks in index order. Each has `index` (0-based) and `data` (`Uint8Array`).                                                |
| `cursor`  | `string \| null` | Cursor for the next page                                                                                                    |
| `hasMore` | `boolean`        | Whether more pages of already-written chunks exist                                                                          |
| `done`    | `boolean`        | Whether the stream is fully closed. When `false`, new chunks may appear in future requests even after `hasMore` is `false`. |

### getInfo()

Retrieve lightweight metadata about a stream without fetching chunks.

```typescript lineNumbers
const info = await world.streams.getInfo(runId, "default"); // [!code highlight]
// info.tailIndex: last chunk index (-1 if empty), info.done: whether stream is closed
```

**Parameters:**

| Parameter | Type     | Description         |
| --------- | -------- | ------------------- |
| `runId`   | `string` | The workflow run ID |
| `name`    | `string` | The stream name     |

**Returns:** `StreamInfoResponse`

| Field       | Type      | Description                                                                     |
| ----------- | --------- | ------------------------------------------------------------------------------- |
| `tailIndex` | `number`  | Index of the last known chunk (0-based). `-1` when no chunks have been written. |
| `done`      | `boolean` | Whether the stream is fully complete (closed).                                  |

## Examples

### Read a Stream as a Response

```typescript lineNumbers
// app/api/workflow-streams/read/route.ts
import { getWorld } from "workflow/runtime";

export async function GET(req: Request) {
  const url = new URL(req.url);
  const streamName = url.searchParams.get("name") ?? "default";
  const runId = url.searchParams.get("runId")!;
  const world = await getWorld();
  const readable = await world.streams.get(runId, streamName); // [!code highlight]

  return new Response(readable, {
    headers: { "Content-Type": "application/octet-stream" },
  });
}
```

### Paginate Through Stream Chunks

```typescript lineNumbers
import { getWorld } from "workflow/runtime";

const world = await getWorld();
let cursor: string | undefined;

do {
  const result = await world.streams.getChunks(runId, "default", { cursor }); // [!code highlight]
  for (const chunk of result.data) {
    console.log(`Chunk ${chunk.index}:`, chunk.data);
  }
  cursor = result.cursor ?? undefined;
} while (cursor);
```

## Related

* [Streaming](/docs/foundations/streaming) — Core concepts for streaming data from workflows
* [getWritable()](/docs/api-reference/workflow/get-writable) — The standard way to write to streams from within steps
* [Storage](/docs/api-reference/workflow-api/world/storage) — Query runs, steps, hooks, and events


## Sitemap
[Overview of all docs pages](/sitemap.md)
