Implementing the Saga Pattern in Quarkus: A Reactive Content Moderation Pipeline
I will share example of how to implement the event-based choreography saga pattern using Quarkus and Apache Kafka. In this article i just share an overall idea, so you can have a better understading about this pattern.

Imagine a platform where users can upload content. Before the content goes live, it must pass through a multi-step moderation process. This is a perfect use case for a saga. Our system will consist of three microservices:
- Content Upload Service: The saga initiator. It receives a file, saves its initial metadata with a
PENDINGstatus, and kicks off the moderation process - Virus Scan Service: The first participant. It listens for new uploads, simulates a virus scan, and publishes the result
- Content Analysis Service: The second participant. If the file is clean, this service simulates a check for inappropriate keywords and publishes a final verdict
The Happy Path (Success Flow)
- Client uploads a file to the Upload Service.
- Upload Service publishes a
FileUploadedEventto theuploadsKafka topic - Virus Scan Service consumes the event, finds the file is clean, and publishes a
FileScanCleanEventto thescanstopic - Content Analysis Service consumes the clean event, finds the content acceptable, and publishes a
FileApprovedEventto theanalysestopic - Upload Service consumes the final approval, updates the file's status to
APPROVED.The saga is complete

The Compensation Path (Failure Flow)
- Steps 1 & 2 are the same as above
- Virus Scan Service consumes the event but finds a virus. It publishes a
FileInfectedEventto thescanstopic. - Upload Service consumes the infected event. This triggers a compensating transaction.
- The Upload Service updates the file's status to
REJECTEDand deletes the file data. The saga is complete (rolled back)

Building the Microservices with Quarkus
1. The content-upload-service
This service acts as the state machine for the saga. It initiates the process and listens for outcomes from other services to update its local state and trigger compensations if necessary.
# application.properties for content-upload-service
# Outgoing channel to kick off the saga
mp.messaging.outgoing.uploads-out.connector=smallrye-kafka
mp.messaging.outgoing.uploads-out.topic=uploads
mp.messaging.outgoing.uploads-out.value.serializer=io.quarkus.kafka.client.serialization.JsonObjectSerializer
# Incoming channels to listen for saga outcomes
mp.messaging.incoming.scans-in.connector=smallrye-kafka
mp.messaging.incoming.scans-in.topic=scans
mp.messaging.incoming.scans-in.value.deserializer=io.quarkus.kafka.client.serialization.JsonObjectDeserializer
mp.messaging.incoming.analyses-in.connector=smallrye-kafka
mp.messaging.incoming.analyses-in.topic=analyses
mp.messaging.incoming.analyses-in.value.deserializer=io.quarkus.kafka.client.serialization.JsonObjectDeserializer// ContentLifecycleManager.java - The core saga logic
@ApplicationScoped
public class ContentLifecycleManager {
// In-memory DB for simplicity
private final Map<String, Content> database = new ConcurrentHashMap<>();
@Inject
@Channel(uploads-out)
Emitter<FileUploadedEvent> uploadEmitter;
public void initiateUpload(String fileName) {
String contentId = UUID.randomUUID().toString();
database.put(contentId, new Content(contentId, fileName, PENDING));
uploadEmitter.send(new FileUploadedEvent(contentId, fileName));
System.out.println(Saga started for contentId: + contentId);
}
@Incoming(scans-in)
public void handleScanResult(JsonObject scanEvent) {
String contentId = scanEvent.getString(contentId);
String status = scanEvent.getString(status);
if (INFECTED.equals(status)) {
System.out.println(COMPENSATION: Virus detected for + contentId);
database.get(contentId).setStatus(REJECTED_VIRUS);
// In a real app, you would delete the file blob here
}
}
@Incoming(analyses-in)
public void handleAnalysisResult(JsonObject analysisEvent) {
String contentId = analysisEvent.getString(contentId);
String status = analysisEvent.getString(status);
if (REJECTED.equals(status)) {
System.out.println(COMPENSATION: Content rejected for + contentId);
database.get(contentId).setStatus(REJECTED_CONTENT);
} else if (APPROVED.equals(status)) {
System.out.println(Saga SUCCESS for + contentId);
database.get(contentId).setStatus(APPROVED);
}
}
}2. The virus-scan-service
This service is a simple reactive processor. It consumes from one topic, performs its business logic, and produces to another topic. It holds no state related to the saga.
// VirusScanner.java
@ApplicationScoped
public class VirusScanner {
private final Random random = new Random();
@Incoming(uploads-in)
@Outgoing(scans-out)
public JsonObject process(JsonObject uploadEvent) {
String contentId = uploadEvent.getString(contentId);
System.out.println(Scanning file for contentId: + contentId);
// Simulate a scan: 10% chance of being infected
boolean isInfected = random.nextInt(10) == 0;
String status = isInfected ? INFECTED : CLEAN;
System.out.println(Scan result for + contentId + : + status);
return new JsonObject().put(contentId, contentId).put(status, status);
}
}3. The content-analysis-service
This service only acts if the file is clean. It filters incoming messages and then performs its analysis, publishing the final outcome of the moderation pipeline.
// ContentAnalyzer.java
@ApplicationScoped
public class ContentAnalyzer {
@Incoming(scans-in)
@Outgoing(analyses-out)
@Broadcast // Ensures the stream continues even if some messages are filtered out
public JsonObject process(JsonObject scanEvent) {
if (!CLEAN.equals(scanEvent.getString(status))) {
// Ignore infected files, do not pass them down the chain
return null;
}
String contentId = scanEvent.getString(contentId);
System.out.println(Analyzing content for: + contentId);
// Simulate analysis: 20% chance of rejection
boolean isRejected = new Random().nextInt(5) == 0;
String status = isRejected ? REJECTED : APPROVED;
System.out.println(Analysis result for + contentId + : + status);
return new JsonObject().put(contentId, contentId).put(status, status);
}
}Notice how each service is completely decoupled. The virus-scan-service has no idea that a content-analysis-service even exists. This is the power of event-driven choreography.
- Idempotency is Crucial: Kafka guarantees at-least-once delivery. Your consumers must be designed to handle the same message multiple times without causing side effects. For example, the compensation logic should be safe to run more than once for the same event
- Observability: Tracking a single request across multiple services and Kafka topics can be difficult. Implementing distributed tracing is essential to see the entire flow of a saga and debug failures
- Complexity: While this pattern solves the distributed transaction problem, it introduces its own complexity. The logic is spread across services, which can be harder to reason about than a centralized transaction manager
- Consider an Orchestrator for Complex Sagas: For sagas with many steps, complex branching, or dependencies, a central Orchestrator service might be a better fit. The orchestrator would explicitly call each service and manage the state, rather than relying on event choreography
The Saga pattern, powered by Quarkus's superb reactive messaging capabilities and the reliability of Apache Kafka, provides a scalable and resilient framework for managing complex workflows in a microservices world. By embracing eventual consistency and designing for failure, you can build robust systems that maintain data integrity without the bottlenecks of traditional distributed transactions.
Thanks ;-)