[RxJava] 5. Reactive ์ฐ์ฐ์ ํํค์ณ๋ณด๊ธฐ 3ํธ (์ํ ์ฐ์ฐ์ ๋ฐ ๊ธฐํ ์ฐ์ฐ์)
์ง๋ ํฌ์คํธ์ ์ด์ด์ RxJava์ ๊ธฐ๋ณธ์ ๋ค๋ฃจ๋ ์ฐ์ฐ์ ๋ง์ง๋ง ํธ ์ํ ์ฐ์ฐ์ ๋ฐ ๊ธฐํ ์ฐ์ฐ์์ ๋ํด ์์๋ณด๊ฒ ์ต๋๋ค.
์ด๋ฒ ํฌ์คํธ์์ ๋ค๋ฃฐ ์ํ ์ฐ์ฐ์๋ ์ฐ๋ฆฌ๊ฐ ์์ฃผ ์ฌ์ฉํ๋ Sum, Average ์ ๊ฐ์ ์ํ ์ฐ์ฐ์๋ค์ ์ด์ผ๊ธฐํ๋๋ฐ, ์ด๋ฅผ ์ข ๋ Reactive ํ๊ฒ ๋ค๋ฃฐ ์ ์๋ ์ฐ์ฐ์๋ก ์ ๊ณตํ๋ฉฐ ๊ทธ ์ธ ๊ธฐํ ์ฐ์ฐ์๋ก๋ ์ง๋ ํฌ์คํธ์์๋ ๊ฐ์กฐํ๋ ๋ฐฐ์ ์ด์์ ๊ด๋ จ๋ ์ฐ์ฐ์ ๋ฑ ํน์ํ๊ฒ ๋ค๋ฃจ๋ ์ฐ์ฐ์๋ค์ ์ด์ผ๊ธฐ ํฉ๋๋ค.
์ํ ์ฐ์ฐ์
์ํ ์ฐ์ฐ์๋ ์ฌ๋ฌ ๊ฐ์ ์ซ์ํ ๋ฐ์ดํฐ๋ก ์ด๋ฃจ์ด์ง Array ๋ฑ์ ์ด์ฉํ์ฌ ํฉ, ํ๊ท ๋ฑ์ ๊ณ์ฐ์ ์ํํ์ฌ ํ๋์ Observable ๊ฐ์ฒด๋ก ๋ง๋ค์ด์ฃผ๋ ์ฐ์ฐ์์ ๋๋ค.
๋ณธ๋ RxJava์์ ์ํ์ฐ์ฐ์๋ 1.0์์ ์ง์ํ๋ ๋ํ๋์์์ต๋๋ค. ํ์ฌ RxJava 1.x์์ ์ฌ์ฉํ๋ ค๋ฉด ์ฐ๋ฆฌ๊ฐ ๊ธฐ๋ณธ์ ์ผ๋ก ์ฌ์ฉํ๋ RxJava์ ๋ํ๋์๋ง์ผ๋ก๋ ์ฌ์ฉํ ์ ์์ต๋๋ค. ๋ฐ๋ผ์ Math ๊ด๋ จ๋ ์ฐ์ฐ์๋ ๋ณ๋์ ๋ค๋ฅธ ๋ํ๋์๋ฅผ ์ถ๊ฐํด์ผ ํฉ๋๋ค.
implementation "io.reactivex.rxjava-math:1.0.0"
๋ณธ๋ RxJava 1.x์์๋ ์ด๋ฌํ ๋ํ๋์๋ฅผ ์ถ๊ฐํ์ง ์์๋ ๋ฐ๋ก ์ํ ์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ ์ ์์์ง๋ง RxJava 2.x ๋ฒ์ ๋ถํฐ ๋ ํฌ์งํฐ๋ฆฌ๊ฐ ๋ถ๋ฆฌ๋๋ฉด์ ์ํ ๊ด๋ จ๋ ์ฐ์ฐ์๋ ๋ณ๋์ ๋ํ๋์๋ฅผ ์ค์นํด์ค์ผ ํฉ๋๋ค.
์ด๋ฌํ ๋ํ๋์๋ RxJava Companion ์ฐ์ฐ์๋ก ๋ถ๋ฆฌ์ RxJava 2.x ์ด์์์๋ ํธํ๋์ง ์๊ณ , ๊ณต์์ ์ผ๋ก ๊ฐ๋ฐํ์ฌ ์ ๊ณตํ์ง ์์ต๋๋ค.
RxJava 2.x ์ด์ ๋ฒ์ ์์ ์ด๋ฌํ ์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด์ ์ด๋ฅผ ๋ณ๋๋ก ํฌํ ์ ํด์ฃผ์ RxJava 2์ ํต์ฌ ๊ธฐ์ฌ์์ด์ David Karnok์ด ์ด๋ฌํ Extension์ ๊ณต๊ฐํ์ฌ ํ์ฌ Maven์์ ์ ๊ณต๋๊ณ ์์ต๋๋ค.
implementation "com.github.akarnokd:rxjava3-extensions:3.0.1"
sum()
์์ ๋งํ ๋ฏ์ด ๋ํ์ ์ผ๋ก๋ sum์ ์ฌ๋ฌ๊ฐ์ ์ซ์ํ์ผ๋ก ์ด๋ฃจ์ด์ง Array, List (ํ๋์ Observable ํน์ Flowable) ๋ฑ์ ์ด์ฉํ์ฌ ํฉ๊ณ๋ฅผ ํ๋์ Observable๋ก ๋ฐํํด์ฃผ๋ ์ฐ์ฐ์์ ๋๋ค.
์ ๋ฌธ์๋ RxJava 1.x์์ ์ ๊ณต๋์๋ ๋ฌธ์๋ฅผ ๊ฐ์ ธ์จ ๊ฒ์ด๋ฉฐ ๊ทธ๋ฆผ๊ณผ ๊ฐ์ด X Observable์ด ์๊ณ , ํด๋น ๊ฐ์ฒด์์ Iterable ํํ์ ๋ ํผ๋ฐ์ค ์ธ์คํด์ค์์ ์๋ฃ๋ฅผ ๋นผ์ ํฉ๊ณ๋ฅผ ํ๋์ ๊ฐ์ฒด๋ก ๋ํ๋ด์ฃผ๊ณ ์์ผ๋ฉฐ ๊ฐ ์๋ฃํ๋ณ๋ก sumInteger, sumLong, sumFloat๊ฐ ์กด์ฌํฉ๋๋ค.
// RxJava 1.x
MathObservable.sumInteger(Observable.range(1, 4)).subscribe(System.out::println);
// RxJava 2.x or 3.x
Observable.range(1, 4).to(MathObservable::sumInt).subscribe(System.out::println);
๊ณต์์ ์ผ๋ก ์ ๊ณตํ๋ ๊ฒ๊ณผ Extension์ผ๋ก ์ ๊ณตํ๋ ๊ฒ์ ์ฌ์ฉ๋ฒ์ด ์ฝ๊ฐ ์ฐจ์ด๊ฐ ์๋๋ฐ, ์ผ๋จ ๋ฉ์๋ ์ด๋ฆ์ด RxJava Extension์ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ sumInt๋ก ์ฌ์ฉํ ์ ์์ต๋๋ค.
average()
average ์ญ์ sum๊ณผ ๋ง์ฐฌ๊ฐ์ง๋ก ๊ฐ์ ๋ฐ์ดํฐ ํ์์ ์ฌ์ฉํฉ๋๋ค. ๋ค๋ง average์ ๊ฒฝ์ฐ ์ ์๊ฐ ๋์ฌ ์๋ ์๊ณ , ์์ซ์ ๊ฐ์ด ๋์ฌ ์๋ ์๊ธฐ ๋๋ฌธ์ averageFloat์ averageDouble ๋ฉ์๋๋ง์ ๊ฐ์ง๊ณ ์์ต๋๋ค.
// RxJava 1.x
MathObservable.averageFloat(new Integer[]{30, 60, 25, 18, 41}).subscribe(System.out::println);
// RxJava 2.x or 3.x
Observable.fromArray(new Integer[]{30, 60, 25, 18, 41}).to(MathObservable::averageFloat)
.subscribe(System.out::println);
average์ ๊ฒฝ์ฐ๋ ๋ฉ์๋ ์ด๋ฆ์ ๋ณํ๊ฐ ์์ต๋๋ค. ๊ทธ๋ฐ๋ฐ sum๋ ๊ทธ๋ ๊ณ average๊ณ ๊ทธ๋ ๊ณ ์ฝ๋ ์ฌ์ฉ์ ์กฐ๊ธ ์ฐจ์ด๊ฐ ์๋๋ฐ์. 1.x์ ๊ฒฝ์ฐ๋ MathObservable์ด๋ผ๋ ๋ณ๋์ ๊ฐ์ฒด๊ฐ ์กด์ฌํ๊ณ , Extension์ ์ด๊ฒ์ด ์์ต๋๋ค.
์ฌ์ค ์ฝ๋๋ฅผ ์ด์ด๋ณด๋ฉด 1.x์์๋ Observable์ Base๋ก ํ์ฌ๊ธ MathObservable ํด๋์ค๋ฅผ ๋ง๋ค์ด์ ํด๋น ๋ฉ์๋๋ฅผ ์ ์ํ ๊ฒ์ด๊ณ , 2.x์์ ์ ๊ณตํ๋ Extension์ ๊ฒฝ์ฐ ์ด์ฐจํผ Base๊ฐ Observable์ด๊ธฐ ๋๋ฌธ์ ๊ตณ์ด ๊ทธ๊ฒ์ ํด๋์ค์ ์ ์ํ ํ์ ์์ด ์ด์ ๊ด๋ จ๋ ๋ฉ์๋๋ฅผ static์ผ๋ก ์ ๊ณตํจ์ผ๋ก์จ ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํ๋ ๊ฒ์ผ๋ก ๊ตฌํ๋์ด ์์ต๋๋ค.
๋ฐ๋ผ์ Extension๊ณผ ๊ธฐ์กด ์ฝ๋์ ์ฐจ์ด๋ ๊ฑฐ์ ์์ผ๋ฉฐ ๋จ์ง ์ฝ๋์ ๋ชจ์ต์ ๋จ์ํ ํ๊ธฐ ์ํด ๋ฆฌํฉํ ๋ง ํ ๊ฒ ๋ฟ์ด๋ผ๊ณ ๋ง ๋ณด๋ฉด ๋๊ฒ ์ต๋๋ค.
min()
min ์ญ์ ๊ฐ์ ์ ํ์ ๋ฐ์ดํฐ๋ก ์ต์๊ฐ์ ํ๋์ Observable๋ก ๋ฐํํ๋ ๋ฉ์๋์ ๋๋ค.
// RxJava 1.x
MathObservable.min(new Integer[]{30, 60, 25, 18, 41}).subscribe(System.out::println);
// RxJava 2.x or RxJava 3.x
Observable.fromArray(new Integer[]{30, 60, 25, 18, 41}).to(MathObservable::min)
.subscribe(System.out::println);
์ ์ฌ์ฉ๋ฒ ๊ทธ๋๋ก max ๋ฉ์๋๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค.
max()
max๋ ๋ฐ์ดํฐ ๋ชจ์์์ ์ต๋๊ฐ์ ํ๋์ Observable๋ก ๋ฐํํ๋ ๋ฉ์๋์ ๋๋ค.
// RxJava 1.x
MathObservable.max(new Integer[]{30, 60, 25, 18, 41}).subscribe(System.out::println);
// RxJava 2.x or RxJava 3.x
Observable.fromArray(new Integer[]{30, 60, 25, 18, 41}).to(MathObservable::max)
.subscribe(System.out::println);
๊ทธ๋ฐ๋ฐ, ํ ๊ฐ์ง ์๋ฌธ์ ์ด ์์ต๋๋ค. 1.x์์๋ MathObservable์ ๋ณ๋๋ก ์ด์ฉํ๊ณ , Extension์์๋ ๊ธฐ์กด ๊ตฌํ์ฒด์์ ์ฌ์ฉํ๋๋ก ๋์ด ์๋๋ฐ, ์ด๊ฒ ๊ฐ๋ฅํ ๊ฒ to ๋ฉ์๋๊ฐ ๋์ ๋๋๋ฐ์. to ๋ฉ์๋๋ ๋ฌด์์ธ๊ฑธ๊น์?
to ๋ฉ์๋๋ RxJava์์ ๋ค๋ฅธ ํ์ ์ผ๋ก ๋ณํํด์ฃผ๊ธฐ ์ํ ๋ฉ์๋์ ๋๋ค. ๋ง์ ๋ฐ์ดํฐ๋ค ๋ชจ์์์ ํ๋์ Observable๋ก ๋ฐํํ๊ธฐ ์ํด์๋ Observable<List<Integer>> -> Obserable<Integer> ํ์์ผ๋ก ๋ณํํด์ผํ๋ ๊ฒ์ด๋ผ๊ณ ๋ณด๋ฉด ๋๊ฒ ์ต๋๋ค.
๋น์ทํ ๋ฉ์๋๋ก toFlowable ๋ฉ์๋๊ฐ ์์ต๋๋ค. Flowable์ RxJava์์ ๋ฐฐ์ ์ด์๋ฅผ ์ํํ๊ธฐ ์ํ ํด๋์ค์ธ๋ฐ์. Observable๋ก ๊ฐ์ง๊ณ ์๋ ๊ฐ์ฒด๋ฅผ Flowable๋ก ๋ณํํ๊ธฐ ์ํด ์ฌ์ฉํ๋ ๋ฉ์๋์ ๋๋ค.
// RxJava 2.x or 3.x
Observable.fromArray(new Integer[]{30, 60, 25, 18, 41}).toFlowable(BackpressureStrategy.BUFFER)
.to(MathFlowable::min).subscribe(System.out::println);
Flowable๋ก ๋ณํํ ๋ ์ฃผ๋ ์ธ์๋ ๋ฐ๋ก ๋ฐฐ์ ์ ๋ต์ธ๋ฐ์. ๋ฐฐ์ ์ ๋ต์ ๋ฐ๋ผ์ ๋ง์ ๋ฐ์ดํฐ๋์ ์์ฐํ๊ณ ์๋นํ ๋๋ฅผ ์ ์ดํ ์ ์์ต๋๋ค. ์ด ๋ถ๋ถ์ ๋ค๋ฅธ ํฌ์คํธ์์ ์์ธํ ๋ค๋ค๋ณด๋๋ก ํ๊ฒ ์ต๋๋ค.
count()
RxJava์์ ์ฌ์ฉ ๊ฐ๋ฅํ ์ํ ์ฐ์ฐ์ ์ค ์ ์ผํ๊ฒ Extension์ ์์กดํ์ง ์๋ ์ฐ์ฐ์์ธ count๋ ์ด๋ ๋ฒ์ ์ด๋ ์ง Observable ๊ฐ์ฒด์์ ๋ฐ๋ก ์ฌ์ฉํ ์ ์๋ ์ฐ์ฐ์์ ๋๋ค.
Observable.range(1, 4).count().subscribe(System.out::println);
Extension์ ์์กดํ์ง ์๊ธฐ ๋๋ฌธ์ ๋ชจ๋ ๋ฒ์ ์์ ์์ ๊ฐ์ ๋์ผํ ์ฝ๋๋ฅผ ์ฌ์ฉํฉ๋๋ค. ์์ฐํด์ ๋ค์ด์ค๋ ๋ฐ์ดํฐ๋ค์ ๋ํ ๊ฐฏ์๋ฅผ ํต๊ณํ๊ณ ์ ํ ๋ ์ฌ์ฉํ ์ ์์ต๋๋ค.
๊ธฐํ ์ฐ์ฐ์
๊ทธ ์ธ ์ฐ์ฐ์๋ค์ ๋ํด์๋ ์ง์ฐ๊ณผ ๊ด๋ จ๋ ์ฐ์ฐ์๊ฐ ์์ต๋๋ค. ์ง๋ ํฌ์คํธ์์ ์ ๊น ๋ค๋ค์ง๋ง ๋ฐ์ดํฐ๋ฅผ ์์ฐํ๊ณ ์๋นํ๋ ๊ณผ์ ์์ sleep๊ณผ ๋น์ทํ ์ญํ ์ ํ๋ delay, timeInterval ํจ์ ๋ฑ์ ์๊ฐ ๊ด๋ จ ์ฐ์ฐ์๋ค์ ๋ณดํต ์ ํธ๋ฆฌํฐ ์ฐ์ฐ์๋ผ๊ณ ํฉ๋๋ค.
delay()
amb ์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ์ ๋ ๊ทธ ์ฐจ์ด๋ฅผ ์์๋ณด๊ธฐ ์ํด์ ์ ๊น ์ฌ์ฉํ๋ ์ฐ์ฐ์์ธ๋ฐ์. ์ด๋ฐ์์ผ๋ก ๋ฐ์ดํฐ์ ๋ฐํ์ ์ ์ ์ง์ฐ์ํค๋ ์ญํ ์ ํ๋ ์ฐ์ฐ์์ ๋๋ค.
Observable.amb(Arrays.asList(Observable.just("N.K").delay(2, TimeUnit.SECONDS), Observable.just("NEONKID")))
.subscribe(System.out::println);
์๊ฐ ๊ด๋ จ ์ฐ์ฐ์๋ค์ ํน์ง์ผ๋ก๋ ๋ค๋ฅธ ์ฐ์ฐ์๋ค๊ณผ ๋ค๋ฅด๊ฒ ์ค์ผ์ค๋ฌ๋ฅผ ๋ณ๋์ ์ค๋ ๋๋ก ์ด์ฉํ๋ค๋ ๊ฒ์ธ๋ฐ์. ๋ง์ฝ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃจ๋ ์ค๋ ๋์ ๊ฐ์ด ์ด๋ฃจ์ด์ง๋ค๋ฉด ๋ฐ์ดํฐ์ ์ฐ๊ฒฐ์ฑ์ ๋ณด์ฅ ๋ฐ๊ธฐ๊ฐ ์ด๋ ต๊ฒ ์ฃ ?
timeInterval()
timeInterval ์ฐ์ฐ์๋ ์ด๋ค ๊ฐ์ ์์ฐํ๋ ๋ฐ ๊ฑธ๋ฆฌ๋ ์๊ฐ์ ๋ฐํํ๋ ์ฐ์ฐ์์ ๋๋ค. ์๋ฒ ์ธก์์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์์ค๋ ๋ฐ๋ง์ ์ฌ์ฉํ๋ค๋ฉด ์ด ์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ๋ ๊ฒ๋ ์ข๊ฒ ์ฃ ?
Observable<Timed<Integer>> src = Observable.range(20, 100).delay(item -> {
Thread.sleep(new Random().nextInt(500));
return Observable.just(item);
}).timeInterval();
Disposable disposable = src.subscribe(System.out::println);
disposable.dispose();
์ฌ์ด ์ค๋ช ์ ์ํด 20 ~ 100๊น์ง์ ๋ฐ์ดํฐ๋ฅผ range ๋ฉ์๋๋ก ํ์ฌ๊ธ array ๋ฐ์ดํฐ๋ฅผ ์์ฑํ์ฌ delay๋ฅผ ๋๋คํ๊ฒ ์ฃผ๊ณ ๊ทธ ๊ฒฐ๊ณผ๋ฅผ ๊ตฌ๋ ๋ฐ๋ ์ฝ๋๋ฅผ ์์ฑํด๋ดค์ต๋๋ค.
์ด์ ๋ฐ์ดํฐ๋ฅผ ๊ธฐ์ค์ผ๋ก ์ง๊ณํ์ ๋ 117, 118, 119๊ฐ ๊ฐ๊ฐ ๋ฐํ๋ ๋๋ฅผ ์ถ๋ ฅํ ๋ชจ์ต์ ๋๋ค. 116์ด ๋ฐํ๋๊ณ 117์ด ๋ฐํ๋์ ๋ ๊ฑธ๋ฆฐ ์๊ฐ์ 229ms, 118์ด ๋ฐํ๋์ ๋๊น์ง ๊ฑธ๋ฆฐ ์๊ฐ์ 161ms ์ด๋ฐ์์ผ๋ก ํ์๋์ด ๊ฐ ๋ฐํ ๋ฐ์ดํฐ๊ฐ ํ๊ท ์ ์ผ๋ก ์ผ๋ง ์ ๋ ๊ฑธ๋ฆฌ๋์ง๋ฅผ ์ธก์ ํ ์ ์๋๋ก ๊ตฌํํ ์ ์์ต๋๋ค.
ํ์ฌ ์ฝ๋๋ Thread์์ sleep ๋ฉ์๋๋ฅผ ์ด์ฉํ์ฌ ์ต๋ 500ms๊น์ง ๋ฌด์์๋ก ์ซ์๋ฅผ ์์ฑํ์ฌ ๊ฑธ๋ฆฌ๋ ์๊ฐ์ ์ธก์ ํ์์ต๋๋ค.
๋ง์น๋ฉฐ...
๊ธธ๊ณ ๊ธด RxJava ์ฐ์ฐ์ ์๋ฆฌ์ฆ Reactive ์ฐ์ฐ์ ํํค์ณ๋ณด๊ธฐ ๊ธ์ด ๋ง๋ฌด๋ฆฌ ๋์๋๋ฐ์. ์์ธํ ๋ณด๋ฉด ์ฐ๋ฆฌ๊ฐ ์ฌ์ฉํ๋ ๋ฉ์๋๋ค๊ณผ ํฐ ์ฐจ์ด๋ ์์ง๋ง ์ด๋ฒคํธ ๊ธฐ๋ฐ์ผ๋ก ๋์ํ๊ณ ๊ทธ์ ํด๋นํ๋ ์ด์์ธ ๋ฐฐ์ ์ด์๋ฅผ ์ฌ๋ฐ๋ฅด๊ฒ ์ฒ๋ฆฌํ ์ ์๋๋ก ํด์ฃผ๋ ํ๋์ ์๋ฃจ์ ์ด์์์ ํ์ธํ ์ ์์์ต๋๋ค.