Documentation

Pub-Sub messaging through AMQP broker like RabbitMQ

Send the event on a Publish-Subscribe Channel, which delivers a copy of a particular event to each receiver.

Pub-Sub messaging through an AMQP broker like RabbitMQ provides scalable, reliable, and decoupled communication for distributed systems. Multiple consumers can simultaneously receive real-time updates or events from a single published message, improving efficiency. RabbitMQ offers features like message acknowledgment, persistence, and routing, ensuring fault tolerance and guaranteed delivery. Its asynchronous nature supports high throughput and reduces system bottlenecks. The flexible routing options (direct, fanout, topic, headers) enable optimized message delivery to relevant subscribers.

Moreover, it supports integration across various languages and platforms, making it versatile for diverse environments. Overall, it simplifies the complex inter-service communication in modern microservices architectures.

Pattern page: https://www.enterpriseintegrationpatterns.com/patterns/messaging/PublishSubscribeChannel.html

Here is a basic broker configuration:

{
  "BrokerConfiguration": {
    "HostName": "localhost",
    "Port": 5672,
    "UserName": "guest",
    "Password": "guest"
  },
  ...
}
//Get broker connction settings
var brokerConfiguration = Configuration
    .GetSection("Nomirun.Module:Nomirun.Demo.Module:BrokerConfiguration")
    .Get<BrokerConfiguration>();

//Register Publisher for sending a message
services.AddAmqpPublisher(brokerConfiguration);

//Add subscriber to listen on the options defined as a hosted service. Consumer is defined as NotificationConsumer, which implements IConsume interface and listens and process NotificationMessage messages.
services.AddAmqpSubscriber<NotificationMessage, NotificationConsumer>(brokerConfiguration, options =>
{
    options.Queue = "Queue";
    options.Exchange = "Exchange";
    options.ExchangeType = "direct";
    options.RoutingKey = "routingKey";
    options.AutoAck = true;
});

1. Use the Publisher

Here is a message object:

public class NotificationMessage
{
    public string Message { get; set; }
}

And here is how you can publish a message.

public class PublishMe
{
    private readonly IPublish<NotificationMessage> _publisher;

    public PublishMe(IPublish<NotificationMessage> publisher)
    {
        _publisher = publisher;
    }

    public async Task Publish(NotificationMessage message)
    {
        await _publisher.PublishMessage(message, new PublisherOptions
        {
            Queue = "PublisherQueue",
            Exchange = "Exchange",
            ExchangeType = "fanout",
            RoutingKey = null
        });
    }
}

2. Implement subscriber/consumer

Consumers will connect to the broker and listen for new messages. Once a message is received it is processed in the ConsumeMessage method.

public class NotificationConsumer : IConsume<NotificationMessage>
{
    private readonly ILogger<NotificationConsumer> _logger;

    public NotificationConsumer(ILogger<NotificationConsumer> logger)
    {
        _logger = logger;
    }

    public async Task ConsumeMessage(NotificationMessage message, BasicDeliverEventArgs eventArgs)
    {
        _logger.LogInformation($"Consumed it in the consumer: {message.Message}, Correlation ID: {eventArgs.BasicProperties.CorrelationId}");
        await Task.CompletedTask;
    }
}