[RxJava] 4. Reactive ์—ฐ์‚ฐ์ž ํŒŒํ—ค์ณ๋ณด๊ธฐ 2ํŽธ (๊ฒฐํ•ฉ, ์กฐ๊ฑด ์—ฐ์‚ฐ์ž)

๋ฐ˜์‘ํ˜•

 

2021/02/07 - [Programming/Java] - [RxJava] 3. Reactive ์—ฐ์‚ฐ์ž ํŒŒํ—ค์ณ๋ณด๊ธฐ 1ํŽธ (์ƒ์„ฑ, ๋ณ€ํ™˜ ์—ฐ์‚ฐ์ž)

 

[RxJava] 3. Reactive ์—ฐ์‚ฐ์ž ํŒŒํ—ค์ณ๋ณด๊ธฐ 1ํŽธ (์ƒ์„ฑ, ๋ณ€ํ™˜ ์—ฐ์‚ฐ์ž)

2021/01/23 - [Programming/Java] - [RxJava] 2. Reactive ๊ธฐ๋ณธ ์—ฐ์‚ฐ์ž(Operator) - map, filter, reduce [RxJava] 2. Reactive ๊ธฐ๋ณธ ์—ฐ์‚ฐ์ž(Operator) - map, filter, reduce Reactive Programming์—์„œ ๊ฝƒ์ด๋ผ๊ณ ..

blog.neonkid.xyz

์ด๋ฒˆ ํฌ์ŠคํŠธ๋Š” ์ง€๋‚œ ํฌ์ŠคํŠธ์— ์ด์–ด ๊ฒฐํ•ฉ, ์กฐ๊ฑด ์—ฐ์‚ฐ์ž์— ๋Œ€ํ•ด ์•Œ์•„๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

 

 

 

๊ฒฐํ•ฉ ์—ฐ์‚ฐ์ž

๊ฒฐํ•ฉ ์—ฐ์‚ฐ์ž๋Š” ์—ฌ๋Ÿฌ ๊ฐœ์˜ Observable ๊ฐ์ฒด๋ฅผ ์กฐํ•ฉํ•˜์—ฌ ํ™œ์šฉํ•˜๋Š” ์—ฐ์‚ฐ์ž๋กœ ๋‹ค์ˆ˜์˜ Observable์„ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ๋ฐ›์•„ ํ•˜๋‚˜์˜ Observable๋กœ ๋งŒ๋“ค์–ด์ฃผ๋Š” ์—ฐ์‚ฐ์ž๋“ค์„ ๋งํ•ฉ๋‹ˆ๋‹ค.

 

 

zip()

๋จผ์ € ์ฒซ ๋ฒˆ์งธ๋กœ zip ์—ฐ์‚ฐ์ž๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. zip ์—ฐ์‚ฐ์ž๋Š” ๋‘ ๊ฐœ ์ด์ƒ์˜ Observable ๊ฐ์ฒด๋ฅผ ์ด์šฉํ•˜์—ฌ ํ•˜๋‚˜์˜ ObservableSource๋กœ ๊ฒฐํ•ฉํ•ด์ฃผ๋Š” ์—ฐ์‚ฐ์ž์ธ๋ฐ, ์ด ์—ฐ์‚ฐ์ž์—๋Š” ๋‹ค์†Œ ์˜๋ฌธ์ ์ด ์žˆ์Šต๋‹ˆ๋‹ค.

 

๋‘ ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฐœํ–‰๋˜๊ธฐ ์ „์˜ ์ƒํƒœ๋ผ๋ฉด ์–ด๋–จ๊นŒ์š”? ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฐœํ–‰๋˜์ง€ ์•Š์€ ์ƒํƒœ๋ผ๋ฉด ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์„ ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ ค์•ผํ•ฉ๋‹ˆ๋‹ค. zip ์—ฐ์‚ฐ์ž๋Š” ๋‘ ์—ฐ์‚ฐ์ž ์ค‘ ํ•˜๋‚˜์˜ ์—ฐ์‚ฐ์ž๋ผ๋„ ๋ฐœํ–‰๋œ ์ƒํƒœ๊ฐ€ ์•„๋‹ˆ๋ผ๋ฉด ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฐœํ–‰๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐํ•ฉ๋‹ˆ๋‹ค.

 

zip ์—ฐ์‚ฐ์ž๋Š” ์ตœ๋Œ€ 9๊ฐœ์˜ Observable์„ ๊ฒฐํ•ฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. 

Disposable disposable = Observable.zip(
    Observable.just(300, 600, 900),
    Observable.just(30, 60, 90),
    Observable.just(3, 6, 9),
    (x, y, z) -> x + y + z
).subscribe(System.out::println);

disposable.dispose();

๋‹ค์ˆ˜์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ๋“ค์–ด ์žˆ๋Š” Observable ๊ฐ์ฒด๋ฅผ ๊ฒฐํ•ฉํ•  ๋•Œ๋Š” ๊ฐ๊ฐ์˜ ์ธ์ž์— ๋งž๊ฒŒ ๊ฒฐํ•ฉ๋˜๋ฉฐ ์ด๋ฅผ ๊ฒฐํ•ฉํ•  ๋–„ ์‚ฌ์šฉํ•  ์—ฐ์‚ฐ์„ lambda๋กœ ์ •์˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

๋น„์Šทํ•œ ์—ฐ์‚ฐ์ž๋กœ zipWith๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

 

๊ธฐ๋ณธ ๊ณจ๊ฒฉ์€ zip ์—ฐ์‚ฐ์ž์™€ ๊ฐ™์ง€๋งŒ ๊ธฐ์กด์— ๋ฐ˜ํ™˜๋œ ๋‹จ์ผ Observable ๊ฐ์ฒด์—์„œ ๋˜๋‹ค๋ฅธ zip ์—ฐ์‚ฐ์„ ์ทจํ•˜๊ณ ์ž ํ•˜๋Š” ๊ฒฝ์šฐ์— ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ๊ฐ€๋ น ์˜ˆ๋ฅผ ๋“ค์–ด์„œ ์•„๋ž˜์™€ ๊ฐ™์ด ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Disposable dp = Observable.just(300, 600, 900).zipWith(
    Observable.just(30, 60, 90), (x, y) -> x * y
).subscribe(System.out::println);

์ธ์ž๊ฐ€ ObservableSource๋กœ ํ•˜๋‚˜์˜ ์—ฐ์‚ฐ์ด ๋“ค์–ด๊ฐˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ ์—ฌ๋Ÿฌ ๊ฐœ์˜ Observable์„ ๊ฒฐํ•ฉํ•˜๊ณ ์ž ํ•˜๋Š” ๊ฒฝ์šฐ ObservableSource๋กœ ๊ฒฐํ•ฉํ•œ ๋‹ค์Œ, zipWith๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํ•œ๊บผ๋ฒˆ์— ๊ฒฐํ•ฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Disposable disposable = Observable.zip(
    Observable.just(300, 600, 900),
    Observable.just(30, 60, 90),
    Observable.just(3, 6, 9),
    (x, y, z) -> x + y + z
).zipWith(Observable.just(0, 0, 0), (xyz, z) -> xyz * z).subscribe(System.out::println);

์œ„์—์„œ zip ์—ฐ์‚ฐ์ž๋กœ ๊ฒฐํ•ฉํ•œ ObservableSource ๊ฐ’์— ์ด์–ด์„œ zipWith๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ฒฐํ•ฉํ•˜๋ฉด ์ด๋ฅผ ํ•œ ๋ฒˆ์— ์กฐํ•ฉํ•œ ๋’ค ๋‹ค์Œ์˜ ์ •ํ•ด์ง„ ๊ฒฐํ•ฉ ๋ฐ์ดํ„ฐ๋กœ ๋”๋ถˆ์–ด ๊ฒฐํ•ฉ์ด ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค.

 

 

combineLatest()

zip ์—ฐ์‚ฐ์ž๊ฐ€ ๋ฐ์ดํ„ฐ์˜ ๋ฐœํ–‰์ด ์ด๋ค„์งˆ ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ ธ๋‹ค ๊ฐ๊ฐ์˜ Observable ๊ฐ์ฒด ๋‚ด ๋ฐ์ดํ„ฐ๋ฅผ ๊ฒฐํ•ฉํ•ด์ฃผ๋Š” ์—ฐ์‚ฐ์ž์˜€๋‹ค๋ฉด combineLatest ์—ฐ์‚ฐ์ž๋Š” 2๊ฐœ ์ด์ƒ์˜ Observable ๊ฐ์ฒด๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ๊ฐ๊ธฐ ๊ฐ’์ด ๋ณ€๊ฒฝ๋˜์—ˆ์„ ๋•Œ ์ด๋ฅผ ๊ฐฑ์‹ ํ•ด์ฃผ๋Š” ์—ฐ์‚ฐ์ž์ž…๋‹ˆ๋‹ค.

 

์ฝ”๋“œ๋ฅผ ์ž˜๋ณด๋ฉด ๋งˆ์ง€๋ง‰ ์—ฐ์‚ฐ์ž์— combiner๊ฐ€ ๋“ค์–ด๊ฐ‘๋‹ˆ๋‹ค. ์ด๋Š” zip ์—ฐ์‚ฐ์ž์—์„œ zipper์™€ ๋™์ผํ•œ ์—ญํ• ์„ ํ•˜๋Š” Lambda ํ•จ์ˆ˜๋ผ๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ์œผ๋ฉฐ ๋‘ Observable์„ ๊ฒฐํ•ฉํ•˜์—ฌ ์–ด๋–ค Observable์„ ๋งŒ๋“ค์–ด ์ฃผ๋„๋ก ํ•ฉ๋‹ˆ๋‹ค.

 

zip ์—ฐ์‚ฐ์ž์™€ ๋‹ค๋ฅธ ์ ์€ zip ์—ฐ์‚ฐ์ž๋Š” ํ•œ ๋ฒˆ ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฐœํ–‰๋˜๋ฉด ๊ทธ ์ดํ›„๋กœ zipper ํ•จ์ˆ˜๋Š” ๋™์ž‘ํ•˜์ง€ ์•Š์ง€๋งŒ combineLatest๋Š” ๊ฐ’์ด ๋ณ€๊ฒฝ๋  ๋•Œ๋งˆ๋‹ค combiner ํ•จ์ˆ˜๋ฅผ ์žฌ์‹คํ–‰ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์ˆ˜์‹œ๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ๋ณ€๋™๋˜์—ˆ์„ ๋•Œ์˜ ์ด๋ฒคํŠธ๋ฅผ ๋งŒ๋“ค๊ณ ์ž ํ•  ๋•Œ ์œ ๋ฆฌํ•ฉ๋‹ˆ๋‹ค.

 

๋‹จ, ์•„๋ฌด๋„ ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฐœํ–‰๋˜์ง€ ์•Š์€ ์ƒํƒœ์—์„œ ํ•œ ๊ฐœ์˜ Observable์—์„œ๋งŒ ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฐœํ–‰๋๋‹ค๊ฑฐ๋‚˜ ๋˜ ๋‹ค๋ฅธ ํ•œ ๊ฐœ์˜ Observable์—์„œ๋งŒ ๋ฐ์ดํ„ฐ์˜ ๋ณ€ํ™”๊ฐ€ ์ƒ๊ธด๋‹ค๋ฉด ๊ทธ ๋•Œ๋Š” ์ด ์—ฐ์‚ฐ์ด ์ˆ˜ํ–‰๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ์ตœ์†Œ ๋‘ ๊ฐ’์ด ๋ชจ๋‘ ๋ฐœํ–‰๋œ ์ƒํƒœ์—์„œ ์—ฐ์‚ฐ์ด ์‹œ์ž‘๋˜๋ฉฐ ๊ทธ ํ›„์—๋Š” ์–ด๋Š Observable ์—์„œ๋“ ์ง€ ์žฌ๋ฐœํ–‰ ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐฑ์‹ ํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

 

์ตœ๋Œ€ 9๊ฐœ์˜ Observable ๊ฐ์ฒด๋ฅผ ๋„ฃ์„ ์ˆ˜ ์žˆ์œผ๋ฉฐ 2๊ฐœ์˜ ์ธ์ž์— Observable์„ ๊ฐ๊ฐ ๋„ฃ๊ณ  combiner์— ์›ํ•˜๋Š” ์—ฐ์‚ฐ์„ ๊ตฌํ˜„ํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.

 

 

 

merge()

merge ์ด๋ฆ„ ์ž์ฒด๋Š” ๊ฒฐํ•ฉ ์—ฐ์‚ฐ์ž์ธ ๊ฒƒ์ด ๋ณด์ด์ง€๋งŒ zip๊ณผ combineLatest์™€๋Š” ๋‹ค์†Œ ์ฐจ์ด๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ฒฐํ•ฉ์„ ์œ„ํ•ด์„œ๋Š” ๋‹จ์ผ Observable์ด ์•„๋‹Œ ๋‘ ๊ฐœ ์ด์ƒ์˜ Observable์„ ๋„ฃ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฐ๋ฐ merge ์—ฐ์‚ฐ์ž๋Š” ์ˆœ์„œ๋‚˜ ๋ฐ์ดํ„ฐ ๋ฐœํ–‰ ์—ฌ๋ถ€์™€ ๊ด€๊ณ„ ์—†์ด UpStream์—์„œ ๋จผ์ € ์ž…๋ ฅ๋˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๊ทธ๋Œ€๋กœ ๋ฐœํ–‰ํ•ฉ๋‹ˆ๋‹ค.

 

๋ฉ”์†Œ๋“œ ์ธ์ž๋ฅผ ๋ณด๋ฉด ํ•œ ๊ฐ€์ง€ ๋” ์ฐจ์ด๋ฅผ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. combineLatest์™€ zip์˜ ๊ฒฝ์šฐ ๊ฒฐํ•ฉํ•  ๋–„ ์ค‘๊ฐ„ ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋„๋ก zipper, combiner๊ฐ€ ์ฃผ์–ด์กŒ์ง€๋งŒ merge์—์„œ๋Š” ์–ด๋– ํ•œ ๋ชจ์Šต๋„ ์ฐพ์•„๋ณผ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ merge๋Š” ๋‹จ์ˆœํžˆ ๋ฐ์ดํ„ฐ๋งŒ์„ ๊ฒฐํ•ฉํ•˜๋Š” ์—ฐ์‚ฐ์ž๋ผ๊ณ  ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

๋น„์Šทํ•œ ์—ฐ์‚ฐ์ž๋กœ zip๊ณผ ๋™์ผํ•˜๊ฒŒ mergeWith๊ฐ€ ์กด์žฌํ•ฉ๋‹ˆ๋‹ค.

 

mergeWith ์—ญ์‹œ zipWith์™€ ์‚ฌ์šฉ๋ฒ•์ด ๋น„์Šทํ•˜์ง€๋งŒ merge์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ combiner์™€ zipper์™€ ๊ฐ™์€ lambda ํ•จ์ˆ˜ ์ธ์ž๊ฐ€ ์ฃผ์–ด์ ธ ์žˆ์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

Disposable mp = Observable.just(300, 600, 900).mergeWith(
    Observable.just(30, 60, 90)
).subscribe(System.out::println);

 

 

concat()

 

๋งˆ์ง€๋ง‰์œผ๋กœ concat ์—ฐ์‚ฐ์ž์ž…๋‹ˆ๋‹ค. concat ์—ฐ์‚ฐ์ž๋Š” ์ด๋ฆ„ ๊ทธ๋Œ€๋กœ ๊ฐ์ฒด๋ฅผ ์ด์–ด ๋ถ™์—ฌ์ฃผ๋Š” ์—ฐ์‚ฐ์ž์ž…๋‹ˆ๋‹ค. 2๊ฐœ ์ด์ƒ์˜ Observable ๊ฐ์ฒด๋ฅผ ์ด์–ด ๋ถ™์—ฌ ํ•˜๋‚˜์˜ ObservableSource๋กœ ๋งŒ๋“ค์–ด์ฃผ๋Š” ์—ฐ์‚ฐ์ž๋กœ ์œ„์—์„œ ๋ณธ ๊ฒฐํ•ฉ ์—ฐ์‚ฐ์ž์™€ ๋‹ค์†Œ ์ฐจ์ด๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

 

๋ณด๋‹ค์‹œํ”ผ ๋ณ„๋‹ค๋ฅธ ์Šค์ผ€์ค„๋Ÿฌ๋ฅผ ํ†ตํ•ด ์—ฐ์‚ฐ ์ฒ˜๋ฆฌ๋ฅผ ํ•˜์ง„ ์•Š์Šต๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์ฒซ ๋ฒˆ์งธ Observable์— onComplete ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ•ด์•ผ๋งŒ ๋‘ ๋ฒˆ์งธ Observable์„ ๊ตฌ๋…ํ•˜์—ฌ ๊ฒฐํ•ฉํ•˜๋Š” ํ˜•ํƒœ์ด๊ธฐ ๋•Œ๋ฌธ์— ์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ๋ฅผ ์ด์šฉํ•œ๋‹ค๋Š” ์ ์ด ์žˆ์Šต๋‹ˆ๋‹ค.

 

๋˜ ํ•œ ๊ฐ€์ง€๋Š” ๋‘ ๋ฒˆ์งธ Observable์ด ๊ณ„์† ๋ฐœํ–‰๋œ๋‹ค ํ•˜๋”๋ผ๋„ ์ฒซ ๋ฒˆ์งธ Observable์ด ๋ฐœํ–‰๋˜์ง€ ์•Š์œผ๋ฉด ์•„๋ฌด๋Ÿฐ ์†Œ์šฉ์ด ์—†๊ธฐ ๋•Œ๋ฌธ์— ์ด๋กœ์จ ๋‹ค๋ฅธ Observable์€ ๋ฌดํ•œ์ • ๋Œ€๊ธฐ ์ƒํƒœ์— ์ด๋ฅด๊ฒŒ ๋˜๋ฉฐ ์ด ๊ณผ์ •์ด ์ง€์†๋  ๊ฒฝ์šฐ Memory Leak(๋ฉ”๋ชจ๋ฆฌ ๋ˆ„์ˆ˜) ํ˜„์ƒ์œผ๋กœ ์ด๋Ÿฌ์ด์ง€๊ธฐ๋„ ํ•ฉ๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ ์ด ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ๋Š” ์ฒซ ๋ฒˆ์งธ Observable์ด ๋ฐ˜๋“œ์‹œ onComplete ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋„๋ก ์œ ๋„ํ•˜๋Š” ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

 

๋งˆ์ง€๋ง‰์œผ๋กœ ๊ฒฐํ•ฉํ•  ์ˆ˜ ์žˆ๋Š” Observable ๊ฐฏ์ˆ˜๋Š” ์ตœ๋Œ€ 4๊ฐœ์ž…๋‹ˆ๋‹ค.

 

 

 

 

 

์กฐ๊ฑด ์—ฐ์‚ฐ์ž

์กฐ๊ฑด ์—ฐ์‚ฐ์ž๋Š” Observable ๊ฐ์ฒด ๋‚ด ๋ฐ์ดํ„ฐ์˜ ํ๋ฆ„์„ ์ œ์–ดํ•˜๋Š” ์—ฐ์‚ฐ์ž์ž…๋‹ˆ๋‹ค. ์šฐ๋ฆฌ๋Š” ์ด์™€ ๋น„์Šทํ•œ ์—ฐ์‚ฐ์ž๋กœ filter ์—ฐ์‚ฐ์ž๋ฅผ ๋ณผ ์ˆ˜ ์žˆ๋Š”๋ฐ, filter ์—ฐ์‚ฐ์ž๋Š” Observable ๊ฐ์ฒด ๋‚ด ๋ฐ์ดํ„ฐ์—์„œ ์›ํ•˜๋Š” ์กฐ๊ฑด์— ๋ถ€ํ•ฉํ•˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐœํ–‰ํ•˜๊ณ , ๋ฏธ๋ถ€ํ•ฉํ•˜๋Š” ๋ฐ์ดํ„ฐ๋Š” ๊ธฐ๊ฐํ•˜๋Š” ์—ฐ์‚ฐ์ด ๋ชฉ์ ์ด์—ˆ๋‹ค๋ฉด ์ง€๊ธˆ ๋‹ค๋ฃจ๊ณ ์ž ํ•˜๋Š” ์กฐ๊ฑด ์—ฐ์‚ฐ์ž๋Š” ๋ฐ์ดํ„ฐ์˜ ๋ฐœํ–‰ ์—ฌ๋ถ€ ๋ณด๋‹ค๋Š” ๊ทธ ํ๋ฆ„์„ ์ œ์–ดํ•˜๋Š” ์—ฐ์‚ฐ์ž๋ผ๊ณ  ๋ณด๋ฉด ๋˜๊ฒ ์Šต๋‹ˆ๋‹ค.

 

 

amb()

amb ์—ฐ์‚ฐ์ž๋Š” ์—ฌ๋Ÿฌ ๊ฐœ ๋“ค์–ด์˜ค๋Š” Observable ์ค‘์—์„œ ๊ฐ€์žฅ ๋จผ์ € ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฐœํ–‰๋˜๋Š” ๊ฒƒ์„ ์„ ํƒํ•˜๋Š” ์กฐ๊ฑด ์—ฐ์‚ฐ์ž์ž…๋‹ˆ๋‹ค. ๊ทธ๋ ‡๋‹ค๋ฉด ์ดํ›„ ๋ฐœํ–‰๋˜๋Š” Observable์€ ์–ด๋–ป๊ฒŒ ๋˜๋Š” ๊ฑด๊ฐ€์š”? ๊ณต๊ต๋กญ๊ฒŒ๋„ ์ „๋ถ€ ๊ณต์ค‘๋ถ„ํ•ด๋ฉ๋‹ˆ๋‹ค.. ใ… ใ… 

 

amb ์—ฐ์‚ฐ์ž์—์„œ amb๋Š” ambiguous(๋ชจํ˜ธํ•œ) ์ด๋ผ๋Š” ๋‹จ์–ด์—์„œ ํŒŒ์ƒ๋˜์—ˆ์œผ๋ฉฐ ๋ชจ๋“  Observable์ด ๋™์ผํ•˜๊ฒŒ ์žˆ๋Š” ๊ฐ€์šด๋ฐ์—์„œ๋„ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฐœํ–‰๋  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฌ๊ธฐ ๋ณด๋‹ค๋Š” ๊ทธ ์ค‘์—์„œ ๋จผ์ € ์˜ค๋Š” ํ•œ ๊ฐœ (์„ ์ฐฉ์ˆœ)์œผ๋กœ ๋ฐ›๋Š”๋‹ค๊ณ  ์ดํ•ดํ•˜์‹œ๋ฉด ๋˜๊ฒ ์Šต๋‹ˆ๋‹ค.

Disposable ap = Observable.amb(
    Arrays.asList(Observable.just("N.K").delay(2, TimeUnit.SECONDS), Observable.just("NEONKID"))
).subscribe(System.out::println);

์ด๋ฅผ ์‰ฝ๊ฒŒ ์ดํ•ดํ•˜๊ธฐ ์œ„ํ•ด delay ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์ฒ˜์Œ ๋ฐœํ–‰ํ•œ N.K ๋ผ๋Š” Observable ๊ฐ์ฒด์—๋Š” 2์ดˆ ๊ฐ„์˜ ์ง€์—ฐ์„ ์ฃผ๊ณ , ๊ทธ ๋‹ค์Œ์— ์žˆ๋Š” NEONKID๋ผ๋Š” Observable ๊ฐ์ฒด๋Š” ๋ฐ”๋กœ ๋ฐœํ–‰ํ•  ์ˆ˜ ์žˆ๋„๋ก ์ฒ˜๋ฆฌํ•˜๊ฒŒ๋” ๊ตฌํ˜„ํ•˜๋ฉด ๋จผ์ € ๋ฐœํ–‰์ด ๋œ NEONKID๊ฐ€ ์ถœ๋ ฅ๋จ์„ ์•Œ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

๋น„์Šทํ•œ ์—ฐ์‚ฐ์ž๋กœ ambWith ์—ฐ์‚ฐ์ž๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ๋ณด์‹œ๋‹ค์‹œํ”ผ ๊ธฐ์กด์˜ Observable ๊ฐ์ฒด์—์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ์—ฐ์‚ฐ์ž์ด๊ณ , amb ์—ฐ์‚ฐ์ž์™€ ๋‹ค๋ฅด๊ฒŒ Iterable์ด ์•„๋‹Œ ์ผ๋ฐ˜ Observable ๋ฐ์ดํ„ฐ๋ฅผ ์ธ์ž๋กœ ๋ฐ›๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.

 

 

takeUntil()

์ด ์—ฐ์‚ฐ์ž๋Š” ์กฐ๊ธˆ ํŠน์ดํ•œ ์—ฐ์‚ฐ์ž์ธ๋ฐ, ์‚ฌ์ „์— ๋จผ์ € ๋“ค์–ด์˜จ Observable ๊ฐ์ฒด๋ฅผ ๋ฐœํ–‰ํ•˜๊ณ  ๊ตฌ๋…ํ•˜๊ณ  ์žˆ๋Š” ์ƒํƒœ์—์„œ ์ธ์ž๋กœ ๋„˜๊ฒจ ๋ฐ›์€ Observable์ด ๋ฐœํ–‰๋˜๋ฉด ๋จผ์ € ๋ฐœํ–‰๋˜์—ˆ๋˜ Observable ๊ฐ์ฒด๋ฅผ ๋ฌด์‹œํ•˜๊ณ  ์ธ์ž๋กœ ๋ฐ›์€ Observable์„ ๋ฐœํ–‰ํ•œ ๋‹ค์Œ ์ž‘์—…์„ ๋งˆ์น˜๋Š” ์—ฐ์‚ฐ์ž์ž…๋‹ˆ๋‹ค.

 

ํ•„ํ„ฐ ์—ฐ์‚ฐ์ž์˜ take์™€ ์ฐจ์ด์ ์ด ์žˆ๋‹ค๋ฉด take๋Š” ํŠน์ • ๊ฐฏ์ˆ˜๋งŒ ๊ฐ’์„ ๋ฐœํ–‰ํ•˜๋˜, ์ž‘์—… ์™„๋ฃŒ ๊ธฐ์ค€์„ ๋‹ค๋ฅธ Observable ๊ฐ’์˜ ๋ฐœํ–‰์œผ๋กœ ํ•˜๊ฒ ๋‹ค๋Š” ๊ฒƒ์ด์ฃ . ์ฆ‰ ํŠน์ • Observable ๊ฐ์ฒด์˜ ๊ฐ’์ด ๋“ค์–ด์˜ค๋ฉด ๋‹ค๋ฅธ ๋ฐ์ดํ„ฐ์˜ ๋ฐœํ–‰์ด ์™„๋ฃŒ๋˜์ง€ ์•Š์•„๋„ ๋ฐ”๋กœ ์™„๋ฃŒ ์ด๋ฒคํŠธ๋ฅผ ๋‚˜ํƒ€๋‚ด๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

 

์ธ์ž๋กœ ๋“ค์–ด์˜ค๋Š” Observable์€ ๋‹จ์ผ Observable์„ ํฌํ•จํ•˜์—ฌ ๊ฒฐํ•ฉ๋œ ObservableSource(๋ณต์ˆ˜ Observable)์„ ํฌํ•จํ•ฉ๋‹ˆ๋‹ค. ํ•ด๋‹น Observable์ด ๋ฐœํ–‰๋˜๋ฉด ๋‹ค๋ฅธ ํ˜„์žฌ ์ง„ํ–‰๋˜๊ณ  ์žˆ๋Š” Observable์˜ ๋ฐœํ–‰์„ ์ฆ‰๊ฐ ์ค‘๋‹จํ•˜๊ณ  onComplete ์ด๋ฒคํŠธ๋ฅผ ๋ฐœ์ƒ์‹œํ‚ค๋Š” ์—ฐ์‚ฐ์ž์ž…๋‹ˆ๋‹ค.

 

 

 

skipUntil()

takeUntil ์—ฐ์‚ฐ์ž์™€๋Š” ์ •๋ฐ˜๋Œ€๋กœ ์ธ์ž๋กœ ์ฃผ์–ด์ง„ Observable์„ ๋ฐœํ–‰ํ•œ ๋‹ค์Œ๋ถ€ํ„ฐ ๊ธฐ์กด์— ์ฃผ์–ด์ง„ Observable ๊ฐ์ฒด์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐœํ–‰ํ•˜๊ฒ ๋‹ค๋Š” ์—ฐ์‚ฐ์ž์ž…๋‹ˆ๋‹ค. 

 

์ฃผ์–ด์ง„ ์ธ์ž์—๋Š” other๋ผ๋Š” Observable ์ธ์ž๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด other Observable์ด ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฐœํ–‰ํ•  ๋•Œ๊นŒ์ง€๋Š” ๊ธฐ์กด์— ์ž…๋ ฅ๋œ Observable ๊ฐ์ฒด์—์„œ๋Š” ์•„๋ฌด๋Ÿฐ ์ž‘์—…์ด ์ผ์–ด๋‚˜์ง€ ์•Š์œผ๋ฉฐ other Observable ๊ฐ์ฒด๊ฐ€ ๋ฐœํ–‰๋œ ๋’ค์— ๊ธฐ์กด Observable ๊ฐ์ฒด์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐœํ–‰ํ•˜๋Š” ์—ฐ์‚ฐ์ž์ž…๋‹ˆ๋‹ค.

 

 

 

all()

๋งˆ์ง€๋ง‰์œผ๋กœ all ์—ฐ์‚ฐ์ž๋Š” ์ฃผ์–ด์ง„ ๋ชจ๋“  ์กฐ๊ฑด์„ ๋ถ€ํ•ฉํ•˜๋Š” Observable์„ ๋ฐœํ–‰ํ•˜๋Š” ์—ฐ์‚ฐ์ž์ž…๋‹ˆ๋‹ค.

 

์ด ๋•Œ ๋ฐ›๋Š” ์ธ์ž๋Š” Predicate์ธ๋ฐ, Predicate๋Š” filter ์—ฐ์‚ฐ์ž์—์„œ ์‚ฌ์šฉํ–ˆ๋˜ ํƒ€์ž…์œผ๋กœ ๋žŒ๋‹ค ํ•จ์ˆ˜ ์ธ์ž์™€ ํ•จ๊ผ ์กฐ๊ฑด์„ ์ฃผ์–ด์ฃผ๋Š” ํƒ€์ž…์ž…๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ ์ด ๋žŒ๋‹ค์˜ ์กฐ๊ฑด์ด ์ฐธ์ด๋ฉด true ๊ฑฐ์ง“์ด๋ฉด false๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋ฉฐ true์ธ ๊ฐ’๋งŒ ๋ฐœํ–‰ํ•ฉ๋‹ˆ๋‹ค.

 

 

๋‹ค์Œ ํฌ์ŠคํŠธ์— ์ด์–ด์„œ 3ํŽธ(์ˆ˜ํ•™ ์—ฐ์‚ฐ์ž ๋ฐ ๊ธฐํƒ€ ์—ฐ์‚ฐ์ž)์„ ์ด์–ด๊ฐ€๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

 

 

์ฐธ๊ณ (์ด๋ฏธ์ง€): https://reactivex.io/RxJava/javadoc/

๋ฐ˜์‘ํ˜•
TAGS.

Tistory Comments