Batch Operations Guide

Learn how to efficiently process large volumes of data using Stateset’s batch operation capabilities. This guide covers bulk creation, updates, imports, exports, and advanced processing patterns for high-throughput scenarios.

Prerequisites

Before you begin, ensure you have:

  • A Stateset account with appropriate rate limits
  • Understanding of asynchronous processing patterns
  • Node.js 16+ installed
  • Knowledge of database optimization techniques
  • Experience with queue systems (recommended)

Core Batch Concepts

Batch Operation Types

Bulk Create

Create multiple records in a single operation

Bulk Update

Update multiple records simultaneously

Bulk Import

Import large datasets from external sources

Bulk Export

Export large datasets for reporting or migration

Performance Considerations

// Batch processing configuration
const BATCH_CONFIG = {
  // Optimal batch sizes for different operations
  CREATE_BATCH_SIZE: 100,
  UPDATE_BATCH_SIZE: 200,
  IMPORT_BATCH_SIZE: 500,
  EXPORT_BATCH_SIZE: 1000,
  
  // Concurrency limits
  MAX_CONCURRENT_BATCHES: 5,
  MAX_RETRY_ATTEMPTS: 3,
  
  // Rate limiting
  REQUESTS_PER_SECOND: 10,
  BURST_LIMIT: 50,
  
  // Timeouts
  BATCH_TIMEOUT: 60000,      // 1 minute
  OPERATION_TIMEOUT: 300000  // 5 minutes
};

Setting Up Batch Processing

1

Install Dependencies

Install required packages for batch processing:

npm install stateset-node p-limit p-queue lodash csv-parser fast-csv
# or
yarn add stateset-node p-limit p-queue lodash csv-parser fast-csv
2

Configure Environment

Set up environment for batch operations:

# .env
STATESET_API_KEY=your_api_key_here
STATESET_ENVIRONMENT=production

# Batch Processing Configuration
BATCH_WORKER_CONCURRENCY=5
BATCH_QUEUE_SIZE=1000
BATCH_RETRY_ATTEMPTS=3
BATCH_TIMEOUT_MS=300000

# Storage Configuration
TEMP_STORAGE_PATH=./temp/batch-files
RESULTS_STORAGE_PATH=./results

# Queue Configuration (Redis)
REDIS_URL=redis://localhost:6379
QUEUE_NAME=stateset-batch-operations
3

Initialize Batch Client

Create a robust batch processing client:

import { StateSetClient } from 'stateset-node';
import pLimit from 'p-limit';
import _ from 'lodash';

class BatchProcessor {
  constructor(options = {}) {
    this.client = new StateSetClient({
      apiKey: process.env.STATESET_API_KEY,
      timeout: options.timeout || 60000
    });
    
    this.concurrencyLimit = pLimit(options.concurrency || 5);
    this.retryAttempts = options.retryAttempts || 3;
    this.batchSizes = {
      create: options.createBatchSize || 100,
      update: options.updateBatchSize || 200,
      import: options.importBatchSize || 500,
      export: options.exportBatchSize || 1000
    };
    
    this.results = {
      successful: [],
      failed: [],
      skipped: []
    };
  }
  
  async processBatch(operation, data, options = {}) {
    const startTime = Date.now();
    console.log(`Starting batch ${operation} for ${data.length} items`);
    
    try {
      const batchSize = this.batchSizes[operation] || 100;
      const batches = _.chunk(data, batchSize);
      
      const results = await Promise.allSettled(
        batches.map((batch, index) => 
          this.concurrencyLimit(() => 
            this.processSingleBatch(operation, batch, index, options)
          )
        )
      );
      
      const summary = this.compileBatchResults(results);
      
      console.log(`Batch ${operation} completed in ${Date.now() - startTime}ms`, summary);
      
      return summary;
      
    } catch (error) {
      console.error(`Batch ${operation} failed:`, error);
      throw error;
    }
  }
  
  async processSingleBatch(operation, batch, batchIndex, options) {
    let attempt = 0;
    let lastError;
    
    while (attempt < this.retryAttempts) {
      try {
        console.log(`Processing batch ${batchIndex + 1}, attempt ${attempt + 1}`);
        
        const result = await this.executeBatchOperation(operation, batch, options);
        
        return {
          batchIndex,
          success: true,
          count: batch.length,
          result
        };
        
      } catch (error) {
        lastError = error;
        attempt++;
        
        if (attempt < this.retryAttempts) {
          const delay = Math.pow(2, attempt) * 1000; // Exponential backoff
          console.warn(`Batch ${batchIndex + 1} failed, retrying in ${delay}ms:`, error.message);
          await this.delay(delay);
        }
      }
    }
    
    return {
      batchIndex,
      success: false,
      count: batch.length,
      error: lastError.message
    };
  }
  
  delay(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
  
  compileBatchResults(results) {
    const summary = {
      total: results.length,
      successful: 0,
      failed: 0,
      totalItems: 0,
      errors: []
    };
    
    results.forEach(result => {
      if (result.status === 'fulfilled' && result.value.success) {
        summary.successful++;
        summary.totalItems += result.value.count;
      } else {
        summary.failed++;
        summary.errors.push(result.reason || result.value.error);
      }
    });
    
    return summary;
  }
}

Bulk Order Operations

Bulk Order Creation

Efficiently create multiple orders:

class OrderBatchProcessor extends BatchProcessor {
  async bulkCreateOrders(ordersData, options = {}) {
    return await this.processBatch('create', ordersData, {
      ...options,
      validateInput: this.validateOrderData,
      transformInput: this.transformOrderData
    });
  }
  
  async executeBatchOperation(operation, batch, options) {
    switch (operation) {
      case 'create':
        return await this.batchCreateOrders(batch, options);
      case 'update':
        return await this.batchUpdateOrders(batch, options);
      case 'cancel':
        return await this.batchCancelOrders(batch, options);
      default:
        throw new Error(`Unsupported operation: ${operation}`);
    }
  }
  
  async batchCreateOrders(orders, options = {}) {
    // Validate orders before processing
    const validatedOrders = orders.map(order => {
      if (options.validateInput) {
        return options.validateInput(order);
      }
      return this.validateOrderData(order);
    });
    
    // Transform orders if needed
    const transformedOrders = validatedOrders.map(order => {
      if (options.transformInput) {
        return options.transformInput(order);
      }
      return this.transformOrderData(order);
    });
    
    // Create orders in parallel with concurrency control
    const createLimit = pLimit(3); // Limit concurrent creations
    
    const results = await Promise.allSettled(
      transformedOrders.map(order => 
        createLimit(async () => {
          try {
            const result = await this.client.orders.create(order);
            return { success: true, order: result, originalData: order };
          } catch (error) {
            return { 
              success: false, 
              error: error.message, 
              originalData: order 
            };
          }
        })
      )
    );
    
    return this.processCreateResults(results);
  }
  
  validateOrderData(order) {
    const required = ['customer_email', 'items'];
    const missing = required.filter(field => !order[field]);
    
    if (missing.length > 0) {
      throw new Error(`Missing required fields: ${missing.join(', ')}`);
    }
    
    // Validate items
    if (!Array.isArray(order.items) || order.items.length === 0) {
      throw new Error('Order must have at least one item');
    }
    
    order.items.forEach((item, index) => {
      if (!item.sku || !item.quantity || !item.price) {
        throw new Error(`Item ${index + 1} missing required fields: sku, quantity, or price`);
      }
    });
    
    return order;
  }
  
  transformOrderData(order) {
    return {
      ...order,
      // Add default values
      status: order.status || 'pending',
      source: order.source || 'batch_import',
      created_at: order.created_at || new Date().toISOString(),
      
      // Ensure proper data types
      items: order.items.map(item => ({
        ...item,
        quantity: parseInt(item.quantity, 10),
        price: parseFloat(item.price)
      })),
      
      // Add metadata
      metadata: {
        ...order.metadata,
        batch_import: true,
        import_timestamp: new Date().toISOString()
      }
    };
  }
  
  processCreateResults(results) {
    const summary = {
      created: [],
      failed: [],
      total: results.length
    };
    
    results.forEach(result => {
      if (result.status === 'fulfilled') {
        const value = result.value;
        if (value.success) {
          summary.created.push(value.order);
        } else {
          summary.failed.push({
            error: value.error,
            data: value.originalData
          });
        }
      } else {
        summary.failed.push({
          error: result.reason.message,
          data: null
        });
      }
    });
    
    return summary;
  }
}

// Usage example
const orderProcessor = new OrderBatchProcessor({ concurrency: 3 });

const sampleOrders = [
  {
    customer_email: 'customer1@example.com',
    items: [
      { sku: 'PRODUCT-001', quantity: 2, price: 29.99 },
      { sku: 'PRODUCT-002', quantity: 1, price: 19.99 }
    ],
    shipping_address: {
      street1: '123 Main St',
      city: 'Anytown',
      state: 'CA',
      zip: '12345',
      country: 'US'
    }
  },
  // ... more orders
];

async function bulkCreateOrdersExample() {
  try {
    const result = await orderProcessor.bulkCreateOrders(sampleOrders);
    console.log(`Successfully created ${result.created.length} orders`);
    console.log(`Failed to create ${result.failed.length} orders`);
    
    if (result.failed.length > 0) {
      console.log('Failed orders:', result.failed);
    }
  } catch (error) {
    console.error('Bulk order creation failed:', error);
  }
}

Bulk Order Updates

Update multiple orders efficiently:

class OrderUpdateProcessor extends OrderBatchProcessor {
  async bulkUpdateOrders(updates, options = {}) {
    return await this.processBatch('update', updates, options);
  }
  
  async batchUpdateOrders(updates, options = {}) {
    const updateLimit = pLimit(5); // Higher concurrency for updates
    
    const results = await Promise.allSettled(
      updates.map(update => 
        updateLimit(async () => {
          try {
            const { id, data } = update;
            const result = await this.client.orders.update(id, data);
            return { success: true, order: result, updateData: update };
          } catch (error) {
            return { 
              success: false, 
              error: error.message, 
              updateData: update 
            };
          }
        })
      )
    );
    
    return this.processUpdateResults(results);
  }
  
  processUpdateResults(results) {
    const summary = {
      updated: [],
      failed: [],
      total: results.length
    };
    
    results.forEach(result => {
      if (result.status === 'fulfilled') {
        const value = result.value;
        if (value.success) {
          summary.updated.push(value.order);
        } else {
          summary.failed.push({
            error: value.error,
            data: value.updateData
          });
        }
      } else {
        summary.failed.push({
          error: result.reason.message,
          data: null
        });
      }
    });
    
    return summary;
  }
  
  // Bulk status updates
  async bulkUpdateOrderStatus(orderIds, newStatus, options = {}) {
    const updates = orderIds.map(id => ({
      id,
      data: { 
        status: newStatus,
        updated_at: new Date().toISOString(),
        status_history: [{
          status: newStatus,
          timestamp: new Date().toISOString(),
          source: 'batch_update'
        }]
      }
    }));
    
    return await this.bulkUpdateOrders(updates, options);
  }
  
  // Bulk tracking number updates
  async bulkUpdateTrackingNumbers(trackingUpdates, options = {}) {
    const updates = trackingUpdates.map(({ orderId, trackingNumber, carrier }) => ({
      id: orderId,
      data: {
        tracking_number: trackingNumber,
        carrier: carrier,
        status: 'shipped',
        shipped_at: new Date().toISOString()
      }
    }));
    
    return await this.bulkUpdateOrders(updates, options);
  }
}

// Usage examples
const updateProcessor = new OrderUpdateProcessor();

// Bulk status update
await updateProcessor.bulkUpdateOrderStatus(
  ['ord_123', 'ord_124', 'ord_125'],
  'processing'
);

// Bulk tracking number update
await updateProcessor.bulkUpdateTrackingNumbers([
  { orderId: 'ord_123', trackingNumber: 'TRACK123', carrier: 'UPS' },
  { orderId: 'ord_124', trackingNumber: 'TRACK124', carrier: 'FedEx' },
  { orderId: 'ord_125', trackingNumber: 'TRACK125', carrier: 'USPS' }
]);

Bulk Inventory Operations

Inventory Import and Synchronization

Handle large inventory datasets:

class InventoryBatchProcessor extends BatchProcessor {
  async bulkUpdateInventory(inventoryData, options = {}) {
    return await this.processBatch('update', inventoryData, {
      ...options,
      validateInput: this.validateInventoryData,
      transformInput: this.transformInventoryData
    });
  }
  
  async executeBatchOperation(operation, batch, options) {
    switch (operation) {
      case 'update':
        return await this.batchUpdateInventory(batch, options);
      case 'sync':
        return await this.batchSyncInventory(batch, options);
      case 'adjust':
        return await this.batchAdjustInventory(batch, options);
      default:
        throw new Error(`Unsupported inventory operation: ${operation}`);
    }
  }
  
  async batchUpdateInventory(inventoryItems, options = {}) {
    const updateLimit = pLimit(8); // Higher concurrency for inventory
    
    const results = await Promise.allSettled(
      inventoryItems.map(item => 
        updateLimit(async () => {
          try {
            const result = await this.client.inventory.update(item.sku, {
              quantity: item.quantity,
              location: item.location || 'default',
              updated_at: new Date().toISOString(),
              source: 'batch_update'
            });
            return { success: true, item: result, originalData: item };
          } catch (error) {
            return { 
              success: false, 
              error: error.message, 
              originalData: item 
            };
          }
        })
      )
    );
    
    return this.processInventoryResults(results);
  }
  
  // Advanced inventory synchronization
  async batchSyncInventory(inventoryItems, options = {}) {
    const { source = 'external_system' } = options;
    
    // Group by location for efficient processing
    const itemsByLocation = _.groupBy(inventoryItems, 'location');
    
    const locationResults = await Promise.allSettled(
      Object.entries(itemsByLocation).map(([location, items]) => 
        this.syncLocationInventory(location, items, { source })
      )
    );
    
    return this.compileSyncResults(locationResults);
  }
  
  async syncLocationInventory(location, items, options) {
    const { source } = options;
    
    // Get current inventory for comparison
    const currentInventory = await this.getCurrentInventory(location);
    const currentBySku = new Map(
      currentInventory.map(item => [item.sku, item])
    );
    
    const syncOperations = [];
    
    for (const item of items) {
      const current = currentBySku.get(item.sku);
      
      if (!current) {
        // New item - create
        syncOperations.push({
          operation: 'create',
          sku: item.sku,
          data: {
            ...item,
            location,
            source,
            created_at: new Date().toISOString()
          }
        });
      } else if (current.quantity !== item.quantity) {
        // Existing item with different quantity - update
        syncOperations.push({
          operation: 'update',
          sku: item.sku,
          data: {
            quantity: item.quantity,
            location,
            source,
            updated_at: new Date().toISOString(),
            previous_quantity: current.quantity
          }
        });
      }
      // Remove from current map (remaining items will be marked as discontinued)
      currentBySku.delete(item.sku);
    }
    
    // Handle discontinued items
    for (const [sku, current] of currentBySku.entries()) {
      if (options.markDiscontinued) {
        syncOperations.push({
          operation: 'discontinue',
          sku,
          data: {
            quantity: 0,
            location,
            status: 'discontinued',
            discontinued_at: new Date().toISOString(),
            source
          }
        });
      }
    }
    
    // Execute sync operations
    return await this.executeSyncOperations(syncOperations);
  }
  
  async executeSyncOperations(operations) {
    const operationLimit = pLimit(6);
    
    const results = await Promise.allSettled(
      operations.map(op => 
        operationLimit(async () => {
          try {
            let result;
            switch (op.operation) {
              case 'create':
                result = await this.client.inventory.create(op.data);
                break;
              case 'update':
                result = await this.client.inventory.update(op.sku, op.data);
                break;
              case 'discontinue':
                result = await this.client.inventory.update(op.sku, op.data);
                break;
              default:
                throw new Error(`Unknown operation: ${op.operation}`);
            }
            
            return { 
              success: true, 
              operation: op.operation,
              sku: op.sku,
              result 
            };
          } catch (error) {
            return { 
              success: false, 
              operation: op.operation,
              sku: op.sku,
              error: error.message 
            };
          }
        })
      )
    );
    
    return this.processSyncResults(results);
  }
  
  validateInventoryData(item) {
    const required = ['sku', 'quantity'];
    const missing = required.filter(field => item[field] === undefined || item[field] === null);
    
    if (missing.length > 0) {
      throw new Error(`Missing required fields: ${missing.join(', ')}`);
    }
    
    if (typeof item.quantity !== 'number' || item.quantity < 0) {
      throw new Error('Quantity must be a non-negative number');
    }
    
    return item;
  }
  
  transformInventoryData(item) {
    return {
      ...item,
      sku: item.sku.toUpperCase().trim(),
      quantity: parseInt(item.quantity, 10),
      location: item.location || 'default',
      updated_at: new Date().toISOString()
    };
  }
  
  processInventoryResults(results) {
    const summary = {
      updated: [],
      failed: [],
      total: results.length
    };
    
    results.forEach(result => {
      if (result.status === 'fulfilled') {
        const value = result.value;
        if (value.success) {
          summary.updated.push(value.item);
        } else {
          summary.failed.push({
            error: value.error,
            data: value.originalData
          });
        }
      } else {
        summary.failed.push({
          error: result.reason.message,
          data: null
        });
      }
    });
    
    return summary;
  }
}

// Usage example
const inventoryProcessor = new InventoryBatchProcessor({ concurrency: 8 });

const inventoryUpdates = [
  { sku: 'PRODUCT-001', quantity: 150, location: 'warehouse_1' },
  { sku: 'PRODUCT-002', quantity: 75, location: 'warehouse_1' },
  { sku: 'PRODUCT-003', quantity: 200, location: 'warehouse_2' },
  // ... more inventory items
];

await inventoryProcessor.bulkUpdateInventory(inventoryUpdates);

File-Based Batch Operations

CSV Import Processing

Handle large CSV files efficiently:

import fs from 'fs';
import csv from 'csv-parser';
import { createReadStream } from 'fs';

class CSVBatchProcessor extends BatchProcessor {
  async processCSVFile(filePath, options = {}) {
    const {
      batchSize = 1000,
      skipHeader = true,
      transform = null,
      validate = null
    } = options;
    
    return new Promise((resolve, reject) => {
      const results = [];
      const errors = [];
      let batch = [];
      let lineNumber = 0;
      
      createReadStream(filePath)
        .pipe(csv())
        .on('data', async (row) => {
          lineNumber++;
          
          try {
            // Apply validation if provided
            if (validate) {
              validate(row, lineNumber);
            }
            
            // Apply transformation if provided
            const transformedRow = transform ? transform(row, lineNumber) : row;
            
            batch.push(transformedRow);
            
            // Process batch when it reaches the specified size
            if (batch.length >= batchSize) {
              const batchResults = await this.processBatch('import', batch);
              results.push(batchResults);
              batch = [];
            }
            
          } catch (error) {
            errors.push({
              line: lineNumber,
              data: row,
              error: error.message
            });
          }
        })
        .on('end', async () => {
          try {
            // Process remaining items in the final batch
            if (batch.length > 0) {
              const batchResults = await this.processBatch('import', batch);
              results.push(batchResults);
            }
            
            resolve({
              totalLines: lineNumber,
              totalBatches: results.length,
              results,
              errors
            });
          } catch (error) {
            reject(error);
          }
        })
        .on('error', reject);
    });
  }
  
  // Order CSV import
  async importOrdersFromCSV(filePath, options = {}) {
    return await this.processCSVFile(filePath, {
      ...options,
      validate: this.validateOrderCSVRow,
      transform: this.transformOrderCSVRow
    });
  }
  
  validateOrderCSVRow(row, lineNumber) {
    const required = ['customer_email', 'item_sku', 'item_quantity', 'item_price'];
    const missing = required.filter(field => !row[field] || row[field].trim() === '');
    
    if (missing.length > 0) {
      throw new Error(`Line ${lineNumber}: Missing required fields: ${missing.join(', ')}`);
    }
    
    // Validate email format
    const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
    if (!emailRegex.test(row.customer_email)) {
      throw new Error(`Line ${lineNumber}: Invalid email format`);
    }
    
    // Validate numeric fields
    if (isNaN(parseFloat(row.item_price)) || parseFloat(row.item_price) <= 0) {
      throw new Error(`Line ${lineNumber}: Invalid item price`);
    }
    
    if (isNaN(parseInt(row.item_quantity)) || parseInt(row.item_quantity) <= 0) {
      throw new Error(`Line ${lineNumber}: Invalid item quantity`);
    }
  }
  
  transformOrderCSVRow(row, lineNumber) {
    return {
      customer_email: row.customer_email.trim().toLowerCase(),
      customer_name: row.customer_name?.trim() || '',
      items: [{
        sku: row.item_sku.trim().toUpperCase(),
        quantity: parseInt(row.item_quantity, 10),
        price: parseFloat(row.item_price),
        name: row.item_name?.trim() || ''
      }],
      shipping_address: {
        street1: row.shipping_street1?.trim() || '',
        street2: row.shipping_street2?.trim() || '',
        city: row.shipping_city?.trim() || '',
        state: row.shipping_state?.trim() || '',
        zip: row.shipping_zip?.trim() || '',
        country: row.shipping_country?.trim().toUpperCase() || 'US'
      },
      source: 'csv_import',
      import_line_number: lineNumber,
      imported_at: new Date().toISOString()
    };
  }
  
  // Generate import report
  generateImportReport(results, outputPath) {
    const report = {
      summary: {
        total_lines: results.totalLines,
        total_batches: results.totalBatches,
        successful_imports: 0,
        failed_imports: 0,
        error_count: results.errors.length
      },
      batch_details: [],
      errors: results.errors
    };
    
    results.results.forEach((batch, index) => {
      report.summary.successful_imports += batch.successful || 0;
      report.summary.failed_imports += batch.failed || 0;
      
      report.batch_details.push({
        batch_number: index + 1,
        items_processed: batch.totalItems || 0,
        successful: batch.successful || 0,
        failed: batch.failed || 0,
        errors: batch.errors || []
      });
    });
    
    // Write report to file
    fs.writeFileSync(outputPath, JSON.stringify(report, null, 2));
    
    return report;
  }
}

// Usage example
const csvProcessor = new CSVBatchProcessor({ concurrency: 3 });

async function importOrdersExample() {
  try {
    const results = await csvProcessor.importOrdersFromCSV('./orders.csv', {
      batchSize: 500
    });
    
    console.log(`Processed ${results.totalLines} lines in ${results.totalBatches} batches`);
    console.log(`Errors: ${results.errors.length}`);
    
    // Generate and save report
    const report = csvProcessor.generateImportReport(results, './import-report.json');
    console.log('Import report saved to ./import-report.json');
    
  } catch (error) {
    console.error('CSV import failed:', error);
  }
}

Export Operations

Export large datasets efficiently:

import { createWriteStream } from 'fs';
import fastCsv from 'fast-csv';

class ExportProcessor extends BatchProcessor {
  async exportOrdersToCSV(filters = {}, outputPath, options = {}) {
    const {
      batchSize = 1000,
      includeFields = null,
      transform = null
    } = options;
    
    const writeStream = createWriteStream(outputPath);
    const csvStream = fastCsv.format({ headers: true });
    
    csvStream.pipe(writeStream);
    
    let offset = 0;
    let hasMore = true;
    let totalExported = 0;
    
    try {
      while (hasMore) {
        console.log(`Exporting batch starting at offset ${offset}`);
        
        const orders = await this.client.orders.list({
          ...filters,
          limit: batchSize,
          offset,
          include: ['line_items', 'customer', 'shipping_address']
        });
        
        if (orders.orders.length === 0) {
          hasMore = false;
          break;
        }
        
        // Transform orders for CSV export
        const transformedOrders = orders.orders.map(order => {
          const baseData = {
            order_id: order.id,
            order_number: order.order_number,
            customer_email: order.customer_email,
            customer_name: order.customer_name,
            status: order.status,
            total_amount: order.total_amount,
            created_date: order.created_date,
            updated_date: order.updated_date
          };
          
          // Include line items as separate rows or flatten
          if (order.order_line_items && order.order_line_items.length > 0) {
            return order.order_line_items.map(item => ({
              ...baseData,
              item_sku: item.sku_name,
              item_name: item.product_name,
              item_quantity: item.quantity,
              item_price: item.sale_price,
              item_brand: item.brand
            }));
          }
          
          return [baseData];
        }).flat();
        
        // Apply custom transformation if provided
        const finalData = transform ? 
          transformedOrders.map(transform) : 
          transformedOrders;
        
        // Filter fields if specified
        const filteredData = includeFields ? 
          finalData.map(row => _.pick(row, includeFields)) : 
          finalData;
        
        // Write to CSV
        for (const row of filteredData) {
          csvStream.write(row);
        }
        
        totalExported += filteredData.length;
        offset += batchSize;
        
        // Check if we have more data
        hasMore = orders.orders.length === batchSize;
        
        // Add delay to respect rate limits
        await this.delay(100);
      }
      
      csvStream.end();
      
      return new Promise((resolve, reject) => {
        writeStream.on('finish', () => {
          console.log(`Export completed: ${totalExported} records exported to ${outputPath}`);
          resolve({ totalExported, outputPath });
        });
        
        writeStream.on('error', reject);
      });
      
    } catch (error) {
      csvStream.end();
      throw error;
    }
  }
  
  // Export with progress tracking
  async exportOrdersWithProgress(filters = {}, outputPath, options = {}) {
    const {
      onProgress = null,
      ...exportOptions
    } = options;
    
    // Get total count first
    const countResponse = await this.client.orders.list({
      ...filters,
      limit: 1
    });
    
    const totalOrders = countResponse.total_count;
    let processedOrders = 0;
    
    // Wrap the export with progress tracking
    const progressTracker = {
      onProgress: (batch) => {
        processedOrders += batch.length;
        const progress = Math.round((processedOrders / totalOrders) * 100);
        
        console.log(`Export progress: ${processedOrders}/${totalOrders} (${progress}%)`);
        
        if (onProgress) {
          onProgress({
            processed: processedOrders,
            total: totalOrders,
            percentage: progress
          });
        }
      }
    };
    
    return await this.exportOrdersToCSV(filters, outputPath, {
      ...exportOptions,
      progressTracker
    });
  }
}

// Usage example
const exportProcessor = new ExportProcessor();

// Export all orders from the last 30 days
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000).toISOString();

await exportProcessor.exportOrdersWithProgress(
  {
    created_after: thirtyDaysAgo,
    status_in: ['processing', 'shipped', 'delivered']
  },
  './orders-export.csv',
  {
    batchSize: 500,
    includeFields: [
      'order_id', 'order_number', 'customer_email', 'status',
      'total_amount', 'created_date', 'item_sku', 'item_quantity'
    ],
    onProgress: (progress) => {
      // Update progress bar or send notification
      console.log(`Progress: ${progress.percentage}%`);
    }
  }
);

Queue-Based Batch Processing

Background Job Processing

Implement robust background processing with queues:

import Queue from 'bull';
import Redis from 'ioredis';

class QueuedBatchProcessor extends BatchProcessor {
  constructor(options = {}) {
    super(options);
    
    this.redis = new Redis(process.env.REDIS_URL);
    this.batchQueue = new Queue('batch operations', process.env.REDIS_URL);
    
    this.setupJobProcessors();
    this.setupEventHandlers();
  }
  
  setupJobProcessors() {
    // Process batch creation jobs
    this.batchQueue.process('bulk-create-orders', 5, async (job) => {
      const { orders, options } = job.data;
      
      job.progress(0);
      
      try {
        const result = await this.bulkCreateOrders(orders, {
          ...options,
          onProgress: (progress) => {
            job.progress(Math.round((progress.completed / progress.total) * 100));
          }
        });
        
        job.progress(100);
        return result;
        
      } catch (error) {
        console.error('Batch job failed:', error);
        throw error;
      }
    });
    
    // Process batch update jobs
    this.batchQueue.process('bulk-update-orders', 3, async (job) => {
      const { updates, options } = job.data;
      
      job.progress(0);
      
      const result = await this.bulkUpdateOrders(updates, {
        ...options,
        onProgress: (progress) => {
          job.progress(Math.round((progress.completed / progress.total) * 100));
        }
      });
      
      job.progress(100);
      return result;
    });
    
    // Process CSV import jobs
    this.batchQueue.process('csv-import', 1, async (job) => {
      const { filePath, type, options } = job.data;
      
      job.progress(0);
      
      let result;
      switch (type) {
        case 'orders':
          result = await this.importOrdersFromCSV(filePath, {
            ...options,
            onProgress: (progress) => {
              job.progress(Math.round((progress.completed / progress.total) * 100));
            }
          });
          break;
        case 'inventory':
          result = await this.importInventoryFromCSV(filePath, options);
          break;
        default:
          throw new Error(`Unsupported import type: ${type}`);
      }
      
      job.progress(100);
      return result;
    });
  }
  
  setupEventHandlers() {
    this.batchQueue.on('completed', (job, result) => {
      console.log(`Job ${job.id} completed successfully`);
      this.notifyJobCompletion(job, result, 'completed');
    });
    
    this.batchQueue.on('failed', (job, err) => {
      console.error(`Job ${job.id} failed:`, err);
      this.notifyJobCompletion(job, null, 'failed', err);
    });
    
    this.batchQueue.on('progress', (job, progress) => {
      console.log(`Job ${job.id} progress: ${progress}%`);
    });
  }
  
  async queueBulkCreateOrders(orders, options = {}) {
    const job = await this.batchQueue.add('bulk-create-orders', {
      orders,
      options
    }, {
      attempts: 3,
      backoff: {
        type: 'exponential',
        delay: 2000
      },
      removeOnComplete: 10,
      removeOnFail: 5
    });
    
    return {
      jobId: job.id,
      status: 'queued'
    };
  }
  
  async queueCSVImport(filePath, type, options = {}) {
    const job = await this.batchQueue.add('csv-import', {
      filePath,
      type,
      options
    }, {
      attempts: 2,
      timeout: 300000, // 5 minutes
      removeOnComplete: 5,
      removeOnFail: 3
    });
    
    return {
      jobId: job.id,
      status: 'queued'
    };
  }
  
  async getJobStatus(jobId) {
    const job = await this.batchQueue.getJob(jobId);
    
    if (!job) {
      return { status: 'not_found' };
    }
    
    const state = await job.getState();
    const progress = job.progress();
    
    return {
      jobId: job.id,
      status: state,
      progress,
      data: job.data,
      result: job.returnvalue,
      error: job.failedReason,
      created: job.timestamp,
      processed: job.processedOn,
      finished: job.finishedOn
    };
  }
  
  async notifyJobCompletion(job, result, status, error = null) {
    const notification = {
      jobId: job.id,
      jobType: job.name,
      status,
      timestamp: new Date().toISOString(),
      result,
      error: error?.message
    };
    
    // Send webhook notification
    if (process.env.WEBHOOK_NOTIFICATION_URL) {
      try {
        await fetch(process.env.WEBHOOK_NOTIFICATION_URL, {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify(notification)
        });
      } catch (webhookError) {
        console.error('Failed to send webhook notification:', webhookError);
      }
    }
    
    // Store notification in database or send email
    // Implementation depends on your notification system
  }
}

// Usage example
const queueProcessor = new QueuedBatchProcessor();

// Queue a bulk order creation job
const createJob = await queueProcessor.queueBulkCreateOrders(sampleOrders, {
  validateInput: true,
  notifyOnCompletion: true
});

console.log(`Bulk create job queued with ID: ${createJob.jobId}`);

// Check job status
setInterval(async () => {
  const status = await queueProcessor.getJobStatus(createJob.jobId);
  console.log(`Job ${createJob.jobId} status:`, status);
  
  if (['completed', 'failed'].includes(status.status)) {
    clearInterval(this);
  }
}, 5000);

Performance Optimization

Memory Management and Streaming

Handle very large datasets efficiently:

class StreamingBatchProcessor extends BatchProcessor {
  async streamProcessLargeDataset(dataStream, processor, options = {}) {
    const {
      batchSize = 1000,
      maxMemoryUsage = 512 * 1024 * 1024, // 512MB
      gcInterval = 10000 // Force GC every 10k items
    } = options;
    
    let batch = [];
    let processedCount = 0;
    let errorCount = 0;
    
    return new Promise((resolve, reject) => {
      dataStream
        .on('data', async (item) => {
          batch.push(item);
          
          if (batch.length >= batchSize) {
            try {
              // Pause stream while processing batch
              dataStream.pause();
              
              const result = await processor(batch);
              processedCount += result.successful || 0;
              errorCount += result.failed || 0;
              
              // Clear batch and force garbage collection if needed
              batch = [];
              
              if (processedCount % gcInterval === 0) {
                this.forceGarbageCollection();
                this.checkMemoryUsage(maxMemoryUsage);
              }
              
              // Resume stream
              dataStream.resume();
              
            } catch (error) {
              dataStream.destroy(error);
              return;
            }
          }
        })
        .on('end', async () => {
          try {
            // Process final batch
            if (batch.length > 0) {
              const result = await processor(batch);
              processedCount += result.successful || 0;
              errorCount += result.failed || 0;
            }
            
            resolve({
              processed: processedCount,
              errors: errorCount,
              total: processedCount + errorCount
            });
            
          } catch (error) {
            reject(error);
          }
        })
        .on('error', reject);
    });
  }
  
  forceGarbageCollection() {
    if (global.gc) {
      global.gc();
      console.log('Forced garbage collection');
    }
  }
  
  checkMemoryUsage(maxUsage) {
    const used = process.memoryUsage();
    
    if (used.heapUsed > maxUsage) {
      console.warn(`High memory usage detected: ${Math.round(used.heapUsed / 1024 / 1024)}MB`);
      
      if (used.heapUsed > maxUsage * 1.5) {
        throw new Error('Memory usage too high, aborting batch processing');
      }
    }
  }
  
  // Optimized batch processing with connection pooling
  async optimizedBatchProcess(data, operation, options = {}) {
    const {
      connectionPoolSize = 10,
      maxRetries = 3,
      retryDelay = 1000
    } = options;
    
    // Create connection pool
    const pool = Array.from({ length: connectionPoolSize }, () => 
      new StateSetClient({
        apiKey: process.env.STATESET_API_KEY,
        timeout: 60000,
        keepAlive: true
      })
    );
    
    let poolIndex = 0;
    const getClient = () => {
      const client = pool[poolIndex % pool.length];
      poolIndex++;
      return client;
    };
    
    try {
      const processLimit = pLimit(connectionPoolSize);
      
      const results = await Promise.allSettled(
        data.map(item => 
          processLimit(async () => {
            let attempt = 0;
            let lastError;
            
            while (attempt < maxRetries) {
              try {
                const client = getClient();
                const result = await this.executeOperation(client, operation, item);
                return { success: true, result, item };
              } catch (error) {
                lastError = error;
                attempt++;
                
                if (attempt < maxRetries) {
                  await this.delay(retryDelay * Math.pow(2, attempt - 1));
                }
              }
            }
            
            return { success: false, error: lastError.message, item };
          })
        )
      );
      
      return this.compileResults(results);
      
    } finally {
      // Clean up connection pool
      for (const client of pool) {
        if (client.destroy) {
          client.destroy();
        }
      }
    }
  }
  
  async executeOperation(client, operation, item) {
    switch (operation) {
      case 'create_order':
        return await client.orders.create(item);
      case 'update_order':
        return await client.orders.update(item.id, item.data);
      case 'update_inventory':
        return await client.inventory.update(item.sku, item.data);
      default:
        throw new Error(`Unsupported operation: ${operation}`);
    }
  }
}

Best Practices and Monitoring

Comprehensive Monitoring

Monitor batch operation performance:

class BatchMonitor {
  constructor() {
    this.metrics = {
      totalOperations: 0,
      successfulOperations: 0,
      failedOperations: 0,
      averageProcessingTime: 0,
      peakMemoryUsage: 0,
      operationHistory: []
    };
  }
  
  recordOperation(operation, duration, success, memoryUsage) {
    this.metrics.totalOperations++;
    
    if (success) {
      this.metrics.successfulOperations++;
    } else {
      this.metrics.failedOperations++;
    }
    
    // Update average processing time
    const totalTime = this.metrics.averageProcessingTime * (this.metrics.totalOperations - 1) + duration;
    this.metrics.averageProcessingTime = totalTime / this.metrics.totalOperations;
    
    // Track peak memory usage
    if (memoryUsage > this.metrics.peakMemoryUsage) {
      this.metrics.peakMemoryUsage = memoryUsage;
    }
    
    // Store operation history (keep last 1000)
    this.metrics.operationHistory.push({
      operation,
      duration,
      success,
      memoryUsage,
      timestamp: new Date().toISOString()
    });
    
    if (this.metrics.operationHistory.length > 1000) {
      this.metrics.operationHistory.shift();
    }
  }
  
  getSuccessRate() {
    if (this.metrics.totalOperations === 0) return 0;
    return (this.metrics.successfulOperations / this.metrics.totalOperations) * 100;
  }
  
  generateReport() {
    return {
      summary: {
        total_operations: this.metrics.totalOperations,
        successful_operations: this.metrics.successfulOperations,
        failed_operations: this.metrics.failedOperations,
        success_rate: `${this.getSuccessRate().toFixed(2)}%`,
        average_processing_time: `${this.metrics.averageProcessingTime.toFixed(2)}ms`,
        peak_memory_usage: `${(this.metrics.peakMemoryUsage / 1024 / 1024).toFixed(2)}MB`
      },
      recent_operations: this.metrics.operationHistory.slice(-10),
      recommendations: this.generateRecommendations()
    };
  }
  
  generateRecommendations() {
    const recommendations = [];
    
    const successRate = this.getSuccessRate();
    if (successRate < 95) {
      recommendations.push('Consider reducing batch size or improving error handling');
    }
    
    if (this.metrics.averageProcessingTime > 30000) {
      recommendations.push('Processing time is high, consider optimizing batch operations');
    }
    
    if (this.metrics.peakMemoryUsage > 1024 * 1024 * 1024) {
      recommendations.push('High memory usage detected, consider streaming or smaller batches');
    }
    
    return recommendations;
  }
}

// Enhanced batch processor with monitoring
class MonitoredBatchProcessor extends StreamingBatchProcessor {
  constructor(options = {}) {
    super(options);
    this.monitor = new BatchMonitor();
  }
  
  async processBatch(operation, data, options = {}) {
    const startTime = Date.now();
    const startMemory = process.memoryUsage().heapUsed;
    
    try {
      const result = await super.processBatch(operation, data, options);
      
      const duration = Date.now() - startTime;
      const endMemory = process.memoryUsage().heapUsed;
      
      this.monitor.recordOperation(operation, duration, true, endMemory);
      
      return {
        ...result,
        processingTime: duration,
        memoryUsed: endMemory - startMemory
      };
      
    } catch (error) {
      const duration = Date.now() - startTime;
      const endMemory = process.memoryUsage().heapUsed;
      
      this.monitor.recordOperation(operation, duration, false, endMemory);
      
      throw error;
    }
  }
  
  getMonitoringReport() {
    return this.monitor.generateReport();
  }
}

Troubleshooting

Next Steps

Conclusion

Efficient batch processing is crucial for handling large volumes of data in production systems. This guide provides comprehensive patterns for:

  • ✅ Bulk order and inventory operations
  • ✅ CSV import/export processing
  • ✅ Queue-based background processing
  • ✅ Memory-efficient streaming operations
  • ✅ Performance monitoring and optimization

With these patterns, you can build robust, scalable batch processing systems that handle high-volume operations efficiently while maintaining data integrity and system performance.