Skip to content

ElasticJob DataflowJob vs SimpleJob: Which Job Type Should You Use?

I spent an hour staring at ElasticJob’s documentation, trying to figure out which job type I should use for my batch processing task. SimpleJob looked straightforward—just implement one method. But DataflowJob had two methods and something called “streaming mode.”

The documentation explained what each does, but not when to use which. After building several jobs and learning from mistakes, here’s what I wish someone had told me upfront.

The Core Difference

The choice comes down to one question: Do you need to keep processing items until a queue is empty?

Job Type Decision Flow
┌─────────────────────┐
│ Your Task │
└──────────┬──────────┘
┌───────────────────────────────┐
│ Do you fetch items from │
│ DB/queue and process them │
│ in batches? │
└───────────────┬───────────────┘
┌────────────────┴────────────────┐
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ YES │ │ NO │
│ DataflowJob │ │ SimpleJob │
│ │ │ │
│ - fetchData() │ │ - execute() │
│ - processData() │ │ - One method │
│ - Streaming │ │ - One shot │
└─────────────────┘ └─────────────────┘

SimpleJob: The One-Shot Executor

SimpleJob is for tasks that run once per trigger. You put all your logic in a single execute() method.

I started with SimpleJob because it seemed simpler. Here’s what it looks like:

SimpleJobExample.java
public class ReportGenerationJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
int shardId = context.getShardingItem();
// Everything happens here
List<User> users = userRepository.findByShard(shardId);
Report report = generateReport(users);
reportRepository.save(report);
log.info("Generated report for shard {}", shardId);
}
}

The cron trigger fires, execute() runs once, and you’re done.

When SimpleJob works well:

  • Generating daily reports
  • Sending scheduled notifications
  • Running cleanup tasks
  • Calling external APIs
  • File operations

The key characteristic: you know exactly what needs to happen when the job fires. There’s no “keep going until empty” logic.

DataflowJob: The Batch Processor

DataflowJob separates data fetching from processing. This separation matters when you have large datasets or queues.

My first attempt at batch processing used SimpleJob:

BadBatchJob.java
public class OrderProcessingJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
// Problem: How many orders to process?
// Process too few = slow progress
// Process too many = timeout risk
List<Order> orders = orderRepository.findUnprocessed(1000);
for (Order order : orders) {
processOrder(order);
}
// What if there are 5000 more orders waiting?
// They won't be processed until next cron trigger
}
}

This approach has a flaw: if there are more orders than the batch size, they wait until the next trigger. With a 5-minute cron, that’s a 5-minute delay minimum.

DataflowJob solves this with streaming mode:

OrderDataflowJob.java
public class OrderDataflowJob implements DataflowJob<Order> {
@Override
public List<Order> fetchData(ShardingContext context) {
int shardId = context.getShardingItem();
// Fetch a batch of unprocessed orders for this shard
return orderRepository.findUnprocessed(shardId, 100);
}
@Override
public void processData(ShardingContext context, List<Order> orders) {
for (Order order : orders) {
processOrder(order);
order.markProcessed();
}
orderRepository.saveAll(orders);
}
}

Streaming Mode: The Game Changer

Here’s where DataflowJob shines. With streaming enabled, ElasticJob loops:

Streaming Mode Execution Flow
┌─────────────────────────────────────────────────────────────┐
│ STREAMING MODE ENABLED │
├─────────────────────────────────────────────────────────────┤
│ │
│ Cron fires ──┐ │
│ │ │
│ ▼ │
│ ┌────────────────┐ │
│ │ fetchData() │ │
│ └───────┬────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────┐ ┌──────────────────┐ │
│ │ Returns items? │─YES─►│ processData() │ │
│ └───────┬────────┘ └────────┬─────────┘ │
│ │ │ │
│ NO │ │
│ │ │ │
│ ▼ ▼ │
│ ┌────────────────┐ ┌──────────────────┐ │
│ │ Job ends │◄─────│ Loop back to │ │
│ └────────────────┘ │ fetchData() │ │
│ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

Enable streaming mode in your job configuration:

JobConfiguration.java
JobConfiguration jobConfig = JobConfiguration.newBuilder("OrderDataflowJob", 3)
.cron("0/5 * * * * ?")
.setProperty("streaming.process", "true") // Enable streaming
.build();

When streaming is enabled:

  1. fetchData() is called
  2. If it returns items, processData() handles them
  3. fetchData() is called again
  4. Repeat until fetchData() returns null or empty list
  5. Job completes

This means you can process all pending orders in one job execution, not just one batch.

When I Made the Wrong Choice

I once built a queue consumer using SimpleJob:

QueueConsumerWrong.java
public class QueueConsumerJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
// Process messages from SQS
while (true) {
List<Message> messages = sqsClient.receiveMessages(10);
if (messages.isEmpty()) {
break; // Queue empty, stop
}
for (Message msg : messages) {
processMessage(msg);
}
}
}
}

This worked, but had problems:

  1. No sharding awareness: The while loop ran on every instance, causing duplicate processing
  2. No graceful shutdown: Long-running while loops don’t respond well to job stops
  3. Mixed concerns: Fetching and processing tangled together

Rewriting with DataflowJob:

QueueConsumerRight.java
public class QueueConsumerJob implements DataflowJob<Message> {
@Override
public List<Message> fetchData(ShardingContext context) {
int shardId = context.getShardingItem();
// Only fetch messages for this shard
return sqsClient.receiveMessagesForShard(shardId, 10);
}
@Override
public void processData(ShardingContext context, List<Message> messages) {
for (Message msg : messages) {
processMessage(msg);
}
}
}

With streaming mode, this processes messages until the queue is empty, then stops gracefully.

ShardingContext: Both Types Get It

Both SimpleJob and DataflowJob receive ShardingContext. This gives you:

ShardingContextUsage.java
public void execute(ShardingContext context) {
// Which shard am I responsible for?
int shardId = context.getShardingItem();
// Total shards in the job
int totalShards = context.getShardingTotalCount();
// Job name
String jobName = context.getJobName();
// Use shardId to partition your data
List<Order> orders = orderRepository.findByShard(shardId);
}

This enables parallel processing across multiple instances. If you configure 3 shards and have 3 instances, each handles one shard.

Quick Reference: Which to Choose

ScenarioJob TypeWhy
Daily report generationSimpleJobOne-shot task
Cleanup expired sessionsSimpleJobOne-shot task
Process pending ordersDataflowJobBatch processing
Queue consumerDataflowJobStream until empty
ETL pipelineDataflowJobFetch-process pattern
Send birthday emailsSimpleJobKnown set of users
Sync external dataDataflowJobMight need multiple batches

The Mental Model

Think of it this way:

  • SimpleJob = “Do this thing now”
  • DataflowJob = “Process all items until done”

If your job naturally fits “do X every hour” with a clear endpoint, use SimpleJob.

If your job is “process pending items” and the count varies, use DataflowJob with streaming.

Configuration Differences

Both use similar configuration, but DataflowJob has the streaming option:

SimpleJobConfig.java
// SimpleJob configuration
JobConfiguration simpleConfig = JobConfiguration.newBuilder("MySimpleJob", 3)
.cron("0 0 2 * * ?") // Daily at 2 AM
.build();
DataflowJobConfig.java
// DataflowJob configuration
JobConfiguration dataflowConfig = JobConfiguration.newBuilder("MyDataflowJob", 3)
.cron("0/10 * * * * ?") // Every 10 seconds
.setProperty("streaming.process", "true")
.build();

The 3 in newBuilder is the shard count—how many parallel instances can run.

Common Mistakes to Avoid

Mistake 1: Using SimpleJob with a while loop

MistakeWhileLoop.java
// DON'T do this
public void execute(ShardingContext context) {
while (hasMoreData()) {
processData();
}
}

This blocks the job thread and doesn’t respect sharding. Use DataflowJob with streaming instead.

Mistake 2: Not enabling streaming for DataflowJob

Without streaming, DataflowJob only calls fetchData() once per trigger. You lose the “process until empty” behavior.

Mistake 3: fetchData() returning too many items

FetchTooMany.java
// DON'T return huge lists
public List<Order> fetchData(ShardingContext context) {
return orderRepository.findAll(); // Could be millions!
}

Return a reasonable batch size (100-1000 typically). Streaming mode will call fetchData() again if needed.

Summary

Choose SimpleJob when your job is a self-contained operation that runs once per trigger. Choose DataflowJob when you need to fetch and process items in batches, especially with streaming mode for continuous processing.

The DataflowJob pattern—separating fetch from process—makes your batch jobs more resilient, shard-aware, and easier to reason about. When in doubt, ask: “Do I need to keep going until there’s nothing left to process?” If yes, DataflowJob is your answer.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments