Tool Streaming
Stream real-time progress updates from tools to the UI while they execute.
Use this pattern when tools take a long time to execute and you want to show progress updates, intermediate results, or status messages in the UI while the tool is still running.
Pattern
Inside a step function, call getWritable<UIMessageChunk>() to write custom data parts to the same stream the agent uses. These appear as typed data parts in the client's message parts array.
Simplified
import { DurableAgent } from "@workflow/ai/agent";
import { getWritable } from "workflow";
import { z } from "zod";
import type { UIMessageChunk } from "ai";
declare function performSearch(query: string): Promise<{ id: string; title: string }[]>; // @setup
declare function searchWithProgress(args: { query: string }): Promise<any>; // @setup
export async function searchAgent(userMessage: string) {
"use workflow";
const agent = new DurableAgent({
model: "anthropic/claude-haiku-4.5",
tools: {
search: {
description: "Search for items",
inputSchema: z.object({ query: z.string() }),
execute: searchWithProgress,
},
},
});
await agent.stream({
messages: [{ role: "user", content: userMessage }],
writable: getWritable<UIMessageChunk>(),
});
}Full Implementation
import { DurableAgent } from "@workflow/ai/agent";
import { getWritable } from "workflow";
import { z } from "zod";
import type { UIMessageChunk } from "ai";
// Custom data part type for the client to render
interface FoundItemDataPart {
type: "data-found-item";
id: string;
data: {
title: string;
score: number;
};
}
// Step: Search with streaming progress updates
async function searchWithProgress(
{ query }: { query: string },
{ toolCallId }: { toolCallId: string }
) {
"use step";
const writable = getWritable<UIMessageChunk>();
const writer = writable.getWriter();
try {
// Simulate finding items one at a time
const items = [
{ title: "Result A", score: 95 },
{ title: "Result B", score: 87 },
{ title: "Result C", score: 72 },
];
for (const item of items) {
// Simulate search latency
await new Promise((resolve) => setTimeout(resolve, 800));
// Stream each result to the UI as it's found
await writer.write({
type: "data-found-item",
id: `${toolCallId}-${item.title}`,
data: item,
} as UIMessageChunk);
}
return {
message: `Found ${items.length} results for "${query}"`,
items,
};
} finally {
writer.releaseLock();
}
}
// Step: Fetch details for a specific item
async function getItemDetails({ itemId }: { itemId: string }) {
"use step";
const writable = getWritable<UIMessageChunk>();
const writer = writable.getWriter();
try {
// Emit a transient progress message
await writer.write({
type: "data-progress",
data: { message: `Loading details for ${itemId}...` },
transient: true,
} as UIMessageChunk);
await new Promise((resolve) => setTimeout(resolve, 1000));
return { itemId, description: "Detailed information", available: true };
} finally {
writer.releaseLock();
}
}
export async function searchAgent(userMessage: string) {
"use workflow";
const writable = getWritable<UIMessageChunk>();
const agent = new DurableAgent({
model: "anthropic/claude-haiku-4.5",
instructions: "You help users search for items. Use the search tool first, then get details if asked.",
tools: {
search: {
description: "Search for items matching a query",
inputSchema: z.object({
query: z.string().describe("Search query"),
}),
execute: searchWithProgress,
},
getDetails: {
description: "Get detailed information about a specific item",
inputSchema: z.object({
itemId: z.string().describe("Item ID from search results"),
}),
execute: getItemDetails,
},
},
});
await agent.stream({
messages: [{ role: "user", content: userMessage }],
writable,
});
}Client Rendering
// In your chat component's message rendering:
{message.parts.map((part, i) => {
if (part.type === "data-found-item") {
const item = part.data as { title: string; score: number };
return (
<div key={part.id} className="p-3 bg-muted rounded-md">
<div className="font-medium">{item.title}</div>
<div className="text-muted-foreground">Score: {item.score}</div>
</div>
);
}
// ... other part types
})}Key APIs
"use step"— step functions can write to the streamgetWritable()— access the run's output stream from inside a stepDurableAgent— agent streams LLM output to the same writable