Untangling Job Dependencies with BullMQ’s FlowProducer:

'Seyi Ogunjuyigbe
3 min readJan 13, 2024

--

Photo by Daniel Schludi on Unsplash

Building on our exploration of microservices with BullMQ, we’re taking a step further into the realm of advanced job and message handling. In this article, we’ll focus on one of BullMQ’s standout features: the FlowProducer. This powerful tool allows us to create complex workflows and manage parent-child relationships between jobs, providing a robust and reliable way for our services to communicate

The Power of FlowProducer

One of the standout features of BullMQ is the FlowProducer. This class allows you to create flows, which are essentially parent-child relationships between jobs. A parent job will not be moved to the wait status (where it could be picked up by a worker) until all its children jobs have been processed successfully. This functionality enables the creation of flows where jobs are the nodes of trees of arbitrary depth.

The FlowProducer is particularly useful in situations where you need to execute a flow of several actions, any of which could fail. For example, you may need to update a database, make calls to external services, or any other kind of asynchronous call. By using the FlowProducer, you can ensure that all these tasks are completed successfully before moving on to the next step.

Example Scenario: E-commerce Order Processing

Let’s consider an e-commerce platform. When a customer places an order, several things need to happen: the order needs to be logged, the inventory needs to be updated, and a confirmation email needs to be sent to the customer. These tasks can be modeled as separate jobs within a flow.

Here’s how we can use BullMQ’s FlowProducer to handle this:

const { FlowProducer } = require('bullmq');

// Create a new FlowProducer
const flowProducer = new FlowProducer();

// Define the flow
const flow = {
name: 'process-order',
queueName: 'orders',
data: { orderId: 12345 },
children: [
{ name: 'log-order', queueName: 'logs', data: { orderId: 12345 } },
{ name: 'update-inventory', queueName: 'inventory', data: { orderId: 12345 } },
{ name: 'send-email', queueName: 'emails', data: { orderId: 12345 } },
],
};

// Add the flow to the queue
await flowProducer.add(flow);

In this code, we first import the FlowProducer class from BullMQ and create a new instance. We then define a flow for processing an order. This flow includes three child jobs: logging the order, updating the inventory, and sending a confirmation email. Each job is represented as an object with a name, queueName, and data property. Finally, we add the flow to the queue using the add method.

Consuming the Jobs

Now that we’ve added the jobs to the queue, we need workers to consume and process these jobs. Here’s how we can do that:

const { Worker } = require('bullmq');

// Create a worker for the 'orders' queue
const ordersWorker = new Worker('orders', async job => {
// Process the order...
});

// Create a worker for the 'logs' queue
const logsWorker = new Worker('logs', async job => {
// Log the order...
});

// Create a worker for the 'inventory' queue
const inventoryWorker = new Worker('inventory', async job => {
// Update the inventory...
});

// Create a worker for the 'emails' queue
const emailsWorker = new Worker('emails', async job => {
// Send the confirmation email...
});

In this code, we create four workers, one for each queue. Each worker is responsible for processing the jobs in its respective queue.

Conclusion

BullMQ’s FlowProducer offers a powerful way to manage complex workflows in a microservices architecture. By understanding how to use this feature effectively, you can ensure that your services communicate efficiently and reliably, leading to more robust and maintainable systems.

--

--