In Flatfile, a Job represents a large unit of work performed asynchronously on a resource such as a file, Workbook, or Sheet. The Jobs workflow provides visibility into the status and progress of your Jobs, allowing you to monitor and troubleshoot the data processing pipeline. For handling large datasets, see our multi-part jobs guide.

Types of Jobs

Jobs can be triggered in a number of ways, most commonly in response to user activity. Jobs are then managed via Listeners, which receive Events published by Flatfile in response to activity.

There are three types of Jobs on the Flatfile Platform:

  • Action Jobs - Attached to custom actions
  • Custom Jobs - Created dynamically in your listener
  • System Jobs - Flatfile system jobs

Job Types

File Processing Jobs

Handle file upload and extraction operations:

// File extraction job
const extractJob = await api.jobs.create({
  type: "file",
  operation: "extract",
  source: fileId,
  config: {
    delimiter: ",",
    encoding: "utf-8",
    headers: true,
    skipRows: 1,
  },
});

// File validation job
const validateJob = await api.jobs.create({
  type: "file",
  operation: "validate",
  source: fileId,
  config: {
    maxFileSize: "50MB",
    allowedTypes: ["csv", "xlsx", "json"],
    scanForViruses: true,
  },
});

Workbook Processing Jobs

Process workbook data and records:

// Workbook mapping job
const mapJob = await api.jobs.create({
  type: "workbook",
  operation: "map",
  source: workbookId,
  config: {
    autoMap: true,
    confidence: 0.8,
    strategy: "fuzzy",
  },
});

// Record validation job
const validateRecordsJob = await api.jobs.create({
  type: "workbook",
  operation: "validate-records",
  source: workbookId,
  config: {
    validateRequiredFields: true,
    validateDataTypes: true,
    validateBusinessRules: true,
  },
});

// Data export job
const exportJob = await api.jobs.create({
  type: "workbook",
  operation: "export",
  source: workbookId,
  config: {
    format: "json",
    includeMetadata: false,
    filterInvalidRecords: true,
  },
});

Custom Processing Jobs

Execute custom business logic:

// Custom data transformation
const transformJob = await api.jobs.create({
  type: "custom",
  operation: "transform-customer-data",
  source: workbookId,
  config: {
    transformations: [
      { field: "name", operation: "titleCase" },
      { field: "email", operation: "lowercase" },
      { field: "phone", operation: "formatPhone" },
    ],
  },
});

// Data enrichment job
const enrichJob = await api.jobs.create({
  type: "custom",
  operation: "enrich-data",
  source: workbookId,
  config: {
    enrichmentService: "clearbit",
    fields: ["company", "jobTitle", "industry"],
    batchSize: 100,
  },
});

Job Configuration

Basic Job Structure

const job = await api.jobs.create({
  // Job identification
  type: "workbook", // 'file', 'workbook', 'custom'
  operation: "transform", // Specific operation to perform

  // Data source
  source: workbookId, // Source resource ID

  // Job configuration
  config: {
    // Operation-specific settings
    batchSize: 1000,
    timeout: 300000,
    retryAttempts: 3,
  },

  // Execution options
  options: {
    priority: "normal", // 'low', 'normal', 'high'
    schedule: "immediate", // 'immediate', ISO date, cron expression
    dependencies: [], // Other job IDs this job depends on

    // Resource constraints
    resources: {
      memory: "1GB",
      cpu: "1000m",
      timeout: 600000,
    },
  },
});

Advanced Job Configuration

const complexJob = await api.jobs.create({
  type: "pipeline",
  operation: "customer-import-pipeline",

  // Pipeline configuration
  config: {
    stages: [
      {
        name: "extract",
        type: "file",
        operation: "extract",
        config: { delimiter: "," },
      },
      {
        name: "validate",
        type: "workbook",
        operation: "validate-records",
        dependsOn: ["extract"],
      },
      {
        name: "transform",
        type: "custom",
        operation: "apply-business-rules",
        dependsOn: ["validate"],
      },
      {
        name: "export",
        type: "workbook",
        operation: "export",
        dependsOn: ["transform"],
      },
    ],
  },

  // Pipeline options
  options: {
    failureStrategy: "stop-on-error", // 'continue', 'stop-on-error', 'retry'
    parallelism: 2,
    timeout: 1800000, // 30 minutes
  },
});

Job Monitoring

Job Status Tracking

// Get job status
const job = await api.jobs.get(jobId);

console.log(`Status: ${job.status}`); // 'queued', 'running', 'completed', 'failed'
console.log(`Progress: ${job.progress}%`); // Completion percentage
console.log(`Records: ${job.recordsProcessed}`); // Records processed so far

// Job timing information
console.log(`Created: ${job.createdAt}`);
console.log(`Started: ${job.startedAt}`);
console.log(`Completed: ${job.completedAt}`);
console.log(`Duration: ${job.duration}ms`);

Real-time Progress Updates

// Listen for job progress events
const jobListener = FlatfileListener.create((client) => {
  listener.on(
    "job:updated",
    {
      filter: (event) => event.context.jobId === jobId,
    },
    (event) => {
      const job = event.context.job;

      console.log(`Job ${job.id} progress: ${job.progress}%`);

      // Update UI progress bar
      updateProgressBar(job.progress);

      // Show stage information
      if (job.currentStage) {
        updateStatus(`Processing: ${job.currentStage.name}`);
      }
    }
  );

  listener.on(
    "job:completed",
    {
      filter: (event) => event.context.jobId === jobId,
    },
    (event) => {
      const job = event.context.job;

      console.log(`Job completed successfully!`);
      console.log(
        `Processed ${job.recordsProcessed} records in ${job.duration}ms`
      );

      // Handle successful completion
      handleJobSuccess(job);
    }
  );

  listener.on(
    "job:failed",
    {
      filter: (event) => event.context.jobId === jobId,
    },
    (event) => {
      const job = event.context.job;

      console.error(`Job failed: ${job.error}`);

      // Handle job failure
      handleJobFailure(job);
    }
  );
});

Batch Job Monitoring

// Monitor multiple jobs
const jobIds = ["job_1", "job_2", "job_3"];

const jobStatuses = await Promise.all(jobIds.map((id) => api.jobs.get(id)));

// Calculate overall progress
const totalProgress =
  jobStatuses.reduce((sum, job) => sum + job.progress, 0) / jobStatuses.length;

// Check if all jobs completed
const allCompleted = jobStatuses.every((job) => job.status === "completed");

// Get failed jobs
const failedJobs = jobStatuses.filter((job) => job.status === "failed");

if (failedJobs.length > 0) {
  console.log(
    `${failedJobs.length} jobs failed:`,
    failedJobs.map((j) => j.id)
  );
}

Job Results and Output

Retrieving Job Results

// Get job completion data
const job = await api.jobs.get(jobId);

if (job.status === "completed") {
  // Get processed data
  const result = await api.jobs.getResult(jobId);

  console.log(`Records processed: ${result.recordCount}`);
  console.log(`Errors found: ${result.errorCount}`);
  console.log(`Processing time: ${result.duration}ms`);

  // Access output data
  if (result.data) {
    console.log("Processed data:", result.data);
  }

  // Download exported files
  if (result.fileUrl) {
    const file = await downloadFile(result.fileUrl);
    console.log("Downloaded exported data");
  }
}

Job Artifacts

// Get job artifacts (logs, reports, exports)
const artifacts = await api.jobs.getArtifacts(jobId);

artifacts.forEach((artifact) => {
  console.log(`Artifact: ${artifact.name}`);
  console.log(`Type: ${artifact.type}`);
  console.log(`Size: ${artifact.size} bytes`);
  console.log(`URL: ${artifact.downloadUrl}`);
});

// Download specific artifact
const logFile = artifacts.find((a) => a.type === "log");
if (logFile) {
  const logs = await downloadFile(logFile.downloadUrl);
  console.log("Job logs:", logs);
}

Error Handling

Job Error Management

// Handle job errors
const jobListener = FlatfileListener.create((client) => {
  listener.on("job:failed", async (event) => {
    const job = event.context.job;

    console.error(`Job ${job.id} failed:`, job.error);

    // Get detailed error information
    const errorDetails = await api.jobs.getErrors(job.id);

    errorDetails.forEach((error) => {
      console.error(`Error in ${error.stage}:`, error.message);

      if (error.recordId) {
        console.error(`Failed record: ${error.recordId}`);
      }
    });

    // Implement retry logic
    if (job.retryAttempts < 3) {
      await retryJob(job.id);
    } else {
      await escalateJobFailure(job);
    }
  });
});

// Retry failed job
async function retryJob(jobId) {
  const originalJob = await api.jobs.get(jobId);

  const retryJob = await api.jobs.create({
    ...originalJob.config,
    options: {
      ...originalJob.options,
      retryAttempts: (originalJob.retryAttempts || 0) + 1,
      retryDelay: Math.pow(2, originalJob.retryAttempts || 0) * 1000, // Exponential backoff
    },
  });

  console.log(`Retrying job as ${retryJob.id}`);
  return retryJob;
}

Partial Failure Handling

// Handle jobs with partial failures
const job = await api.jobs.get(jobId);

if (job.status === "completed" && job.errorCount > 0) {
  console.log(`Job completed with ${job.errorCount} errors`);

  // Get failed records
  const failedRecords = await api.jobs.getFailedRecords(jobId);

  // Process failed records separately
  for (const record of failedRecords) {
    try {
      await processRecordManually(record);
    } catch (error) {
      console.error(`Manual processing failed for record ${record.id}:`, error);
    }
  }

  // Generate error report
  await generateErrorReport(jobId, failedRecords);
}

Job Scheduling

Immediate Execution

// Execute job immediately
const job = await api.jobs.create({
  type: "workbook",
  operation: "export",
  source: workbookId,
  options: {
    schedule: "immediate",
  },
});

Delayed Execution

// Schedule job for later execution
const delayedJob = await api.jobs.create({
  type: "custom",
  operation: "daily-report",
  source: workbookId,
  options: {
    schedule: new Date(Date.now() + 3600000).toISOString(), // 1 hour from current time
  },
});

Recurring Jobs

// Create recurring job with cron expression
const recurringJob = await api.jobs.create({
  type: "custom",
  operation: "data-sync",
  source: workbookId,
  options: {
    schedule: "0 2 * * *", // Daily at 2 AM
    recurring: true,
    timezone: "America/New_York",
  },
});

Job Dependencies

Sequential Jobs

// Create dependent jobs
const extractJob = await api.jobs.create({
  type: "file",
  operation: "extract",
  source: fileId,
});

const mapJob = await api.jobs.create({
  type: "workbook",
  operation: "map",
  source: workbookId,
  options: {
    dependencies: [extractJob.id],
  },
});

const exportJob = await api.jobs.create({
  type: "workbook",
  operation: "export",
  source: workbookId,
  options: {
    dependencies: [mapJob.id],
  },
});

Parallel Jobs with Synchronization

// Create parallel processing jobs
const validationJob = await api.jobs.create({
  type: "workbook",
  operation: "validate",
  source: workbookId,
  options: { dependencies: [extractJob.id] },
});

const enrichmentJob = await api.jobs.create({
  type: "custom",
  operation: "enrich",
  source: workbookId,
  options: { dependencies: [extractJob.id] },
});

// Wait for both to complete before final export
const finalExportJob = await api.jobs.create({
  type: "workbook",
  operation: "export",
  source: workbookId,
  options: {
    dependencies: [validationJob.id, enrichmentJob.id],
  },
});

Performance Optimization

Job Batching

// Configure optimal batch sizes
const batchJob = await api.jobs.create({
  type: "workbook",
  operation: "transform",
  source: workbookId,
  config: {
    batchSize: 1000, // Process 1000 records at a time
    parallelBatches: 4, // Process 4 batches concurrently
    batchTimeout: 30000, // 30 second timeout per batch
  },
});

Resource Management

// Configure job resources
const resourceOptimizedJob = await api.jobs.create({
  type: "custom",
  operation: "heavy-processing",
  source: workbookId,
  options: {
    resources: {
      memory: "2GB", // Allocate 2GB memory
      cpu: "2000m", // Allocate 2 CPU cores
      timeout: 1800000, // 30 minute timeout
      priority: "high", // High priority execution
    },
  },
});

Best Practices

Job Design

  • Keep jobs focused on specific tasks
  • Use appropriate batch sizes for optimal performance
  • Implement proper error handling and recovery
  • Design jobs to be idempotent when possible

Monitoring

  • Track job progress and performance metrics
  • Set up alerts for job failures and delays
  • Monitor resource usage and optimize as needed
  • Log important events and errors

Error Handling

  • Implement graceful failure handling
  • Use retry logic with exponential backoff
  • Provide clear error messages and context
  • Handle partial failures appropriately

Performance

  • Optimize batch sizes for your data
  • Use parallel processing when appropriate
  • Monitor and tune resource allocation
  • Implement caching for expensive operations
  • Listeners - Event handlers that create and monitor jobs
  • Workbooks - Data containers that jobs process
  • Records - Individual data items that jobs transform
  • Actions - Custom operations that trigger jobs
  • Plugins - Reusable components that jobs can utilize

Jobs provide the foundation for asynchronous data processing in Flatfile, enabling scalable, reliable handling of complex data transformation and import workflows.