WebFlux & Reactor

Operator

연산자의 종류는 매우 많으니, 일일이 설명하기는 힘든 부분이다. 아래 링크를 참고하자

Stepverifier

Blocking Call Handling

1
2
3
4
Mono blockingWrapper = Mono.fromCallable(() -> { 
return /* make a remote synchronous call */
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic());

JDBC를 사용하는 경우, JDBC에 대한 구현 자체가 Blocking-Call로 발생하기 때문에, 그대로 사용할 경우 리액터 스레드가 빠르게 반환되지 않고 전반적인 어플리케이션 성능에 영향을 줄 수 있다. 이런 경우 때문에, 위 코드와 같이 스케줄러를 지정하여 사용하는 것이 리액터 레퍼런스에서 소개하는 적절한 방법이다. 아래 링크는 JDBC를 사용한 Blocking Call에 대한 이해를 좀 더 깊게 해줄 수 있는 좋은 글이다.

WebClient

Logging

Filter를 사용하는 방법도 존재하지만, 필터에서는 Http Request와 Response의 디테일한 모든 부분을 보기 힘들다. Netty에 LoggingHandler를 활용하여 로깅을 설정할 수 있으며, 아래와 같이 설정하게 되면 오고가는 HTTP의 요청과 응답의 헤더와 바디를 로깅할 수 있다.

1
logging.level.reactor.netty.http.client.HttpClient=DEBUG
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.logging.LoggingHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.http.client.HttpClient;

import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;

@RestController
@RequestMapping("/api")
public class ApiController {
private static final HttpClient httpClient = HttpClient.create()
.tcpConfiguration(tcpClient ->
tcpClient.bootstrap(bootstrap ->
BootstrapHandlers.updateLogSupport(bootstrap, new HttpLoggingHandler(HttpClient.class))));

private static final WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();

@RequiredArgsConstructor
private static class Body {
private final String foo;
}

private static class HttpLoggingHandler extends LoggingHandler {
// 주의) LoggingHandler의 기본 생성자를 사용하게 될 경우, 로깅에 대한 디폴트 레벨이 지정된다.
public HttpLoggingHandler(Class<?> clazz) {
super(clazz);
}

@Override
protected String format(ChannelHandlerContext ctx, String event, Object arg) {
if (arg instanceof ByteBuf) {
ByteBuf msg = (ByteBuf) arg;
return msg.toString(StandardCharsets.UTF_8);
}
return super.format(ctx, event, arg);
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) {
ctx.fireChannelRegistered();
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.fireChannelReadComplete();
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
}

@Override
public void flush(ChannelHandlerContext ctx) {
ctx.flush();
}

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
ctx.connect(remoteAddress, localAddress, promise);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
ctx.fireUserEventTriggered(evt);
}
}

@GetMapping
public Mono<Map> res() {
return webClient.post()
.uri("https://postman-echo.com/post")
.syncBody("{\"foo\" : \"bar\"}")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(clientResponse -> clientResponse.bodyToMono(Map.class));
}


}

웹클라이언트_로깅이미지