Overview

Batching & Parallel Processing

Process large collections in parallel batches with failure isolation between groups.

Use batching when you need to process a large list of items in parallel while controlling concurrency. Items are split into fixed-size batches, each batch runs concurrently, and failures in one batch don't affect others.

When to use this

  • Processing hundreds or thousands of items (orders, images, records)
  • Calling rate-limited APIs where you need to control concurrency
  • Any fan-out where you want failure isolation between groups

Pattern

The workflow splits items into chunks and processes each chunk with Promise.allSettled(). A sleep() between chunks prevents overloading downstream services.

import { sleep } from "workflow";

declare function processItem(item: string): Promise<{ item: string; ok: boolean }>; // @setup

export async function processBatch(items: string[], batchSize: number = 5) {
  "use workflow";

  const results = [];

  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize);

    // Run batch in parallel -- failures are isolated
    const outcomes = await Promise.allSettled(
      batch.map((item) => processItem(item))
    );

    for (let j = 0; j < outcomes.length; j++) {
      const outcome = outcomes[j];
      results.push(
        outcome.status === "fulfilled"
          ? outcome.value
          : { item: batch[j], ok: false, error: String(outcome.reason) }
      );
    }

    // Pace between batches to avoid overload
    if (i + batchSize < items.length) {
      await sleep("1s");
    }
  }

  const succeeded = results.filter((r) => r.ok).length;
  return { total: results.length, succeeded, failed: results.length - succeeded };
}

Step function

Each item is processed in its own step, giving it full Node.js access and automatic retries.

async function processItem(item: string): Promise<{ item: string; ok: boolean }> {
  "use step";
  const res = await fetch(`https://api.example.com/process`, {
    method: "POST",
    body: JSON.stringify({ item }),
  });
  if (!res.ok) throw new Error(`Failed to process ${item}`);
  return { item, ok: true };
}

Variations

Scatter-gather

When you need results from multiple independent sources before continuing, fan out in parallel and collect all results:

export async function scatterGather(query: string) {
  "use workflow";

  const [web, database, cache] = await Promise.allSettled([
    searchWeb(query),
    searchDatabase(query),
    searchCache(query),
  ]);

  return {
    web: web.status === "fulfilled" ? web.value : null,
    database: database.status === "fulfilled" ? database.value : null,
    cache: cache.status === "fulfilled" ? cache.value : null,
  };
}

async function searchWeb(query: string): Promise<string[]> {
  "use step";
  // Full Node.js access -- call external APIs
  const res = await fetch(`https://search.example.com?q=${query}`);
  return res.json();
}

async function searchDatabase(query: string): Promise<string[]> {
  "use step";
  // Query your database
  return [`db-result-for-${query}`];
}

async function searchCache(query: string): Promise<string[]> {
  "use step";
  return [`cached-result-for-${query}`];
}

In-step concurrency control

When you need to process many items against a rate-limited API but want the entire operation to be a single atomic step, batch the work inside the step itself. This keeps the event log clean (one step instead of hundreds) while still controlling concurrency.

async function processConcurrently<T>(
  items: string[],
  processor: (item: string) => Promise<T>,
  maxConcurrent: number = 5,
): Promise<T[]> {
  "use step";
  const results: T[] = [];

  for (let i = 0; i < items.length; i += maxConcurrent) {
    const batch = items.slice(i, i + maxConcurrent);
    const batchResults = await Promise.all(batch.map(processor));
    results.push(...batchResults);
  }

  return results;
}

Usage in a workflow:

declare function processConcurrently<T>(items: string[], processor: (item: string) => Promise<T>, maxConcurrent?: number): Promise<T[]>; // @setup

export async function moderateImages(imageUrls: string[]) {
  "use workflow";

  const results = await processConcurrently(
    imageUrls,
    async (url) => {
      const res = await fetch("https://api.example.com/moderate", {
        method: "POST",
        body: JSON.stringify({ url }),
      });
      return res.json();
    },
    3, // max 3 concurrent API calls
  );

  return { total: results.length, results };
}

When to use in-step batching vs workflow-level batching:

  • Workflow-level (the pattern above): Each item is its own step with independent retries and failure isolation. Use when items are independent and individual failures should be retried.
  • In-step: All items are processed in one step. Use when the items are tightly coupled (e.g., moderating all thumbnails for a single video) or when you want to minimize step overhead for large item counts.

Tips

  • Use Promise.allSettled over Promise.all when you want to continue even if some items fail. Promise.all rejects on the first failure; allSettled waits for everything and tells you what failed.
  • Tune batch size to your downstream API limits. If the API allows 10 concurrent requests, use batchSize: 10.
  • Add pacing with sleep() between batches to respect rate limits. The sleep is durable -- it survives cold starts.
  • Each processItem call is an independent step. If one fails, it retries up to 3 times without affecting other items in the batch.

Key APIs