Batching & Parallel Processing
Process large collections in parallel batches with failure isolation between groups.
Use batching when you need to process a large list of items in parallel while controlling concurrency. Items are split into fixed-size batches, each batch runs concurrently, and failures in one batch don't affect others.
When to use this
- Processing hundreds or thousands of items (orders, images, records)
- Calling rate-limited APIs where you need to control concurrency
- Any fan-out where you want failure isolation between groups
Pattern
The workflow splits items into chunks and processes each chunk with Promise.allSettled(). A sleep() between chunks prevents overloading downstream services.
import { sleep } from "workflow";
declare function processItem(item: string): Promise<{ item: string; ok: boolean }>; // @setup
export async function processBatch(items: string[], batchSize: number = 5) {
"use workflow";
const results = [];
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
// Run batch in parallel -- failures are isolated
const outcomes = await Promise.allSettled(
batch.map((item) => processItem(item))
);
for (let j = 0; j < outcomes.length; j++) {
const outcome = outcomes[j];
results.push(
outcome.status === "fulfilled"
? outcome.value
: { item: batch[j], ok: false, error: String(outcome.reason) }
);
}
// Pace between batches to avoid overload
if (i + batchSize < items.length) {
await sleep("1s");
}
}
const succeeded = results.filter((r) => r.ok).length;
return { total: results.length, succeeded, failed: results.length - succeeded };
}Step function
Each item is processed in its own step, giving it full Node.js access and automatic retries.
async function processItem(item: string): Promise<{ item: string; ok: boolean }> {
"use step";
const res = await fetch(`https://api.example.com/process`, {
method: "POST",
body: JSON.stringify({ item }),
});
if (!res.ok) throw new Error(`Failed to process ${item}`);
return { item, ok: true };
}Variations
Scatter-gather
When you need results from multiple independent sources before continuing, fan out in parallel and collect all results:
export async function scatterGather(query: string) {
"use workflow";
const [web, database, cache] = await Promise.allSettled([
searchWeb(query),
searchDatabase(query),
searchCache(query),
]);
return {
web: web.status === "fulfilled" ? web.value : null,
database: database.status === "fulfilled" ? database.value : null,
cache: cache.status === "fulfilled" ? cache.value : null,
};
}
async function searchWeb(query: string): Promise<string[]> {
"use step";
// Full Node.js access -- call external APIs
const res = await fetch(`https://search.example.com?q=${query}`);
return res.json();
}
async function searchDatabase(query: string): Promise<string[]> {
"use step";
// Query your database
return [`db-result-for-${query}`];
}
async function searchCache(query: string): Promise<string[]> {
"use step";
return [`cached-result-for-${query}`];
}In-step concurrency control
When you need to process many items against a rate-limited API but want the entire operation to be a single atomic step, batch the work inside the step itself. This keeps the event log clean (one step instead of hundreds) while still controlling concurrency.
async function processConcurrently<T>(
items: string[],
processor: (item: string) => Promise<T>,
maxConcurrent: number = 5,
): Promise<T[]> {
"use step";
const results: T[] = [];
for (let i = 0; i < items.length; i += maxConcurrent) {
const batch = items.slice(i, i + maxConcurrent);
const batchResults = await Promise.all(batch.map(processor));
results.push(...batchResults);
}
return results;
}Usage in a workflow:
declare function processConcurrently<T>(items: string[], processor: (item: string) => Promise<T>, maxConcurrent?: number): Promise<T[]>; // @setup
export async function moderateImages(imageUrls: string[]) {
"use workflow";
const results = await processConcurrently(
imageUrls,
async (url) => {
const res = await fetch("https://api.example.com/moderate", {
method: "POST",
body: JSON.stringify({ url }),
});
return res.json();
},
3, // max 3 concurrent API calls
);
return { total: results.length, results };
}When to use in-step batching vs workflow-level batching:
- Workflow-level (the pattern above): Each item is its own step with independent retries and failure isolation. Use when items are independent and individual failures should be retried.
- In-step: All items are processed in one step. Use when the items are tightly coupled (e.g., moderating all thumbnails for a single video) or when you want to minimize step overhead for large item counts.
Tips
- Use
Promise.allSettledoverPromise.allwhen you want to continue even if some items fail.Promise.allrejects on the first failure;allSettledwaits for everything and tells you what failed. - Tune batch size to your downstream API limits. If the API allows 10 concurrent requests, use
batchSize: 10. - Add pacing with
sleep()between batches to respect rate limits. The sleep is durable -- it survives cold starts. - Each
processItemcall is an independent step. If one fails, it retries up to 3 times without affecting other items in the batch.
Key APIs
"use workflow"-- marks the orchestrator function"use step"-- marks functions that run with full Node.js accesssleep()-- pacing delay between batchesPromise.allSettled()-- runs items in parallel, isolating failures