Ai

Queueing User Messages

When using multi-turn workflows, messages typically arrive between agent turns. The workflow waits at a hook, receives a message, then starts a new turn. But sometimes you need to inject messages during an agent's turn, before tool calls complete or while the model is reasoning.

DurableAgent's prepareStep callback enables this by running before each step in the agent loop, giving you a chance to inject queued messages into the conversation. prepareStep also allows you to modify the model choice and existing messages mid-turn, see AI SDK's prepareStep callback for more details.

When to Use This

Message queueing is useful when:

  • Users send follow-up messages while the agent is still searching for flights or processing bookings
  • External systems need to inject context mid-turn (e.g., a flight status webhook fires during processing)
  • You want messages to influence the agent's next step rather than waiting for the current turn to complete

If you just need basic multi-turn conversations where messages arrive between turns, see Chat Session Modeling. This guide covers the more advanced case of injecting messages during turns.

The prepareStep Callback

The prepareStep callback runs before each step in the agent loop. It receives the current state and can modify the messages sent to the model:

interface PrepareStepInfo {
  model: string | (() => Promise<LanguageModelV2>);  // Current model
  stepNumber: number;                                // 0-indexed step count
  steps: StepResult[];                               // Previous step results
  messages: LanguageModelV2Prompt;                   // Messages to be sent
}

interface PrepareStepResult {
  model?: string | (() => Promise<LanguageModelV2>); // Override model
  messages?: LanguageModelV2Prompt;                  // Override messages
}

Injecting Queued Messages

Once you have a multi-turn workflow, you can combine a message queue with prepareStep to inject messages that arrive during processing:

workflows/chat/workflow.ts
import { DurableAgent } from "@workflow/ai/agent";
import type { UIMessageChunk } from "ai";
import { getWritable } from "workflow";
import { chatMessageHook } from "./hooks/chat-message";
import { flightBookingTools, FLIGHT_ASSISTANT_PROMPT } from "./steps/tools";

export async function chatWorkflow(threadId: string, initialMessage: string) {
  "use workflow";

  const writable = getWritable<UIMessageChunk>();
  const messageQueue: Array<{ role: "user"; content: string }> = []; 

  const agent = new DurableAgent({
    model: "bedrock/claude-4-5-haiku-20251001-v1",
    system: FLIGHT_ASSISTANT_PROMPT,
    tools: flightBookingTools,
  });

  // Listen for messages in background (non-blocking)
  const hook = chatMessageHook.create({ token: `thread:${threadId}` }); 
  hook.then(({ message }) => { 
    messageQueue.push({ role: "user", content: message }); 
  }); 

  await agent.stream({
    messages: [{ role: "user", content: initialMessage }],
    writable,
    prepareStep: ({ messages: currentMessages }) => { 
      // Inject any queued messages before the next LLM call
      if (messageQueue.length > 0) { 
        const newMessages = messageQueue.splice(0); // Drain queue
        return { 
          messages: [ 
            ...currentMessages, 
            ...newMessages.map(m => ({ 
              role: m.role, 
              content: [{ type: "text" as const, text: m.content }], 
            })), 
          ], 
        }; 
      } 
      return {}; 
    }, 
  });
}

Messages sent via chatMessageHook.resume() accumulate in the queue and get injected before the next step, whether that's a tool call or another LLM request.

The prepareStep callback receives messages in LanguageModelV2Prompt format (with content arrays), which is the internal format used by the AI SDK.

On this page

GitHubEdit this page on GitHub