I. filter
- Predicate를 이용해서 true인 것만 통지
1
| filter(Predicate predicate)
|
EX) 짝수만 통지
1
2
3
4
5
6
7
8
| private static void filterExample() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.filter(data -> data % 2 == 0);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(3000L);
}
|
출력:
1
2
3
4
5
| RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 6
RxComputationThreadPool-1: 8
|
II. distinct
- 이미 통지한 데이터와 같은 데이터를 제외하고 통지
- Parameter
- distinct()
- distinct(Function<? Super T,K> keySelector)
- keySelector - 받은 데이터와 비교할 데이터를 생성
EX) distinct()
1
2
3
4
5
6
7
8
| private static void distinctExample() throws InterruptedException {
Flowable<String> flowable = Flowable.just("A", "a", "B", "b", "A", "a", "B", "b")
.distinct();
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(3000L);
}
|
출력:
1
2
3
4
5
| main: A
main: a
main: B
main: b
main: 완료
|
EX) distinct(keySelector)
1
2
3
4
5
6
7
8
| private static void distinctWithKeySelectorExample() throws InterruptedException {
Flowable<String> flowable = Flowable.just("A", "a", "B", "b", "A", "a", "B", "b")
.distinct(String::toLowerCase);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(3000L);
}
|
출력:
1
2
3
| main: A
main: B
main: 완료
|
III. distinctUntilChanged
- 같은 데이터를 연속으로 받을 때 이 데이터를 제외
- 통지한 데이터와 같아도 연속되지 않으면 제외하지 않음
- Parameters
- keySelector - 받은 데이터와 비교할 데이터를 생성
- comparer - 바로 앞 데이터와 현재 데이터가 같은지 판단하는 함수 생성
EX) distinctUntilChanged()
1
2
3
4
5
6
| private static void distinctUntilChangedExample() {
Flowable<String> flowable = Flowable.just("A", "B", "A", "A", "B", "B")
.distinctUntilChanged();
flowable.subscribe(new DebugSubcriber<>());
}
|
출력:
1
2
3
4
5
| main: A
main: B
main: A
main: B
main: 완료
|
EX) distinctUntilChanged(Function<? super T, K> keySelector)
1
2
3
4
5
6
| private static void distinctUntilChangedWithKeySelectorExample() {
Flowable<String> flowable = Flowable.just("A", "a", "B", "b", "A", "a", "B", "b")
.distinctUntilChanged(data -> data.toLowerCase());
flowable.subscribe(new DebugSubcriber<>());
}
|
출력:
1
2
3
4
5
| main: A
main: B
main: A
main: B
main: 완료
|
EX) distinctUntilChanged(BiPredicate<? super T, ? super T> comparer)
1
2
3
4
5
6
| private static void distinctUntilChangedWithComparerExample() {
Flowable<String> flowable = Flowable.just("A", "a", "B", "b", "A", "a", "B", "b")
.distinctUntilChanged((data1, data2) -> data1.equalsIgnoreCase(data2));
flowable.subscribe(new DebugSubcriber<>());
}
|
출력:
1
2
3
4
5
| main: A
main: B
main: A
main: B
main: 완료
|
IV. take
EX) take()
1
2
3
4
5
6
7
8
| private static void takeExample() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS)
.take(3);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(4000L);
}
|
출력:
1
2
3
4
| RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 완료
|
V. takeUntil
- 특정 조건이 될 때까지 데이터를 통지
- Parameters에 따라 통지방식이 나뉨
- stopPredicate - 해당 조건이 true가 될 때까지 통지 (Predicate)
- true가 될 때 받은 데이터도 완료와 같이 통지
- other - 해당 Flowable이 데이터 통지를 시작할 때까지 통지 (Publisher/ObservableSource)
EX) takeUntil(stopPredicate)
1
2
3
4
5
6
7
8
| private static void takeUntilWithStopPredicateExample() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.takeUntil(data -> data == 3);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(4000L);
}
|
출력:
1
2
3
4
5
| RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 완료
|
EX) takeUntil(other)
1
2
3
4
5
6
7
8
| private static void takeUntilWithOtherExample() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.takeUntil(Flowable.timer(1000L, TimeUnit.MILLISECONDS));
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(4000L);
}
|
출력:
1
2
3
4
| RxComputationThreadPool-2: 0
RxComputationThreadPool-2: 1
RxComputationThreadPool-2: 2
RxComputationThreadPool-1: 완료
|
VI. takeWhile
- Predicate로 받는 조건이 true면 계속 통지
EX) takeWhile(predicate)
1
2
3
4
5
6
7
8
| private static void takeWhileWithPredicateExample() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.takeWhile(data -> data != 3);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(2000L);
}
|
출력:
1
2
3
4
| RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 완료
|
VII. takeLast
- 완료시점에서 마지막 데이터 부터 지정한 개수나 기간의 데이터를 통지
- 완료 시점을 통지하지 않는 Flowable/ObservableSource에선 사용 불가
- 조건에 맞는 데이터를 확인하는 시점은 데이터 통지시점이 아니라 완료 시점
- Parameters
- count - 끝에서부터 세는 데이터 개수 (int)
- time - 통지할 데이터를 결정할 대상 기간 (long)
- unit - 위에 기간의 단위 (TimeUnit)
EX) takeLast(count)
1
2
3
4
5
6
7
8
9
| private static void takeLastWithCountExample() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(5)
.takeLast(2);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(2000L);
}
|
출력:
1
2
3
| RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 완료
|
- takeLast(count, time, unit)
- 마지막 통지 시점부터 지정한 기간(time)까지의 데이터를 얻고, 그 중에 끝에서 count만큼 데이터를 통지
EX) takeLast(count, time, unit)
1
2
3
4
5
6
7
8
9
| private static void takeLastWithTimeAndUnitExample() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(10)
.takeLast(3, 300L, TimeUnit.MILLISECONDS);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(4000L);
}
|
출력:
1
2
3
| RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 9
RxComputationThreadPool-1: 완료
|
VIII. skip
- 앞에서부터 지정한 만큼 건너뛰기
- 데이터 개수나 경과 시간으로 지정 가능
- Parameters
- count - 건너뛸 개수
- time - 건너뛸 시간
- unit - 건너뛸 시간 단위
EX) skip()
1
2
3
4
5
6
7
8
| private static void skip() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.skip(2);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(1000L);
}
|
출력:
1
| RxComputationThreadPool-1: 2
|
IX. skipUntil
- Parameter로 받는 Flowable/Observable이 데이터를 통지할때까지 건너뛴다
EX) skipUntil(Publisher/ObservableSource other)
1
2
3
4
5
6
7
8
| private static void skipUntilWithOther() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.skipUntil(Flowable.timer(1000L, TimeUnit.MILLISECONDS));
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(2000L);
}
|
출력:
1
2
3
| RxComputationThreadPool-2: 3
RxComputationThreadPool-2: 4
RxComputationThreadPool-2: 5
|
X. SkipWhile
EX) skipWhile(predicate)
1
2
3
4
5
6
7
8
| private static void skipWhileWithPredicate() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.skipWhile(data -> data != 3);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(2000L);
}
|
출력:
1
2
3
| RxComputationThreadPool-1: 3
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 5
|
XI. skipLast
- 끝에서부터 지정한 범위의 시간, 개수만큼 데이터 건너뛰어서 통지
- 건너뛰는만큼 늦게 결과 통지 시작
EX) skipLast(count)
1
2
3
4
5
6
7
8
| private static void skipLastWithCount() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.skipLast(2);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(2000L);
}
|
출력:
1
2
3
4
| RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 1
RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 3
|
XII. throttleFirst
- 데이터 통지 후 지정 시간 동안 데이터를 통지하지 않음
- 처리가 완료될 때까지 반복됨
- 단기간에 들어오는 데이터가 모두 필요하지 않은 경우 사용해서 쳐낼 수 있다
- Parameters
- Time - 데이터를 파기하는 시간
- Unit - 시간 단위
EX)
1
2
3
4
5
6
7
8
9
| private static void throttleFirstWithTimeAndUnit() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(10)
.throttleFirst(1000L, TimeUnit.MILLISECONDS);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(3000L);
}
|
출력
1
2
3
4
| RxComputationThreadPool-1: 0
RxComputationThreadPool-1: 4
RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 완료
|
XIII. throttleLast
- 지정한 시간마다 가장 마지막에 통지되는 데이터만 통지
- throttleLast와 sample은 이름만 다를뿐 같은 Parameter를 넣으면 똑같은 작업
- Parameters
- Time - 간격으로 지정한 시간 (long)
- Unit - 시간 단위 (TimeUnit)
EX) throttleLast(long time, TimeUnit unit)
1
2
3
4
5
6
7
8
9
| private static void throttleLastWithTimeAndUnit() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(9)
.throttleLast(1000L, TimeUnit.MILLISECONDS); // 1000 Milliseconds 동안 받은 데이터중 가장 마지막꺼만 통지
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(3000L);
}
|
출력
1
2
3
| RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 5
RxComputationThreadPool-2: 완료
|
- throttleLast와 sample은 이름만 다를뿐 같은 Parameter를 넣으면 똑같은 작업
- Parameter
- Time - 간격으로 지정한 시간 (long)
- Unit - 시간 단위 (TimeUnit)
- sampler - 데이터 통지 간격을 정하는 Flowable / Observable (Publisher / ObservableSource)
EX) sample(sampler)
1
2
3
4
5
6
7
8
9
| private static void sampleWithSampler() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(9)
.sample(Flowable.interval(1000L, TimeUnit.MILLISECONDS));
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(3000L);
}
|
출력
1
2
3
| RxComputationThreadPool-1: 2
RxComputationThreadPool-1: 5
RxComputationThreadPool-2: 완료
|
XIV. throttleWithTimeout
- 데이터를 받은 후 일정 기간동안 다음 데이터를 받지 못하면 현재 데이터를 통지
- Parameters
- time - 지정하려는 시간
- unit - 시간 단위
- debounceIndicator - 건너뛸 시간을 정하는 Flowable/Observable
1
2
3
4
5
6
7
8
9
| private static void throttleWithTimeout() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(9)
.throttleWithTimeout(500L, TimeUnit.MILLISECONDS);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(3000L);
}
|
출력
1
2
| RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 완료
|
- Debounce는 throttleWithTimeout과 parameter가 같으면 동일한 처리
EX) debounce(time, unit)
1
2
3
4
5
6
7
8
9
| private static void debounce() throws InterruptedException {
Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.take(9)
.debounce(500L, TimeUnit.MILLISECONDS);
flowable.subscribe(new DebugSubcriber<>());
Thread.sleep(3000L);
}
|
출력
1
2
| RxComputationThreadPool-1: 8
RxComputationThreadPool-1: 완료
|
XV. elementAt / elementAtOrError
- 지정한 위치의 데이터만을 통지하는 연산자
- Single / Maybe를 return
- methods
Maybe<T> elementAt(long index)
- 데이터가 없으면 완료를 통지하는 MaybeSingle<T> elementAt(long index, T defaultItem)
- 데이터가 없으면 defaultItem을 통지Single<T> elementAtOrError(long index)
- 데이터가 없으면 에러 통지
EX) elementAt(long)
1
2
3
4
5
6
7
8
| private static void elementAtExample() throws InterruptedException {
Maybe<Long> maybe = Flowable.interval(300L, TimeUnit.MILLISECONDS)
.elementAt(3);
maybe.subscribe(new DebugMaybeObserver());
Thread.sleep(3000L);
}
|
출력
1
| RxComputationThreadPool-1: 3
|