함수형 타입의 빈 즉 (e.g java.util.function.[Supplier/Function/Consumer]) 은 auto-discovered 기능에 의해 자동으로 spring cloud stream 빈으로 사용될 수 있다.

 

그러나 여러개의 함수형 빈이 있다면 spring.cloud.function.definition 설정으로 어떤 함수형 빈이 Binding 에 사용될건지 알려줘야한다.

 

예시로 consumer 와 supplier 함수 모두 spring cloud stream 에 바인딩을 하려면 기본적인 바인딩 설정과 함께 spring.cloud.function.definition=supplier;consume 이런식으로 등록해줘야함.

 

1. Suppliers (Sources)

Supplier 는 Function 과 Consumer 와 달리 이벤트 기반으로 트리거 되는게 아니므로 별개의 트리거 시스템이 필요하다:

  • (Kafka 기준) Function 과 Consumer 를 이용한 바인딩은 메시지 브로커에 주기적으로 polling 을 해서 메시지를 가져오면 되지만 Supplier 는 메시지를 생산하는 측이니 별도의 트리거 시스템이 필요함.

 

imperative 기반의 supplier:

  • 다음과 같이 stringSupplier 를 만든다면 주기적으로 polling 을 해서 stringSupplier 를 트리거링 할 것이다.
  • 이런 폴링에 대한 설정은 Polling Configuration Properties 를 참고하면 된다.
  • imperative 란 뜻은 반복문과 조건문을 이용해서 코드를 짜는 방식을 말한다.
@SpringBootApplication
public static class SupplierConfiguration {

    @Bean
    public Supplier<String> stringSupplier() {
        return () -> "Hello from Supplier";
    }
}

 

Reactive Style 의 infinite stream 기반 supplier:

  • 이 supplier 의 경우 한번만 호출된다. 그러므로 무한 스트림을 만드는 경우 적합함.
  • 아래 예제는 Reactor 의 share() 를 통해서 Hot Stream 으로 변경한 것이다:
    • Hot Stream 은 Cold Stream 과 달리 데이터 스트림을 연속적으로 생성하며, 생성된 데이터 스트림은 여러 구독자에게 공유될 수 있다.
    • Cold Stream 은 구독 시점에 데이터 스트림을 처음부터 가지고 오며, 데이터 스트림은 구독자들 사이에 공유되지 않고 독립적이다.
@SpringBootApplication
public static class SupplierConfiguration {

    @Bean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }
}

 

Reactive Stream 의 finite stream 기반 supplier:

  • 주기적으로 유한개의 데이터 스트림을 생성하는 경우에는 @PollableBean 을 이용해서 만들 수 았다.
  • @PollableBeansplittable 특성이 있는데 이는 데이터를 쪼개서 전송할 것인지 여부를 결정한다. 기본값은 true 이다.
@SpringBootApplication
public static class SupplierConfiguration {

    @PollableBean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.just("hello", "bye");
    }
}

 

Supplier 의 poller 방식 메커니즘은 사용되는 스레드를 예측할 수 없다는 문제가 있다.

 

그래서 Thread Locality 를 이용하는 경우 또는 특정 스레드를 선호하는 경우에는 문제가 발생할 수 있다.

 

Poller 방식 대신 특정 스레드에서 외부 메시지 브로커에 메시지를 쓰는 방식을 제어하고 싶다면 StreamBridge 를 이용하면 된다.

@SpringBootApplication
@Controller
public class WebSourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
    }

    @Autowired
    private StreamBridge streamBridge;

    @RequestMapping
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void delegateToSupplier(@RequestBody String body) {
        System.out.println("Sending " + body);
        streamBridge.send("toStream", body);
    }
}

 

2. Consumer (Reactive)

Reactive Consumer 의 경우에는 Consumer<Flux<?>> 을 쓰기 보다는 Function<Flux<?>, Mono<Void>> 을 쓰는 걸 권장한다.

  • Consumer 를 쓰게 되면 반환 타입이 없으니까 완료 시그널 자체도 못뱉어내게 되므로 함수를 compose 하기 어렵다. 그래서 리턴 타입으로 Mono<Void> 를 써서 완료 시그널을 뱉으라는 거임.
public Function<Flux<?>, Mono<Void>> consumer() {
    return flux -> flux.map(..).filter(..).then();
}

 

3. Sending arbitrary data to an output (e.g. Foreign event-driven sources)

Supplier 함수형 빈을 생성해서 외부 메시지 시스템에 데이터를 쓰는게 아니라 StreamBridge 를 통해서 메시지를 보낼 수 있다:

  • 이 경우 바인딩은 동적으로 이용할 수 있으며, 바인딩 정보는 캐싱된다. 캐싱 사이즈는 spring.cloud.stream.dynamic-destination-cache-size 로 설정할 수 있으며 기본값은 10이다. 캐시 사이즈를 초과해서 여러 토픽으로 보낼 경우에는 기존 바인딩 정보는 제거되고 사용될 때 다시 생성되기 때문에 성능적으로 감소될 여지가 있다. 이 경우에는 캐시 사이즈를 늘려 두는 것을 권장함.
    • streamBridge.send() 호출할 때 동적 바인딩이 됨.
  • 동적 바인딩 말고 어플리케이션 부팅 시점에 바인딩을 생성해놓고 싶다면 spring.cloud.stream.output-bindings 변수를 통해서 만들어놓을 수 있다. 이 설정에서 여러개의 토픽 정보를 넣어둘 수 있으며 ; 로 구분시키면 된다.
    • 예시: spring.cloud.stream.output-bindings=foo;bar foo 와 bar 토픽에다가 바인딩을 할 것.
  • 데이터는 어떠한 타입이던지 보낼 수 있음.
@SpringBootApplication
@Controller
public class WebSourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
    }

    @Autowired
    private StreamBridge streamBridge;

    @RequestMapping
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void delegateToSupplier(@RequestBody String body) {
        System.out.println("Sending " + body);
        streamBridge.send("toStream", body);
    }
}

 

StreamBridge 를 통해서 특정한 데이터 타입을 보내려면 send(String bindingName, Object data, MimeType outputContentType) 메소드를 이용하면 된다.

 

StreamBridge 를 통해서 메시지를 보낼 때 보내기 전에 Interceptor 를 거쳐가게 만들 수 있다: 

  • 기본적으로 인터셉터는 등록되어 있지 않다. 글로벌한 인터셉터를 등록하려면 @GlobalChannelInterceptor(patterns = "*") 를 이용하면 되고, 특정한 토픽에만 인터셉터를 등록하려면 @GlobalChannelInterceptor(patterns = "foo-*") 이런식으로 등록하면 된다.
@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

 

StreamBridge 를 Mocking 하려면 StreamOperations 을 이용하면 된다. StreamBridge 의 모든 send() 함수는 StreamOperations 인터페이스에서 제공하는 것임.

 

4. Reactive Functions support

함수형 타입의 빈들을 선언할 때 (= Supplier, Function or Consumer) Reactive 스타일로 선언할 수 있다:

@SpringBootApplication
public static class SinkFromConsumer {

    @Bean
    public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
        return flux -> flux.map(val -> val.toUpperCase());
    }
}

 

Reactive 의 이점을 온전히 누리기 위해서는 Binder 도 Reactive 를 사용해야한다. (e.g Reactive Kafka)

  • 일반적인 Kafka Binder 를 이용하게 되면 어플리케이션 코드는 Reactive 를 쓰겠지만 외부 메시지 브로커에 데이터를 저장하고 가져오는 부분은 Reactive 가 아닐 거라서 온전히 Reactive 프로그래밍의 이점을 누릴 수 없다.

 

Reactive 스타일의 함수형 빈은 imperative 스타일 함수형 빈과 에러 처리와 재시도 처리가 다르다:

  • imperative 스타일은 spring cloud stream 프레임워크가 메시지 핸들러를 호출해주기 때문에 에러 핸들링 로직과 재시도 로직을 프레임워크에서 제공해줄 수 있다. 그러나 Reactive 스타일은 사용자가 제공한 Flux|Mono 를 프레임워크와 연결만 해주기 때문에 에러 처리는 사용자가 직접해야한다. doOnError(), onError*() 등을 이용해서

 

5. Functional Composition

Spring Cloud Stream 에서 functional programming 처럼 여러 함수형 빈을 조합해서 메시지를 처리하도록 할 수 있다:

  • spring.cloud.function.definition=toUpperCase|wrapInQuotes 이렇게 하면 toUpperCase()wrapInQuotes() 를 결합해서 처리하도록 만들 수 있음.

functional composition 의 가장 큰 장점은 Reactive Style 의 함수와 Imperative Style 함수를 결합해서 처리할 수 있다는 거임.

 

functional composition 의 또 다른 장점은 Cross-cutting Concerns 이점을 누릴 수 있다는 거임. 한 함수는 오로지 비즈니스 로직 처리에만 관심을 둘 수 있고, 다른 함수는 그외의 부가 기능을 위해서 두도록 하고.

 

6. Functions with multiple input and output arguments

Spring Cloud Stream 에서는 Multiple Input/Output Binding 을 지원한다:

  • 대표적인 UseCase:
    • Big Data 처리: 여러 종류의 혼합된 데이터가 저장되어 있는 토픽에서 데이터를 가져와서 분류시키는 목적으로 처리. 
    • Data aggregation: 여러 토픽의 데이터를 모아서 하나로 합칠 수 있다.
  • Binding 은 (functionName)-(in/out)-(index) 와 토픽 이름을 매핑시켜서 한다.
  • Multiple Input/Output Binding 은 index 값을 통해서 하나의 함수형 빈에서 받을 수 있도록 할 수 있다. 예를 들어서 gather 함수를 메시지 시스템의 두 토픽에서 읽어오도록 하고 싶다면 spring.cloud.stream.bindings.gather-in-0.destination=MY_TOPIC_1, spring.cloud.stream.bindings.gather-in-1.destination=MY_TOPIC_2 이렇게 설정하면 된다. 

 

Multiple Input Binding 예시:

@SpringBootApplication
public class SampleApplication {

    @Bean
    public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
        return tuple -> {
            Flux<String> stringStream = tuple.getT1();
            Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
            return Flux.merge(stringStream, intStream);
        };
    }
}

 

Multiple Output Binding 예시:

@SpringBootApplication
public class SampleApplication {

    @Bean
    public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
        return flux -> {
            Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
            UnicastProcessor even = UnicastProcessor.create();
            UnicastProcessor odd = UnicastProcessor.create();
            Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
            Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));

            return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
        };
    }
}

 

위의 코드를 코틀린과 Reactor 3.4 버전으로 변경한 것:

@Bean
fun scatter(): Function<Flux<String>, Tuple2<Flux<String>, Flux<String>>> {
    return Function { flux: Flux<String> ->
    val connectedFlux = flux.publish().autoConnect(2)
    val even = Sinks.many().unicast().onBackpressureBuffer<String>()
    val odd = Sinks.many().unicast().onBackpressureBuffer<String>()

    val evenFlux =
        connectedFlux.filter { number: String -> number.length % 2 == 0 }
            .doOnNext { even.emitNext("EVEN: $it", Sinks.EmitFailureHandler.FAIL_FAST ) }

    val oddFlux =
        connectedFlux.filter { number: String -> number.length % 2 == 1 }
            .doOnNext { even.emitNext("ODD: $it", Sinks.EmitFailureHandler.FAIL_FAST ) }

    Tuples.of(
        Flux.from(even.asFlux().publishOn(Schedulers.boundedElastic()).doOnSubscribe { evenFlux.subscribe() }),
        Flux.from(odd.asFlux().publishOn(Schedulers.boundedElastic()).doOnSubscribe { oddFlux.subscribe() })
    )
  }
}

 

7. Multiple functions in a single application

Spring Cloud Stream 어플리케이션에선 Binding 을 여러개 할 수 있다:

  • 다만 이 경우에는 가능한 함수형 빈이 하나가 아닌 여러개가 되므로 어떠한 함수들을 바인딩에 사용할 것인지 등록시켜줘야한다. spring.cloud.function.definition 에 함수 이름들을 넣어주면 되고 delimiter 로 ; 걸 쓰면 된다.

 

8. Batch Consumers

spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode 를 true 로 설정하면 Consumer 를 Batch Mode 로 변경할 수 있다. 이 경우에는 데이터를 List 형식으로 한번에 가져올 수 있다.

 

9. Batch Producers

함수형 타입의 빈에서 다음과 같이 List 타입으로 결과를 내뱉는다면 Batch Producer 가 된다.

@Bean
public Function<String, List<Message<String>>> batch() {
    return p -> {
        List<Message<String>> list = new ArrayList<>();
        list.add(MessageBuilder.withPayload(p + ":1").build());
        list.add(MessageBuilder.withPayload(p + ":2").build());
        list.add(MessageBuilder.withPayload(p + ":3").build());
        list.add(MessageBuilder.withPayload(p + ":4").build());
        return list;
    };
}

 

 

References:

https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/producing-and-consuming-messages.html

'Spring > Spring Cloud Stream' 카테고리의 다른 글

Resetting Offsets  (0) 2024.01.16
Binder Abstraction  (0) 2024.01.16
Error Handling  (0) 2024.01.12

+ Recent posts