Overview

Tool Orchestration

Choose between step-level and workflow-level tools, or combine both for complex tool implementations.

Use this pattern to understand when to implement a tool as a step, at the workflow level, or as a combination. The choice depends on whether the tool needs Node.js I/O (step), workflow primitives like sleep() and hooks (workflow level), or both.

Pattern

Tools marked with "use step" get automatic retries and full Node.js access but cannot use sleep() or hooks. Tools without "use step" run in the workflow context and can use workflow primitives but cannot perform side effects directly. Combine both by having a workflow-level tool call into steps for I/O.

Step-Level vs Workflow-Level

CapabilityStep ("use step")Workflow Level
getWritable()YesNo
Automatic retriesYesNo
Side effects (fetch, DB)YesNo
sleep()NoYes
createHook() / createWebhook()NoYes

Simplified

import { DurableAgent } from "@workflow/ai/agent";
import { sleep, getWritable } from "workflow";
import { z } from "zod";
import type { UIMessageChunk } from "ai";

// Step-level tool: I/O with retries
async function fetchWeather({ city }: { city: string }) {
  "use step";
  const res = await fetch(`https://api.weather.com?city=${city}`);
  return res.json();
}

// Workflow-level tool: uses sleep()
async function scheduleReminder({ delayMs }: { delayMs: number }) {
  // No "use step" — sleep() requires workflow context
  await sleep(delayMs);
  return { message: `Reminder fired after ${delayMs}ms` };
}

// Combined: workflow-level orchestration calling into steps
async function fetchWithDelay({ url, delayMs }: { url: string; delayMs: number }) {
  const result = await doFetch(url);   // Step handles I/O
  await sleep(delayMs);                // Workflow handles sleep
  return result;
}

async function doFetch(url: string) {
  "use step";
  const res = await fetch(url);
  return res.json();
}

export async function assistantAgent(userMessage: string) {
  "use workflow";

  const agent = new DurableAgent({
    model: "anthropic/claude-haiku-4.5",
    tools: {
      fetchWeather: {
        description: "Get weather for a city",
        inputSchema: z.object({ city: z.string() }),
        execute: fetchWeather,
      },
      scheduleReminder: {
        description: "Set a reminder after a delay",
        inputSchema: z.object({ delayMs: z.number() }),
        execute: scheduleReminder,
      },
      fetchWithDelay: {
        description: "Fetch a URL then wait before returning",
        inputSchema: z.object({ url: z.string(), delayMs: z.number() }),
        execute: fetchWithDelay,
      },
    },
  });

  await agent.stream({
    messages: [{ role: "user", content: userMessage }],
    writable: getWritable<UIMessageChunk>(),
  });
}

Full Implementation

import { DurableAgent } from "@workflow/ai/agent";
import { sleep, createWebhook, getWritable } from "workflow";
import { z } from "zod";
import type { UIMessageChunk } from "ai";

// --- Step-level tools: I/O with retries ---

async function searchDatabase({ query }: { query: string }) {
  "use step";

  const response = await fetch(`https://api.example.com/search?q=${query}`);
  if (!response.ok) throw new Error(`Search failed: ${response.status}`);
  return response.json();
}

async function sendNotification({
  userId,
  message,
}: {
  userId: string;
  message: string;
}) {
  "use step";

  await fetch("https://api.example.com/notifications", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ userId, message }),
  });
  return { sent: true };
}

// --- Workflow-level tool: uses sleep ---

async function waitThenCheck({
  delayMs,
  endpoint,
}: {
  delayMs: number;
  endpoint: string;
}) {
  // No "use step" — workflow context needed for sleep()
  await sleep(delayMs);
  // Delegate I/O to a step
  return pollEndpoint(endpoint);
}

async function pollEndpoint(endpoint: string) {
  "use step";
  const res = await fetch(endpoint);
  return res.json();
}

// --- Workflow-level tool: uses webhook ---

async function waitForCallback({ description }: { description: string }) {
  // No "use step" — webhooks are workflow primitives
  const webhook = createWebhook<{ status: string }>();
  // Log the URL so external systems can call it
  console.log(`Waiting for callback at: ${webhook.url}`);

  const result = await Promise.race([
    webhook.then((req) => req.json()),
    sleep("1h").then(() => ({ status: "timeout" })),
  ]);

  return result;
}

// --- Combined tool: step I/O + workflow sleep + step I/O ---

async function retryWithCooldown({
  url,
  maxAttempts,
}: {
  url: string;
  maxAttempts: number;
}) {
  for (let i = 0; i < maxAttempts; i++) {
    const result = await attemptFetch(url);
    if (result.success) return result;
    if (i < maxAttempts - 1) {
      await sleep(`${(i + 1) * 5}s`); // Increasing cooldown between attempts
    }
  }
  return { success: false, error: "All attempts failed" };
}

async function attemptFetch(url: string) {
  "use step";
  try {
    const res = await fetch(url);
    if (!res.ok) return { success: false, status: res.status };
    return { success: true, data: await res.json() };
  } catch {
    return { success: false, error: "Network error" };
  }
}

export async function orchestrationAgent(userMessage: string) {
  "use workflow";

  const writable = getWritable<UIMessageChunk>();

  const agent = new DurableAgent({
    model: "anthropic/claude-haiku-4.5",
    instructions:
      "You are an assistant with access to search, notifications, polling, callbacks, and retry tools.",
    tools: {
      searchDatabase: {
        description: "Search the database",
        inputSchema: z.object({ query: z.string() }),
        execute: searchDatabase,
      },
      sendNotification: {
        description: "Send a notification to a user",
        inputSchema: z.object({
          userId: z.string(),
          message: z.string(),
        }),
        execute: sendNotification,
      },
      waitThenCheck: {
        description: "Wait for a duration then check an endpoint",
        inputSchema: z.object({
          delayMs: z.number().describe("Milliseconds to wait"),
          endpoint: z.string().describe("URL to check after waiting"),
        }),
        execute: waitThenCheck,
      },
      waitForCallback: {
        description: "Create a webhook and wait for an external system to call it",
        inputSchema: z.object({
          description: z.string().describe("What the callback is for"),
        }),
        execute: waitForCallback,
      },
      retryWithCooldown: {
        description: "Fetch a URL with retries and increasing cooldown between attempts",
        inputSchema: z.object({
          url: z.string(),
          maxAttempts: z.number().default(3),
        }),
        execute: retryWithCooldown,
      },
    },
  });

  await agent.stream({
    messages: [{ role: "user", content: userMessage }],
    writable,
  });
}

Key APIs

  • "use workflow" — declares the orchestrator function
  • "use step" — declares step functions with retries and Node.js access
  • sleep() — durable pause (only in workflow context)
  • createWebhook() — wait for external HTTP callbacks (only in workflow context)
  • getWritable() — stream data from steps
  • DurableAgent — agent with mixed step/workflow-level tools

On this page

GitHubEdit this page on GitHub