Message Broker Essentials
Quick reference guide for Message Brokers - asynchronous communication in distributed systems.
What is a Message Broker?
A Message Broker is middleware that enables applications, systems, and services to communicate and exchange information:
Core Concepts
Message
A unit of data sent between systems:
// Simple message
{
"orderId": "12345",
"customerId": "67890",
"items": ["product1", "product2"],
"total": 99.99
}
Producer/Publisher
Application that sends messages:
// Producer
const message = { orderId: '12345', status: 'pending' };
await broker.publish('orders', message);
Consumer/Subscriber
Application that receives and processes messages:
// Consumer
broker.subscribe('orders', (message) => {
console.log('Received:', message);
processOrder(message);
});
Queue
Ordered collection of messages (point-to-point):
Producer → [Queue] → Consumer
Topic/Exchange
Route messages to multiple subscribers (publish-subscribe):
→ Consumer 1
Producer → Topic → Consumer 2
→ Consumer 3
Message Patterns
1. Point-to-Point (Queue)
One message → One consumer
Producer → [Queue] → Consumer
Use cases:
2. Publish-Subscribe (Topic)
One message → Multiple consumers
→ Subscriber 1
Publisher → Topic → Subscriber 2
→ Subscriber 3
Use cases:
3. Request-Reply
Synchronous-style communication over async broker
Client → [Request Queue] → Server
Client ← [Reply Queue] ← Server
Use cases:
4. Routing
Messages routed based on patterns/keys
→ Queue 1 (error.*)
Producer → Exchange → Queue 2 (info.*)
→ Queue 3 (*.critical)
Use cases:
Popular Message Brokers
RabbitMQ
AMQP-based message broker:
Apache Kafka
Distributed streaming platform:
Redis Pub/Sub
In-memory messaging:
AWS SQS/SNS
Cloud-native messaging:
Azure Service Bus
Enterprise messaging as a service:
RabbitMQ
Installation
# ========== Using Docker ==========
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
# Access management UI: http://localhost:15672
# Default credentials: guest/guest
# ========== Using Homebrew (macOS) ==========
brew install rabbitmq
brew services start rabbitmq
Node.js Example
// ========== Install ==========
// npm install amqplib
const amqp = require('amqplib');
// ========== Producer ==========
async function sendMessage() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'tasks';
await channel.assertQueue(queue, { durable: true });
const message = { task: 'process-order', orderId: '12345' };
channel.sendToQueue(
queue,
Buffer.from(JSON.stringify(message)),
{ persistent: true }
);
console.log('Sent:', message);
await channel.close();
await connection.close();
}
// ========== Consumer ==========
async function receiveMessages() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'tasks';
await channel.assertQueue(queue, { durable: true });
// Fair dispatch - one message at a time
channel.prefetch(1);
console.log('Waiting for messages...');
channel.consume(queue, (msg) => {
const message = JSON.parse(msg.content.toString());
console.log('Received:', message);
// Simulate work
setTimeout(() => {
console.log('Processed:', message);
channel.ack(msg); // Acknowledge
}, 1000);
});
}
// ========== Publish-Subscribe ==========
async function publishMessage() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const exchange = 'notifications';
await channel.assertExchange(exchange, 'fanout', { durable: false });
const message = { event: 'user-registered', userId: '123' };
channel.publish(
exchange,
'',
Buffer.from(JSON.stringify(message))
);
console.log('Published:', message);
await channel.close();
await connection.close();
}
async function subscribeToMessages() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const exchange = 'notifications';
await channel.assertExchange(exchange, 'fanout', { durable: false });
const q = await channel.assertQueue('', { exclusive: true });
channel.bindQueue(q.queue, exchange, '');
console.log('Waiting for notifications...');
channel.consume(q.queue, (msg) => {
const message = JSON.parse(msg.content.toString());
console.log('Received notification:', message);
}, { noAck: true });
}
// ========== Topic Exchange ==========
async function publishToTopic() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const exchange = 'logs';
await channel.assertExchange(exchange, 'topic', { durable: false });
const routingKey = 'error.critical';
const message = { level: 'error', message: 'Database connection failed' };
channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(message))
);
console.log('Published to', routingKey, ':', message);
await channel.close();
await connection.close();
}
async function subscribeToTopic(pattern) {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const exchange = 'logs';
await channel.assertExchange(exchange, 'topic', { durable: false });
const q = await channel.assertQueue('', { exclusive: true });
channel.bindQueue(q.queue, exchange, pattern);
console.log('Waiting for logs matching:', pattern);
channel.consume(q.queue, (msg) => {
const message = JSON.parse(msg.content.toString());
console.log('[', msg.fields.routingKey, ']', message);
}, { noAck: true });
}
// Subscribe to different patterns
subscribeToTopic('error.*'); // All errors
subscribeToTopic('*.critical'); // All critical
subscribeToTopic('#'); // All messages
Python Example
# ========== Install ==========
# pip install pika
import pika
import json
# ========== Producer ==========
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)
message = {'task': 'process-order', 'orderId': '12345'}
channel.basic_publish(
exchange='',
routing_key='tasks',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # Make message persistent
)
)
print('Sent:', message)
connection.close()
# ========== Consumer ==========
def callback(ch, method, properties, body):
message = json.loads(body)
print('Received:', message)
# Process message
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='tasks', on_message_callback=callback)
print('Waiting for messages...')
channel.start_consuming()
Apache Kafka
Installation
# ========== Using Docker Compose ==========
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# Start
docker-compose up -d
Node.js Example (KafkaJS)
// ========== Install ==========
// npm install kafkajs
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
// ========== Producer ==========
async function produceMessage() {
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: 'orders',
messages: [
{
key: 'order-12345',
value: JSON.stringify({
orderId: '12345',
customerId: '67890',
total: 99.99
})
}
]
});
console.log('Message sent');
await producer.disconnect();
}
// ========== Consumer ==========
async function consumeMessages() {
const consumer = kafka.consumer({ groupId: 'order-processor' });
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const order = JSON.parse(message.value.toString());
console.log('Received:', {
partition,
offset: message.offset,
key: message.key?.toString(),
value: order
});
// Process order
await processOrder(order);
}
});
}
// ========== Batch Producer ==========
async function produceBatch() {
const producer = kafka.producer();
await producer.connect();
const messages = [];
for (let i = 0; i < 100; i++) {
messages.push({
key: `order-${i}`,
value: JSON.stringify({ orderId: i, status: 'pending' })
});
}
await producer.send({
topic: 'orders',
messages
});
console.log('Batch sent:', messages.length);
await producer.disconnect();
}
// ========== Consumer Groups ==========
async function consumeInGroup(groupId, memberId) {
const consumer = kafka.consumer({ groupId });
await consumer.connect();
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(`[${memberId}] Processing from partition ${partition}:`,
JSON.parse(message.value.toString())
);
}
});
}
// Multiple consumers in same group - messages distributed
consumeInGroup('processors', 'worker-1');
consumeInGroup('processors', 'worker-2');
consumeInGroup('processors', 'worker-3');
Python Example (kafka-python)
# ========== Install ==========
# pip install kafka-python
from kafka import KafkaProducer, KafkaConsumer
import json
# ========== Producer ==========
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
message = {'orderId': '12345', 'status': 'pending'}
producer.send('orders', key=b'order-12345', value=message)
producer.flush()
print('Message sent')
# ========== Consumer ==========
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
group_id='order-processor',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
print(f'Received: {message.value}')
# Process message
Redis Pub/Sub
Node.js Example
// ========== Install ==========
// npm install redis
const redis = require('redis');
// ========== Publisher ==========
async function publish() {
const publisher = redis.createClient();
await publisher.connect();
const message = { event: 'user-login', userId: '123' };
await publisher.publish('events', JSON.stringify(message));
console.log('Published:', message);
await publisher.quit();
}
// ========== Subscriber ==========
async function subscribe() {
const subscriber = redis.createClient();
await subscriber.connect();
await subscriber.subscribe('events', (message) => {
const event = JSON.parse(message);
console.log('Received:', event);
});
console.log('Subscribed to events');
}
// ========== Pattern Subscribe ==========
async function patternSubscribe() {
const subscriber = redis.createClient();
await subscriber.connect();
// Subscribe to all channels starting with 'user.'
await subscriber.pSubscribe('user.*', (message, channel) => {
console.log('Channel:', channel);
console.log('Message:', JSON.parse(message));
});
console.log('Subscribed to user.* pattern');
}
// ========== Multiple Channels ==========
async function multiSubscribe() {
const subscriber = redis.createClient();
await subscriber.connect();
await subscriber.subscribe(['channel1', 'channel2', 'channel3'],
(message, channel) => {
console.log(`[${channel}]`, message);
}
);
}
Python Example
# ========== Install ==========
# pip install redis
import redis
import json
# ========== Publisher ==========
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
message = {'event': 'user-login', 'userId': '123'}
r.publish('events', json.dumps(message))
print('Published:', message)
# ========== Subscriber ==========
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
pubsub = r.pubsub()
pubsub.subscribe('events')
print('Subscribed to events')
for message in pubsub.listen():
if message['type'] == 'message':
event = json.loads(message['data'])
print('Received:', event)
AWS SQS
Node.js Example
// ========== Install ==========
// npm install @aws-sdk/client-sqs
const { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } = require('@aws-sdk/client-sqs');
const client = new SQSClient({ region: 'us-east-1' });
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue';
// ========== Send Message ==========
async function sendMessage() {
const command = new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: JSON.stringify({
orderId: '12345',
status: 'pending'
}),
MessageAttributes: {
Type: {
DataType: 'String',
StringValue: 'Order'
}
}
});
const response = await client.send(command);
console.log('Message sent:', response.MessageId);
}
// ========== Receive Messages ==========
async function receiveMessages() {
const command = new ReceiveMessageCommand({
QueueUrl: queueUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 20, // Long polling
MessageAttributeNames: ['All']
});
const response = await client.send(command);
if (response.Messages) {
for (const message of response.Messages) {
const body = JSON.parse(message.Body);
console.log('Received:', body);
// Process message
await processMessage(body);
// Delete message
await client.send(new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle
}));
}
}
}
// ========== Batch Send ==========
async function sendBatch() {
const { SendMessageBatchCommand } = require('@aws-sdk/client-sqs');
const entries = [];
for (let i = 0; i < 10; i++) {
entries.push({
Id: `msg-${i}`,
MessageBody: JSON.stringify({ orderId: i })
});
}
const command = new SendMessageBatchCommand({
QueueUrl: queueUrl,
Entries: entries
});
await client.send(command);
console.log('Batch sent');
}
Message Patterns
Work Queue
Distribute tasks among multiple workers:
// ========== Task Queue ==========
class TaskQueue {
constructor(broker) {
this.broker = broker;
this.queue = 'tasks';
}
async addTask(task) {
await this.broker.send(this.queue, {
id: generateId(),
type: task.type,
data: task.data,
createdAt: Date.now()
});
}
async startWorker() {
await this.broker.consume(this.queue, async (message) => {
console.log('Processing task:', message.id);
try {
await this.processTask(message);
await this.broker.ack(message);
} catch (error) {
console.error('Task failed:', error);
await this.broker.nack(message);
}
});
}
async processTask(task) {
switch (task.type) {
case 'email':
await sendEmail(task.data);
break;
case 'resize-image':
await resizeImage(task.data);
break;
case 'generate-report':
await generateReport(task.data);
break;
}
}
}
// Usage
const queue = new TaskQueue(broker);
// Add tasks
await queue.addTask({ type: 'email', data: { to: 'user@example.com' } });
await queue.addTask({ type: 'resize-image', data: { imageId: '123' } });
// Start workers
queue.startWorker(); // Worker 1
queue.startWorker(); // Worker 2
queue.startWorker(); // Worker 3
Event-Driven Architecture
// ========== Event Bus ==========
class EventBus {
constructor(broker) {
this.broker = broker;
this.exchange = 'events';
}
async publish(eventType, data) {
await this.broker.publish(this.exchange, {
type: eventType,
data,
timestamp: Date.now()
});
}
async subscribe(eventType, handler) {
await this.broker.subscribe(
this.exchange,
eventType,
async (event) => {
try {
await handler(event.data);
} catch (error) {
console.error('Event handler failed:', error);
}
}
);
}
}
// ========== Services ==========
// Order Service
class OrderService {
async createOrder(orderData) {
const order = await db.orders.create(orderData);
// Publish event
await eventBus.publish('order.created', {
orderId: order.id,
customerId: order.customerId,
total: order.total
});
return order;
}
}
// Inventory Service
class InventoryService {
async initialize() {
await eventBus.subscribe('order.created', async (data) => {
console.log('Reserving inventory for order:', data.orderId);
await this.reserveInventory(data);
});
}
}
// Email Service
class EmailService {
async initialize() {
await eventBus.subscribe('order.created', async (data) => {
console.log('Sending confirmation email:', data.orderId);
await this.sendConfirmation(data);
});
}
}
// Analytics Service
class AnalyticsService {
async initialize() {
await eventBus.subscribe('order.created', async (data) => {
console.log('Recording analytics:', data.orderId);
await this.recordEvent(data);
});
}
}
Saga Pattern
Distributed transactions:
// ========== Saga Orchestrator ==========
class OrderSaga {
constructor(broker) {
this.broker = broker;
}
async execute(orderData) {
const sagaId = generateId();
const steps = [];
try {
// Step 1: Create order
const order = await this.createOrder(orderData);
steps.push({ action: 'createOrder', data: order });
// Step 2: Reserve inventory
await this.reserveInventory(order);
steps.push({ action: 'reserveInventory', data: order });
// Step 3: Process payment
const payment = await this.processPayment(order);
steps.push({ action: 'processPayment', data: payment });
// Step 4: Ship order
await this.shipOrder(order);
steps.push({ action: 'shipOrder', data: order });
// Success - publish completion
await this.broker.publish('saga.completed', {
sagaId,
orderId: order.id
});
} catch (error) {
console.error('Saga failed, compensating...', error);
// Compensate in reverse order
for (let i = steps.length - 1; i >= 0; i--) {
await this.compensate(steps[i]);
}
await this.broker.publish('saga.failed', {
sagaId,
error: error.message
});
}
}
async compensate(step) {
switch (step.action) {
case 'createOrder':
await this.cancelOrder(step.data);
break;
case 'reserveInventory':
await this.releaseInventory(step.data);
break;
case 'processPayment':
await this.refundPayment(step.data);
break;
case 'shipOrder':
await this.cancelShipment(step.data);
break;
}
}
}
Error Handling
Dead Letter Queue
// ========== RabbitMQ DLQ ==========
async function setupDLQ() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// Dead letter exchange
await channel.assertExchange('dlx', 'direct');
// Dead letter queue
await channel.assertQueue('dlq', { durable: true });
await channel.bindQueue('dlq', 'dlx', 'failed');
// Main queue with DLQ
await channel.assertQueue('tasks', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed'
}
});
}
// ========== Consumer with Retry ==========
async function consumeWithRetry() {
channel.consume('tasks', async (msg) => {
try {
const message = JSON.parse(msg.content.toString());
await processMessage(message);
channel.ack(msg);
} catch (error) {
console.error('Processing failed:', error);
// Get retry count
const headers = msg.properties.headers || {};
const retryCount = headers['x-retry-count'] || 0;
if (retryCount < 3) {
// Retry
channel.nack(msg, false, false); // Send to DLX
} else {
// Give up
console.error('Max retries exceeded');
channel.ack(msg); // Remove from queue
}
}
});
}
Circuit Breaker
// ========== Circuit Breaker Pattern ==========
class CircuitBreaker {
constructor(threshold = 5, timeout = 60000) {
this.failureCount = 0;
this.threshold = threshold;
this.timeout = timeout;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.nextAttempt = Date.now();
}
async execute(fn) {
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttempt) {
throw new Error('Circuit breaker is OPEN');
}
this.state = 'HALF_OPEN';
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
this.failureCount = 0;
this.state = 'CLOSED';
}
onFailure() {
this.failureCount++;
if (this.failureCount >= this.threshold) {
this.state = 'OPEN';
this.nextAttempt = Date.now() + this.timeout;
console.log('Circuit breaker opened');
}
}
}
// Usage
const breaker = new CircuitBreaker();
async function sendToExternalService(message) {
await breaker.execute(async () => {
await externalAPI.send(message);
});
}
Monitoring
Message Metrics
// ========== Message Tracking ==========
class MessageMetrics {
constructor() {
this.sent = 0;
this.received = 0;
this.failed = 0;
this.processing = new Map();
}
onSend(messageId) {
this.sent++;
this.processing.set(messageId, Date.now());
}
onReceive(messageId) {
this.received++;
const startTime = this.processing.get(messageId);
if (startTime) {
const latency = Date.now() - startTime;
console.log('Message latency:', latency, 'ms');
this.processing.delete(messageId);
}
}
onError(messageId, error) {
this.failed++;
console.error('Message failed:', messageId, error);
}
getStats() {
return {
sent: this.sent,
received: this.received,
failed: this.failed,
inFlight: this.processing.size
};
}
}
Best Practices
1. Idempotency
Ensure messages can be processed multiple times safely:
// ========== Idempotent Consumer ==========
const processedMessages = new Set();
async function processIdempotent(message) {
const messageId = message.id;
// Check if already processed
if (processedMessages.has(messageId)) {
console.log('Message already processed:', messageId);
return;
}
// Process message
await processMessage(message);
// Mark as processed
processedMessages.add(messageId);
// Persist to database for durability
await db.processedMessages.create({ id: messageId });
}
2. Message Ordering
Preserve order when needed:
// ========== Kafka Partition Key ==========
// Messages with same key go to same partition (ordered)
await producer.send({
topic: 'orders',
messages: [
{ key: 'customer-123', value: JSON.stringify(order1) },
{ key: 'customer-123', value: JSON.stringify(order2) }
]
});
3. Message Expiration
Set TTL for time-sensitive messages:
// ========== RabbitMQ TTL ==========
channel.sendToQueue('tasks', Buffer.from(JSON.stringify(message)), {
expiration: '60000' // 60 seconds
});
4. Graceful Shutdown
// ========== Graceful Consumer Shutdown ==========
process.on('SIGTERM', async () => {
console.log('Shutting down gracefully...');
// Stop accepting new messages
await consumer.stop();
// Wait for current messages to complete
await Promise.all(processingMessages);
// Close connections
await consumer.disconnect();
process.exit(0);
});
5. Monitoring and Alerting
Common Pitfalls
1. Lost Messages
Problem: Messages disappear
Solution: Use persistent messages and manual acknowledgment
// Persistent message
channel.sendToQueue(queue, Buffer.from(message), {
persistent: true
});
// Manual ack
channel.consume(queue, (msg) => {
processMessage(msg);
channel.ack(msg); // Acknowledge after processing
});
2. Duplicate Processing
Problem: Same message processed multiple times
Solution: Implement idempotency
3. Message Ordering Issues
Problem: Messages processed out of order
Solution: Use partition keys (Kafka) or single consumer
4. Backpressure
Problem: Producer overwhelms consumers
Solution: Implement rate limiting and monitoring
// Consumer prefetch
channel.prefetch(1); // Process one at a time