Comprehensive guide to batch processing and bulk operations with Stateset API for high-volume data management
// Batch processing configuration
const BATCH_CONFIG = {
// Optimal batch sizes for different operations
CREATE_BATCH_SIZE: 100,
UPDATE_BATCH_SIZE: 200,
IMPORT_BATCH_SIZE: 500,
EXPORT_BATCH_SIZE: 1000,
// Concurrency limits
MAX_CONCURRENT_BATCHES: 5,
MAX_RETRY_ATTEMPTS: 3,
// Rate limiting
REQUESTS_PER_SECOND: 10,
BURST_LIMIT: 50,
// Timeouts
BATCH_TIMEOUT: 60000, // 1 minute
OPERATION_TIMEOUT: 300000 // 5 minutes
};
Install Dependencies
npm install stateset-node p-limit p-queue lodash csv-parser fast-csv
# or
yarn add stateset-node p-limit p-queue lodash csv-parser fast-csv
Configure Environment
# .env
STATESET_API_KEY=your_api_key_here
STATESET_ENVIRONMENT=production
# Batch Processing Configuration
BATCH_WORKER_CONCURRENCY=5
BATCH_QUEUE_SIZE=1000
BATCH_RETRY_ATTEMPTS=3
BATCH_TIMEOUT_MS=300000
# Storage Configuration
TEMP_STORAGE_PATH=./temp/batch-files
RESULTS_STORAGE_PATH=./results
# Queue Configuration (Redis)
REDIS_URL=redis://localhost:6379
QUEUE_NAME=stateset-batch-operations
Initialize Batch Client
import { StateSetClient } from 'stateset-node';
import pLimit from 'p-limit';
import _ from 'lodash';
class BatchProcessor {
constructor(options = {}) {
this.client = new StateSetClient({
apiKey: process.env.STATESET_API_KEY,
timeout: options.timeout || 60000
});
this.concurrencyLimit = pLimit(options.concurrency || 5);
this.retryAttempts = options.retryAttempts || 3;
this.batchSizes = {
create: options.createBatchSize || 100,
update: options.updateBatchSize || 200,
import: options.importBatchSize || 500,
export: options.exportBatchSize || 1000
};
this.results = {
successful: [],
failed: [],
skipped: []
};
}
async processBatch(operation, data, options = {}) {
const startTime = Date.now();
console.log(`Starting batch ${operation} for ${data.length} items`);
try {
const batchSize = this.batchSizes[operation] || 100;
const batches = _.chunk(data, batchSize);
const results = await Promise.allSettled(
batches.map((batch, index) =>
this.concurrencyLimit(() =>
this.processSingleBatch(operation, batch, index, options)
)
)
);
const summary = this.compileBatchResults(results);
console.log(`Batch ${operation} completed in ${Date.now() - startTime}ms`, summary);
return summary;
} catch (error) {
console.error(`Batch ${operation} failed:`, error);
throw error;
}
}
async processSingleBatch(operation, batch, batchIndex, options) {
let attempt = 0;
let lastError;
while (attempt < this.retryAttempts) {
try {
console.log(`Processing batch ${batchIndex + 1}, attempt ${attempt + 1}`);
const result = await this.executeBatchOperation(operation, batch, options);
return {
batchIndex,
success: true,
count: batch.length,
result
};
} catch (error) {
lastError = error;
attempt++;
if (attempt < this.retryAttempts) {
const delay = Math.pow(2, attempt) * 1000; // Exponential backoff
console.warn(`Batch ${batchIndex + 1} failed, retrying in ${delay}ms:`, error.message);
await this.delay(delay);
}
}
}
return {
batchIndex,
success: false,
count: batch.length,
error: lastError.message
};
}
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
compileBatchResults(results) {
const summary = {
total: results.length,
successful: 0,
failed: 0,
totalItems: 0,
errors: []
};
results.forEach(result => {
if (result.status === 'fulfilled' && result.value.success) {
summary.successful++;
summary.totalItems += result.value.count;
} else {
summary.failed++;
summary.errors.push(result.reason || result.value.error);
}
});
return summary;
}
}
class OrderBatchProcessor extends BatchProcessor {
async bulkCreateOrders(ordersData, options = {}) {
return await this.processBatch('create', ordersData, {
...options,
validateInput: this.validateOrderData,
transformInput: this.transformOrderData
});
}
async executeBatchOperation(operation, batch, options) {
switch (operation) {
case 'create':
return await this.batchCreateOrders(batch, options);
case 'update':
return await this.batchUpdateOrders(batch, options);
case 'cancel':
return await this.batchCancelOrders(batch, options);
default:
throw new Error(`Unsupported operation: ${operation}`);
}
}
async batchCreateOrders(orders, options = {}) {
// Validate orders before processing
const validatedOrders = orders.map(order => {
if (options.validateInput) {
return options.validateInput(order);
}
return this.validateOrderData(order);
});
// Transform orders if needed
const transformedOrders = validatedOrders.map(order => {
if (options.transformInput) {
return options.transformInput(order);
}
return this.transformOrderData(order);
});
// Create orders in parallel with concurrency control
const createLimit = pLimit(3); // Limit concurrent creations
const results = await Promise.allSettled(
transformedOrders.map(order =>
createLimit(async () => {
try {
const result = await this.client.orders.create(order);
return { success: true, order: result, originalData: order };
} catch (error) {
return {
success: false,
error: error.message,
originalData: order
};
}
})
)
);
return this.processCreateResults(results);
}
validateOrderData(order) {
const required = ['customer_email', 'items'];
const missing = required.filter(field => !order[field]);
if (missing.length > 0) {
throw new Error(`Missing required fields: ${missing.join(', ')}`);
}
// Validate items
if (!Array.isArray(order.items) || order.items.length === 0) {
throw new Error('Order must have at least one item');
}
order.items.forEach((item, index) => {
if (!item.sku || !item.quantity || !item.price) {
throw new Error(`Item ${index + 1} missing required fields: sku, quantity, or price`);
}
});
return order;
}
transformOrderData(order) {
return {
...order,
// Add default values
status: order.status || 'pending',
source: order.source || 'batch_import',
created_at: order.created_at || new Date().toISOString(),
// Ensure proper data types
items: order.items.map(item => ({
...item,
quantity: parseInt(item.quantity, 10),
price: parseFloat(item.price)
})),
// Add metadata
metadata: {
...order.metadata,
batch_import: true,
import_timestamp: new Date().toISOString()
}
};
}
processCreateResults(results) {
const summary = {
created: [],
failed: [],
total: results.length
};
results.forEach(result => {
if (result.status === 'fulfilled') {
const value = result.value;
if (value.success) {
summary.created.push(value.order);
} else {
summary.failed.push({
error: value.error,
data: value.originalData
});
}
} else {
summary.failed.push({
error: result.reason.message,
data: null
});
}
});
return summary;
}
}
// Usage example
const orderProcessor = new OrderBatchProcessor({ concurrency: 3 });
const sampleOrders = [
{
customer_email: 'customer1@example.com',
items: [
{ sku: 'PRODUCT-001', quantity: 2, price: 29.99 },
{ sku: 'PRODUCT-002', quantity: 1, price: 19.99 }
],
shipping_address: {
street1: '123 Main St',
city: 'Anytown',
state: 'CA',
zip: '12345',
country: 'US'
}
},
// ... more orders
];
async function bulkCreateOrdersExample() {
try {
const result = await orderProcessor.bulkCreateOrders(sampleOrders);
console.log(`Successfully created ${result.created.length} orders`);
console.log(`Failed to create ${result.failed.length} orders`);
if (result.failed.length > 0) {
console.log('Failed orders:', result.failed);
}
} catch (error) {
console.error('Bulk order creation failed:', error);
}
}
class OrderUpdateProcessor extends OrderBatchProcessor {
async bulkUpdateOrders(updates, options = {}) {
return await this.processBatch('update', updates, options);
}
async batchUpdateOrders(updates, options = {}) {
const updateLimit = pLimit(5); // Higher concurrency for updates
const results = await Promise.allSettled(
updates.map(update =>
updateLimit(async () => {
try {
const { id, data } = update;
const result = await this.client.orders.update(id, data);
return { success: true, order: result, updateData: update };
} catch (error) {
return {
success: false,
error: error.message,
updateData: update
};
}
})
)
);
return this.processUpdateResults(results);
}
processUpdateResults(results) {
const summary = {
updated: [],
failed: [],
total: results.length
};
results.forEach(result => {
if (result.status === 'fulfilled') {
const value = result.value;
if (value.success) {
summary.updated.push(value.order);
} else {
summary.failed.push({
error: value.error,
data: value.updateData
});
}
} else {
summary.failed.push({
error: result.reason.message,
data: null
});
}
});
return summary;
}
// Bulk status updates
async bulkUpdateOrderStatus(orderIds, newStatus, options = {}) {
const updates = orderIds.map(id => ({
id,
data: {
status: newStatus,
updated_at: new Date().toISOString(),
status_history: [{
status: newStatus,
timestamp: new Date().toISOString(),
source: 'batch_update'
}]
}
}));
return await this.bulkUpdateOrders(updates, options);
}
// Bulk tracking number updates
async bulkUpdateTrackingNumbers(trackingUpdates, options = {}) {
const updates = trackingUpdates.map(({ orderId, trackingNumber, carrier }) => ({
id: orderId,
data: {
tracking_number: trackingNumber,
carrier: carrier,
status: 'shipped',
shipped_at: new Date().toISOString()
}
}));
return await this.bulkUpdateOrders(updates, options);
}
}
// Usage examples
const updateProcessor = new OrderUpdateProcessor();
// Bulk status update
await updateProcessor.bulkUpdateOrderStatus(
['ord_123', 'ord_124', 'ord_125'],
'processing'
);
// Bulk tracking number update
await updateProcessor.bulkUpdateTrackingNumbers([
{ orderId: 'ord_123', trackingNumber: 'TRACK123', carrier: 'UPS' },
{ orderId: 'ord_124', trackingNumber: 'TRACK124', carrier: 'FedEx' },
{ orderId: 'ord_125', trackingNumber: 'TRACK125', carrier: 'USPS' }
]);
class InventoryBatchProcessor extends BatchProcessor {
async bulkUpdateInventory(inventoryData, options = {}) {
return await this.processBatch('update', inventoryData, {
...options,
validateInput: this.validateInventoryData,
transformInput: this.transformInventoryData
});
}
async executeBatchOperation(operation, batch, options) {
switch (operation) {
case 'update':
return await this.batchUpdateInventory(batch, options);
case 'sync':
return await this.batchSyncInventory(batch, options);
case 'adjust':
return await this.batchAdjustInventory(batch, options);
default:
throw new Error(`Unsupported inventory operation: ${operation}`);
}
}
async batchUpdateInventory(inventoryItems, options = {}) {
const updateLimit = pLimit(8); // Higher concurrency for inventory
const results = await Promise.allSettled(
inventoryItems.map(item =>
updateLimit(async () => {
try {
const result = await this.client.inventory.update(item.sku, {
quantity: item.quantity,
location: item.location || 'default',
updated_at: new Date().toISOString(),
source: 'batch_update'
});
return { success: true, item: result, originalData: item };
} catch (error) {
return {
success: false,
error: error.message,
originalData: item
};
}
})
)
);
return this.processInventoryResults(results);
}
// Advanced inventory synchronization
async batchSyncInventory(inventoryItems, options = {}) {
const { source = 'external_system' } = options;
// Group by location for efficient processing
const itemsByLocation = _.groupBy(inventoryItems, 'location');
const locationResults = await Promise.allSettled(
Object.entries(itemsByLocation).map(([location, items]) =>
this.syncLocationInventory(location, items, { source })
)
);
return this.compileSyncResults(locationResults);
}
async syncLocationInventory(location, items, options) {
const { source } = options;
// Get current inventory for comparison
const currentInventory = await this.getCurrentInventory(location);
const currentBySku = new Map(
currentInventory.map(item => [item.sku, item])
);
const syncOperations = [];
for (const item of items) {
const current = currentBySku.get(item.sku);
if (!current) {
// New item - create
syncOperations.push({
operation: 'create',
sku: item.sku,
data: {
...item,
location,
source,
created_at: new Date().toISOString()
}
});
} else if (current.quantity !== item.quantity) {
// Existing item with different quantity - update
syncOperations.push({
operation: 'update',
sku: item.sku,
data: {
quantity: item.quantity,
location,
source,
updated_at: new Date().toISOString(),
previous_quantity: current.quantity
}
});
}
// Remove from current map (remaining items will be marked as discontinued)
currentBySku.delete(item.sku);
}
// Handle discontinued items
for (const [sku, current] of currentBySku.entries()) {
if (options.markDiscontinued) {
syncOperations.push({
operation: 'discontinue',
sku,
data: {
quantity: 0,
location,
status: 'discontinued',
discontinued_at: new Date().toISOString(),
source
}
});
}
}
// Execute sync operations
return await this.executeSyncOperations(syncOperations);
}
async executeSyncOperations(operations) {
const operationLimit = pLimit(6);
const results = await Promise.allSettled(
operations.map(op =>
operationLimit(async () => {
try {
let result;
switch (op.operation) {
case 'create':
result = await this.client.inventory.create(op.data);
break;
case 'update':
result = await this.client.inventory.update(op.sku, op.data);
break;
case 'discontinue':
result = await this.client.inventory.update(op.sku, op.data);
break;
default:
throw new Error(`Unknown operation: ${op.operation}`);
}
return {
success: true,
operation: op.operation,
sku: op.sku,
result
};
} catch (error) {
return {
success: false,
operation: op.operation,
sku: op.sku,
error: error.message
};
}
})
)
);
return this.processSyncResults(results);
}
validateInventoryData(item) {
const required = ['sku', 'quantity'];
const missing = required.filter(field => item[field] === undefined || item[field] === null);
if (missing.length > 0) {
throw new Error(`Missing required fields: ${missing.join(', ')}`);
}
if (typeof item.quantity !== 'number' || item.quantity < 0) {
throw new Error('Quantity must be a non-negative number');
}
return item;
}
transformInventoryData(item) {
return {
...item,
sku: item.sku.toUpperCase().trim(),
quantity: parseInt(item.quantity, 10),
location: item.location || 'default',
updated_at: new Date().toISOString()
};
}
processInventoryResults(results) {
const summary = {
updated: [],
failed: [],
total: results.length
};
results.forEach(result => {
if (result.status === 'fulfilled') {
const value = result.value;
if (value.success) {
summary.updated.push(value.item);
} else {
summary.failed.push({
error: value.error,
data: value.originalData
});
}
} else {
summary.failed.push({
error: result.reason.message,
data: null
});
}
});
return summary;
}
}
// Usage example
const inventoryProcessor = new InventoryBatchProcessor({ concurrency: 8 });
const inventoryUpdates = [
{ sku: 'PRODUCT-001', quantity: 150, location: 'warehouse_1' },
{ sku: 'PRODUCT-002', quantity: 75, location: 'warehouse_1' },
{ sku: 'PRODUCT-003', quantity: 200, location: 'warehouse_2' },
// ... more inventory items
];
await inventoryProcessor.bulkUpdateInventory(inventoryUpdates);
import fs from 'fs';
import csv from 'csv-parser';
import { createReadStream } from 'fs';
class CSVBatchProcessor extends BatchProcessor {
async processCSVFile(filePath, options = {}) {
const {
batchSize = 1000,
skipHeader = true,
transform = null,
validate = null
} = options;
return new Promise((resolve, reject) => {
const results = [];
const errors = [];
let batch = [];
let lineNumber = 0;
createReadStream(filePath)
.pipe(csv())
.on('data', async (row) => {
lineNumber++;
try {
// Apply validation if provided
if (validate) {
validate(row, lineNumber);
}
// Apply transformation if provided
const transformedRow = transform ? transform(row, lineNumber) : row;
batch.push(transformedRow);
// Process batch when it reaches the specified size
if (batch.length >= batchSize) {
const batchResults = await this.processBatch('import', batch);
results.push(batchResults);
batch = [];
}
} catch (error) {
errors.push({
line: lineNumber,
data: row,
error: error.message
});
}
})
.on('end', async () => {
try {
// Process remaining items in the final batch
if (batch.length > 0) {
const batchResults = await this.processBatch('import', batch);
results.push(batchResults);
}
resolve({
totalLines: lineNumber,
totalBatches: results.length,
results,
errors
});
} catch (error) {
reject(error);
}
})
.on('error', reject);
});
}
// Order CSV import
async importOrdersFromCSV(filePath, options = {}) {
return await this.processCSVFile(filePath, {
...options,
validate: this.validateOrderCSVRow,
transform: this.transformOrderCSVRow
});
}
validateOrderCSVRow(row, lineNumber) {
const required = ['customer_email', 'item_sku', 'item_quantity', 'item_price'];
const missing = required.filter(field => !row[field] || row[field].trim() === '');
if (missing.length > 0) {
throw new Error(`Line ${lineNumber}: Missing required fields: ${missing.join(', ')}`);
}
// Validate email format
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
if (!emailRegex.test(row.customer_email)) {
throw new Error(`Line ${lineNumber}: Invalid email format`);
}
// Validate numeric fields
if (isNaN(parseFloat(row.item_price)) || parseFloat(row.item_price) <= 0) {
throw new Error(`Line ${lineNumber}: Invalid item price`);
}
if (isNaN(parseInt(row.item_quantity)) || parseInt(row.item_quantity) <= 0) {
throw new Error(`Line ${lineNumber}: Invalid item quantity`);
}
}
transformOrderCSVRow(row, lineNumber) {
return {
customer_email: row.customer_email.trim().toLowerCase(),
customer_name: row.customer_name?.trim() || '',
items: [{
sku: row.item_sku.trim().toUpperCase(),
quantity: parseInt(row.item_quantity, 10),
price: parseFloat(row.item_price),
name: row.item_name?.trim() || ''
}],
shipping_address: {
street1: row.shipping_street1?.trim() || '',
street2: row.shipping_street2?.trim() || '',
city: row.shipping_city?.trim() || '',
state: row.shipping_state?.trim() || '',
zip: row.shipping_zip?.trim() || '',
country: row.shipping_country?.trim().toUpperCase() || 'US'
},
source: 'csv_import',
import_line_number: lineNumber,
imported_at: new Date().toISOString()
};
}
// Generate import report
generateImportReport(results, outputPath) {
const report = {
summary: {
total_lines: results.totalLines,
total_batches: results.totalBatches,
successful_imports: 0,
failed_imports: 0,
error_count: results.errors.length
},
batch_details: [],
errors: results.errors
};
results.results.forEach((batch, index) => {
report.summary.successful_imports += batch.successful || 0;
report.summary.failed_imports += batch.failed || 0;
report.batch_details.push({
batch_number: index + 1,
items_processed: batch.totalItems || 0,
successful: batch.successful || 0,
failed: batch.failed || 0,
errors: batch.errors || []
});
});
// Write report to file
fs.writeFileSync(outputPath, JSON.stringify(report, null, 2));
return report;
}
}
// Usage example
const csvProcessor = new CSVBatchProcessor({ concurrency: 3 });
async function importOrdersExample() {
try {
const results = await csvProcessor.importOrdersFromCSV('./orders.csv', {
batchSize: 500
});
console.log(`Processed ${results.totalLines} lines in ${results.totalBatches} batches`);
console.log(`Errors: ${results.errors.length}`);
// Generate and save report
const report = csvProcessor.generateImportReport(results, './import-report.json');
console.log('Import report saved to ./import-report.json');
} catch (error) {
console.error('CSV import failed:', error);
}
}
import { createWriteStream } from 'fs';
import fastCsv from 'fast-csv';
class ExportProcessor extends BatchProcessor {
async exportOrdersToCSV(filters = {}, outputPath, options = {}) {
const {
batchSize = 1000,
includeFields = null,
transform = null
} = options;
const writeStream = createWriteStream(outputPath);
const csvStream = fastCsv.format({ headers: true });
csvStream.pipe(writeStream);
let offset = 0;
let hasMore = true;
let totalExported = 0;
try {
while (hasMore) {
console.log(`Exporting batch starting at offset ${offset}`);
const orders = await this.client.orders.list({
...filters,
limit: batchSize,
offset,
include: ['line_items', 'customer', 'shipping_address']
});
if (orders.orders.length === 0) {
hasMore = false;
break;
}
// Transform orders for CSV export
const transformedOrders = orders.orders.map(order => {
const baseData = {
order_id: order.id,
order_number: order.order_number,
customer_email: order.customer_email,
customer_name: order.customer_name,
status: order.status,
total_amount: order.total_amount,
created_date: order.created_date,
updated_date: order.updated_date
};
// Include line items as separate rows or flatten
if (order.order_line_items && order.order_line_items.length > 0) {
return order.order_line_items.map(item => ({
...baseData,
item_sku: item.sku_name,
item_name: item.product_name,
item_quantity: item.quantity,
item_price: item.sale_price,
item_brand: item.brand
}));
}
return [baseData];
}).flat();
// Apply custom transformation if provided
const finalData = transform ?
transformedOrders.map(transform) :
transformedOrders;
// Filter fields if specified
const filteredData = includeFields ?
finalData.map(row => _.pick(row, includeFields)) :
finalData;
// Write to CSV
for (const row of filteredData) {
csvStream.write(row);
}
totalExported += filteredData.length;
offset += batchSize;
// Check if we have more data
hasMore = orders.orders.length === batchSize;
// Add delay to respect rate limits
await this.delay(100);
}
csvStream.end();
return new Promise((resolve, reject) => {
writeStream.on('finish', () => {
console.log(`Export completed: ${totalExported} records exported to ${outputPath}`);
resolve({ totalExported, outputPath });
});
writeStream.on('error', reject);
});
} catch (error) {
csvStream.end();
throw error;
}
}
// Export with progress tracking
async exportOrdersWithProgress(filters = {}, outputPath, options = {}) {
const {
onProgress = null,
...exportOptions
} = options;
// Get total count first
const countResponse = await this.client.orders.list({
...filters,
limit: 1
});
const totalOrders = countResponse.total_count;
let processedOrders = 0;
// Wrap the export with progress tracking
const progressTracker = {
onProgress: (batch) => {
processedOrders += batch.length;
const progress = Math.round((processedOrders / totalOrders) * 100);
console.log(`Export progress: ${processedOrders}/${totalOrders} (${progress}%)`);
if (onProgress) {
onProgress({
processed: processedOrders,
total: totalOrders,
percentage: progress
});
}
}
};
return await this.exportOrdersToCSV(filters, outputPath, {
...exportOptions,
progressTracker
});
}
}
// Usage example
const exportProcessor = new ExportProcessor();
// Export all orders from the last 30 days
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000).toISOString();
await exportProcessor.exportOrdersWithProgress(
{
created_after: thirtyDaysAgo,
status_in: ['processing', 'shipped', 'delivered']
},
'./orders-export.csv',
{
batchSize: 500,
includeFields: [
'order_id', 'order_number', 'customer_email', 'status',
'total_amount', 'created_date', 'item_sku', 'item_quantity'
],
onProgress: (progress) => {
// Update progress bar or send notification
console.log(`Progress: ${progress.percentage}%`);
}
}
);
import Queue from 'bull';
import Redis from 'ioredis';
class QueuedBatchProcessor extends BatchProcessor {
constructor(options = {}) {
super(options);
this.redis = new Redis(process.env.REDIS_URL);
this.batchQueue = new Queue('batch operations', process.env.REDIS_URL);
this.setupJobProcessors();
this.setupEventHandlers();
}
setupJobProcessors() {
// Process batch creation jobs
this.batchQueue.process('bulk-create-orders', 5, async (job) => {
const { orders, options } = job.data;
job.progress(0);
try {
const result = await this.bulkCreateOrders(orders, {
...options,
onProgress: (progress) => {
job.progress(Math.round((progress.completed / progress.total) * 100));
}
});
job.progress(100);
return result;
} catch (error) {
console.error('Batch job failed:', error);
throw error;
}
});
// Process batch update jobs
this.batchQueue.process('bulk-update-orders', 3, async (job) => {
const { updates, options } = job.data;
job.progress(0);
const result = await this.bulkUpdateOrders(updates, {
...options,
onProgress: (progress) => {
job.progress(Math.round((progress.completed / progress.total) * 100));
}
});
job.progress(100);
return result;
});
// Process CSV import jobs
this.batchQueue.process('csv-import', 1, async (job) => {
const { filePath, type, options } = job.data;
job.progress(0);
let result;
switch (type) {
case 'orders':
result = await this.importOrdersFromCSV(filePath, {
...options,
onProgress: (progress) => {
job.progress(Math.round((progress.completed / progress.total) * 100));
}
});
break;
case 'inventory':
result = await this.importInventoryFromCSV(filePath, options);
break;
default:
throw new Error(`Unsupported import type: ${type}`);
}
job.progress(100);
return result;
});
}
setupEventHandlers() {
this.batchQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed successfully`);
this.notifyJobCompletion(job, result, 'completed');
});
this.batchQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err);
this.notifyJobCompletion(job, null, 'failed', err);
});
this.batchQueue.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
}
async queueBulkCreateOrders(orders, options = {}) {
const job = await this.batchQueue.add('bulk-create-orders', {
orders,
options
}, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: 10,
removeOnFail: 5
});
return {
jobId: job.id,
status: 'queued'
};
}
async queueCSVImport(filePath, type, options = {}) {
const job = await this.batchQueue.add('csv-import', {
filePath,
type,
options
}, {
attempts: 2,
timeout: 300000, // 5 minutes
removeOnComplete: 5,
removeOnFail: 3
});
return {
jobId: job.id,
status: 'queued'
};
}
async getJobStatus(jobId) {
const job = await this.batchQueue.getJob(jobId);
if (!job) {
return { status: 'not_found' };
}
const state = await job.getState();
const progress = job.progress();
return {
jobId: job.id,
status: state,
progress,
data: job.data,
result: job.returnvalue,
error: job.failedReason,
created: job.timestamp,
processed: job.processedOn,
finished: job.finishedOn
};
}
async notifyJobCompletion(job, result, status, error = null) {
const notification = {
jobId: job.id,
jobType: job.name,
status,
timestamp: new Date().toISOString(),
result,
error: error?.message
};
// Send webhook notification
if (process.env.WEBHOOK_NOTIFICATION_URL) {
try {
await fetch(process.env.WEBHOOK_NOTIFICATION_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(notification)
});
} catch (webhookError) {
console.error('Failed to send webhook notification:', webhookError);
}
}
// Store notification in database or send email
// Implementation depends on your notification system
}
}
// Usage example
const queueProcessor = new QueuedBatchProcessor();
// Queue a bulk order creation job
const createJob = await queueProcessor.queueBulkCreateOrders(sampleOrders, {
validateInput: true,
notifyOnCompletion: true
});
console.log(`Bulk create job queued with ID: ${createJob.jobId}`);
// Check job status
setInterval(async () => {
const status = await queueProcessor.getJobStatus(createJob.jobId);
console.log(`Job ${createJob.jobId} status:`, status);
if (['completed', 'failed'].includes(status.status)) {
clearInterval(this);
}
}, 5000);
class StreamingBatchProcessor extends BatchProcessor {
async streamProcessLargeDataset(dataStream, processor, options = {}) {
const {
batchSize = 1000,
maxMemoryUsage = 512 * 1024 * 1024, // 512MB
gcInterval = 10000 // Force GC every 10k items
} = options;
let batch = [];
let processedCount = 0;
let errorCount = 0;
return new Promise((resolve, reject) => {
dataStream
.on('data', async (item) => {
batch.push(item);
if (batch.length >= batchSize) {
try {
// Pause stream while processing batch
dataStream.pause();
const result = await processor(batch);
processedCount += result.successful || 0;
errorCount += result.failed || 0;
// Clear batch and force garbage collection if needed
batch = [];
if (processedCount % gcInterval === 0) {
this.forceGarbageCollection();
this.checkMemoryUsage(maxMemoryUsage);
}
// Resume stream
dataStream.resume();
} catch (error) {
dataStream.destroy(error);
return;
}
}
})
.on('end', async () => {
try {
// Process final batch
if (batch.length > 0) {
const result = await processor(batch);
processedCount += result.successful || 0;
errorCount += result.failed || 0;
}
resolve({
processed: processedCount,
errors: errorCount,
total: processedCount + errorCount
});
} catch (error) {
reject(error);
}
})
.on('error', reject);
});
}
forceGarbageCollection() {
if (global.gc) {
global.gc();
console.log('Forced garbage collection');
}
}
checkMemoryUsage(maxUsage) {
const used = process.memoryUsage();
if (used.heapUsed > maxUsage) {
console.warn(`High memory usage detected: ${Math.round(used.heapUsed / 1024 / 1024)}MB`);
if (used.heapUsed > maxUsage * 1.5) {
throw new Error('Memory usage too high, aborting batch processing');
}
}
}
// Optimized batch processing with connection pooling
async optimizedBatchProcess(data, operation, options = {}) {
const {
connectionPoolSize = 10,
maxRetries = 3,
retryDelay = 1000
} = options;
// Create connection pool
const pool = Array.from({ length: connectionPoolSize }, () =>
new StateSetClient({
apiKey: process.env.STATESET_API_KEY,
timeout: 60000,
keepAlive: true
})
);
let poolIndex = 0;
const getClient = () => {
const client = pool[poolIndex % pool.length];
poolIndex++;
return client;
};
try {
const processLimit = pLimit(connectionPoolSize);
const results = await Promise.allSettled(
data.map(item =>
processLimit(async () => {
let attempt = 0;
let lastError;
while (attempt < maxRetries) {
try {
const client = getClient();
const result = await this.executeOperation(client, operation, item);
return { success: true, result, item };
} catch (error) {
lastError = error;
attempt++;
if (attempt < maxRetries) {
await this.delay(retryDelay * Math.pow(2, attempt - 1));
}
}
}
return { success: false, error: lastError.message, item };
})
)
);
return this.compileResults(results);
} finally {
// Clean up connection pool
for (const client of pool) {
if (client.destroy) {
client.destroy();
}
}
}
}
async executeOperation(client, operation, item) {
switch (operation) {
case 'create_order':
return await client.orders.create(item);
case 'update_order':
return await client.orders.update(item.id, item.data);
case 'update_inventory':
return await client.inventory.update(item.sku, item.data);
default:
throw new Error(`Unsupported operation: ${operation}`);
}
}
}
class BatchMonitor {
constructor() {
this.metrics = {
totalOperations: 0,
successfulOperations: 0,
failedOperations: 0,
averageProcessingTime: 0,
peakMemoryUsage: 0,
operationHistory: []
};
}
recordOperation(operation, duration, success, memoryUsage) {
this.metrics.totalOperations++;
if (success) {
this.metrics.successfulOperations++;
} else {
this.metrics.failedOperations++;
}
// Update average processing time
const totalTime = this.metrics.averageProcessingTime * (this.metrics.totalOperations - 1) + duration;
this.metrics.averageProcessingTime = totalTime / this.metrics.totalOperations;
// Track peak memory usage
if (memoryUsage > this.metrics.peakMemoryUsage) {
this.metrics.peakMemoryUsage = memoryUsage;
}
// Store operation history (keep last 1000)
this.metrics.operationHistory.push({
operation,
duration,
success,
memoryUsage,
timestamp: new Date().toISOString()
});
if (this.metrics.operationHistory.length > 1000) {
this.metrics.operationHistory.shift();
}
}
getSuccessRate() {
if (this.metrics.totalOperations === 0) return 0;
return (this.metrics.successfulOperations / this.metrics.totalOperations) * 100;
}
generateReport() {
return {
summary: {
total_operations: this.metrics.totalOperations,
successful_operations: this.metrics.successfulOperations,
failed_operations: this.metrics.failedOperations,
success_rate: `${this.getSuccessRate().toFixed(2)}%`,
average_processing_time: `${this.metrics.averageProcessingTime.toFixed(2)}ms`,
peak_memory_usage: `${(this.metrics.peakMemoryUsage / 1024 / 1024).toFixed(2)}MB`
},
recent_operations: this.metrics.operationHistory.slice(-10),
recommendations: this.generateRecommendations()
};
}
generateRecommendations() {
const recommendations = [];
const successRate = this.getSuccessRate();
if (successRate < 95) {
recommendations.push('Consider reducing batch size or improving error handling');
}
if (this.metrics.averageProcessingTime > 30000) {
recommendations.push('Processing time is high, consider optimizing batch operations');
}
if (this.metrics.peakMemoryUsage > 1024 * 1024 * 1024) {
recommendations.push('High memory usage detected, consider streaming or smaller batches');
}
return recommendations;
}
}
// Enhanced batch processor with monitoring
class MonitoredBatchProcessor extends StreamingBatchProcessor {
constructor(options = {}) {
super(options);
this.monitor = new BatchMonitor();
}
async processBatch(operation, data, options = {}) {
const startTime = Date.now();
const startMemory = process.memoryUsage().heapUsed;
try {
const result = await super.processBatch(operation, data, options);
const duration = Date.now() - startTime;
const endMemory = process.memoryUsage().heapUsed;
this.monitor.recordOperation(operation, duration, true, endMemory);
return {
...result,
processingTime: duration,
memoryUsed: endMemory - startMemory
};
} catch (error) {
const duration = Date.now() - startTime;
const endMemory = process.memoryUsage().heapUsed;
this.monitor.recordOperation(operation, duration, false, endMemory);
throw error;
}
}
getMonitoringReport() {
return this.monitor.generateReport();
}
}
Memory Issues
Rate Limiting
Data Quality
Performance