Binder abstraction
Spring Cloud Stream 에서는 여러가지 외부 메시지 시스템과 연결할 수 있는 다양한 Binder 구현체들이 있음.
여기에서는 이런 다양한 구현체들을 일관된 방식으로 사용할 수 있도록 만드는 컨셉을 다룸.
Producer and Consumers
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
/**
* Returns instance identity of this binder.
* Individual binders should normally override this method.
* @return instance identity of this binder
*/
default String getBinderIdentity() {
return String.valueOf(this.hashCode());
}
/**
* Bind the target component as a message consumer to the logical entity identified by
* the name.
* @param name the logical identity of the message source
* @param group the consumer group to which this consumer belongs - subscriptions are
* shared among consumers in the same group (a <code>null</code> or empty String, must
* be treated as an anonymous group that doesn't share the subscription with any other
* consumer)
* @param inboundBindTarget the app interface to be bound as a consumer
* @param consumerProperties the consumer properties
* @return the setup binding
*/
Binding<T> bindConsumer(String name, String group, T inboundBindTarget,
C consumerProperties);
/**
* Bind the target component as a message producer to the logical entity identified by
* the name.
* @param name the logical identity of the message target
* @param outboundBindTarget the app interface to be bound as a producer
* @param producerProperties the producer properties
* @return the setup binding
*/
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}
bindProducer()
메소드를 통해서 각 Binder 의 Producer 를 생성한다.bindConsumer()
메소드를 통해서 각 Binder 의 Consumer 를 생성한다.
Binder SPI
META-INF/spring.binders
경로에 있는 Binder Definition 정보를 바탕으로 각 Binder 를 빈으로 만든다:
- Binder 에서
bindProducer()
와binderConsumer()
를 호출해서 Producer 와 Consumer 를 만드는 구조임.
예시: Reactor Kafka Binder 기준으로 META-INF/spring.binders
에는 다음과 같은 binder definition 이 있음:
reactorKafka:\
org.springframework.cloud.stream.binder.reactorkafka.ReactorKafkaBinderConfiguration
예시: Reactor Kafka Binder 에서 Binder 를 생성하는 @Configuration 클래스
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(Binder.class)
@EnableConfigurationProperties({ KafkaProperties.class, KafkaExtendedBindingProperties.class })
public class ReactorKafkaBinderConfiguration {
/**
* @ConfigurationProperties is declared on the @Bean method for Spring Boot to ignore
* constructor binding on KafkaBinderConfigurationProperties. If constructor binding is
* used, it ignores all the JavaBeans style properties when generating configuration metadata.
*
* See the following issues for more details:
*
* https://github.com/spring-cloud/spring-cloud-stream/issues/2640
* https://github.com/spring-projects/spring-boot/issues/34031
*
* @param kafkaProperties Spring Kafka properties autoconfigured by Spring Boot
*/
@Bean
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.binder")
KafkaBinderConfigurationProperties configurationProperties(
KafkaProperties kafkaProperties) {
return new KafkaBinderConfigurationProperties(kafkaProperties);
}
@Bean
KafkaTopicProvisioner provisioningProvider(
KafkaBinderConfigurationProperties configurationProperties,
ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer, KafkaProperties kafkaProperties) {
return new KafkaTopicProvisioner(configurationProperties,
kafkaProperties, adminClientConfigCustomizer.getIfUnique());
}
@Bean
ReactorKafkaBinder reactorKafkaBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider,
KafkaExtendedBindingProperties extendedBindingProperties,
ObjectProvider<ConsumerConfigCustomizer> consumerConfigCustomizer,
ObjectProvider<ProducerConfigCustomizer> producerConfigCustomizer,
ObjectProvider<ReceiverOptionsCustomizer> receiverOptionsCustomizers,
ObjectProvider<SenderOptionsCustomizer> senderOptionsptionsCustomizers) {
ReactorKafkaBinder reactorKafkaBinder = new ReactorKafkaBinder(configurationProperties, provisioningProvider);
reactorKafkaBinder.setExtendedBindingProperties(extendedBindingProperties);
reactorKafkaBinder.setConsumerConfigCustomizer(consumerConfigCustomizer.getIfUnique());
reactorKafkaBinder.setProducerConfigCustomizer(producerConfigCustomizer.getIfUnique());
reactorKafkaBinder.receiverOptionsCustomizers(receiverOptionsCustomizers);
reactorKafkaBinder.senderOptionsCustomizers(senderOptionsptionsCustomizers);
return reactorKafkaBinder;
}
}
Binder Detection
Spring Cloud Stream 에서 외부 메시지 시스템과 바인딩을 하려면 spring cloud stream 에 대한 의존성 외에도 추가적인 Binder 에 대한 의존성도 필요하다:
예시: Reactive Kafka 바인더를 추가하는 경우
implementation("org.springframework.cloud:spring-cloud-stream")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-reactive")
Multiple Binders on the Classpath
여러개의 binder 가 존재하는 경우에는 어떤 것을 사용할 지 명시해야한다:
- 이 경우에는
META-INF/spring.binders
가 여러개 있는 경우를 말함. - 글로벌 적으로 기본으로 사용할 binder 를 정하고 싶다면
spring.cloud.stream.defaultBinder=rabbit
이런식으로 등록하며 된다. - input 과 output 별로 별도의 바인딩을 사용하고 싶다면
spring.cloud.stream.bindings.input.binder=kafka
과spring.cloud.stream.bindings.output.binder=rabbit
처럼 application.properties 에 등록하면 된다.
Connecting to Multiple Systems
같은 타입의 메시지 브로커를 여러개 쓰는 경우에는 다음 예시를 참고해보자:
spring:
cloud:
stream:
bindings:
input:
destination: thing1
binder: rabbit1
output:
destination: thing2
binder: rabbit2
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: <host1>
rabbit2:
type: rabbit
environment:
spring:
rabbitmq:
host: <host2>
'Spring > Spring Cloud Stream' 카테고리의 다른 글
Resetting Offsets (0) | 2024.01.16 |
---|---|
Error Handling (0) | 2024.01.12 |
Producing and Consuming Messages: Spring Cloud Stream (0) | 2024.01.12 |