Skip to main content

Kafka Configuration

You can customize some predefined variables of Reactive Commons.

This can be done by Spring Boot application.yaml or by overriding the AsyncKafkaProps bean.

reactive:
commons:
kafka:
app: # this is the name of the default domain
withDLQRetry: false # if you want to have dlq queues with retries you can set it to true, you cannot change it after queues are created, because you will get an error, so you should delete topology before the change.
maxRetries: -1 # -1 will be considered default value. When withDLQRetry is true, it will be retried 10 times. When withDLQRetry is false, it will be retried indefinitely.
retryDelay: 1000 # interval for message retries, with and without DLQRetry
checkExistingTopics: true # if you don't want to verify topic existence before send a record you can set it to false
createTopology: true # if your organization have restrictions with automatic topology creation you can set it to false and create it manually or by your organization process.
useDiscardNotifierPerDomain: false # if true it uses a discard notifier for each domain,when false it uses a single discard notifier for all domains with default 'app' domain
enabled: true # if you want to disable this domain you can set it to false
brokerType: "kafka" # please don't change this value
domain:
ignoreThisListener: false # Allows you to disable event listener for this specific domain
connectionProperties: # you can override the connection properties of each domain
bootstrap-servers: localhost:9092
# Another domain can be configured with same properties structure that app
accounts: # this is a second domain name and can have another independent setup
connectionProperties: # you can override the connection properties of each domain
bootstrap-servers: localhost:9093

You can override this settings programmatically through a AsyncKafkaPropsDomainProperties bean:

package sample;

import org.reactivecommons.async.kafka.config.KafkaProperties;
import org.reactivecommons.async.kafka.config.props.AsyncProps;
import org.reactivecommons.async.kafka.config.props.AsyncKafkaPropsDomainProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;

@Configuration
public class MyDomainConfig {

@Bean
@Primary
public AsyncKafkaPropsDomainProperties customKafkaDomainProperties() {
KafkaProperties propertiesApp = new KafkaProperties();
propertiesApp.setBootstrapServers(List.of("localhost:9092"));

KafkaProperties propertiesAccounts = new KafkaProperties();
propertiesAccounts.setBootstrapServers(List.of("localhost:9093"));

return AsyncKafkaPropsDomainProperties.builder()
.withDomain("app", AsyncProps.builder()
.connectionProperties(propertiesApp)
.build())
.withDomain("accounts", AsyncProps.builder()
.connectionProperties(propertiesAccounts)
.build())
.build();
}
}

Loading properties from a secret

Additionally, if you want to set only connection properties you can use the AsyncKafkaPropsDomain.KafkaSecretFiller class.


@Bean
@Primary
public AsyncKafkaPropsDomain.KafkaSecretFiller customKafkaFiller() {
return (domain, asyncProps) -> {
// customize asyncProps here by domain
};
}