In Flatfile, a Job represents a large unit of work performed asynchronously on a resource such as a file, Workbook, or Sheet. The Jobs workflow provides visibility into the status and progress of your Jobs, allowing you to monitor and troubleshoot the data processing pipeline. For handling large datasets, see our multi-part jobs guide.
Types of Jobs
Jobs can be triggered in a number of ways, most commonly in response to user activity. Jobs are then managed via Listeners, which receive Events published by Flatfile in response to activity.
There are three types of Jobs on the Flatfile Platform:
- Action Jobs - Attached to custom actions
- Custom Jobs - Created dynamically in your listener
- System Jobs - Flatfile system jobs
Job Types
File Processing Jobs
Handle file upload and extraction operations:
// File extraction job
const extractJob = await api.jobs.create({
type: "file",
operation: "extract",
source: fileId,
config: {
delimiter: ",",
encoding: "utf-8",
headers: true,
skipRows: 1,
},
});
// File validation job
const validateJob = await api.jobs.create({
type: "file",
operation: "validate",
source: fileId,
config: {
maxFileSize: "50MB",
allowedTypes: ["csv", "xlsx", "json"],
scanForViruses: true,
},
});
Workbook Processing Jobs
Process workbook data and records:
// Workbook mapping job
const mapJob = await api.jobs.create({
type: "workbook",
operation: "map",
source: workbookId,
config: {
autoMap: true,
confidence: 0.8,
strategy: "fuzzy",
},
});
// Record validation job
const validateRecordsJob = await api.jobs.create({
type: "workbook",
operation: "validate-records",
source: workbookId,
config: {
validateRequiredFields: true,
validateDataTypes: true,
validateBusinessRules: true,
},
});
// Data export job
const exportJob = await api.jobs.create({
type: "workbook",
operation: "export",
source: workbookId,
config: {
format: "json",
includeMetadata: false,
filterInvalidRecords: true,
},
});
Custom Processing Jobs
Execute custom business logic:
// Custom data transformation
const transformJob = await api.jobs.create({
type: "custom",
operation: "transform-customer-data",
source: workbookId,
config: {
transformations: [
{ field: "name", operation: "titleCase" },
{ field: "email", operation: "lowercase" },
{ field: "phone", operation: "formatPhone" },
],
},
});
// Data enrichment job
const enrichJob = await api.jobs.create({
type: "custom",
operation: "enrich-data",
source: workbookId,
config: {
enrichmentService: "clearbit",
fields: ["company", "jobTitle", "industry"],
batchSize: 100,
},
});
Job Configuration
Basic Job Structure
const job = await api.jobs.create({
// Job identification
type: "workbook", // 'file', 'workbook', 'custom'
operation: "transform", // Specific operation to perform
// Data source
source: workbookId, // Source resource ID
// Job configuration
config: {
// Operation-specific settings
batchSize: 1000,
timeout: 300000,
retryAttempts: 3,
},
// Execution options
options: {
priority: "normal", // 'low', 'normal', 'high'
schedule: "immediate", // 'immediate', ISO date, cron expression
dependencies: [], // Other job IDs this job depends on
// Resource constraints
resources: {
memory: "1GB",
cpu: "1000m",
timeout: 600000,
},
},
});
Advanced Job Configuration
const complexJob = await api.jobs.create({
type: "pipeline",
operation: "customer-import-pipeline",
// Pipeline configuration
config: {
stages: [
{
name: "extract",
type: "file",
operation: "extract",
config: { delimiter: "," },
},
{
name: "validate",
type: "workbook",
operation: "validate-records",
dependsOn: ["extract"],
},
{
name: "transform",
type: "custom",
operation: "apply-business-rules",
dependsOn: ["validate"],
},
{
name: "export",
type: "workbook",
operation: "export",
dependsOn: ["transform"],
},
],
},
// Pipeline options
options: {
failureStrategy: "stop-on-error", // 'continue', 'stop-on-error', 'retry'
parallelism: 2,
timeout: 1800000, // 30 minutes
},
});
Job Monitoring
Job Status Tracking
// Get job status
const job = await api.jobs.get(jobId);
console.log(`Status: ${job.status}`); // 'queued', 'running', 'completed', 'failed'
console.log(`Progress: ${job.progress}%`); // Completion percentage
console.log(`Records: ${job.recordsProcessed}`); // Records processed so far
// Job timing information
console.log(`Created: ${job.createdAt}`);
console.log(`Started: ${job.startedAt}`);
console.log(`Completed: ${job.completedAt}`);
console.log(`Duration: ${job.duration}ms`);
Real-time Progress Updates
// Listen for job progress events
const jobListener = FlatfileListener.create((client) => {
listener.on(
"job:updated",
{
filter: (event) => event.context.jobId === jobId,
},
(event) => {
const job = event.context.job;
console.log(`Job ${job.id} progress: ${job.progress}%`);
// Update UI progress bar
updateProgressBar(job.progress);
// Show stage information
if (job.currentStage) {
updateStatus(`Processing: ${job.currentStage.name}`);
}
}
);
listener.on(
"job:completed",
{
filter: (event) => event.context.jobId === jobId,
},
(event) => {
const job = event.context.job;
console.log(`Job completed successfully!`);
console.log(
`Processed ${job.recordsProcessed} records in ${job.duration}ms`
);
// Handle successful completion
handleJobSuccess(job);
}
);
listener.on(
"job:failed",
{
filter: (event) => event.context.jobId === jobId,
},
(event) => {
const job = event.context.job;
console.error(`Job failed: ${job.error}`);
// Handle job failure
handleJobFailure(job);
}
);
});
Batch Job Monitoring
// Monitor multiple jobs
const jobIds = ["job_1", "job_2", "job_3"];
const jobStatuses = await Promise.all(jobIds.map((id) => api.jobs.get(id)));
// Calculate overall progress
const totalProgress =
jobStatuses.reduce((sum, job) => sum + job.progress, 0) / jobStatuses.length;
// Check if all jobs completed
const allCompleted = jobStatuses.every((job) => job.status === "completed");
// Get failed jobs
const failedJobs = jobStatuses.filter((job) => job.status === "failed");
if (failedJobs.length > 0) {
console.log(
`${failedJobs.length} jobs failed:`,
failedJobs.map((j) => j.id)
);
}
Job Results and Output
Retrieving Job Results
// Get job completion data
const job = await api.jobs.get(jobId);
if (job.status === "completed") {
// Get processed data
const result = await api.jobs.getResult(jobId);
console.log(`Records processed: ${result.recordCount}`);
console.log(`Errors found: ${result.errorCount}`);
console.log(`Processing time: ${result.duration}ms`);
// Access output data
if (result.data) {
console.log("Processed data:", result.data);
}
// Download exported files
if (result.fileUrl) {
const file = await downloadFile(result.fileUrl);
console.log("Downloaded exported data");
}
}
Job Artifacts
// Get job artifacts (logs, reports, exports)
const artifacts = await api.jobs.getArtifacts(jobId);
artifacts.forEach((artifact) => {
console.log(`Artifact: ${artifact.name}`);
console.log(`Type: ${artifact.type}`);
console.log(`Size: ${artifact.size} bytes`);
console.log(`URL: ${artifact.downloadUrl}`);
});
// Download specific artifact
const logFile = artifacts.find((a) => a.type === "log");
if (logFile) {
const logs = await downloadFile(logFile.downloadUrl);
console.log("Job logs:", logs);
}
Error Handling
Job Error Management
// Handle job errors
const jobListener = FlatfileListener.create((client) => {
listener.on("job:failed", async (event) => {
const job = event.context.job;
console.error(`Job ${job.id} failed:`, job.error);
// Get detailed error information
const errorDetails = await api.jobs.getErrors(job.id);
errorDetails.forEach((error) => {
console.error(`Error in ${error.stage}:`, error.message);
if (error.recordId) {
console.error(`Failed record: ${error.recordId}`);
}
});
// Implement retry logic
if (job.retryAttempts < 3) {
await retryJob(job.id);
} else {
await escalateJobFailure(job);
}
});
});
// Retry failed job
async function retryJob(jobId) {
const originalJob = await api.jobs.get(jobId);
const retryJob = await api.jobs.create({
...originalJob.config,
options: {
...originalJob.options,
retryAttempts: (originalJob.retryAttempts || 0) + 1,
retryDelay: Math.pow(2, originalJob.retryAttempts || 0) * 1000, // Exponential backoff
},
});
console.log(`Retrying job as ${retryJob.id}`);
return retryJob;
}
Partial Failure Handling
// Handle jobs with partial failures
const job = await api.jobs.get(jobId);
if (job.status === "completed" && job.errorCount > 0) {
console.log(`Job completed with ${job.errorCount} errors`);
// Get failed records
const failedRecords = await api.jobs.getFailedRecords(jobId);
// Process failed records separately
for (const record of failedRecords) {
try {
await processRecordManually(record);
} catch (error) {
console.error(`Manual processing failed for record ${record.id}:`, error);
}
}
// Generate error report
await generateErrorReport(jobId, failedRecords);
}
Job Scheduling
// Execute job immediately
const job = await api.jobs.create({
type: "workbook",
operation: "export",
source: workbookId,
options: {
schedule: "immediate",
},
});
Delayed Execution
// Schedule job for later execution
const delayedJob = await api.jobs.create({
type: "custom",
operation: "daily-report",
source: workbookId,
options: {
schedule: new Date(Date.now() + 3600000).toISOString(), // 1 hour from current time
},
});
Recurring Jobs
// Create recurring job with cron expression
const recurringJob = await api.jobs.create({
type: "custom",
operation: "data-sync",
source: workbookId,
options: {
schedule: "0 2 * * *", // Daily at 2 AM
recurring: true,
timezone: "America/New_York",
},
});
Job Dependencies
Sequential Jobs
// Create dependent jobs
const extractJob = await api.jobs.create({
type: "file",
operation: "extract",
source: fileId,
});
const mapJob = await api.jobs.create({
type: "workbook",
operation: "map",
source: workbookId,
options: {
dependencies: [extractJob.id],
},
});
const exportJob = await api.jobs.create({
type: "workbook",
operation: "export",
source: workbookId,
options: {
dependencies: [mapJob.id],
},
});
Parallel Jobs with Synchronization
// Create parallel processing jobs
const validationJob = await api.jobs.create({
type: "workbook",
operation: "validate",
source: workbookId,
options: { dependencies: [extractJob.id] },
});
const enrichmentJob = await api.jobs.create({
type: "custom",
operation: "enrich",
source: workbookId,
options: { dependencies: [extractJob.id] },
});
// Wait for both to complete before final export
const finalExportJob = await api.jobs.create({
type: "workbook",
operation: "export",
source: workbookId,
options: {
dependencies: [validationJob.id, enrichmentJob.id],
},
});
Job Batching
// Configure optimal batch sizes
const batchJob = await api.jobs.create({
type: "workbook",
operation: "transform",
source: workbookId,
config: {
batchSize: 1000, // Process 1000 records at a time
parallelBatches: 4, // Process 4 batches concurrently
batchTimeout: 30000, // 30 second timeout per batch
},
});
Resource Management
// Configure job resources
const resourceOptimizedJob = await api.jobs.create({
type: "custom",
operation: "heavy-processing",
source: workbookId,
options: {
resources: {
memory: "2GB", // Allocate 2GB memory
cpu: "2000m", // Allocate 2 CPU cores
timeout: 1800000, // 30 minute timeout
priority: "high", // High priority execution
},
},
});
Best Practices
Job Design
- Keep jobs focused on specific tasks
- Use appropriate batch sizes for optimal performance
- Implement proper error handling and recovery
- Design jobs to be idempotent when possible
Monitoring
- Track job progress and performance metrics
- Set up alerts for job failures and delays
- Monitor resource usage and optimize as needed
- Log important events and errors
Error Handling
- Implement graceful failure handling
- Use retry logic with exponential backoff
- Provide clear error messages and context
- Handle partial failures appropriately
- Optimize batch sizes for your data
- Use parallel processing when appropriate
- Monitor and tune resource allocation
- Implement caching for expensive operations
- Listeners - Event handlers that create and monitor jobs
- Workbooks - Data containers that jobs process
- Records - Individual data items that jobs transform
- Actions - Custom operations that trigger jobs
- Plugins - Reusable components that jobs can utilize
Jobs provide the foundation for asynchronous data processing in Flatfile, enabling scalable, reliable handling of complex data transformation and import workflows.