I. map
- 원본 데이터를 받아 변환 작업을 한 뒤 변환된 데이터를 통지
- 한개의 데이터로 여러 데이터를 생성해서 통지 불가
- 건너뛰거나 null을 리턴 불가능
- null을 리턴해야 할 경우 map 대신 flatMap을 사용
1
| map(Function<? super T, ? extends R> mapper)
|
EX)
1
2
3
4
5
6
| public static void main(String[] args) {
Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E")
.map(String::toLowerCase);
flowable.subscribe(new DebugSubcriber<>());
}
|
출력:
1
2
3
4
5
6
| main: a
main: b
main: c
main: d
main: e
main: 완료
|
II. flatMap
- 데이터들을 비동기로 처리 (<-> concatMap)
- map과 달리 Flowable / Observable을 리턴
- 데이터 한개로 여러 데이터 통지 가능
- 빈 Flowable / Observable을 이용해서 통지 하지 않고 건너뛸 수 있다
- 에러 통지도 가능
- 주요 메소드
1
2
3
4
5
6
| flatMap(Function<? super T, ? extends Publisher/ObservableSource<? extends R>> mapper)
flatMap(Function<? super T, ? extends Publisher/ObservableSource<? extends U>> mapper, BiFunction<? super T, ? super U, ? extends R> combiner)
flatMap(Function<? super T, ? extends Publisher/ObservableSource<? extends R>> onNextMapper,
Function<? super Throwable, ? extends Publisher/ObservableSource<? extends R>> onErrorMapper,
Callable<? extends Publisher<? extends R>> onCompleteSupplier)
|
1. flatMap(mapper)
EX)
1
2
3
4
5
6
7
8
9
10
11
12
| public static void flatMapExample() {
Flowable<String> flowable = Flowable.just("A", "B", "C")
.flatMap(data -> {
if ("".equals(data)) {
return Flowable.empty();
} else {
return Flowable.just(data.toLowerCase());
}
});
flowable.subscribe(new DebugSubcriber<>());
}
|
출력:
1
2
3
4
| main: a
main: b
main: c
main: 완료
|
2. flatMap(mapper, combiner)
- combiner: 원본 데이터와 mapper에서 생성한 Flowable/Observable의 데이터를 받아서 새로운 데이터를 생성
EX)
1
2
3
4
5
6
7
8
9
10
11
12
| public static void mapperCombinerExample() throws InterruptedException {
Flowable<String> flowable = Flowable.range(1, 3).flatMap(
data -> {
return Flowable.interval(100L, TimeUnit.MILLISECONDS).take(3);
},
(sourceData, newData) -> String.format("sourceData: %s | newData: %s", sourceData, newData)
);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(1000L);
}
|
출력:
1
2
3
4
5
6
7
8
9
10
| RxComputationThreadPool-2: sourceData: 2 | newData: 0
RxComputationThreadPool-1: sourceData: 1 | newData: 0
RxComputationThreadPool-1: sourceData: 3 | newData: 0
RxComputationThreadPool-1: sourceData: 1 | newData: 1
RxComputationThreadPool-1: sourceData: 2 | newData: 1
RxComputationThreadPool-3: sourceData: 3 | newData: 1
RxComputationThreadPool-2: sourceData: 2 | newData: 2
RxComputationThreadPool-2: sourceData: 1 | newData: 2
RxComputationThreadPool-2: sourceData: 3 | newData: 2
RxComputationThreadPool-2: 완료
|
3. flatMap(onNextMapper, onErrorMapper, onCompleteSupplier)
- onNextMapper는 기존 mapper 역할
- onErrorMapper는 에러가 통지되면 동작
- onCompleteSupplier는 완료 통지 시점에 동작
EX)
1
2
3
4
5
6
7
8
9
10
11
| public static void mapperErrorAndCompleteExample() {
Flowable<Integer> flowable = Flowable.just(1, 2, 0, 4, 5)
.map(data -> 10 / data)
.flatMap(
Flowable::just,
error ->Flowable.just(-1),
() -> Flowable.just(100)
);
flowable.subscribe(new DebugSubcriber<>());
}
|
출력:
1
2
3
4
| main: 10
main: 5
main: -1
main: 완료
|
III. concatMap / concatMapDelayError
- flatMap과 비슷하지만 데이터를 받은 순서대로 하나씩 처리 (동기처리)
- concatMapDelayError를 통해 에러 발생 후 에러 통지를 완료 전까지 미룰 수 있음
- tillTheEnd 값을 true로 주면됨
- false로 두는 경우 바로 통지됨
1
2
3
| concatMap(Function<? super T, ? extends Publisher/ObservableSource<? extends R>> mapper)
concatMapDelayError(Function<? super T, ? extends Publisher/ObservableSource<? extends R>> mapper)
|
EX) ConcatMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| private static void concatMapExample() throws InterruptedException {
Flowable<String> flowable = Flowable.range(10, 3)
.concatMap(sourceData ->
Flowable.interval(500L, TimeUnit.MILLISECONDS)
.take(2)
.map(data -> {
long time = System.currentTimeMillis();
return String.format("%s ms | sourceData: %s | data: %s", time, sourceData, data);
}));
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(4000L);
}
|
출력:
1
2
3
4
5
6
7
| RxComputationThreadPool-1: 1579622136663 ms | sourceData: 10 | data: 0
RxComputationThreadPool-1: 1579622137166 ms | sourceData: 10 | data: 1
RxComputationThreadPool-2: 1579622137668 ms | sourceData: 11 | data: 0
RxComputationThreadPool-2: 1579622138170 ms | sourceData: 11 | data: 1
RxComputationThreadPool-3: 1579622138676 ms | sourceData: 12 | data: 0
RxComputationThreadPool-3: 1579622139171 ms | sourceData: 12 | data: 1
RxComputationThreadPool-3: 완료
|
EX) ConcatMapDelay
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| private static void concatMapDelayErrorExample() throws InterruptedException {
Flowable<String> flowable = Flowable.range(10, 3)
.concatMapDelayError(sourceData ->
Flowable.interval(500L, TimeUnit.MILLISECONDS)
.take(3)
.doOnNext(data -> {
if (sourceData == 11 && data == 1) {
throw new Exception("Exception!");
}
}
).map(data -> String.format("sourceData: %s | data: %s", sourceData, data)), 1, true);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(6000L);
}
|
출력: boolean tillTheEnd 값이 true 인 경우
1
2
3
4
5
6
7
8
| RxComputationThreadPool-1: sourceData: 10 | data: 0
RxComputationThreadPool-1: sourceData: 10 | data: 1
RxComputationThreadPool-1: sourceData: 10 | data: 2
RxComputationThreadPool-2: sourceData: 11 | data: 0
RxComputationThreadPool-3: sourceData: 12 | data: 0
RxComputationThreadPool-3: sourceData: 12 | data: 1
RxComputationThreadPool-3: sourceData: 12 | data: 2
RxComputationThreadPool-3: java.lang.Exception: Exception!
|
출력: boolean tillTheEnd 값이 false 인 경우
1
2
3
4
5
| RxComputationThreadPool-1: sourceData: 10 | data: 0
RxComputationThreadPool-1: sourceData: 10 | data: 1
RxComputationThreadPool-1: sourceData: 10 | data: 2
RxComputationThreadPool-2: sourceData: 11 | data: 0
RxComputationThreadPool-2: java.lang.Exception: Exception!
|
IV. concatMapEager / concatMapEagerDelayError
- concatMap과 거의 같은 인터페이스
- 내부적으로 구현 방식이 다름
- 각 데이터를 동시에 처리 후(비동기처리) buffer에 담아뒀다가 받은 순서대로 넘겨줌
1
2
3
| concatMapEager(Function<? super T, ? extends Publisher/ObservableSource<? extends R>> mapper)
concatMapEagerDelayError(Function<? super T, ? extends Publisher/ObservableSource<? extends R>> mapper, boolean tillThEnd)
|
EX) concatMapEager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| private static void concatMapEagerExample() throws InterruptedException {
Flowable<String> flowable = Flowable.range(10, 3)
.concatMapEager(sourceData ->
Flowable.interval(500L, TimeUnit.MILLISECONDS)
.take(2)
.map(data -> {
long time = System.currentTimeMillis();
return String.format("%s ms | sourceData: %s | data: %s", time, sourceData, data);
}));
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(4000L);
}
|
출력:
1
2
3
4
5
6
7
| RxComputationThreadPool-1: 1579623890897 ms | sourceData: 10 | data: 0
RxComputationThreadPool-3: 1579623891398 ms | sourceData: 10 | data: 1
RxComputationThreadPool-2: 1579623890897 ms | sourceData: 11 | data: 0
RxComputationThreadPool-2: 1579623891398 ms | sourceData: 11 | data: 1
RxComputationThreadPool-2: 1579623890897 ms | sourceData: 12 | data: 0
RxComputationThreadPool-2: 1579623891398 ms | sourceData: 12 | data: 1
RxComputationThreadPool-2: 완료
|
EX) DelayError
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| private static void concatMapEagerDelayErrorExample() throws InterruptedException {
Flowable<String> flowable = Flowable.range(10, 3)
.concatMapEagerDelayError(sourceData ->
Flowable.interval(500L, TimeUnit.MILLISECONDS)
.take(3)
.doOnNext(data -> {
if (sourceData == 11 && data == 1) {
throw new Exception("Exception!");
}
}
).map(data -> String.format("sourceData: %s | data: %s", sourceData, data)), true);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(5000L);
}
|
출력: boolean tillTheEnd 값이 true인 경우
1
2
3
4
5
6
7
8
| RxComputationThreadPool-1: sourceData: 10 | data: 0
RxComputationThreadPool-1: sourceData: 10 | data: 1
RxComputationThreadPool-1: sourceData: 10 | data: 2
RxComputationThreadPool-1: sourceData: 11 | data: 0
RxComputationThreadPool-1: sourceData: 12 | data: 0
RxComputationThreadPool-1: sourceData: 12 | data: 1
RxComputationThreadPool-3: sourceData: 12 | data: 2
RxComputationThreadPool-3: java.lang.Exception: Exception!
|
출력: boolean tillTheEnd 값이 false인 경우
1
2
3
4
| RxComputationThreadPool-2: sourceData: 10 | data: 0
RxComputationThreadPool-1: sourceData: 10 | data: 1
RxComputationThreadPool-3: sourceData: 10 | data: 2
RxComputationThreadPool-3: java.lang.Exception: Exception!
|
V. buffer
- 바로 통지하지 않고 모아서 List나 Collection에 담아서 통지하는 연산자
- 한번에 모을 개수, 시간 간격을 조정할 수 있음
1
2
3
4
5
6
| buffer(int count) //count - 버퍼에 담을 데이터 개수
buffer(long time, TimeUnit unit) //time 시간 간격, unit - 시간 간격 단위
buffer(Publisher/ObservableSource<B> boundaryIndicator)
buffer(Callable<? extends Publisher/ObservableSource<B>> boundaryIndicatorSupplier)
buffer(Flowable/Observable<? extends TOpening openingIndicator, Function<? super TOpening, ? extends Publisher/ObservableSource<? extends TClosing>> closingIndicator)
|
EX) buffer(count)
1
2
3
4
5
6
7
8
9
| private static void bufferWithCountExample() throws InterruptedException {
Flowable<List<Long>> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS)
.take(10)
.buffer(3);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(1000L);
}
|
출력:
1
2
3
4
5
| RxComputationThreadPool-1: [0, 1, 2]
RxComputationThreadPool-1: [3, 4, 5]
RxComputationThreadPool-1: [6, 7, 8]
RxComputationThreadPool-1: [9]
RxComputationThreadPool-1: 완료
|
EX) buffer(time, unit)
1
2
3
4
5
6
7
8
9
| private static void bufferWithTimeExample() throws InterruptedException {
Flowable<List<Long>> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS)
.take(10)
.buffer(300L, TimeUnit.MILLISECONDS);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(1200L);
}
|
출력: 비동기 작업이기 때문에 Thread별 시간에 오차가 생겨서 돌릴 때 마다 미세한 차이로 묶이는게 다르게 나옴
1
2
3
4
5
6
7
8
9
10
11
12
13
| RxComputationThreadPool-1: [0, 1, 2]
RxComputationThreadPool-1: [3, 4, 5]
RxComputationThreadPool-1: [6, 7, 8]
RxComputationThreadPool-2: [9]
RxComputationThreadPool-2: 완료
RxComputationThreadPool-1: [0, 1, 2]
RxComputationThreadPool-1: [3, 4]
RxComputationThreadPool-1: [5, 6, 7]
RxComputationThreadPool-2: [8, 9]
RxComputationThreadPool-2: 완료
...
|
- buffer(boundaryIndicator)
- Flowable/Observable이 데이터를 통지하는 간격 단위로 buffer에 데이터를 쌓는다
- buffer(boundaryIndicatorSupplier)
- 버퍼링을 시작 할 때 Flowable/Observable을 생성하고, 생성된 Flowable/Observable이 데이터를 통지하는 시점에 버퍼링을 끝냄
- boundaryIndicator는 interval을 사용하지만 boundaryIndicatorSupplier는 timer를 사용해야 같은 결과가 나온다
EX) buffer(Flowable boundaryIndicator)
1
2
3
4
5
6
7
8
9
| private static void bufferWithBoundaryIndicator() throws InterruptedException {
Flowable<List<Long>> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(7)
.buffer(Flowable.interval(1000L, TimeUnit.MILLISECONDS));
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(4000L);
}
|
출력:
1
2
3
4
| RxComputationThreadPool-1: [0, 1, 2]
RxComputationThreadPool-1: [3, 4, 5]
RxComputationThreadPool-2: [6]
RxComputationThreadPool-2: 완료
|
EX) buffer(Callable boundaryIndicatorSupplier)
1
2
3
4
5
6
7
8
9
| private static void bufferWithBoundaryIndicatorSupplier() throws InterruptedException {
Flowable<List<Long>> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(7)
.buffer(() -> Flowable.timer(1000L, TimeUnit.MILLISECONDS));
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(6000L);
}
|
EX)
1
2
3
4
| RxComputationThreadPool-3: [0, 1, 2]
RxComputationThreadPool-5: [3, 4, 5]
RxComputationThreadPool-4: [6]
RxComputationThreadPool-4: 완료
|
- buffer(openingIndicator, closingIndicator)
- openingIndicator(Flowable/Observable)는 여러 데이터를 통지
- closingIndicator(Function)는 하나만 데이터를 통지
- closingIndicator는 openingIndicator의 Flowable / Observable이 데이터를 통지할 때 호출되 처리 시작
EX)
1
2
3
4
5
6
7
8
9
| private static void bufferWithOpeningClosingIndicator() throws InterruptedException {
Flowable<List<Long>> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(7)
.buffer(Flowable.interval(0L, 900L, TimeUnit.MILLISECONDS), opening -> Flowable.interval(1000L, TimeUnit.MILLISECONDS));
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(6000L);
}
|
출력:
1
2
3
4
| RxComputationThreadPool-3: [0, 1, 2]
RxComputationThreadPool-4: [3, 4, 5]
RxComputationThreadPool-2: [6]
RxComputationThreadPool-2: 완료
|
V. toList
- 완료 통지를 받은 시점에 모아뒀던 데이터를 List형태로 통지
- Buffer에 쌓이는 데이터가 너무 많은 경우 메모리가 부족하게 될 수 있으므로 주의
- List 하나만 보내기 때문에 Single을 return
1
2
| toList()
toList(Collable<U> collectionSupplier) // 데이터를 담는 객체를 생성하는 함수형 인터페이스
|
EX)
1
2
3
4
5
6
| private static void toListExample() throws InterruptedException {
Single<List<String>> single = Flowable.just("A", "B", "C", "D", "E")
.toList();
single.subscribe(new DebugSingleObserver());
}
|
출력:
VI. toMap
- 받은 데이터로 key를 생성하고 이 key로 Map에 담는다
- buffer에 넣어둘 데이터 양이 커질 경우 메모리 부족이 발생할 수 있음
- List와 마찬가지로 Single을 return
- 같은 key 값에 데이터를 두 번 이상 넣을 경우 앞에 쓴 데이터가 덮어씌워짐
- Parameters
- keySelector - 사용할 key를 생성 (Function)
- valueSelector - 사용할value를 생성 (Funciton)
- mapSupplier - key와 value를 담을 Map 객체를 생성 (Callable)
EX) toMap(keySelector)
1
2
3
4
5
6
| private static void toMapExample() {
Single<Map<Long, String>> single = Flowable.just("1A", "2B", "3C", "1D", "2E")
.toMap(data -> Long.valueOf(data.substring(0, 1)));
single.subscribe(new DebugSingleObserver());
}
|
출력:
1
| main: {1=1D, 2=2E, 3=3C}
|
EX) toMap(keySelector, valueSelector)
1
2
3
4
5
6
| private static void toMapWithKeySelectorValueSelectorExample() {
Single<Map<Long, String>> single = Flowable.just("1A", "2B", "3C", "1D", "2E")
.toMap(data -> Long.valueOf(data.substring(0, 1)), data -> data.substring(1));
single.subscribe(new DebugSingleObserver());
}
|
출력:
VII. toMultiMap
- map인데 value가 collection 타입으로 같은 key값을 가진 value가 여러개
- Parameter
- keySelector - 사용할 key를 생성 (Function)
- valueSelector - 사용할value를 생성 (Funciton)
- mapSupplier - 통지할 Map 객체를 생성 (Callable)
- collectionFactory - key를 바탕으로 Map에 값으로 담을 collection 객체를 생성 (Function)
EX)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| private static void toMultiMapWithKeySelectorExample() throws InterruptedException {
Single<Map<String, Collection<Long>>> single = Flowable.interval(500L, TimeUnit.MILLISECONDS)
.take(5)
.toMultimap(data -> {
if (data % 2 == 0) {
return "Even";
} else {
return "Odd";
}
});
single.subscribe(new DebugSingleObserver());
Thread.sleep(3000L);
}
|
출력:
1
| RxComputationThreadPool-1: {Even=[0, 2, 4], Odd=[1, 3]}
|