Documentation

Request-Reply messaging through AMQP broker like RabbitMQ

Nomirun SDK supports sending a pair of Request-Reply messages, each on its own channel through AMQP broker.

Request-Reply messaging through an AMQP broker like RabbitMQ allows seamless two-way communication between services. It ensures efficient decoupling by enabling separate services to communicate indirectly via broker-managed queues, avoiding direct dependency. The requester sends a request, and the broker routes it to the responder, which then sends back a reply through a different queue.

This approach simplifies asynchronous messaging, provides robust fault tolerance, and supports scalability by balancing loads across multiple consumers. Additionally, RabbitMQ’s inherent advantages, like message durability and reliable delivery acknowledgments, make it ideal for implementing the request-reply pattern in distributed systems. This ensures enhanced flexibility and reliability.

Pattern page: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html

1. Registration of requester

1.1. First configure the broker

Here is a basic broker configuration:

{
  "BrokerConfiguration": {
    "HostName": "localhost",
    "Port": 5672,
    "UserName": "guest",
    "Password": "guest"
  },
  ...
}

Get broker configuration:

var brokerConfiguration = Configuration
    .GetSection("Nomirun.Module:Nomirun.Demo.Module:BrokerConfiguration")
    .Get<BrokerConfiguration>();

Then, register Requester which will send the request message RequestMessage to defined queue in the requestOptions and listen with RequesterConsumer for the response from the replier as message ReplyMessage defined in the replyOptions queue.

services.AddAmqpRequester<RequestMessage, ReplyMessage, RequesterConsumer>(
    brokerConfiguration,
    requestOptions =>
    {
        requestOptions.RoutingKey = null;
        requestOptions.Queue = "req_queue";
        requestOptions.Exchange = "";
        requestOptions.ExchangeType = null;
        requestOptions.AutoAck = false;
    },
    consumeOptions =>
    {
        consumeOptions.RoutingKey = null;
        consumeOptions.Queue = "resp_queue";
        consumeOptions.Exchange = null;
        consumeOptions.ExchangeType = null;
        consumeOptions.AutoAck = false;
    });
1.2. To send the request from the API controller
[Route("api")]
public class RequesterController : NomirunApiController
{
    private readonly ILogger<RequesterController> _logger;
    private readonly ISendRequest<RequestMessage> _amqpRequestSender;

    public RequesterController(ILogger<RequesterController> logger,
        ISendRequest<RequestMessage> amqpRequestSender) : base(logger)
    {
        _logger = logger;
        _amqpRequestSender = amqpRequestSender;
    }

    [HttpGet("requester")]
    [SwaggerOperation(
        Summary = "Requester",
        Description = "",
        OperationId = nameof(Requester)
    )]
    public async Task<IActionResult> Requester(RequestMessage message)
    {
        await _amqpRequestSender.SendRequest(message);

        return Ok();
    }
}
1.3. Listen for replies

To listen for replies you need to implement RequesterConsumer consumer:

public class RequesterConsumer : IConsume<ReplyMessage>
{
    private readonly ILogger<RequesterConsumer> _logger;

    public RequesterConsumer(ILogger<RequesterConsumer> logger)
    {
        _logger = logger;
    }

    public async Task ConsumeMessage(ReplyMessage message, BasicDeliverEventArgs eventArgs)
    {
        _logger.LogInformation($"Consumed it in the consumer: {message.Message}, Correlation ID: {eventArgs.BasicProperties.CorrelationId}");

        await Task.CompletedTask;
    }
}

2. Create replier

2.1. Register replier

In the other application, you can create replier like this:

//Get broker connction settings
var brokerConfiguration = Configuration
    .GetSection("Nomirun.Module:Nomirun.Demo.Module:BrokerConfiguration")
    .Get<BrokerConfiguration>();

Finally, register a replier that will consume RequestMessage with consumer ReplierConsumeAndResponse that will reply with the ReplyMessage.

services.AddAmqpReplier<RequestMessage, ReplyMessage, ReplierConsumeAndResponse>(
    brokerConfiguration,
    consumeOptions =>
    {
        consumeOptions.RoutingKey = null;
        consumeOptions.Queue = "req_queue";
        consumeOptions.Exchange = "";
        consumeOptions.ExchangeType = null;
        consumeOptions.AutoAck = false;
    },
    replyOptions =>
    {
        replyOptions.RoutingKey = null;
        replyOptions.Queue = "resp_queue";
        replyOptions.Exchange = null;
        replyOptions.ExchangeType = null;
        replyOptions.AutoAck = false;
    });
2.2. Listen for requests and send response to requester
public class ReplierConsumeAndResponse : IConsumeAndResponse<RequestMessage, ReplyMessage>
{
    private readonly ILogger<RequestMessage> _logger;

    public ReplierConsumeAndResponse(ILogger<RequestMessage> logger)
    {
        _logger = logger;
    }

    public Task<ReplyMessage> ConsumeMessageAndReply(RequestMessage message, BasicDeliverEventArgs eventArgs)
    {
        if (message == null)
            throw new ArgumentNullException(nameof(message));

        if (eventArgs == null)
            throw new ArgumentNullException(nameof(eventArgs));

        if (eventArgs.BasicProperties == null)
            throw new ArgumentException("BasicProperties in eventArgs cannot be null.");

        _logger.LogInformation($"Consumed it in the consumer: {message.Message}, Correlation ID: {eventArgs.BasicProperties.CorrelationId}");

        return Task.FromResult(new ReplyMessage
        {
            Message = message.Message
        });
    }
}