Nomirun SDK supports sending a pair of Request-Reply messages, each on its own channel through AMQP broker.
Request-Reply messaging through an AMQP broker like RabbitMQ allows seamless two-way communication between services. It ensures efficient decoupling by enabling separate services to communicate indirectly via broker-managed queues, avoiding direct dependency. The requester sends a request, and the broker routes it to the responder, which then sends back a reply through a different queue.
This approach simplifies asynchronous messaging, provides robust fault tolerance, and supports scalability by balancing loads across multiple consumers. Additionally, RabbitMQ’s inherent advantages, like message durability and reliable delivery acknowledgments, make it ideal for implementing the request-reply pattern in distributed systems. This ensures enhanced flexibility and reliability.
Pattern page: https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html
Here is a basic broker configuration:
{
"BrokerConfiguration": {
"HostName": "localhost",
"Port": 5672,
"UserName": "guest",
"Password": "guest"
},
...
}
Get broker configuration:
var brokerConfiguration = Configuration
.GetSection("Nomirun.Module:Nomirun.Demo.Module:BrokerConfiguration")
.Get<BrokerConfiguration>();
Then, register Requester which will send the request message RequestMessage
to defined queue in the requestOptions and listen with RequesterConsumer
for the response from the replier as message ReplyMessage
defined in the replyOptions queue.
services.AddAmqpRequester<RequestMessage, ReplyMessage, RequesterConsumer>(
brokerConfiguration,
requestOptions =>
{
requestOptions.RoutingKey = null;
requestOptions.Queue = "req_queue";
requestOptions.Exchange = "";
requestOptions.ExchangeType = null;
requestOptions.AutoAck = false;
},
consumeOptions =>
{
consumeOptions.RoutingKey = null;
consumeOptions.Queue = "resp_queue";
consumeOptions.Exchange = null;
consumeOptions.ExchangeType = null;
consumeOptions.AutoAck = false;
});
[Route("api")]
public class RequesterController : NomirunApiController
{
private readonly ILogger<RequesterController> _logger;
private readonly ISendRequest<RequestMessage> _amqpRequestSender;
public RequesterController(ILogger<RequesterController> logger,
ISendRequest<RequestMessage> amqpRequestSender) : base(logger)
{
_logger = logger;
_amqpRequestSender = amqpRequestSender;
}
[HttpGet("requester")]
[SwaggerOperation(
Summary = "Requester",
Description = "",
OperationId = nameof(Requester)
)]
public async Task<IActionResult> Requester(RequestMessage message)
{
await _amqpRequestSender.SendRequest(message);
return Ok();
}
}
To listen for replies you need to implement RequesterConsumer
consumer:
public class RequesterConsumer : IConsume<ReplyMessage>
{
private readonly ILogger<RequesterConsumer> _logger;
public RequesterConsumer(ILogger<RequesterConsumer> logger)
{
_logger = logger;
}
public async Task ConsumeMessage(ReplyMessage message, BasicDeliverEventArgs eventArgs)
{
_logger.LogInformation($"Consumed it in the consumer: {message.Message}, Correlation ID: {eventArgs.BasicProperties.CorrelationId}");
await Task.CompletedTask;
}
}
In the other application, you can create replier like this:
//Get broker connction settings
var brokerConfiguration = Configuration
.GetSection("Nomirun.Module:Nomirun.Demo.Module:BrokerConfiguration")
.Get<BrokerConfiguration>();
Finally, register a replier that will consume RequestMessage
with consumer ReplierConsumeAndResponse
that will reply with the ReplyMessage
.
services.AddAmqpReplier<RequestMessage, ReplyMessage, ReplierConsumeAndResponse>(
brokerConfiguration,
consumeOptions =>
{
consumeOptions.RoutingKey = null;
consumeOptions.Queue = "req_queue";
consumeOptions.Exchange = "";
consumeOptions.ExchangeType = null;
consumeOptions.AutoAck = false;
},
replyOptions =>
{
replyOptions.RoutingKey = null;
replyOptions.Queue = "resp_queue";
replyOptions.Exchange = null;
replyOptions.ExchangeType = null;
replyOptions.AutoAck = false;
});
public class ReplierConsumeAndResponse : IConsumeAndResponse<RequestMessage, ReplyMessage>
{
private readonly ILogger<RequestMessage> _logger;
public ReplierConsumeAndResponse(ILogger<RequestMessage> logger)
{
_logger = logger;
}
public Task<ReplyMessage> ConsumeMessageAndReply(RequestMessage message, BasicDeliverEventArgs eventArgs)
{
if (message == null)
throw new ArgumentNullException(nameof(message));
if (eventArgs == null)
throw new ArgumentNullException(nameof(eventArgs));
if (eventArgs.BasicProperties == null)
throw new ArgumentException("BasicProperties in eventArgs cannot be null.");
_logger.LogInformation($"Consumed it in the consumer: {message.Message}, Correlation ID: {eventArgs.BasicProperties.CorrelationId}");
return Task.FromResult(new ReplyMessage
{
Message = message.Message
});
}
}