RabbitMQ Configuration
This page describes how to configure RabbitMQ connection and messaging properties for each domain in Reactive Commons. A domain represents an independent connection to a RabbitMQ broker. Your application can work with a single domain (one broker) or multiple domains (several independent brokers), each with its own properties. See Communication Scenarios for guidance on when to use multiple domains.
All available properties are defined in the
AsyncProps
class. There are two ways to provide these values via application.yaml or a combination of YAML and
programmatic configuration, as described in the Configuration approaches section below.
app:
async:
app: # this is the name of the default domain
withDLQRetry: false # if you want to have dlq queues with retries you can set it to true, you cannot change it after queues are created, because you will get an error, so you should delete topology before the change.
maxRetries: -1 # -1 will be considered default value. When withDLQRetry is true, it will be retried 10 times. When withDLQRetry is false, it will be retried indefinitely.
retryDelay: 1000 # interval for message retries, with and without DLQRetry
listenReplies: null # Allows true or false values. If you're using the ReqReply pattern, set it to true. If you don't, set it to false.
createTopology: true # if your organization have restrictions with automatic topology creation you can set it to false and create it manually or by your organization process.
delayedCommands: false # Enable to send a delayed command to an external target
prefetchCount: 250 # is the maximum number of in flight messages you can reduce it to process less concurrent messages, this setting acts per instance of your service
useDiscardNotifierPerDomain: false # if true it uses a discard notifier for each domain,when false it uses a single discard notifier for all domains with default 'app' domain
enabled: true # if you want to disable this domain you can set it to false
mandatory: false # if you want to enable mandatory messages, you can set it to true, this will throw an exception if the message cannot be routed to any queue
brokerType: "rabbitmq" # please don't change this value
queueType: null # you can set to 'classic' or to 'quorum' if your RabbitMQ cluster supports it, by default it will take the virtual host default queue type
flux:
maxConcurrency: 250 # max concurrency of listener flow
domain:
ignoreThisListener: false # Allows you to disable event listener for this specific domain
events:
exchange: domainEvents # you can change the exchange, but you should do it in all applications consistently
eventsSuffix: subsEvents # events queue name suffix, name will be like ${spring.application.name}.${app.async.domain.events.eventsSuffix}
notificationSuffix: notification # notification events queue name suffix
direct:
exchange: directMessages # you can change the exchange, but you should do it in all applications
querySuffix: query # queries queue name suffix, name will be like ${spring.application.name}.${app.async.direct.querySuffix}
commandSuffix: '' # commands queue name suffix, name will be like ${spring.application.name}.${app.async.direct.querySuffix} or ${spring.application.name} if empty by default
discardTimeoutQueries: false # enable to discard this condition
global:
exchange: globalReply # you can change the exchange, but you should do it in all applications
repliesSuffix: replies # async query replies events queue name suffix
connectionProperties: # you can override the connection properties of each domain
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# Another domain can be configured with same properties structure that app
accounts: # this is a second domain name and can have another independent setup
connectionProperties: # you can override the connection properties of each domain
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /accounts
app Domain ConfigurationTo ensure a correct configuration, you should always override the properties of the app domain. If it is not
configured, an exception will be thrown. You can also add properties for additional custom domain if needed.
Configuration approaches
There are two ways to supply domain properties. Choose the one that best fits your use case.
Approach 1: YAML only
Define all domains directly in application.yaml as shown above. This is the simplest approach and works well when
properties do not depend on runtime values such as secrets.
Approach 2: Hybrid YAML + RabbitPropsCustomizer
Use this approach when you want to define the domain structure in YAML (topology, retry settings, etc.) but need to set some properties at runtime for example, loading connection credentials from a secrets manager.
Declare your domains in application.yaml as usual, then define a RabbitPropsCustomizer bean to override specific
properties after the YAML is loaded. The customizer receives the full map of configured domains and can modify
any property on any domain.
The RabbitPropsCustomizer can work with or without pre-existing YAML domains. If no domains are defined in your
application.yaml under app.async, you can define all domains directly inside the customizer using
domainProperties.put("<domain>", AsyncProps.builder()...build()). At least one domain must exist after the customizer
executes, otherwise an InvalidConfigurationException is thrown.
You have two options:
Option A: Define domains in YAML, then override with customizer
Declare your domains in application.yaml as usual, then use the customizer to override or extend them.
app:
async:
app: # first domain
withDLQRetry: true
maxRetries: 3
listenReplies: true
accounts: # second domain
listenReplies: false
package sample;
import org.reactivecommons.async.rabbit.config.RabbitProperties;
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// Loads RabbitMQ connection properties from a secrets manager at runtime.
// See the "Loading properties from a secret" section below for a complete implementation example.
private RabbitProperties loadFromSecret(String secretName) {
// ...
return new RabbitProperties();
}
@Bean
public AsyncPropsDomain.RabbitPropsCustomizer rabbitPropsCustomizer() {
return domainProperties -> {
// Customize the "app" domain — overrides take precedence over YAML values
AsyncProps app = domainProperties.get("app");
if (app != null) {
app.setConnectionProperties(loadFromSecret("secret-app-rabbit"));
}
// Customize the "accounts" domain independently
AsyncProps accounts = domainProperties.get("accounts");
if (accounts != null) {
accounts.setConnectionProperties(loadFromSecret("secret-accounts-rabbit"));
}
};
}
}
Option B: Define all domains in the customizer (no YAML domains)
If you prefer full programmatic control, omit the app.async section entirely from your application.yaml and
define all domains
inside the customizer:
package sample;
import org.reactivecommons.async.rabbit.config.RabbitProperties;
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
private RabbitProperties loadFromSecret(String secretName) {
// ...
return new RabbitProperties();
}
@Bean
public AsyncPropsDomain.RabbitPropsCustomizer rabbitPropsCustomizer() {
return domainProperties -> {
// Define all domains programmatically
domainProperties.put("app", AsyncProps.builder()
.withDLQRetry(Boolean.TRUE)
.maxRetries(3)
.listenReplies(Boolean.TRUE)
.connectionProperties(loadFromSecret("secret-app-rabbit"))
.build());
domainProperties.put("accounts", AsyncProps.builder()
.listenReplies(Boolean.FALSE)
.connectionProperties(loadFromSecret("secret-accounts-rabbit"))
.build());
};
}
}
Key rules for the hybrid approach:
- Properties set in the customizer take precedence over YAML values.
- YAML values not touched by the customizer are preserved.
- The customizer can also add new domains by calling
domainProperties.put("newDomain", asyncProps).
Loading properties from a secret
Using AsyncPropsDomain.RabbitSecretFiller to load secrets is deprecated and will be removed in a future version.
Use Approach 2: Hybrid YAML + RabbitPropsCustomizer instead,
which provides full control over all domain properties at runtime and is the recommended way to integrate with a
secrets manager.
The recommended way to load connection properties from a secrets manager is to use the RabbitPropsCustomizer (see
Approach 2). This gives you full control over all domain properties
at runtime. The example below uses the Secrets Manager library.
- Create a
@ConfigurationPropertiesrecord to map the secret fields:
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "helpers.secrets-manager")
public record SecretsManagerProperties(
String endpoint,
Integer cacheSize,
Integer cacheTime,
String rabbit) {
}
- Create a
RabbitMQConnectionPropertiesrecord to map the fields of your secret and provide a conversion method:
import org.reactivecommons.async.rabbit.config.RabbitProperties;
public record RabbitMQConnectionProperties(
String virtualhost,
String host,
String password,
String username,
boolean ssl,
Integer port) {
public RabbitProperties toRabbitProperties() {
var rabbitProperties = new RabbitProperties();
rabbitProperties.setHost(this.host());
rabbitProperties.setUsername(this.username());
rabbitProperties.setPassword(this.password());
rabbitProperties.setPort(this.port());
rabbitProperties.setVirtualHost(this.virtualhost());
rabbitProperties.getSsl().setEnabled(this.ssl()); // To enable SSL
return rabbitProperties;
}
}
- Create a
SecretsConfigclass that registers theGenericManagerbean and exposes the RabbitMQ secret as a bean:
import co.com.bancolombia.secretsmanager.api.GenericManager;
import co.com.bancolombia.secretsmanager.connector.AWSSecretManagerConnector;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import software.amazon.awssdk.regions.Region;
@Log4j2
@Configuration
@RequiredArgsConstructor
public class SecretsConfig {
private final SecretsManagerProperties properties;
private static final String REGION_SECRET = Region.US_EAST_1.toString();
@Bean
@Profile("!local")
public GenericManager connectionAws() {
return new AWSSecretManagerConnector(REGION_SECRET);
}
@Bean
@Profile("local")
public GenericManager connectionLocal() {
return new AWSSecretManagerConnector(REGION_SECRET, properties.endpoint());
}
public <T> T getSecret(String secretName, Class<T> cls, GenericManager connector) {
try {
log.info("Secret was obtained successfully");
return connector.getSecret(secretName, cls);
} catch (Exception e) {
log.error("Error getting secret: {}", e.getMessage());
return null;
}
}
@Bean
public RabbitMQConnectionProperties getSecretRabbitmq(GenericManager connector) {
return this.getSecret(properties.rabbit(), RabbitMQConnectionProperties.class, connector);
}
}
- Create a separate
RabbitMQConfigclass that injects theRabbitMQConnectionPropertiesbean and defines theRabbitPropsCustomizer:
import lombok.RequiredArgsConstructor;
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@RequiredArgsConstructor
public class RabbitMQConfig {
private final RabbitMQConnectionProperties rabbitMQConnectionProperties;
@Bean
public AsyncPropsDomain.RabbitPropsCustomizer rabbitPropsCustomizer() {
return domainProperties -> {
AsyncProps app = domainProperties.get("app");
if (app != null) {
app.setConnectionProperties(rabbitMQConnectionProperties.toRabbitProperties());
}
};
}
}
Customizing the connection
For advanced control over the RabbitMQ connection, you can define a ConnectionFactoryCustomizer bean. This allows you
to configure options that are not exposed through standard properties, such as custom timeouts, SSL/TLS settings,
or automatic recovery strategies:
@Bean
public ConnectionFactoryCustomizer connectionFactoryCustomizer() {
return (connectionFactory, asyncProps) -> {
connectionFactory.setExceptionHandler(new MyCustomExceptionHandler()); // Optional custom exception handler
connectionFactory.setCredentialsProvider(new MyCustomCredentialsProvider()); // Optional custom credentials provider
return connectionFactory;
};
}
Connections and channels
Reactive Commons establishes a single connection to the RabbitMQ broker, which is reused for all messaging operations, both sending and listening. However, the number of open channels within that connection varies depending on the enabled annotations and the type of interaction (sending, listening, or both). Each scenario described below shows how the number of channels changes according to the applied configuration.
In the context of this documentation, a domain refers to a connection with a broker. The configuration supports up to two brokers, which means the described scenarios are limited to a maximum of two domains.
Annotations used in the tables
[1] Annotations for sending messages:
@EnableDomainEventBusto send domain events.@EnableDirectAsyncGatewayto send commands and asynchronous queries.
[2] Annotations for listening to messages:
@EnableEventListenersto listen domain events.@EnableCommandListenersto listen commands.@EnableQueryListenersto serve async queries.
1. Sending messages (single domain)
In this scenario we only use annotations to enable message sending only, along with different configurations for the
listenRepliesproperty:
| Enabled annotations | listenReplies | Broker | Connections | Channels |
|---|---|---|---|---|
| One or all for sending [1] | true | Broker app | 1 | 13 |
| false | Broker app | 1 | 11 |
2. Sending messages (multiple domains)
In this scenario, we only send messages to two brokers, using one or all of the annotations and configurations for the
listenRepliesproperty:
| Enabled annotations | listenReplies | Broker | Connections | Channels |
|---|---|---|---|---|
| One or all for sending [1] | true | Broker app | 1 | 18 |
| Additional broker | 1 | 8 | ||
| One or all for sending [1] | false | Broker app | 1 | 16 |
| Additional broker | 1 | 6 |
3. Listening for messages (single domain)
This scenario enables only listening for messages from a single broker, using one or all available annotations:
| Enabled annotations | Broker | Connections | Channels |
|---|---|---|---|
| One or all for listening [2] | Broker app | 1 | 14 |
4. Listening for messages (multiple domains)
In this scenario, messages are listened to from two brokers, with variations in the annotations enabled:
| Enabled annotations | Broker | Connections | Channels |
|---|---|---|---|
| All for listening [2] | Broker app | 1 | 19 |
| Additional broker | 1 | 8 | |
| Two for listening [2] | Broker app | 1 | 18 |
| Additional broker | 1 | 8 | |
| One for listening [2] | Broker app | 1 | 17 |
| Additional broker | 1 | 7 |
5. Sending and listening for messages (single domain)
This scenario enables both sending and listening for messages on a single broker, with all annotations enabled:
| Enabled annotations | Broker | Connections | Channels |
|---|---|---|---|
| All for sending [1] and all for listening [2] | Broker app | 1 | 16 |
6. Sending and listening for messages (multiple domains)
In this scenario, messages are sent and listened from two brokers, with variations in the annotations enabled:
| Enabled annotations | Broker | Connections | Channels |
|---|---|---|---|
| All for sending [1] and all for listening [2] | Broker app | 1 | 21 |
| Additional broker | 1 | 10 | |
| One for sending [1] and all for listening [2] | Broker app | 1 | 20 |
| Additional broker | 1 | 9 | |
| All for sending [1] and two for listening [2] | Broker app | 1 | 20 |
| Additional broker | 1 | 10 | |
| All for sending [1] and one for listening [2] | Broker app | 1 | 19 |
| Additional broker | 1 | 8 | |
| All for sending [1] | Broker app | 1 | 16 |
| Additional broker | 1 | 6 |
Recommendations
- Resource Optimization: If only sending commands and events is required, disabling the
listenRepliesproperty reduces the number of open channels. - Selective Annotation Activation: Enabling only the necessary annotations for the use case can improve performance and simplify configuration.
- Proper Use of Configuration Properties: Adjusting configuration properties according to the specific use case allows for resource optimization and avoids unnecessary configurations.
Connections in microservices with multiple replicas
In a typical cloud production environment, such as AWS, microservices are deployed in containers orchestrated by Kubernetes, using managed services like Amazon EKS. For the messaging broker, Amazon MQ for RabbitMQ is used, configured in a 3-node cluster with a Multi-AZ deployment to ensure high availability and fault tolerance.
When working with microservices that use multiple replicas (instances) and implement Reactive Commons, it is important to understand how connections to the message broker are managed. Each replica of a microservice establishes a single connection to the broker, which is used for both sending and listening to messages.
The number of open channels within that single connection will depend on the configuration of the annotations used, as described in the connection scenarios above. This allows each replica to manage its messaging operations independently, distributing the workload efficiently.
For example, if a microservice is deployed with 4 replicas, each of them will establish its own connection to the broker. As a result, the entire microservice deployment will have a total of 4 connections to the broker.
Mandatory property
The mandatory property is a message publishing parameter in RabbitMQ that determines the behavior when a message cannot be routed to any queue. This can happen if there are no queues bound to the exchange or if the routing key does not match any of the available queues.
By default, this option is disabled, but if activated (mandatory = true), it works right after the message is
published to an exchange, but before it is routed to a queue.
When a message is published with mandatory = true, RabbitMQ will try to route it from the exchange to one or more
queues. If no queue receives the message, then:
- The message is not lost, but it is not delivered to any queue.
- RabbitMQ triggers a basic.return event on the producer's channel.
- The producer must have a ReturnListener or an equivalent handler to receive and process the returned message. If one is not defined, the message is lost.
Example
Assuming we have:
- A direct type exchange.
- A queue bound with the key
order.created. - A message is published with the key
order.cancelledandmandatory = true.
Result:
- If there is no queue bound with
order.cancelled, the message is not routed. - Since
mandatory = true, RabbitMQ tries to return it to the producer. - If there is a ReturnListener, this message can be captured and handled, for example, by sending it to another
- consumer's queue, DLQ queues, saving it in a log file, or in a database.
Advantages
- Early detection of routing errors: Prevents critical messages from "disappearing" without a trace, which facilitates the identification of erroneous configurations in bindings or patterns.
- Integrity and reliability: Ensures that each message finds a consumer or, failing that, returns to the producer for alternative handling (DLQ queues, logs, database).
- Operational visibility: Facilitates metrics of "unrouted messages" and alerts when the event flow does not follow the planned routes.
Considerations
Although this property does not prevent performance problems or degradation of the RabbitMQ cluster, it is useful for preventing the loss of unrouted messages and for detecting configuration errors in routing.
When mandatory is active, under normal conditions (all routes exist), there is practically no impact. In anomalous situations, there will be additional return traffic for each unroutable message. This implies an extra load for both RabbitMQ (which must send the message back to the producer) and the sending application (which must process the returned message).
Implementation
To enable the mandatory property, you can configure it in your project's application.yaml file:
app:
async:
app: # this is the name of the default domain
mandatory: true # enable mandatory property
Now we configure the return handler to manage messages that could not be delivered correctly. By default, these messages
are displayed in a log.
To customize this behavior, a class that implements the UnroutableMessageHandler interface is created and registered
as a Spring bean:
package sample;
import co.com.mypackage.usecase.MyUseCase;
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import org.reactivecommons.async.rabbit.communications.UnroutableMessageHandler;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.OutboundMessageResult;
import java.nio.charset.StandardCharsets;
@Log
@Component
@RequiredArgsConstructor
public class ResendUnroutableMessageHandler implements UnroutableMessageHandler {
private final MyUseCase useCase;
@Override
public Mono<Void> processMessage(OutboundMessageResult<OutboundMessage> result) {
var returned = result.outboundMessage();
log.severe("Unroutable message: exchange=" + returned.getExchange()
+ ", routingKey=" + returned.getRoutingKey()
+ ", body=" + new String(returned.getBody(), StandardCharsets.UTF_8)
+ ", properties=" + returned.getProperties()
);
// Process the unroutable message
return useCase.sendMessage(new String(returned.getBody(), StandardCharsets.UTF_8));
}
}
Send unrouted messages to a queue
To send the unrouted message to a queue, we use the @EnableDomainEventBus annotations for
domain events, and @EnableDirectAsyncGateway
for commands and
asynchronous queries, as appropriate.
It is important to ensure that the queue exists before sending the message, as it will otherwise be lost. Therefore, it is recommended to verify or create the queue beforehand to ensure successful delivery.
package sample;
import lombok.extern.java.Log;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.async.api.DirectAsyncGateway;
import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway;
import org.reactivecommons.async.rabbit.communications.UnroutableMessageHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.OutboundMessageResult;
import tools.jackson.core.type.TypeReference;
import tools.jackson.databind.JsonNode;
import tools.jackson.databind.json.JsonMapper;
@Log
@Component
@EnableDirectAsyncGateway
public class ResendUnroutableMessageHandler implements UnroutableMessageHandler {
private final JsonMapper jsonMapper;
private final String retryQueueName;
private final DirectAsyncGateway gateway;
public ResendUnroutableMessageHandler(
JsonMapper jsonMapper,
DirectAsyncGateway gateway,
@Value("${adapters.rabbitmq.retry-queue-name}") String retryQueueName) {
this.jsonMapper = jsonMapper;
this.retryQueueName = retryQueueName;
this.gateway = gateway;
}
public Mono<Void> emitCommand(String name, String commandId, Object data) {
return Mono.from(gateway.sendCommand(
// Connection with broker using the properties defined through the
// AsyncRabbitPropsDomainProperties bean with the "logs" domain
new Command<>(name, commandId, data), retryQueueName, "logs")
);
}
@Override
public Mono<Void> processMessage(OutboundMessageResult<OutboundMessage> result) {
OutboundMessage returned = result.outboundMessage();
try {
// The unroutable message is a command, so the message body is deserialized to the Command class.
// Use the DomainEvent class for domain events and the AsyncQuery class for asynchronous queries.
Command<JsonNode> command = jsonMapper.readValue(returned.getBody(), new TypeReference<>() {
});
// Send the message to the queue
return emitCommand(command.getName(), command.getCommandId(), command.getData())
.doOnError(e -> log.severe("Failed to send the returned message: " + e.getMessage()));
} catch (Exception e) {
log.severe("Error deserializing the returned message: " + e.getMessage());
return Mono.empty();
}
}
}
In the RabbitMQ configuration class, we create the UnroutableMessageProcessor bean to register the unrouted message
handler.
package sample;
import org.reactivecommons.async.rabbit.communications.UnroutableMessageNotifier;
import org.reactivecommons.async.rabbit.communications.UnroutableMessageProcessor;
import org.reactivecommons.async.rabbit.config.RabbitProperties;
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
import org.reactivecommons.async.rabbit.config.props.AsyncRabbitPropsDomainProperties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class RabbitMQConfig {
private final RabbitMQConnectionProperties properties;
private final RabbitMQConnectionProperties propertiesLogs;
private final Boolean withDLQRetry;
private final Integer maxRetries;
private final Integer retryDelay;
public RabbitMQConfig(@Qualifier("rabbit") RabbitMQConnectionProperties properties,
@Qualifier("rabbitLogs") RabbitMQConnectionProperties propertiesLogs,
@Value("${adapters.rabbitmq.withDLQRetry}") Boolean withDLQRetry,
@Value("${adapters.rabbitmq.maxRetries}") Integer maxRetries,
@Value("${adapters.rabbitmq.retryDelay}") Integer retryDelay) {
this.properties = properties;
this.propertiesLogs = propertiesLogs;
this.withDLQRetry = withDLQRetry;
this.maxRetries = maxRetries;
this.retryDelay = retryDelay;
}
// This bean is used to create the RabbitMQ connection properties for the application
@Bean
@Primary
public AsyncRabbitPropsDomainProperties customDomainProperties() {
var propertiesApp = new RabbitProperties();
propertiesApp.setHost(properties.hostname());
propertiesApp.setPort(properties.port());
propertiesApp.setVirtualHost(properties.virtualhost());
propertiesApp.setUsername(properties.username());
propertiesApp.setPassword(properties.password());
propertiesApp.getSsl().setEnabled(properties.ssl());
var propertiesLogs = new RabbitProperties();
propertiesLogs.setHost(propertiesDual.hostname());
propertiesLogs.setPort(propertiesDual.port());
propertiesLogs.setVirtualHost(propertiesDual.virtualhost());
propertiesLogs.setUsername(propertiesDual.username());
propertiesLogs.setPassword(propertiesDual.password());
propertiesLogs.getSsl().setEnabled(propertiesDual.ssl());
return AsyncRabbitPropsDomainProperties.builder()
.withDomain("app", AsyncProps.builder()
.connectionProperties(propertiesApp)
.withDLQRetry(withDLQRetry)
.maxRetries(maxRetries)
.retryDelay(retryDelay)
.mandatory(Boolean.TRUE)
.build())
.withDomain("logs", AsyncProps.builder()
.connectionProperties(propertiesLogs)
.mandatory(Boolean.TRUE)
.build())
.build();
}
// This bean is used to register the handler for unroutable messages
@Bean
UnroutableMessageProcessor registerUnroutableMessageHandler(UnroutableMessageNotifier unroutableMessageNotifier,
ResendUnroutableMessageHandler handler) {
var factory = new UnroutableMessageProcessor();
unroutableMessageNotifier.listenToUnroutableMessages(handler);
return factory;
}
}
Troubleshooting
PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange'
This error occurs when there is a mismatch between the queue properties defined in your application and the properties of the queue that already exists in the RabbitMQ broker. It commonly happens when you try to:
- Change the name of a domain.
- Enable or disable DLQ (Dead Letter Queue) functionality for a queue that has already been created.
Error log example:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method:
#method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange'
for queue 'ms_example.query' in vhost '/': received none but current is the value 'directMessages.DLQ'
of type 'longstr', class-id=50, method-id=10)
Cause:
RabbitMQ does not allow changing certain durable properties of a queue after it has been declared, such as the
x-dead-letter-exchange argument.
When your application starts, it tries to declare the queue with the new properties, but the broker rejects the
declaration because it conflicts
with the existing queue.
Solution:
To resolve this issue, you must manually delete the conflicting queues from the RabbitMQ broker. Once the queues are deleted, you can restart the microservice to recreate them with the correct, updated properties.