Binder abstraction

Spring Cloud Stream 에서는 여러가지 외부 메시지 시스템과 연결할 수 있는 다양한 Binder 구현체들이 있음.

 

여기에서는 이런 다양한 구현체들을 일관된 방식으로 사용할 수 있도록 만드는 컨셉을 다룸.

 

Producer and Consumers

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {

    /**
     * Returns instance identity of this binder.
     * Individual binders should normally override this method.
     * @return instance identity of this binder
     */
    default String getBinderIdentity() {
        return String.valueOf(this.hashCode());
    }

    /**
     * Bind the target component as a message consumer to the logical entity identified by
     * the name.
     * @param name the logical identity of the message source
     * @param group the consumer group to which this consumer belongs - subscriptions are
     * shared among consumers in the same group (a <code>null</code> or empty String, must
     * be treated as an anonymous group that doesn't share the subscription with any other
     * consumer)
     * @param inboundBindTarget the app interface to be bound as a consumer
     * @param consumerProperties the consumer properties
     * @return the setup binding
     */
    Binding<T> bindConsumer(String name, String group, T inboundBindTarget,
            C consumerProperties);

    /**
     * Bind the target component as a message producer to the logical entity identified by
     * the name.
     * @param name the logical identity of the message target
     * @param outboundBindTarget the app interface to be bound as a producer
     * @param producerProperties the producer properties
     * @return the setup binding
     */
    Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);

}
  • bindProducer() 메소드를 통해서 각 Binder 의 Producer 를 생성한다.
  • bindConsumer() 메소드를 통해서 각 Binder 의 Consumer 를 생성한다.

 

Binder SPI

META-INF/spring.binders 경로에 있는 Binder Definition 정보를 바탕으로 각 Binder 를 빈으로 만든다:

  • Binder 에서 bindProducer()binderConsumer() 를 호출해서 Producer 와 Consumer 를 만드는 구조임.

 

예시: Reactor Kafka Binder 기준으로 META-INF/spring.binders 에는 다음과 같은 binder definition 이 있음:

reactorKafka:\
org.springframework.cloud.stream.binder.reactorkafka.ReactorKafkaBinderConfiguration

 

예시: Reactor Kafka Binder 에서 Binder 를 생성하는 @Configuration 클래스

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(Binder.class)
@EnableConfigurationProperties({ KafkaProperties.class, KafkaExtendedBindingProperties.class })
public class ReactorKafkaBinderConfiguration {

    /**
     * @ConfigurationProperties is declared on the @Bean method for Spring Boot to ignore
     * constructor binding on KafkaBinderConfigurationProperties. If constructor binding is
     * used, it ignores all the JavaBeans style properties when generating configuration metadata.
     *
     * See the following issues for more details:
     *
     * https://github.com/spring-cloud/spring-cloud-stream/issues/2640
     * https://github.com/spring-projects/spring-boot/issues/34031
     *
     * @param kafkaProperties Spring Kafka properties autoconfigured by Spring Boot
     */
    @Bean
    @ConfigurationProperties(prefix = "spring.cloud.stream.kafka.binder")
    KafkaBinderConfigurationProperties configurationProperties(
            KafkaProperties kafkaProperties) {
        return new KafkaBinderConfigurationProperties(kafkaProperties);
    }

    @Bean
    KafkaTopicProvisioner provisioningProvider(
            KafkaBinderConfigurationProperties configurationProperties,
            ObjectProvider<AdminClientConfigCustomizer> adminClientConfigCustomizer, KafkaProperties kafkaProperties) {
        return new KafkaTopicProvisioner(configurationProperties,
                kafkaProperties, adminClientConfigCustomizer.getIfUnique());
    }

    @Bean
    ReactorKafkaBinder reactorKafkaBinder(KafkaBinderConfigurationProperties configurationProperties,
            KafkaTopicProvisioner provisioningProvider,
            KafkaExtendedBindingProperties extendedBindingProperties,
            ObjectProvider<ConsumerConfigCustomizer> consumerConfigCustomizer,
            ObjectProvider<ProducerConfigCustomizer> producerConfigCustomizer,
            ObjectProvider<ReceiverOptionsCustomizer> receiverOptionsCustomizers,
            ObjectProvider<SenderOptionsCustomizer> senderOptionsptionsCustomizers) {

        ReactorKafkaBinder reactorKafkaBinder = new ReactorKafkaBinder(configurationProperties, provisioningProvider);
        reactorKafkaBinder.setExtendedBindingProperties(extendedBindingProperties);
        reactorKafkaBinder.setConsumerConfigCustomizer(consumerConfigCustomizer.getIfUnique());
        reactorKafkaBinder.setProducerConfigCustomizer(producerConfigCustomizer.getIfUnique());
        reactorKafkaBinder.receiverOptionsCustomizers(receiverOptionsCustomizers);
        reactorKafkaBinder.senderOptionsCustomizers(senderOptionsptionsCustomizers);
        return reactorKafkaBinder;
    }

}

 

Binder Detection

Spring Cloud Stream 에서 외부 메시지 시스템과 바인딩을 하려면 spring cloud stream 에 대한 의존성 외에도 추가적인 Binder 에 대한 의존성도 필요하다:

 

예시: Reactive Kafka 바인더를 추가하는 경우

implementation("org.springframework.cloud:spring-cloud-stream")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-reactive")    

 

Multiple Binders on the Classpath

여러개의 binder 가 존재하는 경우에는 어떤 것을 사용할 지 명시해야한다:

  • 이 경우에는 META-INF/spring.binders 가 여러개 있는 경우를 말함.
  • 글로벌 적으로 기본으로 사용할 binder 를 정하고 싶다면 spring.cloud.stream.defaultBinder=rabbit 이런식으로 등록하며 된다.
  • input 과 output 별로 별도의 바인딩을 사용하고 싶다면 spring.cloud.stream.bindings.input.binder=kafka
    spring.cloud.stream.bindings.output.binder=rabbit 처럼 application.properties 에 등록하면 된다.

 

Connecting to Multiple Systems

같은 타입의 메시지 브로커를 여러개 쓰는 경우에는 다음 예시를 참고해보자:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: thing1
          binder: rabbit1
        output:
          destination: thing2
          binder: rabbit2
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host1>
        rabbit2:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host2>

'Spring > Spring Cloud Stream' 카테고리의 다른 글

Resetting Offsets  (0) 2024.01.16
Error Handling  (0) 2024.01.12
Producing and Consuming Messages: Spring Cloud Stream  (0) 2024.01.12

+ Recent posts