Get Started
StateSet One Guides
- Orders Management Quickstart
- Order settlements
- Wholesale Quickstart
- Returns Quickstart
- Warranties Quickstart
- Subscriptions Quickstart
- Manufacturing and Production Quickstart
- Warehouse Quickstart
- COGS Quickstart Guide
- Supplier Quickstart
- Inventory Management Quickstart
- Error Handling Best Practices
- API Rate Limiting & Optimization
- Performance Optimization Guide
- Comprehensive Testing Guide
StateSet ResponseCX Guides
- Getting Started with ResponseCX
- Agent Training Guide
- Agent Objectives, Goals, Metrics & Rewards Guide
- Multi-Agent System Architectures
- Reinforcement Learning Platform Overview
- GRPO Agent Framework
- Knowledge Base Quickstart
- RAG Quickstart
- Agents Quickstart
- Agent Attributes & Personality
- Agent Rules Quickstart
- Examples Quickstart
- Evaluations
- Agent Schedules & Automation
- Agent Functions Quickstart
- Shopify Product Quickstart
- Gorgias Ticket Quickstart
- Vision Quickstart
- Voice AI Quickstart
- Synthetic Data Studio
- StateSet Synthetic Data Studio Architecture Guide
StateSet Commerce Network Guides
StateSet ReSponse API Documentation
Batch Operations Guide
Comprehensive guide to batch processing and bulk operations with Stateset API for high-volume data management
Batch Operations Guide
Learn how to efficiently process large volumes of data using Stateset’s batch operation capabilities. This guide covers bulk creation, updates, imports, exports, and advanced processing patterns for high-throughput scenarios.
Prerequisites
Before you begin, ensure you have:
- A Stateset account with appropriate rate limits
- Understanding of asynchronous processing patterns
- Node.js 16+ installed
- Knowledge of database optimization techniques
- Experience with queue systems (recommended)
Core Batch Concepts
Batch Operation Types
Bulk Create
Create multiple records in a single operation
Bulk Update
Update multiple records simultaneously
Bulk Import
Import large datasets from external sources
Bulk Export
Export large datasets for reporting or migration
Performance Considerations
// Batch processing configuration
const BATCH_CONFIG = {
// Optimal batch sizes for different operations
CREATE_BATCH_SIZE: 100,
UPDATE_BATCH_SIZE: 200,
IMPORT_BATCH_SIZE: 500,
EXPORT_BATCH_SIZE: 1000,
// Concurrency limits
MAX_CONCURRENT_BATCHES: 5,
MAX_RETRY_ATTEMPTS: 3,
// Rate limiting
REQUESTS_PER_SECOND: 10,
BURST_LIMIT: 50,
// Timeouts
BATCH_TIMEOUT: 60000, // 1 minute
OPERATION_TIMEOUT: 300000 // 5 minutes
};
Setting Up Batch Processing
Install Dependencies
Install required packages for batch processing:
npm install stateset-node p-limit p-queue lodash csv-parser fast-csv
# or
yarn add stateset-node p-limit p-queue lodash csv-parser fast-csv
Configure Environment
Set up environment for batch operations:
# .env
STATESET_API_KEY=your_api_key_here
STATESET_ENVIRONMENT=production
# Batch Processing Configuration
BATCH_WORKER_CONCURRENCY=5
BATCH_QUEUE_SIZE=1000
BATCH_RETRY_ATTEMPTS=3
BATCH_TIMEOUT_MS=300000
# Storage Configuration
TEMP_STORAGE_PATH=./temp/batch-files
RESULTS_STORAGE_PATH=./results
# Queue Configuration (Redis)
REDIS_URL=redis://localhost:6379
QUEUE_NAME=stateset-batch-operations
Initialize Batch Client
Create a robust batch processing client:
import { StateSetClient } from 'stateset-node';
import pLimit from 'p-limit';
import _ from 'lodash';
class BatchProcessor {
constructor(options = {}) {
this.client = new StateSetClient({
apiKey: process.env.STATESET_API_KEY,
timeout: options.timeout || 60000
});
this.concurrencyLimit = pLimit(options.concurrency || 5);
this.retryAttempts = options.retryAttempts || 3;
this.batchSizes = {
create: options.createBatchSize || 100,
update: options.updateBatchSize || 200,
import: options.importBatchSize || 500,
export: options.exportBatchSize || 1000
};
this.results = {
successful: [],
failed: [],
skipped: []
};
}
async processBatch(operation, data, options = {}) {
const startTime = Date.now();
console.log(`Starting batch ${operation} for ${data.length} items`);
try {
const batchSize = this.batchSizes[operation] || 100;
const batches = _.chunk(data, batchSize);
const results = await Promise.allSettled(
batches.map((batch, index) =>
this.concurrencyLimit(() =>
this.processSingleBatch(operation, batch, index, options)
)
)
);
const summary = this.compileBatchResults(results);
console.log(`Batch ${operation} completed in ${Date.now() - startTime}ms`, summary);
return summary;
} catch (error) {
console.error(`Batch ${operation} failed:`, error);
throw error;
}
}
async processSingleBatch(operation, batch, batchIndex, options) {
let attempt = 0;
let lastError;
while (attempt < this.retryAttempts) {
try {
console.log(`Processing batch ${batchIndex + 1}, attempt ${attempt + 1}`);
const result = await this.executeBatchOperation(operation, batch, options);
return {
batchIndex,
success: true,
count: batch.length,
result
};
} catch (error) {
lastError = error;
attempt++;
if (attempt < this.retryAttempts) {
const delay = Math.pow(2, attempt) * 1000; // Exponential backoff
console.warn(`Batch ${batchIndex + 1} failed, retrying in ${delay}ms:`, error.message);
await this.delay(delay);
}
}
}
return {
batchIndex,
success: false,
count: batch.length,
error: lastError.message
};
}
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
compileBatchResults(results) {
const summary = {
total: results.length,
successful: 0,
failed: 0,
totalItems: 0,
errors: []
};
results.forEach(result => {
if (result.status === 'fulfilled' && result.value.success) {
summary.successful++;
summary.totalItems += result.value.count;
} else {
summary.failed++;
summary.errors.push(result.reason || result.value.error);
}
});
return summary;
}
}
Bulk Order Operations
Bulk Order Creation
Efficiently create multiple orders:
class OrderBatchProcessor extends BatchProcessor {
async bulkCreateOrders(ordersData, options = {}) {
return await this.processBatch('create', ordersData, {
...options,
validateInput: this.validateOrderData,
transformInput: this.transformOrderData
});
}
async executeBatchOperation(operation, batch, options) {
switch (operation) {
case 'create':
return await this.batchCreateOrders(batch, options);
case 'update':
return await this.batchUpdateOrders(batch, options);
case 'cancel':
return await this.batchCancelOrders(batch, options);
default:
throw new Error(`Unsupported operation: ${operation}`);
}
}
async batchCreateOrders(orders, options = {}) {
// Validate orders before processing
const validatedOrders = orders.map(order => {
if (options.validateInput) {
return options.validateInput(order);
}
return this.validateOrderData(order);
});
// Transform orders if needed
const transformedOrders = validatedOrders.map(order => {
if (options.transformInput) {
return options.transformInput(order);
}
return this.transformOrderData(order);
});
// Create orders in parallel with concurrency control
const createLimit = pLimit(3); // Limit concurrent creations
const results = await Promise.allSettled(
transformedOrders.map(order =>
createLimit(async () => {
try {
const result = await this.client.orders.create(order);
return { success: true, order: result, originalData: order };
} catch (error) {
return {
success: false,
error: error.message,
originalData: order
};
}
})
)
);
return this.processCreateResults(results);
}
validateOrderData(order) {
const required = ['customer_email', 'items'];
const missing = required.filter(field => !order[field]);
if (missing.length > 0) {
throw new Error(`Missing required fields: ${missing.join(', ')}`);
}
// Validate items
if (!Array.isArray(order.items) || order.items.length === 0) {
throw new Error('Order must have at least one item');
}
order.items.forEach((item, index) => {
if (!item.sku || !item.quantity || !item.price) {
throw new Error(`Item ${index + 1} missing required fields: sku, quantity, or price`);
}
});
return order;
}
transformOrderData(order) {
return {
...order,
// Add default values
status: order.status || 'pending',
source: order.source || 'batch_import',
created_at: order.created_at || new Date().toISOString(),
// Ensure proper data types
items: order.items.map(item => ({
...item,
quantity: parseInt(item.quantity, 10),
price: parseFloat(item.price)
})),
// Add metadata
metadata: {
...order.metadata,
batch_import: true,
import_timestamp: new Date().toISOString()
}
};
}
processCreateResults(results) {
const summary = {
created: [],
failed: [],
total: results.length
};
results.forEach(result => {
if (result.status === 'fulfilled') {
const value = result.value;
if (value.success) {
summary.created.push(value.order);
} else {
summary.failed.push({
error: value.error,
data: value.originalData
});
}
} else {
summary.failed.push({
error: result.reason.message,
data: null
});
}
});
return summary;
}
}
// Usage example
const orderProcessor = new OrderBatchProcessor({ concurrency: 3 });
const sampleOrders = [
{
customer_email: 'customer1@example.com',
items: [
{ sku: 'PRODUCT-001', quantity: 2, price: 29.99 },
{ sku: 'PRODUCT-002', quantity: 1, price: 19.99 }
],
shipping_address: {
street1: '123 Main St',
city: 'Anytown',
state: 'CA',
zip: '12345',
country: 'US'
}
},
// ... more orders
];
async function bulkCreateOrdersExample() {
try {
const result = await orderProcessor.bulkCreateOrders(sampleOrders);
console.log(`Successfully created ${result.created.length} orders`);
console.log(`Failed to create ${result.failed.length} orders`);
if (result.failed.length > 0) {
console.log('Failed orders:', result.failed);
}
} catch (error) {
console.error('Bulk order creation failed:', error);
}
}
Bulk Order Updates
Update multiple orders efficiently:
class OrderUpdateProcessor extends OrderBatchProcessor {
async bulkUpdateOrders(updates, options = {}) {
return await this.processBatch('update', updates, options);
}
async batchUpdateOrders(updates, options = {}) {
const updateLimit = pLimit(5); // Higher concurrency for updates
const results = await Promise.allSettled(
updates.map(update =>
updateLimit(async () => {
try {
const { id, data } = update;
const result = await this.client.orders.update(id, data);
return { success: true, order: result, updateData: update };
} catch (error) {
return {
success: false,
error: error.message,
updateData: update
};
}
})
)
);
return this.processUpdateResults(results);
}
processUpdateResults(results) {
const summary = {
updated: [],
failed: [],
total: results.length
};
results.forEach(result => {
if (result.status === 'fulfilled') {
const value = result.value;
if (value.success) {
summary.updated.push(value.order);
} else {
summary.failed.push({
error: value.error,
data: value.updateData
});
}
} else {
summary.failed.push({
error: result.reason.message,
data: null
});
}
});
return summary;
}
// Bulk status updates
async bulkUpdateOrderStatus(orderIds, newStatus, options = {}) {
const updates = orderIds.map(id => ({
id,
data: {
status: newStatus,
updated_at: new Date().toISOString(),
status_history: [{
status: newStatus,
timestamp: new Date().toISOString(),
source: 'batch_update'
}]
}
}));
return await this.bulkUpdateOrders(updates, options);
}
// Bulk tracking number updates
async bulkUpdateTrackingNumbers(trackingUpdates, options = {}) {
const updates = trackingUpdates.map(({ orderId, trackingNumber, carrier }) => ({
id: orderId,
data: {
tracking_number: trackingNumber,
carrier: carrier,
status: 'shipped',
shipped_at: new Date().toISOString()
}
}));
return await this.bulkUpdateOrders(updates, options);
}
}
// Usage examples
const updateProcessor = new OrderUpdateProcessor();
// Bulk status update
await updateProcessor.bulkUpdateOrderStatus(
['ord_123', 'ord_124', 'ord_125'],
'processing'
);
// Bulk tracking number update
await updateProcessor.bulkUpdateTrackingNumbers([
{ orderId: 'ord_123', trackingNumber: 'TRACK123', carrier: 'UPS' },
{ orderId: 'ord_124', trackingNumber: 'TRACK124', carrier: 'FedEx' },
{ orderId: 'ord_125', trackingNumber: 'TRACK125', carrier: 'USPS' }
]);
Bulk Inventory Operations
Inventory Import and Synchronization
Handle large inventory datasets:
class InventoryBatchProcessor extends BatchProcessor {
async bulkUpdateInventory(inventoryData, options = {}) {
return await this.processBatch('update', inventoryData, {
...options,
validateInput: this.validateInventoryData,
transformInput: this.transformInventoryData
});
}
async executeBatchOperation(operation, batch, options) {
switch (operation) {
case 'update':
return await this.batchUpdateInventory(batch, options);
case 'sync':
return await this.batchSyncInventory(batch, options);
case 'adjust':
return await this.batchAdjustInventory(batch, options);
default:
throw new Error(`Unsupported inventory operation: ${operation}`);
}
}
async batchUpdateInventory(inventoryItems, options = {}) {
const updateLimit = pLimit(8); // Higher concurrency for inventory
const results = await Promise.allSettled(
inventoryItems.map(item =>
updateLimit(async () => {
try {
const result = await this.client.inventory.update(item.sku, {
quantity: item.quantity,
location: item.location || 'default',
updated_at: new Date().toISOString(),
source: 'batch_update'
});
return { success: true, item: result, originalData: item };
} catch (error) {
return {
success: false,
error: error.message,
originalData: item
};
}
})
)
);
return this.processInventoryResults(results);
}
// Advanced inventory synchronization
async batchSyncInventory(inventoryItems, options = {}) {
const { source = 'external_system' } = options;
// Group by location for efficient processing
const itemsByLocation = _.groupBy(inventoryItems, 'location');
const locationResults = await Promise.allSettled(
Object.entries(itemsByLocation).map(([location, items]) =>
this.syncLocationInventory(location, items, { source })
)
);
return this.compileSyncResults(locationResults);
}
async syncLocationInventory(location, items, options) {
const { source } = options;
// Get current inventory for comparison
const currentInventory = await this.getCurrentInventory(location);
const currentBySku = new Map(
currentInventory.map(item => [item.sku, item])
);
const syncOperations = [];
for (const item of items) {
const current = currentBySku.get(item.sku);
if (!current) {
// New item - create
syncOperations.push({
operation: 'create',
sku: item.sku,
data: {
...item,
location,
source,
created_at: new Date().toISOString()
}
});
} else if (current.quantity !== item.quantity) {
// Existing item with different quantity - update
syncOperations.push({
operation: 'update',
sku: item.sku,
data: {
quantity: item.quantity,
location,
source,
updated_at: new Date().toISOString(),
previous_quantity: current.quantity
}
});
}
// Remove from current map (remaining items will be marked as discontinued)
currentBySku.delete(item.sku);
}
// Handle discontinued items
for (const [sku, current] of currentBySku.entries()) {
if (options.markDiscontinued) {
syncOperations.push({
operation: 'discontinue',
sku,
data: {
quantity: 0,
location,
status: 'discontinued',
discontinued_at: new Date().toISOString(),
source
}
});
}
}
// Execute sync operations
return await this.executeSyncOperations(syncOperations);
}
async executeSyncOperations(operations) {
const operationLimit = pLimit(6);
const results = await Promise.allSettled(
operations.map(op =>
operationLimit(async () => {
try {
let result;
switch (op.operation) {
case 'create':
result = await this.client.inventory.create(op.data);
break;
case 'update':
result = await this.client.inventory.update(op.sku, op.data);
break;
case 'discontinue':
result = await this.client.inventory.update(op.sku, op.data);
break;
default:
throw new Error(`Unknown operation: ${op.operation}`);
}
return {
success: true,
operation: op.operation,
sku: op.sku,
result
};
} catch (error) {
return {
success: false,
operation: op.operation,
sku: op.sku,
error: error.message
};
}
})
)
);
return this.processSyncResults(results);
}
validateInventoryData(item) {
const required = ['sku', 'quantity'];
const missing = required.filter(field => item[field] === undefined || item[field] === null);
if (missing.length > 0) {
throw new Error(`Missing required fields: ${missing.join(', ')}`);
}
if (typeof item.quantity !== 'number' || item.quantity < 0) {
throw new Error('Quantity must be a non-negative number');
}
return item;
}
transformInventoryData(item) {
return {
...item,
sku: item.sku.toUpperCase().trim(),
quantity: parseInt(item.quantity, 10),
location: item.location || 'default',
updated_at: new Date().toISOString()
};
}
processInventoryResults(results) {
const summary = {
updated: [],
failed: [],
total: results.length
};
results.forEach(result => {
if (result.status === 'fulfilled') {
const value = result.value;
if (value.success) {
summary.updated.push(value.item);
} else {
summary.failed.push({
error: value.error,
data: value.originalData
});
}
} else {
summary.failed.push({
error: result.reason.message,
data: null
});
}
});
return summary;
}
}
// Usage example
const inventoryProcessor = new InventoryBatchProcessor({ concurrency: 8 });
const inventoryUpdates = [
{ sku: 'PRODUCT-001', quantity: 150, location: 'warehouse_1' },
{ sku: 'PRODUCT-002', quantity: 75, location: 'warehouse_1' },
{ sku: 'PRODUCT-003', quantity: 200, location: 'warehouse_2' },
// ... more inventory items
];
await inventoryProcessor.bulkUpdateInventory(inventoryUpdates);
File-Based Batch Operations
CSV Import Processing
Handle large CSV files efficiently:
import fs from 'fs';
import csv from 'csv-parser';
import { createReadStream } from 'fs';
class CSVBatchProcessor extends BatchProcessor {
async processCSVFile(filePath, options = {}) {
const {
batchSize = 1000,
skipHeader = true,
transform = null,
validate = null
} = options;
return new Promise((resolve, reject) => {
const results = [];
const errors = [];
let batch = [];
let lineNumber = 0;
createReadStream(filePath)
.pipe(csv())
.on('data', async (row) => {
lineNumber++;
try {
// Apply validation if provided
if (validate) {
validate(row, lineNumber);
}
// Apply transformation if provided
const transformedRow = transform ? transform(row, lineNumber) : row;
batch.push(transformedRow);
// Process batch when it reaches the specified size
if (batch.length >= batchSize) {
const batchResults = await this.processBatch('import', batch);
results.push(batchResults);
batch = [];
}
} catch (error) {
errors.push({
line: lineNumber,
data: row,
error: error.message
});
}
})
.on('end', async () => {
try {
// Process remaining items in the final batch
if (batch.length > 0) {
const batchResults = await this.processBatch('import', batch);
results.push(batchResults);
}
resolve({
totalLines: lineNumber,
totalBatches: results.length,
results,
errors
});
} catch (error) {
reject(error);
}
})
.on('error', reject);
});
}
// Order CSV import
async importOrdersFromCSV(filePath, options = {}) {
return await this.processCSVFile(filePath, {
...options,
validate: this.validateOrderCSVRow,
transform: this.transformOrderCSVRow
});
}
validateOrderCSVRow(row, lineNumber) {
const required = ['customer_email', 'item_sku', 'item_quantity', 'item_price'];
const missing = required.filter(field => !row[field] || row[field].trim() === '');
if (missing.length > 0) {
throw new Error(`Line ${lineNumber}: Missing required fields: ${missing.join(', ')}`);
}
// Validate email format
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
if (!emailRegex.test(row.customer_email)) {
throw new Error(`Line ${lineNumber}: Invalid email format`);
}
// Validate numeric fields
if (isNaN(parseFloat(row.item_price)) || parseFloat(row.item_price) <= 0) {
throw new Error(`Line ${lineNumber}: Invalid item price`);
}
if (isNaN(parseInt(row.item_quantity)) || parseInt(row.item_quantity) <= 0) {
throw new Error(`Line ${lineNumber}: Invalid item quantity`);
}
}
transformOrderCSVRow(row, lineNumber) {
return {
customer_email: row.customer_email.trim().toLowerCase(),
customer_name: row.customer_name?.trim() || '',
items: [{
sku: row.item_sku.trim().toUpperCase(),
quantity: parseInt(row.item_quantity, 10),
price: parseFloat(row.item_price),
name: row.item_name?.trim() || ''
}],
shipping_address: {
street1: row.shipping_street1?.trim() || '',
street2: row.shipping_street2?.trim() || '',
city: row.shipping_city?.trim() || '',
state: row.shipping_state?.trim() || '',
zip: row.shipping_zip?.trim() || '',
country: row.shipping_country?.trim().toUpperCase() || 'US'
},
source: 'csv_import',
import_line_number: lineNumber,
imported_at: new Date().toISOString()
};
}
// Generate import report
generateImportReport(results, outputPath) {
const report = {
summary: {
total_lines: results.totalLines,
total_batches: results.totalBatches,
successful_imports: 0,
failed_imports: 0,
error_count: results.errors.length
},
batch_details: [],
errors: results.errors
};
results.results.forEach((batch, index) => {
report.summary.successful_imports += batch.successful || 0;
report.summary.failed_imports += batch.failed || 0;
report.batch_details.push({
batch_number: index + 1,
items_processed: batch.totalItems || 0,
successful: batch.successful || 0,
failed: batch.failed || 0,
errors: batch.errors || []
});
});
// Write report to file
fs.writeFileSync(outputPath, JSON.stringify(report, null, 2));
return report;
}
}
// Usage example
const csvProcessor = new CSVBatchProcessor({ concurrency: 3 });
async function importOrdersExample() {
try {
const results = await csvProcessor.importOrdersFromCSV('./orders.csv', {
batchSize: 500
});
console.log(`Processed ${results.totalLines} lines in ${results.totalBatches} batches`);
console.log(`Errors: ${results.errors.length}`);
// Generate and save report
const report = csvProcessor.generateImportReport(results, './import-report.json');
console.log('Import report saved to ./import-report.json');
} catch (error) {
console.error('CSV import failed:', error);
}
}
Export Operations
Export large datasets efficiently:
import { createWriteStream } from 'fs';
import fastCsv from 'fast-csv';
class ExportProcessor extends BatchProcessor {
async exportOrdersToCSV(filters = {}, outputPath, options = {}) {
const {
batchSize = 1000,
includeFields = null,
transform = null
} = options;
const writeStream = createWriteStream(outputPath);
const csvStream = fastCsv.format({ headers: true });
csvStream.pipe(writeStream);
let offset = 0;
let hasMore = true;
let totalExported = 0;
try {
while (hasMore) {
console.log(`Exporting batch starting at offset ${offset}`);
const orders = await this.client.orders.list({
...filters,
limit: batchSize,
offset,
include: ['line_items', 'customer', 'shipping_address']
});
if (orders.orders.length === 0) {
hasMore = false;
break;
}
// Transform orders for CSV export
const transformedOrders = orders.orders.map(order => {
const baseData = {
order_id: order.id,
order_number: order.order_number,
customer_email: order.customer_email,
customer_name: order.customer_name,
status: order.status,
total_amount: order.total_amount,
created_date: order.created_date,
updated_date: order.updated_date
};
// Include line items as separate rows or flatten
if (order.order_line_items && order.order_line_items.length > 0) {
return order.order_line_items.map(item => ({
...baseData,
item_sku: item.sku_name,
item_name: item.product_name,
item_quantity: item.quantity,
item_price: item.sale_price,
item_brand: item.brand
}));
}
return [baseData];
}).flat();
// Apply custom transformation if provided
const finalData = transform ?
transformedOrders.map(transform) :
transformedOrders;
// Filter fields if specified
const filteredData = includeFields ?
finalData.map(row => _.pick(row, includeFields)) :
finalData;
// Write to CSV
for (const row of filteredData) {
csvStream.write(row);
}
totalExported += filteredData.length;
offset += batchSize;
// Check if we have more data
hasMore = orders.orders.length === batchSize;
// Add delay to respect rate limits
await this.delay(100);
}
csvStream.end();
return new Promise((resolve, reject) => {
writeStream.on('finish', () => {
console.log(`Export completed: ${totalExported} records exported to ${outputPath}`);
resolve({ totalExported, outputPath });
});
writeStream.on('error', reject);
});
} catch (error) {
csvStream.end();
throw error;
}
}
// Export with progress tracking
async exportOrdersWithProgress(filters = {}, outputPath, options = {}) {
const {
onProgress = null,
...exportOptions
} = options;
// Get total count first
const countResponse = await this.client.orders.list({
...filters,
limit: 1
});
const totalOrders = countResponse.total_count;
let processedOrders = 0;
// Wrap the export with progress tracking
const progressTracker = {
onProgress: (batch) => {
processedOrders += batch.length;
const progress = Math.round((processedOrders / totalOrders) * 100);
console.log(`Export progress: ${processedOrders}/${totalOrders} (${progress}%)`);
if (onProgress) {
onProgress({
processed: processedOrders,
total: totalOrders,
percentage: progress
});
}
}
};
return await this.exportOrdersToCSV(filters, outputPath, {
...exportOptions,
progressTracker
});
}
}
// Usage example
const exportProcessor = new ExportProcessor();
// Export all orders from the last 30 days
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000).toISOString();
await exportProcessor.exportOrdersWithProgress(
{
created_after: thirtyDaysAgo,
status_in: ['processing', 'shipped', 'delivered']
},
'./orders-export.csv',
{
batchSize: 500,
includeFields: [
'order_id', 'order_number', 'customer_email', 'status',
'total_amount', 'created_date', 'item_sku', 'item_quantity'
],
onProgress: (progress) => {
// Update progress bar or send notification
console.log(`Progress: ${progress.percentage}%`);
}
}
);
Queue-Based Batch Processing
Background Job Processing
Implement robust background processing with queues:
import Queue from 'bull';
import Redis from 'ioredis';
class QueuedBatchProcessor extends BatchProcessor {
constructor(options = {}) {
super(options);
this.redis = new Redis(process.env.REDIS_URL);
this.batchQueue = new Queue('batch operations', process.env.REDIS_URL);
this.setupJobProcessors();
this.setupEventHandlers();
}
setupJobProcessors() {
// Process batch creation jobs
this.batchQueue.process('bulk-create-orders', 5, async (job) => {
const { orders, options } = job.data;
job.progress(0);
try {
const result = await this.bulkCreateOrders(orders, {
...options,
onProgress: (progress) => {
job.progress(Math.round((progress.completed / progress.total) * 100));
}
});
job.progress(100);
return result;
} catch (error) {
console.error('Batch job failed:', error);
throw error;
}
});
// Process batch update jobs
this.batchQueue.process('bulk-update-orders', 3, async (job) => {
const { updates, options } = job.data;
job.progress(0);
const result = await this.bulkUpdateOrders(updates, {
...options,
onProgress: (progress) => {
job.progress(Math.round((progress.completed / progress.total) * 100));
}
});
job.progress(100);
return result;
});
// Process CSV import jobs
this.batchQueue.process('csv-import', 1, async (job) => {
const { filePath, type, options } = job.data;
job.progress(0);
let result;
switch (type) {
case 'orders':
result = await this.importOrdersFromCSV(filePath, {
...options,
onProgress: (progress) => {
job.progress(Math.round((progress.completed / progress.total) * 100));
}
});
break;
case 'inventory':
result = await this.importInventoryFromCSV(filePath, options);
break;
default:
throw new Error(`Unsupported import type: ${type}`);
}
job.progress(100);
return result;
});
}
setupEventHandlers() {
this.batchQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed successfully`);
this.notifyJobCompletion(job, result, 'completed');
});
this.batchQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err);
this.notifyJobCompletion(job, null, 'failed', err);
});
this.batchQueue.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
}
async queueBulkCreateOrders(orders, options = {}) {
const job = await this.batchQueue.add('bulk-create-orders', {
orders,
options
}, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: 10,
removeOnFail: 5
});
return {
jobId: job.id,
status: 'queued'
};
}
async queueCSVImport(filePath, type, options = {}) {
const job = await this.batchQueue.add('csv-import', {
filePath,
type,
options
}, {
attempts: 2,
timeout: 300000, // 5 minutes
removeOnComplete: 5,
removeOnFail: 3
});
return {
jobId: job.id,
status: 'queued'
};
}
async getJobStatus(jobId) {
const job = await this.batchQueue.getJob(jobId);
if (!job) {
return { status: 'not_found' };
}
const state = await job.getState();
const progress = job.progress();
return {
jobId: job.id,
status: state,
progress,
data: job.data,
result: job.returnvalue,
error: job.failedReason,
created: job.timestamp,
processed: job.processedOn,
finished: job.finishedOn
};
}
async notifyJobCompletion(job, result, status, error = null) {
const notification = {
jobId: job.id,
jobType: job.name,
status,
timestamp: new Date().toISOString(),
result,
error: error?.message
};
// Send webhook notification
if (process.env.WEBHOOK_NOTIFICATION_URL) {
try {
await fetch(process.env.WEBHOOK_NOTIFICATION_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(notification)
});
} catch (webhookError) {
console.error('Failed to send webhook notification:', webhookError);
}
}
// Store notification in database or send email
// Implementation depends on your notification system
}
}
// Usage example
const queueProcessor = new QueuedBatchProcessor();
// Queue a bulk order creation job
const createJob = await queueProcessor.queueBulkCreateOrders(sampleOrders, {
validateInput: true,
notifyOnCompletion: true
});
console.log(`Bulk create job queued with ID: ${createJob.jobId}`);
// Check job status
setInterval(async () => {
const status = await queueProcessor.getJobStatus(createJob.jobId);
console.log(`Job ${createJob.jobId} status:`, status);
if (['completed', 'failed'].includes(status.status)) {
clearInterval(this);
}
}, 5000);
Performance Optimization
Memory Management and Streaming
Handle very large datasets efficiently:
class StreamingBatchProcessor extends BatchProcessor {
async streamProcessLargeDataset(dataStream, processor, options = {}) {
const {
batchSize = 1000,
maxMemoryUsage = 512 * 1024 * 1024, // 512MB
gcInterval = 10000 // Force GC every 10k items
} = options;
let batch = [];
let processedCount = 0;
let errorCount = 0;
return new Promise((resolve, reject) => {
dataStream
.on('data', async (item) => {
batch.push(item);
if (batch.length >= batchSize) {
try {
// Pause stream while processing batch
dataStream.pause();
const result = await processor(batch);
processedCount += result.successful || 0;
errorCount += result.failed || 0;
// Clear batch and force garbage collection if needed
batch = [];
if (processedCount % gcInterval === 0) {
this.forceGarbageCollection();
this.checkMemoryUsage(maxMemoryUsage);
}
// Resume stream
dataStream.resume();
} catch (error) {
dataStream.destroy(error);
return;
}
}
})
.on('end', async () => {
try {
// Process final batch
if (batch.length > 0) {
const result = await processor(batch);
processedCount += result.successful || 0;
errorCount += result.failed || 0;
}
resolve({
processed: processedCount,
errors: errorCount,
total: processedCount + errorCount
});
} catch (error) {
reject(error);
}
})
.on('error', reject);
});
}
forceGarbageCollection() {
if (global.gc) {
global.gc();
console.log('Forced garbage collection');
}
}
checkMemoryUsage(maxUsage) {
const used = process.memoryUsage();
if (used.heapUsed > maxUsage) {
console.warn(`High memory usage detected: ${Math.round(used.heapUsed / 1024 / 1024)}MB`);
if (used.heapUsed > maxUsage * 1.5) {
throw new Error('Memory usage too high, aborting batch processing');
}
}
}
// Optimized batch processing with connection pooling
async optimizedBatchProcess(data, operation, options = {}) {
const {
connectionPoolSize = 10,
maxRetries = 3,
retryDelay = 1000
} = options;
// Create connection pool
const pool = Array.from({ length: connectionPoolSize }, () =>
new StateSetClient({
apiKey: process.env.STATESET_API_KEY,
timeout: 60000,
keepAlive: true
})
);
let poolIndex = 0;
const getClient = () => {
const client = pool[poolIndex % pool.length];
poolIndex++;
return client;
};
try {
const processLimit = pLimit(connectionPoolSize);
const results = await Promise.allSettled(
data.map(item =>
processLimit(async () => {
let attempt = 0;
let lastError;
while (attempt < maxRetries) {
try {
const client = getClient();
const result = await this.executeOperation(client, operation, item);
return { success: true, result, item };
} catch (error) {
lastError = error;
attempt++;
if (attempt < maxRetries) {
await this.delay(retryDelay * Math.pow(2, attempt - 1));
}
}
}
return { success: false, error: lastError.message, item };
})
)
);
return this.compileResults(results);
} finally {
// Clean up connection pool
for (const client of pool) {
if (client.destroy) {
client.destroy();
}
}
}
}
async executeOperation(client, operation, item) {
switch (operation) {
case 'create_order':
return await client.orders.create(item);
case 'update_order':
return await client.orders.update(item.id, item.data);
case 'update_inventory':
return await client.inventory.update(item.sku, item.data);
default:
throw new Error(`Unsupported operation: ${operation}`);
}
}
}
Best Practices and Monitoring
Comprehensive Monitoring
Monitor batch operation performance:
class BatchMonitor {
constructor() {
this.metrics = {
totalOperations: 0,
successfulOperations: 0,
failedOperations: 0,
averageProcessingTime: 0,
peakMemoryUsage: 0,
operationHistory: []
};
}
recordOperation(operation, duration, success, memoryUsage) {
this.metrics.totalOperations++;
if (success) {
this.metrics.successfulOperations++;
} else {
this.metrics.failedOperations++;
}
// Update average processing time
const totalTime = this.metrics.averageProcessingTime * (this.metrics.totalOperations - 1) + duration;
this.metrics.averageProcessingTime = totalTime / this.metrics.totalOperations;
// Track peak memory usage
if (memoryUsage > this.metrics.peakMemoryUsage) {
this.metrics.peakMemoryUsage = memoryUsage;
}
// Store operation history (keep last 1000)
this.metrics.operationHistory.push({
operation,
duration,
success,
memoryUsage,
timestamp: new Date().toISOString()
});
if (this.metrics.operationHistory.length > 1000) {
this.metrics.operationHistory.shift();
}
}
getSuccessRate() {
if (this.metrics.totalOperations === 0) return 0;
return (this.metrics.successfulOperations / this.metrics.totalOperations) * 100;
}
generateReport() {
return {
summary: {
total_operations: this.metrics.totalOperations,
successful_operations: this.metrics.successfulOperations,
failed_operations: this.metrics.failedOperations,
success_rate: `${this.getSuccessRate().toFixed(2)}%`,
average_processing_time: `${this.metrics.averageProcessingTime.toFixed(2)}ms`,
peak_memory_usage: `${(this.metrics.peakMemoryUsage / 1024 / 1024).toFixed(2)}MB`
},
recent_operations: this.metrics.operationHistory.slice(-10),
recommendations: this.generateRecommendations()
};
}
generateRecommendations() {
const recommendations = [];
const successRate = this.getSuccessRate();
if (successRate < 95) {
recommendations.push('Consider reducing batch size or improving error handling');
}
if (this.metrics.averageProcessingTime > 30000) {
recommendations.push('Processing time is high, consider optimizing batch operations');
}
if (this.metrics.peakMemoryUsage > 1024 * 1024 * 1024) {
recommendations.push('High memory usage detected, consider streaming or smaller batches');
}
return recommendations;
}
}
// Enhanced batch processor with monitoring
class MonitoredBatchProcessor extends StreamingBatchProcessor {
constructor(options = {}) {
super(options);
this.monitor = new BatchMonitor();
}
async processBatch(operation, data, options = {}) {
const startTime = Date.now();
const startMemory = process.memoryUsage().heapUsed;
try {
const result = await super.processBatch(operation, data, options);
const duration = Date.now() - startTime;
const endMemory = process.memoryUsage().heapUsed;
this.monitor.recordOperation(operation, duration, true, endMemory);
return {
...result,
processingTime: duration,
memoryUsed: endMemory - startMemory
};
} catch (error) {
const duration = Date.now() - startTime;
const endMemory = process.memoryUsage().heapUsed;
this.monitor.recordOperation(operation, duration, false, endMemory);
throw error;
}
}
getMonitoringReport() {
return this.monitor.generateReport();
}
}
Troubleshooting
- Use streaming for large datasets
- Implement proper garbage collection
- Monitor memory usage and set limits
- Process data in smaller batches
- Implement proper retry logic with exponential backoff
- Use connection pooling
- Monitor API rate limits
- Distribute load across multiple time periods
- Implement comprehensive validation
- Use transformation functions
- Handle edge cases and malformed data
- Provide detailed error reporting
- Optimize batch sizes for your use case
- Use appropriate concurrency limits
- Implement caching where possible
- Monitor and profile operations
Next Steps
API Testing Guide
Test your batch operations thoroughly
Monitoring Guide
Set up comprehensive monitoring
Performance Optimization
Optimize batch processing performance
Integration Patterns
Learn integration best practices
Conclusion
Efficient batch processing is crucial for handling large volumes of data in production systems. This guide provides comprehensive patterns for:
- ✅ Bulk order and inventory operations
- ✅ CSV import/export processing
- ✅ Queue-based background processing
- ✅ Memory-efficient streaming operations
- ✅ Performance monitoring and optimization
With these patterns, you can build robust, scalable batch processing systems that handle high-volume operations efficiently while maintaining data integrity and system performance.
- Batch Operations Guide
- Prerequisites
- Core Batch Concepts
- Batch Operation Types
- Performance Considerations
- Setting Up Batch Processing
- Bulk Order Operations
- Bulk Order Creation
- Bulk Order Updates
- Bulk Inventory Operations
- Inventory Import and Synchronization
- File-Based Batch Operations
- CSV Import Processing
- Export Operations
- Queue-Based Batch Processing
- Background Job Processing
- Performance Optimization
- Memory Management and Streaming
- Best Practices and Monitoring
- Comprehensive Monitoring
- Troubleshooting
- Next Steps
- Conclusion