Resumable Streams
When building chat interfaces, it's common to run into network interruptions, page refreshes, or serverless function timeouts, which can break the connection to an in-progress agent.
Where a standard chat implementation would require the user to resend their message and wait for the entire response again, workflow runs are durable, and so are the streams attached to them. This means a stream can be resumed at any point, optionally only syncing the data that was missed since the last connection.
Resumable streams come out of the box with Workflow DevKit, however, the client needs to recognize that a stream exists, and needs to know which stream to reconnect to, and needs to know where to start from. For this, Workflow DevKit provides the WorkflowChatTransport helper, a drop-in transport for the AI SDK that handles client-side resumption logic for you.
Implementing stream resumption
Let's add stream resumption to our Flight Booking Agent that we build in the Building Durable AI Agents guide.
Return the Run ID from Your API
Modify your chat endpoint to include the workflow run ID in a response header. The Run ID uniquely identifies the run's stream, so it allows the client to know which stream to reconnect to.
// ... imports ...
export async function POST(req: Request) {
// ... existing logic to create the workflow ...
const run = await start(chatWorkflow, [modelMessages]);
return createUIMessageStreamResponse({
stream: run.readable,
headers: { // [!code highlight
"x-workflow-run-id": run.runId,
},
});
}Add a Stream Reconnection Endpoint
Currently we only have one API endpoint that always creates a new run, so we need to create a new API route that returns the stream for an existing run:
import { createUIMessageStreamResponse } from "ai";
import { getRun } from "workflow/api";
export async function GET(
request: Request,
{ params }: { params: Promise<{ id: string }> }
) {
const { id } = await params;
const { searchParams } = new URL(request.url);
// Client provides the last chunk index they received
const startIndexParam = searchParams.get("startIndex");
const startIndex = startIndexParam
? parseInt(startIndexParam, 10)
: undefined;
// Instead of starting a new run, we fetch an existing run.
const run = getRun(id);
const stream = run.getReadable({ startIndex });
return createUIMessageStreamResponse({ stream });
}The startIndex parameter ensures the client can choose where to resume the stream from. For instance, if the function times out during streaming, the chat transport will use startIndex to resume the stream exactly from the last token it received.
Use WorkflowChatTransport in the Client
Replace the default transport in AI-SDK's useChat with WorkflowChatTransport, and update the callbacks to store and use the latest run ID. For now, we'll store the run ID in localStorage. For your own app, this would be stored wherever you store session information.
"use client";
import { useChat } from "@ai-sdk/react";
import { WorkflowChatTransport } from "@workflow/ai";
import { useMemo, useState } from "react";
export default function ChatPage() {
// Check for an active workflow run on mount
const activeRunId = useMemo(() => {
if (typeof window === "undefined") return;
return localStorage.getItem("active-workflow-run-id") ?? undefined;
}, []);
const { messages, sendMessage, status } = useChat({
resume: Boolean(activeRunId),
transport: new WorkflowChatTransport({
api: "/api/chat",
// Store the run ID when a new chat starts
onChatSendMessage: (response) => {
const workflowRunId = response.headers.get("x-workflow-run-id");
if (workflowRunId) {
localStorage.setItem("active-workflow-run-id", workflowRunId);
}
},
// Clear the run ID when the chat completes
onChatEnd: () => {
localStorage.removeItem("active-workflow-run-id");
},
// Use the stored run ID for reconnection
prepareReconnectToStreamRequest: ({ api, ...rest }) => {
const runId = localStorage.getItem("active-workflow-run-id");
if (!runId) throw new Error("No active workflow run ID found");
return {
...rest,
api: `/api/chat/${encodeURIComponent(runId)}/stream`,
};
},
}),
});
// ... render your chat UI
}Now try the flight booking example again. Open it up in a separate tab, or spam the refresh button, and see how the client connects to the same chat stream every time.
How It Works
- When the user sends a message,
WorkflowChatTransportmakes a POST to/api/chat - The API starts a workflow and returns the run ID in the
x-workflow-run-idheader onChatSendMessagestores this run ID in localStorage- If the stream is interrupted before receiving a "finish" chunk, the transport automatically reconnects
prepareReconnectToStreamRequestbuilds the reconnection URL using the stored run ID, pointing to the new endpoint/api/chat/{runId}/stream- The reconnection endpoint returns the stream from where the client left off
- When the stream completes,
onChatEndclears the stored run ID
This approach also handles page refreshes, as the client will automatically reconnect to the stream from the last known position when the UI loads with a stored run ID, following the behavior of AI SDK's stream resumption.
Related Documentation
WorkflowChatTransportAPI Reference - Full configuration options- Streaming - Understanding workflow streams
getRun()API Reference - Retrieving existing runs