I. Intro
- RxJava가 스프링에서 어떻게 구현되었는지 알아보자
II. FutureTask 이용하기
- 기존 Java에선 다른 Thread에서 비동기 처리를 위해 ThreadPool을 지원함 => ExecutorService
- Future는 JS에서 Promise처럼 아직 값이 들어있진 않지만 get을 부를 경우 비동기로 실행중인 Thread의 작업이 끝날 때까지 현재 Thread를 멈추고 대기하다가 값이 나오면 이어서 처리한다
- ExecutorService를 이용해서 비동기 작업처리하기
- FutureTask를 정의해서 ExecutorService에서 처리할 내용을 미리 정의해두기
- 문제점: 매 작업마다 Exception이 발생할 경우 try / catch를 걸어줘야하고 get으로 결과값을 받아서 처리하는동안 현재 Thread가 blocking됨
EX1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package lee.twoweeks.tobyreactivespringexample.rxjava.future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FutureExample {
/**
* Created by Joohan Lee on 2020/04/16
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
// es.execute(() -> { // Runnable
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {}
// log.info("Async");
// });
Future<String> f = es.submit(() -> { //Callable - Runnable과 다르게 return 할 수 있고 Exception을 throw한다.
Thread.sleep(2000);
log.info("Async");
return "Hello";
});
log.info("" + f.isDone()); // false
Thread.sleep(2100);
log.info("" + f.isDone()); // true
log.info(f.get()); // Async 작업이 끝날 때까지 Blocking 으로 기다림
log.info("Exit");
FutureTask<String> futureTask = new FutureTask<String>(() -> {// Future로 받기 전 처리할 작업을 객체로 선언
Thread.sleep(2000);
log.info("Async");
return "Hello";
});
es.execute(futureTask); // 위에 es.execute (() -> ...) 으로 처리한 것과 같은 결과임
FutureTask<String> futureTask2 = new FutureTask<String>(() -> {// Future로 받기 전 처리할 작업을 객체로 선언
Thread.sleep(2000);
log.info("Async");
return "Hello";
}) {
@Override
protected void done() {
try {
System.out.println(get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
};
es.execute(futureTask2);
es.shutdown(); // shutdown하지 않으면 ExecutorService가 계속 떠있어서 종료되지 않음
}
}
III. Callback으로 처리하기
- 위 방식의 문제점
- Future값을 가져올 때 blocking해서 결과를 받을 때까지 대기하도록 함
- 내부에서 Exception이 발생할 경우 처리를 위해 try catch문이 필요하다.
- 추가적으로, FutureTask 생성 작업을 좀 더 간결하게 해보자
- Callback으로 처리하기
- 위에 Future와 비슷하지만 exception를 callback으로 처리함
- Callback - 위에 Future와 비슷하지만 exception를 callback으로 처리함 // Ex. AsynchronousByteChannel의 read
EX)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package lee.twoweeks.tobyreactivespringexample.rxjava.future;
/**
* Created by Joohan Lee on 2020/04/16
*/
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FutureCallbackExample {
interface SuccessCallback {
void onSuccess(String result);
}
interface ExceptionCallback {
void onError(Throwable t);
}
public static class CallbackFutureTask extends FutureTask<String> {
SuccessCallback successCallback;
ExceptionCallback exceptionCallback;
public CallbackFutureTask(Callable<String> callable, SuccessCallback successCallback, ExceptionCallback exceptionCallback) {
super(callable);
this.successCallback = Objects.requireNonNull(successCallback); // Null인 경우 NullPointerException을 발생
this.exceptionCallback = Objects.requireNonNull(exceptionCallback);
}
@Override
protected void done() {
try {
successCallback.onSuccess(get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
exceptionCallback.onError(e.getCause());
}
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
CallbackFutureTask callbackFutureTask = new CallbackFutureTask(() -> {
Thread.sleep(2000);
log.info("Async");
return "Hello";
},
result -> log.info("Result:" + result),
e -> log.info("Error: " + e.getMessage())
);
executorService.execute(callbackFutureTask);
executorService.shutdown();
}
}
- 하지만, 여기까지 코드의 문제점
- 비지니스 로직과 기술적인 코드가 혼재되어 있음. ExecutorService와 FutureTask를 생성하는 기술적인 코드를 따로 분리할 필요가 있음
- Spring에서 제공하는 기능을 써보자
IV. NIO Servlet에 대해 알아보자
- 기존 서블릿 방식
- Servelt 3.0 이전은 Request 하나당 Thread 하나
- InputStream기반 (blocking 방식)
- 요청이오면 NIO에서 받아서 Servlet Thread에 할당한다
- 허용 thread가 5인데 6개 이상이 오면 Queue에 쌓이고 순차적으로 처리됨
- Queue에 너무 많이 쌓이게 되면 Latency가 생기고 에러가 발생
- 이 방식의 경우 Thread 내에서 IO 처리로 대기하는 시간이 발생해서 해당 Thread가 낭비됨
- Thread가 늘어날 경우 Context Switching으로 CPU 부하가 늘어나고 메모리 사용량이 증가한다
Request | Servlet Thread | 처리 흐름 | |
---|---|---|---|
1 | ST1 | Req -> WorkerReq -> Worker Thread { Blocking IO (DB, API) } -> res(html) | |
2 | ST2 | worker thread가 처리되는 동안 Servlet thread가 대기 | |
3 | NIO | ST3 | |
4 | ST4 | ||
5 | ST5 |
- 3.0 이후 비동기 서블릿
- Servlet Thread는 요청을 받아서 Worket Thread에게 넘기고 바로 Thread Pool로 반환한다
- Worker Thread가 작업이 끝나면 Servlet Thread를 Pool로부터 할당 받아서 넘긴다
- Request -> Sevlet Thread 1 -> Worker Thread 1 -> Servlet Thread 1 -> Response
- 이런식으로 처리하면 Worker Thread 작업중에 다른 요청을 Servlet Thread가 처리할 수 있음
- Servlet Thread 개수가 요청보다 훨씬 적어도 빠르게 처리가능
- Request -> Sevlet Thread 1 -> Worker Thread 1 -> Servlet Thread 1 -> Response
- 하지만, 이 방법도 결국엔 Worker Thread를 요청만큼 생성하게 된다
- DeferredResult Queue 사용하기