함수형 타입의 빈 즉 (e.g java.util.function.[Supplier/Function/Consumer]
) 은 auto-discovered 기능에 의해 자동으로 spring cloud stream 빈으로 사용될 수 있다.
그러나 여러개의 함수형 빈이 있다면 spring.cloud.function.definition
설정으로 어떤 함수형 빈이 Binding 에 사용될건지 알려줘야한다.
예시로 consumer 와 supplier 함수 모두 spring cloud stream 에 바인딩을 하려면 기본적인 바인딩 설정과 함께 spring.cloud.function.definition=supplier;consume
이런식으로 등록해줘야함.
1. Suppliers (Sources)
Supplier 는 Function 과 Consumer 와 달리 이벤트 기반으로 트리거 되는게 아니므로 별개의 트리거 시스템이 필요하다:
- (Kafka 기준) Function 과 Consumer 를 이용한 바인딩은 메시지 브로커에 주기적으로 polling 을 해서 메시지를 가져오면 되지만 Supplier 는 메시지를 생산하는 측이니 별도의 트리거 시스템이 필요함.
imperative 기반의 supplier:
- 다음과 같이 stringSupplier 를 만든다면 주기적으로 polling 을 해서 stringSupplier 를 트리거링 할 것이다.
- 이런 폴링에 대한 설정은 Polling Configuration Properties 를 참고하면 된다.
- imperative 란 뜻은 반복문과 조건문을 이용해서 코드를 짜는 방식을 말한다.
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<String> stringSupplier() {
return () -> "Hello from Supplier";
}
}
Reactive Style 의 infinite stream 기반 supplier:
- 이 supplier 의 경우 한번만 호출된다. 그러므로 무한 스트림을 만드는 경우 적합함.
- 아래 예제는 Reactor 의
share()
를 통해서 Hot Stream 으로 변경한 것이다:- Hot Stream 은 Cold Stream 과 달리 데이터 스트림을 연속적으로 생성하며, 생성된 데이터 스트림은 여러 구독자에게 공유될 수 있다.
- Cold Stream 은 구독 시점에 데이터 스트림을 처음부터 가지고 오며, 데이터 스트림은 구독자들 사이에 공유되지 않고 독립적이다.
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
return "Hello from Supplier";
} catch (Exception e) {
// ignore
}
}
})).subscribeOn(Schedulers.elastic()).share();
}
}
Reactive Stream 의 finite stream 기반 supplier:
- 주기적으로 유한개의 데이터 스트림을 생성하는 경우에는
@PollableBean
을 이용해서 만들 수 았다. @PollableBean
은splittable
특성이 있는데 이는 데이터를 쪼개서 전송할 것인지 여부를 결정한다. 기본값은 true 이다.
@SpringBootApplication
public static class SupplierConfiguration {
@PollableBean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.just("hello", "bye");
}
}
Supplier 의 poller 방식 메커니즘은 사용되는 스레드를 예측할 수 없다는 문제가 있다.
그래서 Thread Locality 를 이용하는 경우 또는 특정 스레드를 선호하는 경우에는 문제가 발생할 수 있다.
Poller 방식 대신 특정 스레드에서 외부 메시지 브로커에 메시지를 쓰는 방식을 제어하고 싶다면 StreamBridge
를 이용하면 된다.
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("toStream", body);
}
}
2. Consumer (Reactive)
Reactive Consumer 의 경우에는 Consumer<Flux<?>>
을 쓰기 보다는 Function<Flux<?>, Mono<Void>>
을 쓰는 걸 권장한다.
- Consumer 를 쓰게 되면 반환 타입이 없으니까 완료 시그널 자체도 못뱉어내게 되므로 함수를 compose 하기 어렵다. 그래서 리턴 타입으로
Mono<Void>
를 써서 완료 시그널을 뱉으라는 거임.
public Function<Flux<?>, Mono<Void>> consumer() {
return flux -> flux.map(..).filter(..).then();
}
3. Sending arbitrary data to an output (e.g. Foreign event-driven sources)
Supplier 함수형 빈을 생성해서 외부 메시지 시스템에 데이터를 쓰는게 아니라 StreamBridge
를 통해서 메시지를 보낼 수 있다:
- 이 경우 바인딩은 동적으로 이용할 수 있으며, 바인딩 정보는 캐싱된다. 캐싱 사이즈는
spring.cloud.stream.dynamic-destination-cache-size
로 설정할 수 있으며 기본값은 10이다. 캐시 사이즈를 초과해서 여러 토픽으로 보낼 경우에는 기존 바인딩 정보는 제거되고 사용될 때 다시 생성되기 때문에 성능적으로 감소될 여지가 있다. 이 경우에는 캐시 사이즈를 늘려 두는 것을 권장함.streamBridge.send()
호출할 때 동적 바인딩이 됨.
- 동적 바인딩 말고 어플리케이션 부팅 시점에 바인딩을 생성해놓고 싶다면
spring.cloud.stream.output-bindings
변수를 통해서 만들어놓을 수 있다. 이 설정에서 여러개의 토픽 정보를 넣어둘 수 있으며;
로 구분시키면 된다.- 예시:
spring.cloud.stream.output-bindings=foo;bar
foo 와 bar 토픽에다가 바인딩을 할 것.
- 예시:
- 데이터는 어떠한 타입이던지 보낼 수 있음.
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("toStream", body);
}
}
StreamBridge 를 통해서 특정한 데이터 타입을 보내려면 send(String bindingName, Object data, MimeType outputContentType)
메소드를 이용하면 된다.
StreamBridge 를 통해서 메시지를 보낼 때 보내기 전에 Interceptor 를 거쳐가게 만들 수 있다:
- 기본적으로 인터셉터는 등록되어 있지 않다. 글로벌한 인터셉터를 등록하려면
@GlobalChannelInterceptor(patterns = "*")
를 이용하면 되고, 특정한 토픽에만 인터셉터를 등록하려면@GlobalChannelInterceptor(patterns = "foo-*")
이런식으로 등록하면 된다.
@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
StreamBridge 를 Mocking 하려면 StreamOperations
을 이용하면 된다. StreamBridge 의 모든 send()
함수는 StreamOperations
인터페이스에서 제공하는 것임.
4. Reactive Functions support
함수형 타입의 빈들을 선언할 때 (= Supplier, Function or Consumer) Reactive 스타일로 선언할 수 있다:
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
return flux -> flux.map(val -> val.toUpperCase());
}
}
Reactive 의 이점을 온전히 누리기 위해서는 Binder 도 Reactive 를 사용해야한다. (e.g Reactive Kafka)
- 일반적인 Kafka Binder 를 이용하게 되면 어플리케이션 코드는 Reactive 를 쓰겠지만 외부 메시지 브로커에 데이터를 저장하고 가져오는 부분은 Reactive 가 아닐 거라서 온전히 Reactive 프로그래밍의 이점을 누릴 수 없다.
Reactive 스타일의 함수형 빈은 imperative 스타일 함수형 빈과 에러 처리와 재시도 처리가 다르다:
- imperative 스타일은 spring cloud stream 프레임워크가 메시지 핸들러를 호출해주기 때문에 에러 핸들링 로직과 재시도 로직을 프레임워크에서 제공해줄 수 있다. 그러나 Reactive 스타일은 사용자가 제공한 Flux|Mono 를 프레임워크와 연결만 해주기 때문에 에러 처리는 사용자가 직접해야한다.
doOnError()
,onError*()
등을 이용해서
5. Functional Composition
Spring Cloud Stream 에서 functional programming 처럼 여러 함수형 빈을 조합해서 메시지를 처리하도록 할 수 있다:
spring.cloud.function.definition=toUpperCase|wrapInQuotes
이렇게 하면toUpperCase()
와wrapInQuotes()
를 결합해서 처리하도록 만들 수 있음.
functional composition 의 가장 큰 장점은 Reactive Style 의 함수와 Imperative Style 함수를 결합해서 처리할 수 있다는 거임.
functional composition 의 또 다른 장점은 Cross-cutting Concerns 이점을 누릴 수 있다는 거임. 한 함수는 오로지 비즈니스 로직 처리에만 관심을 둘 수 있고, 다른 함수는 그외의 부가 기능을 위해서 두도록 하고.
6. Functions with multiple input and output arguments
Spring Cloud Stream 에서는 Multiple Input/Output Binding 을 지원한다:
- 대표적인 UseCase:
- Big Data 처리: 여러 종류의 혼합된 데이터가 저장되어 있는 토픽에서 데이터를 가져와서 분류시키는 목적으로 처리.
- Data aggregation: 여러 토픽의 데이터를 모아서 하나로 합칠 수 있다.
- Binding 은
(functionName)-(in/out)-(index)
와 토픽 이름을 매핑시켜서 한다. - Multiple Input/Output Binding 은 index 값을 통해서 하나의 함수형 빈에서 받을 수 있도록 할 수 있다. 예를 들어서 gather 함수를 메시지 시스템의 두 토픽에서 읽어오도록 하고 싶다면
spring.cloud.stream.bindings.gather-in-0.destination=MY_TOPIC_1
,spring.cloud.stream.bindings.gather-in-1.destination=MY_TOPIC_2
이렇게 설정하면 된다.
Multiple Input Binding 예시:
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
return tuple -> {
Flux<String> stringStream = tuple.getT1();
Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
return Flux.merge(stringStream, intStream);
};
}
}
Multiple Output Binding 예시:
@SpringBootApplication
public class SampleApplication {
@Bean
public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
return flux -> {
Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
UnicastProcessor even = UnicastProcessor.create();
UnicastProcessor odd = UnicastProcessor.create();
Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));
return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
};
}
}
위의 코드를 코틀린과 Reactor 3.4 버전으로 변경한 것:
@Bean
fun scatter(): Function<Flux<String>, Tuple2<Flux<String>, Flux<String>>> {
return Function { flux: Flux<String> ->
val connectedFlux = flux.publish().autoConnect(2)
val even = Sinks.many().unicast().onBackpressureBuffer<String>()
val odd = Sinks.many().unicast().onBackpressureBuffer<String>()
val evenFlux =
connectedFlux.filter { number: String -> number.length % 2 == 0 }
.doOnNext { even.emitNext("EVEN: $it", Sinks.EmitFailureHandler.FAIL_FAST ) }
val oddFlux =
connectedFlux.filter { number: String -> number.length % 2 == 1 }
.doOnNext { even.emitNext("ODD: $it", Sinks.EmitFailureHandler.FAIL_FAST ) }
Tuples.of(
Flux.from(even.asFlux().publishOn(Schedulers.boundedElastic()).doOnSubscribe { evenFlux.subscribe() }),
Flux.from(odd.asFlux().publishOn(Schedulers.boundedElastic()).doOnSubscribe { oddFlux.subscribe() })
)
}
}
7. Multiple functions in a single application
Spring Cloud Stream 어플리케이션에선 Binding 을 여러개 할 수 있다:
- 다만 이 경우에는 가능한 함수형 빈이 하나가 아닌 여러개가 되므로 어떠한 함수들을 바인딩에 사용할 것인지 등록시켜줘야한다.
spring.cloud.function.definition
에 함수 이름들을 넣어주면 되고 delimiter 로;
걸 쓰면 된다.
8. Batch Consumers
spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
를 true 로 설정하면 Consumer 를 Batch Mode 로 변경할 수 있다. 이 경우에는 데이터를 List 형식으로 한번에 가져올 수 있다.
9. Batch Producers
함수형 타입의 빈에서 다음과 같이 List 타입으로 결과를 내뱉는다면 Batch Producer 가 된다.
@Bean
public Function<String, List<Message<String>>> batch() {
return p -> {
List<Message<String>> list = new ArrayList<>();
list.add(MessageBuilder.withPayload(p + ":1").build());
list.add(MessageBuilder.withPayload(p + ":2").build());
list.add(MessageBuilder.withPayload(p + ":3").build());
list.add(MessageBuilder.withPayload(p + ":4").build());
return list;
};
}
References:
'Spring > Spring Cloud Stream' 카테고리의 다른 글
Resetting Offsets (0) | 2024.01.16 |
---|---|
Binder Abstraction (0) | 2024.01.16 |
Error Handling (0) | 2024.01.12 |