Home / Notebooks / Backend
Backend
intermediate

Message Broker Essentials

Essential message broker concepts for asynchronous communication and distributed systems

March 10, 2024
Updated regularly

Message Broker Essentials

Quick reference guide for Message Brokers - asynchronous communication in distributed systems.

What is a Message Broker?

A Message Broker is middleware that enables applications, systems, and services to communicate and exchange information:

  • Decouples components - Producers and consumers are independent
  • Asynchronous communication - Non-blocking message exchange
  • Reliability - Ensures message delivery
  • Scalability - Handles high message throughput
  • Load balancing - Distributes messages across consumers
  • Routing - Delivers messages to correct destinations
  • Core Concepts

    Message

    A unit of data sent between systems:

    // Simple message
    {
      "orderId": "12345",
      "customerId": "67890",
      "items": ["product1", "product2"],
      "total": 99.99
    }
    

    Producer/Publisher

    Application that sends messages:

    // Producer
    const message = { orderId: '12345', status: 'pending' };
    await broker.publish('orders', message);
    

    Consumer/Subscriber

    Application that receives and processes messages:

    // Consumer
    broker.subscribe('orders', (message) => {
      console.log('Received:', message);
      processOrder(message);
    });
    

    Queue

    Ordered collection of messages (point-to-point):

    Producer → [Queue] → Consumer
    

    Topic/Exchange

    Route messages to multiple subscribers (publish-subscribe):

                      → Consumer 1
    Producer → Topic → Consumer 2
                      → Consumer 3
    

    Message Patterns

    1. Point-to-Point (Queue)

    One message → One consumer

    Producer → [Queue] → Consumer
    

    Use cases:

  • Task distribution
  • Load balancing
  • Work queues
  • 2. Publish-Subscribe (Topic)

    One message → Multiple consumers

                      → Subscriber 1
    Publisher → Topic → Subscriber 2
                      → Subscriber 3
    

    Use cases:

  • Event broadcasting
  • Notifications
  • Real-time updates
  • 3. Request-Reply

    Synchronous-style communication over async broker

    Client → [Request Queue] → Server
    Client ← [Reply Queue] ← Server
    

    Use cases:

  • RPC over messaging
  • Synchronous workflows
  • 4. Routing

    Messages routed based on patterns/keys

                      → Queue 1 (error.*)
    Producer → Exchange → Queue 2 (info.*)
                      → Queue 3 (*.critical)
    

    Use cases:

  • Log aggregation
  • Event filtering
  • Multi-tenant systems
  • RabbitMQ

    AMQP-based message broker:

  • Mature and stable
  • Rich routing capabilities
  • Multiple protocols (AMQP, MQTT, STOMP)
  • Easy to use
  • Good documentation
  • Apache Kafka

    Distributed streaming platform:

  • High throughput
  • Persistent storage
  • Stream processing
  • Horizontal scalability
  • Event sourcing
  • Redis Pub/Sub

    In-memory messaging:

  • Extremely fast
  • Simple to use
  • Fire-and-forget
  • No persistence (by default)
  • Real-time updates
  • AWS SQS/SNS

    Cloud-native messaging:

  • Fully managed
  • Scalable
  • Pay-as-you-go
  • AWS integration
  • No infrastructure management
  • Azure Service Bus

    Enterprise messaging as a service:

  • Fully managed
  • Advanced features
  • Azure integration
  • Hybrid scenarios
  • RabbitMQ

    Installation

    # ========== Using Docker ==========
    docker run -d --name rabbitmq \
      -p 5672:5672 \
      -p 15672:15672 \
      rabbitmq:3-management
    
    # Access management UI: http://localhost:15672
    # Default credentials: guest/guest
    
    # ========== Using Homebrew (macOS) ==========
    brew install rabbitmq
    brew services start rabbitmq
    

    Node.js Example

    // ========== Install ==========
    // npm install amqplib
    
    const amqp = require('amqplib');
    
    // ========== Producer ==========
    async function sendMessage() {
      const connection = await amqp.connect('amqp://localhost');
      const channel = await connection.createChannel();
      
      const queue = 'tasks';
      await channel.assertQueue(queue, { durable: true });
      
      const message = { task: 'process-order', orderId: '12345' };
      channel.sendToQueue(
        queue,
        Buffer.from(JSON.stringify(message)),
        { persistent: true }
      );
      
      console.log('Sent:', message);
      
      await channel.close();
      await connection.close();
    }
    
    // ========== Consumer ==========
    async function receiveMessages() {
      const connection = await amqp.connect('amqp://localhost');
      const channel = await connection.createChannel();
      
      const queue = 'tasks';
      await channel.assertQueue(queue, { durable: true });
      
      // Fair dispatch - one message at a time
      channel.prefetch(1);
      
      console.log('Waiting for messages...');
      
      channel.consume(queue, (msg) => {
        const message = JSON.parse(msg.content.toString());
        console.log('Received:', message);
        
        // Simulate work
        setTimeout(() => {
          console.log('Processed:', message);
          channel.ack(msg); // Acknowledge
        }, 1000);
      });
    }
    
    // ========== Publish-Subscribe ==========
    async function publishMessage() {
      const connection = await amqp.connect('amqp://localhost');
      const channel = await connection.createChannel();
      
      const exchange = 'notifications';
      await channel.assertExchange(exchange, 'fanout', { durable: false });
      
      const message = { event: 'user-registered', userId: '123' };
      channel.publish(
        exchange,
        '',
        Buffer.from(JSON.stringify(message))
      );
      
      console.log('Published:', message);
      
      await channel.close();
      await connection.close();
    }
    
    async function subscribeToMessages() {
      const connection = await amqp.connect('amqp://localhost');
      const channel = await connection.createChannel();
      
      const exchange = 'notifications';
      await channel.assertExchange(exchange, 'fanout', { durable: false });
      
      const q = await channel.assertQueue('', { exclusive: true });
      channel.bindQueue(q.queue, exchange, '');
      
      console.log('Waiting for notifications...');
      
      channel.consume(q.queue, (msg) => {
        const message = JSON.parse(msg.content.toString());
        console.log('Received notification:', message);
      }, { noAck: true });
    }
    
    // ========== Topic Exchange ==========
    async function publishToTopic() {
      const connection = await amqp.connect('amqp://localhost');
      const channel = await connection.createChannel();
      
      const exchange = 'logs';
      await channel.assertExchange(exchange, 'topic', { durable: false });
      
      const routingKey = 'error.critical';
      const message = { level: 'error', message: 'Database connection failed' };
      
      channel.publish(
        exchange,
        routingKey,
        Buffer.from(JSON.stringify(message))
      );
      
      console.log('Published to', routingKey, ':', message);
      
      await channel.close();
      await connection.close();
    }
    
    async function subscribeToTopic(pattern) {
      const connection = await amqp.connect('amqp://localhost');
      const channel = await connection.createChannel();
      
      const exchange = 'logs';
      await channel.assertExchange(exchange, 'topic', { durable: false });
      
      const q = await channel.assertQueue('', { exclusive: true });
      channel.bindQueue(q.queue, exchange, pattern);
      
      console.log('Waiting for logs matching:', pattern);
      
      channel.consume(q.queue, (msg) => {
        const message = JSON.parse(msg.content.toString());
        console.log('[', msg.fields.routingKey, ']', message);
      }, { noAck: true });
    }
    
    // Subscribe to different patterns
    subscribeToTopic('error.*');      // All errors
    subscribeToTopic('*.critical');   // All critical
    subscribeToTopic('#');           // All messages
    

    Python Example

    # ========== Install ==========
    # pip install pika
    
    import pika
    import json
    
    # ========== Producer ==========
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    channel.queue_declare(queue='tasks', durable=True)
    
    message = {'task': 'process-order', 'orderId': '12345'}
    channel.basic_publish(
        exchange='',
        routing_key='tasks',
        body=json.dumps(message),
        properties=pika.BasicProperties(
            delivery_mode=2,  # Make message persistent
        )
    )
    
    print('Sent:', message)
    connection.close()
    
    # ========== Consumer ==========
    def callback(ch, method, properties, body):
        message = json.loads(body)
        print('Received:', message)
        # Process message
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    
    channel.queue_declare(queue='tasks', durable=True)
    channel.basic_qos(prefetch_count=1)
    
    channel.basic_consume(queue='tasks', on_message_callback=callback)
    
    print('Waiting for messages...')
    channel.start_consuming()
    

    Apache Kafka

    Installation

    # ========== Using Docker Compose ==========
    # docker-compose.yml
    version: '3'
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:latest
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
    
      kafka:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
          - "9092:9092"
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    
    # Start
    docker-compose up -d
    

    Node.js Example (KafkaJS)

    // ========== Install ==========
    // npm install kafkajs
    
    const { Kafka } = require('kafkajs');
    
    const kafka = new Kafka({
      clientId: 'my-app',
      brokers: ['localhost:9092']
    });
    
    // ========== Producer ==========
    async function produceMessage() {
      const producer = kafka.producer();
      await producer.connect();
      
      await producer.send({
        topic: 'orders',
        messages: [
          {
            key: 'order-12345',
            value: JSON.stringify({
              orderId: '12345',
              customerId: '67890',
              total: 99.99
            })
          }
        ]
      });
      
      console.log('Message sent');
      await producer.disconnect();
    }
    
    // ========== Consumer ==========
    async function consumeMessages() {
      const consumer = kafka.consumer({ groupId: 'order-processor' });
      await consumer.connect();
      
      await consumer.subscribe({ topic: 'orders', fromBeginning: true });
      
      await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
          const order = JSON.parse(message.value.toString());
          console.log('Received:', {
            partition,
            offset: message.offset,
            key: message.key?.toString(),
            value: order
          });
          
          // Process order
          await processOrder(order);
        }
      });
    }
    
    // ========== Batch Producer ==========
    async function produceBatch() {
      const producer = kafka.producer();
      await producer.connect();
      
      const messages = [];
      for (let i = 0; i < 100; i++) {
        messages.push({
          key: `order-${i}`,
          value: JSON.stringify({ orderId: i, status: 'pending' })
        });
      }
      
      await producer.send({
        topic: 'orders',
        messages
      });
      
      console.log('Batch sent:', messages.length);
      await producer.disconnect();
    }
    
    // ========== Consumer Groups ==========
    async function consumeInGroup(groupId, memberId) {
      const consumer = kafka.consumer({ groupId });
      await consumer.connect();
      
      await consumer.subscribe({ topic: 'orders' });
      
      await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
          console.log(`[${memberId}] Processing from partition ${partition}:`, 
            JSON.parse(message.value.toString())
          );
        }
      });
    }
    
    // Multiple consumers in same group - messages distributed
    consumeInGroup('processors', 'worker-1');
    consumeInGroup('processors', 'worker-2');
    consumeInGroup('processors', 'worker-3');
    

    Python Example (kafka-python)

    # ========== Install ==========
    # pip install kafka-python
    
    from kafka import KafkaProducer, KafkaConsumer
    import json
    
    # ========== Producer ==========
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    
    message = {'orderId': '12345', 'status': 'pending'}
    producer.send('orders', key=b'order-12345', value=message)
    producer.flush()
    
    print('Message sent')
    
    # ========== Consumer ==========
    consumer = KafkaConsumer(
        'orders',
        bootstrap_servers=['localhost:9092'],
        group_id='order-processor',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )
    
    for message in consumer:
        print(f'Received: {message.value}')
        # Process message
    

    Redis Pub/Sub

    Node.js Example

    // ========== Install ==========
    // npm install redis
    
    const redis = require('redis');
    
    // ========== Publisher ==========
    async function publish() {
      const publisher = redis.createClient();
      await publisher.connect();
      
      const message = { event: 'user-login', userId: '123' };
      await publisher.publish('events', JSON.stringify(message));
      
      console.log('Published:', message);
      await publisher.quit();
    }
    
    // ========== Subscriber ==========
    async function subscribe() {
      const subscriber = redis.createClient();
      await subscriber.connect();
      
      await subscriber.subscribe('events', (message) => {
        const event = JSON.parse(message);
        console.log('Received:', event);
      });
      
      console.log('Subscribed to events');
    }
    
    // ========== Pattern Subscribe ==========
    async function patternSubscribe() {
      const subscriber = redis.createClient();
      await subscriber.connect();
      
      // Subscribe to all channels starting with 'user.'
      await subscriber.pSubscribe('user.*', (message, channel) => {
        console.log('Channel:', channel);
        console.log('Message:', JSON.parse(message));
      });
      
      console.log('Subscribed to user.* pattern');
    }
    
    // ========== Multiple Channels ==========
    async function multiSubscribe() {
      const subscriber = redis.createClient();
      await subscriber.connect();
      
      await subscriber.subscribe(['channel1', 'channel2', 'channel3'], 
        (message, channel) => {
          console.log(`[${channel}]`, message);
        }
      );
    }
    

    Python Example

    # ========== Install ==========
    # pip install redis
    
    import redis
    import json
    
    # ========== Publisher ==========
    r = redis.Redis(host='localhost', port=6379, decode_responses=True)
    
    message = {'event': 'user-login', 'userId': '123'}
    r.publish('events', json.dumps(message))
    
    print('Published:', message)
    
    # ========== Subscriber ==========
    r = redis.Redis(host='localhost', port=6379, decode_responses=True)
    pubsub = r.pubsub()
    pubsub.subscribe('events')
    
    print('Subscribed to events')
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            event = json.loads(message['data'])
            print('Received:', event)
    

    AWS SQS

    Node.js Example

    // ========== Install ==========
    // npm install @aws-sdk/client-sqs
    
    const { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } = require('@aws-sdk/client-sqs');
    
    const client = new SQSClient({ region: 'us-east-1' });
    const queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue';
    
    // ========== Send Message ==========
    async function sendMessage() {
      const command = new SendMessageCommand({
        QueueUrl: queueUrl,
        MessageBody: JSON.stringify({
          orderId: '12345',
          status: 'pending'
        }),
        MessageAttributes: {
          Type: {
            DataType: 'String',
            StringValue: 'Order'
          }
        }
      });
      
      const response = await client.send(command);
      console.log('Message sent:', response.MessageId);
    }
    
    // ========== Receive Messages ==========
    async function receiveMessages() {
      const command = new ReceiveMessageCommand({
        QueueUrl: queueUrl,
        MaxNumberOfMessages: 10,
        WaitTimeSeconds: 20, // Long polling
        MessageAttributeNames: ['All']
      });
      
      const response = await client.send(command);
      
      if (response.Messages) {
        for (const message of response.Messages) {
          const body = JSON.parse(message.Body);
          console.log('Received:', body);
          
          // Process message
          await processMessage(body);
          
          // Delete message
          await client.send(new DeleteMessageCommand({
            QueueUrl: queueUrl,
            ReceiptHandle: message.ReceiptHandle
          }));
        }
      }
    }
    
    // ========== Batch Send ==========
    async function sendBatch() {
      const { SendMessageBatchCommand } = require('@aws-sdk/client-sqs');
      
      const entries = [];
      for (let i = 0; i < 10; i++) {
        entries.push({
          Id: `msg-${i}`,
          MessageBody: JSON.stringify({ orderId: i })
        });
      }
      
      const command = new SendMessageBatchCommand({
        QueueUrl: queueUrl,
        Entries: entries
      });
      
      await client.send(command);
      console.log('Batch sent');
    }
    

    Message Patterns

    Work Queue

    Distribute tasks among multiple workers:

    // ========== Task Queue ==========
    class TaskQueue {
      constructor(broker) {
        this.broker = broker;
        this.queue = 'tasks';
      }
      
      async addTask(task) {
        await this.broker.send(this.queue, {
          id: generateId(),
          type: task.type,
          data: task.data,
          createdAt: Date.now()
        });
      }
      
      async startWorker() {
        await this.broker.consume(this.queue, async (message) => {
          console.log('Processing task:', message.id);
          
          try {
            await this.processTask(message);
            await this.broker.ack(message);
          } catch (error) {
            console.error('Task failed:', error);
            await this.broker.nack(message);
          }
        });
      }
      
      async processTask(task) {
        switch (task.type) {
          case 'email':
            await sendEmail(task.data);
            break;
          case 'resize-image':
            await resizeImage(task.data);
            break;
          case 'generate-report':
            await generateReport(task.data);
            break;
        }
      }
    }
    
    // Usage
    const queue = new TaskQueue(broker);
    
    // Add tasks
    await queue.addTask({ type: 'email', data: { to: 'user@example.com' } });
    await queue.addTask({ type: 'resize-image', data: { imageId: '123' } });
    
    // Start workers
    queue.startWorker(); // Worker 1
    queue.startWorker(); // Worker 2
    queue.startWorker(); // Worker 3
    

    Event-Driven Architecture

    // ========== Event Bus ==========
    class EventBus {
      constructor(broker) {
        this.broker = broker;
        this.exchange = 'events';
      }
      
      async publish(eventType, data) {
        await this.broker.publish(this.exchange, {
          type: eventType,
          data,
          timestamp: Date.now()
        });
      }
      
      async subscribe(eventType, handler) {
        await this.broker.subscribe(
          this.exchange,
          eventType,
          async (event) => {
            try {
              await handler(event.data);
            } catch (error) {
              console.error('Event handler failed:', error);
            }
          }
        );
      }
    }
    
    // ========== Services ==========
    // Order Service
    class OrderService {
      async createOrder(orderData) {
        const order = await db.orders.create(orderData);
        
        // Publish event
        await eventBus.publish('order.created', {
          orderId: order.id,
          customerId: order.customerId,
          total: order.total
        });
        
        return order;
      }
    }
    
    // Inventory Service
    class InventoryService {
      async initialize() {
        await eventBus.subscribe('order.created', async (data) => {
          console.log('Reserving inventory for order:', data.orderId);
          await this.reserveInventory(data);
        });
      }
    }
    
    // Email Service
    class EmailService {
      async initialize() {
        await eventBus.subscribe('order.created', async (data) => {
          console.log('Sending confirmation email:', data.orderId);
          await this.sendConfirmation(data);
        });
      }
    }
    
    // Analytics Service
    class AnalyticsService {
      async initialize() {
        await eventBus.subscribe('order.created', async (data) => {
          console.log('Recording analytics:', data.orderId);
          await this.recordEvent(data);
        });
      }
    }
    

    Saga Pattern

    Distributed transactions:

    // ========== Saga Orchestrator ==========
    class OrderSaga {
      constructor(broker) {
        this.broker = broker;
      }
      
      async execute(orderData) {
        const sagaId = generateId();
        const steps = [];
        
        try {
          // Step 1: Create order
          const order = await this.createOrder(orderData);
          steps.push({ action: 'createOrder', data: order });
          
          // Step 2: Reserve inventory
          await this.reserveInventory(order);
          steps.push({ action: 'reserveInventory', data: order });
          
          // Step 3: Process payment
          const payment = await this.processPayment(order);
          steps.push({ action: 'processPayment', data: payment });
          
          // Step 4: Ship order
          await this.shipOrder(order);
          steps.push({ action: 'shipOrder', data: order });
          
          // Success - publish completion
          await this.broker.publish('saga.completed', {
            sagaId,
            orderId: order.id
          });
          
        } catch (error) {
          console.error('Saga failed, compensating...', error);
          
          // Compensate in reverse order
          for (let i = steps.length - 1; i >= 0; i--) {
            await this.compensate(steps[i]);
          }
          
          await this.broker.publish('saga.failed', {
            sagaId,
            error: error.message
          });
        }
      }
      
      async compensate(step) {
        switch (step.action) {
          case 'createOrder':
            await this.cancelOrder(step.data);
            break;
          case 'reserveInventory':
            await this.releaseInventory(step.data);
            break;
          case 'processPayment':
            await this.refundPayment(step.data);
            break;
          case 'shipOrder':
            await this.cancelShipment(step.data);
            break;
        }
      }
    }
    

    Error Handling

    Dead Letter Queue

    // ========== RabbitMQ DLQ ==========
    async function setupDLQ() {
      const connection = await amqp.connect('amqp://localhost');
      const channel = await connection.createChannel();
      
      // Dead letter exchange
      await channel.assertExchange('dlx', 'direct');
      
      // Dead letter queue
      await channel.assertQueue('dlq', { durable: true });
      await channel.bindQueue('dlq', 'dlx', 'failed');
      
      // Main queue with DLQ
      await channel.assertQueue('tasks', {
        durable: true,
        arguments: {
          'x-dead-letter-exchange': 'dlx',
          'x-dead-letter-routing-key': 'failed'
        }
      });
    }
    
    // ========== Consumer with Retry ==========
    async function consumeWithRetry() {
      channel.consume('tasks', async (msg) => {
        try {
          const message = JSON.parse(msg.content.toString());
          await processMessage(message);
          channel.ack(msg);
        } catch (error) {
          console.error('Processing failed:', error);
          
          // Get retry count
          const headers = msg.properties.headers || {};
          const retryCount = headers['x-retry-count'] || 0;
          
          if (retryCount < 3) {
            // Retry
            channel.nack(msg, false, false); // Send to DLX
          } else {
            // Give up
            console.error('Max retries exceeded');
            channel.ack(msg); // Remove from queue
          }
        }
      });
    }
    

    Circuit Breaker

    // ========== Circuit Breaker Pattern ==========
    class CircuitBreaker {
      constructor(threshold = 5, timeout = 60000) {
        this.failureCount = 0;
        this.threshold = threshold;
        this.timeout = timeout;
        this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
        this.nextAttempt = Date.now();
      }
      
      async execute(fn) {
        if (this.state === 'OPEN') {
          if (Date.now() < this.nextAttempt) {
            throw new Error('Circuit breaker is OPEN');
          }
          this.state = 'HALF_OPEN';
        }
        
        try {
          const result = await fn();
          this.onSuccess();
          return result;
        } catch (error) {
          this.onFailure();
          throw error;
        }
      }
      
      onSuccess() {
        this.failureCount = 0;
        this.state = 'CLOSED';
      }
      
      onFailure() {
        this.failureCount++;
        if (this.failureCount >= this.threshold) {
          this.state = 'OPEN';
          this.nextAttempt = Date.now() + this.timeout;
          console.log('Circuit breaker opened');
        }
      }
    }
    
    // Usage
    const breaker = new CircuitBreaker();
    
    async function sendToExternalService(message) {
      await breaker.execute(async () => {
        await externalAPI.send(message);
      });
    }
    

    Monitoring

    Message Metrics

    // ========== Message Tracking ==========
    class MessageMetrics {
      constructor() {
        this.sent = 0;
        this.received = 0;
        this.failed = 0;
        this.processing = new Map();
      }
      
      onSend(messageId) {
        this.sent++;
        this.processing.set(messageId, Date.now());
      }
      
      onReceive(messageId) {
        this.received++;
        const startTime = this.processing.get(messageId);
        if (startTime) {
          const latency = Date.now() - startTime;
          console.log('Message latency:', latency, 'ms');
          this.processing.delete(messageId);
        }
      }
      
      onError(messageId, error) {
        this.failed++;
        console.error('Message failed:', messageId, error);
      }
      
      getStats() {
        return {
          sent: this.sent,
          received: this.received,
          failed: this.failed,
          inFlight: this.processing.size
        };
      }
    }
    

    Best Practices

    1. Idempotency

    Ensure messages can be processed multiple times safely:

    // ========== Idempotent Consumer ==========
    const processedMessages = new Set();
    
    async function processIdempotent(message) {
      const messageId = message.id;
      
      // Check if already processed
      if (processedMessages.has(messageId)) {
        console.log('Message already processed:', messageId);
        return;
      }
      
      // Process message
      await processMessage(message);
      
      // Mark as processed
      processedMessages.add(messageId);
      
      // Persist to database for durability
      await db.processedMessages.create({ id: messageId });
    }
    

    2. Message Ordering

    Preserve order when needed:

    // ========== Kafka Partition Key ==========
    // Messages with same key go to same partition (ordered)
    await producer.send({
      topic: 'orders',
      messages: [
        { key: 'customer-123', value: JSON.stringify(order1) },
        { key: 'customer-123', value: JSON.stringify(order2) }
      ]
    });
    

    3. Message Expiration

    Set TTL for time-sensitive messages:

    // ========== RabbitMQ TTL ==========
    channel.sendToQueue('tasks', Buffer.from(JSON.stringify(message)), {
      expiration: '60000' // 60 seconds
    });
    

    4. Graceful Shutdown

    // ========== Graceful Consumer Shutdown ==========
    process.on('SIGTERM', async () => {
      console.log('Shutting down gracefully...');
      
      // Stop accepting new messages
      await consumer.stop();
      
      // Wait for current messages to complete
      await Promise.all(processingMessages);
      
      // Close connections
      await consumer.disconnect();
      
      process.exit(0);
    });
    

    5. Monitoring and Alerting

  • Message queue depth - Alert on backlog
  • Processing time - Detect slow consumers
  • Error rate - Monitor failures
  • Consumer lag (Kafka) - Track processing delay
  • Dead letter queue - Monitor failed messages
  • Common Pitfalls

    1. Lost Messages

    Problem: Messages disappear

    Solution: Use persistent messages and manual acknowledgment

    // Persistent message
    channel.sendToQueue(queue, Buffer.from(message), {
      persistent: true
    });
    
    // Manual ack
    channel.consume(queue, (msg) => {
      processMessage(msg);
      channel.ack(msg); // Acknowledge after processing
    });
    

    2. Duplicate Processing

    Problem: Same message processed multiple times

    Solution: Implement idempotency

    3. Message Ordering Issues

    Problem: Messages processed out of order

    Solution: Use partition keys (Kafka) or single consumer

    4. Backpressure

    Problem: Producer overwhelms consumers

    Solution: Implement rate limiting and monitoring

    // Consumer prefetch
    channel.prefetch(1); // Process one at a time
    

    Resources

  • RabbitMQ Documentation
  • Apache Kafka Documentation
  • Redis Pub/Sub
  • AWS SQS Documentation
  • Enterprise Integration Patterns
  • Topics

    Message BrokerRabbitMQKafkaRedisMessaging

    Found This Helpful?

    If you have questions or suggestions for improving these notes, I'd love to hear from you.