[RxJava] 4. Reactive 연산자 파헤쳐보기 2편 (결합, 조건 연산자)

반응형

 

2021/02/07 - [Programming/Java] - [RxJava] 3. Reactive 연산자 파헤쳐보기 1편 (생성, 변환 연산자)

 

[RxJava] 3. Reactive 연산자 파헤쳐보기 1편 (생성, 변환 연산자)

2021/01/23 - [Programming/Java] - [RxJava] 2. Reactive 기본 연산자(Operator) - map, filter, reduce [RxJava] 2. Reactive 기본 연산자(Operator) - map, filter, reduce Reactive Programming에서 꽃이라고..

blog.neonkid.xyz

이번 포스트는 지난 포스트에 이어 결합, 조건 연산자에 대해 알아보겠습니다.

 

 

 

결합 연산자

결합 연산자는 여러 개의 Observable 객체를 조합하여 활용하는 연산자로 다수의 Observable을 매개변수로 받아 하나의 Observable로 만들어주는 연산자들을 말합니다.

 

 

zip()

먼저 첫 번째로 zip 연산자가 있습니다. zip 연산자는 두 개 이상의 Observable 객체를 이용하여 하나의 ObservableSource로 결합해주는 연산자인데, 이 연산자에는 다소 의문점이 있습니다.

 

두 데이터가 발행되기 전의 상태라면 어떨까요? 데이터가 발행되지 않은 상태라면 데이터를 받을 때까지 기다려야합니다. zip 연산자는 두 연산자 중 하나의 연산자라도 발행된 상태가 아니라면 데이터가 발행될 때까지 대기합니다.

 

zip 연산자는 최대 9개의 Observable을 결합할 수 있습니다. 

Disposable disposable = Observable.zip(
    Observable.just(300, 600, 900),
    Observable.just(30, 60, 90),
    Observable.just(3, 6, 9),
    (x, y, z) -> x + y + z
).subscribe(System.out::println);

disposable.dispose();

다수의 데이터가 들어 있는 Observable 객체를 결합할 때는 각각의 인자에 맞게 결합되며 이를 결합할 떄 사용할 연산을 lambda로 정의할 수 있습니다.

 

비슷한 연산자로 zipWith가 있습니다.

 

기본 골격은 zip 연산자와 같지만 기존에 반환된 단일 Observable 객체에서 또다른 zip 연산을 취하고자 하는 경우에 사용합니다. 가령 예를 들어서 아래와 같이 사용할 수 있습니다.

Disposable dp = Observable.just(300, 600, 900).zipWith(
    Observable.just(30, 60, 90), (x, y) -> x * y
).subscribe(System.out::println);

인자가 ObservableSource로 하나의 연산이 들어갈 수 있습니다. 따라서 여러 개의 Observable을 결합하고자 하는 경우 ObservableSource로 결합한 다음, zipWith를 사용하여 한꺼번에 결합할 수 있습니다.

Disposable disposable = Observable.zip(
    Observable.just(300, 600, 900),
    Observable.just(30, 60, 90),
    Observable.just(3, 6, 9),
    (x, y, z) -> x + y + z
).zipWith(Observable.just(0, 0, 0), (xyz, z) -> xyz * z).subscribe(System.out::println);

위에서 zip 연산자로 결합한 ObservableSource 값에 이어서 zipWith를 사용하여 결합하면 이를 한 번에 조합한 뒤 다음의 정해진 결합 데이터로 더불어 결합이 가능합니다.

 

 

combineLatest()

zip 연산자가 데이터의 발행이 이뤄질 때까지 기다렸다 각각의 Observable 객체 내 데이터를 결합해주는 연산자였다면 combineLatest 연산자는 2개 이상의 Observable 객체를 기반으로 각기 값이 변경되었을 때 이를 갱신해주는 연산자입니다.

 

코드를 잘보면 마지막 연산자에 combiner가 들어갑니다. 이는 zip 연산자에서 zipper와 동일한 역할을 하는 Lambda 함수라는 것을 알 수 있으며 두 Observable을 결합하여 어떤 Observable을 만들어 주도록 합니다.

 

zip 연산자와 다른 점은 zip 연산자는 한 번 데이터가 발행되면 그 이후로 zipper 함수는 동작하지 않지만 combineLatest는 값이 변경될 때마다 combiner 함수를 재실행하기 때문에 수시로 데이터가 변동되었을 때의 이벤트를 만들고자 할 때 유리합니다.

 

단, 아무도 데이터가 발행되지 않은 상태에서 한 개의 Observable에서만 데이터가 발행됐다거나 또 다른 한 개의 Observable에서만 데이터의 변화가 생긴다면 그 때는 이 연산이 수행되지 않습니다. 최소 두 값이 모두 발행된 상태에서 연산이 시작되며 그 후에는 어느 Observable 에서든지 재발행 이벤트가 발생하면 데이터를 갱신하게 됩니다.

 

최대 9개의 Observable 객체를 넣을 수 있으며 2개의 인자에 Observable을 각각 넣고 combiner에 원하는 연산을 구현하면 됩니다.

 

 

 

merge()

merge 이름 자체는 결합 연산자인 것이 보이지만 zip과 combineLatest와는 다소 차이가 있습니다. 결합을 위해서는 단일 Observable이 아닌 두 개 이상의 Observable을 넣어야 합니다. 그런데 merge 연산자는 순서나 데이터 발행 여부와 관계 없이 UpStream에서 먼저 입력되는 데이터를 그대로 발행합니다.

 

메소드 인자를 보면 한 가지 더 차이를 볼 수 있습니다. combineLatest와 zip의 경우 결합할 떄 중간 연산을 수행할 수 있도록 zipper, combiner가 주어졌지만 merge에서는 어떠한 모습도 찾아볼 수 없습니다. 따라서 merge는 단순히 데이터만을 결합하는 연산자라고 할 수 있습니다.

 

비슷한 연산자로 zip과 동일하게 mergeWith가 존재합니다.

 

mergeWith 역시 zipWith와 사용법이 비슷하지만 merge와 마찬가지로 combiner와 zipper와 같은 lambda 함수 인자가 주어져 있지 않습니다.

Disposable mp = Observable.just(300, 600, 900).mergeWith(
    Observable.just(30, 60, 90)
).subscribe(System.out::println);

 

 

concat()

 

마지막으로 concat 연산자입니다. concat 연산자는 이름 그대로 객체를 이어 붙여주는 연산자입니다. 2개 이상의 Observable 객체를 이어 붙여 하나의 ObservableSource로 만들어주는 연산자로 위에서 본 결합 연산자와 다소 차이가 있습니다.

 

보다시피 별다른 스케줄러를 통해 연산 처리를 하진 않습니다. 그러나 첫 번째 Observable에 onComplete 이벤트가 발생해야만 두 번째 Observable을 구독하여 결합하는 형태이기 때문에 여러 스레드를 이용한다는 점이 있습니다.

 

또 한 가지는 두 번째 Observable이 계속 발행된다 하더라도 첫 번째 Observable이 발행되지 않으면 아무런 소용이 없기 때문에 이로써 다른 Observable은 무한정 대기 상태에 이르게 되며 이 과정이 지속될 경우 Memory Leak(메모리 누수) 현상으로 이러이지기도 합니다. 따라서 이 연산자를 사용할 때는 첫 번째 Observable이 반드시 onComplete 이벤트가 발생할 수 있도록 유도하는 코드를 작성해야 합니다.

 

마지막으로 결합할 수 있는 Observable 갯수는 최대 4개입니다.

 

 

 

 

 

조건 연산자

조건 연산자는 Observable 객체 내 데이터의 흐름을 제어하는 연산자입니다. 우리는 이와 비슷한 연산자로 filter 연산자를 볼 수 있는데, filter 연산자는 Observable 객체 내 데이터에서 원하는 조건에 부합하는 데이터를 발행하고, 미부합하는 데이터는 기각하는 연산이 목적이었다면 지금 다루고자 하는 조건 연산자는 데이터의 발행 여부 보다는 그 흐름을 제어하는 연산자라고 보면 되겠습니다.

 

 

amb()

amb 연산자는 여러 개 들어오는 Observable 중에서 가장 먼저 데이터가 발행되는 것을 선택하는 조건 연산자입니다. 그렇다면 이후 발행되는 Observable은 어떻게 되는 건가요? 공교롭게도 전부 공중분해됩니다.. ㅠㅠ

 

amb 연산자에서 amb는 ambiguous(모호한) 이라는 단어에서 파생되었으며 모든 Observable이 동일하게 있는 가운데에서도 모든 데이터가 발행될 때까지 기다리기 보다는 그 중에서 먼저 오는 한 개 (선착순)으로 받는다고 이해하시면 되겠습니다.

Disposable ap = Observable.amb(
    Arrays.asList(Observable.just("N.K").delay(2, TimeUnit.SECONDS), Observable.just("NEONKID"))
).subscribe(System.out::println);

이를 쉽게 이해하기 위해 delay 연산자를 사용해서 처음 발행한 N.K 라는 Observable 객체에는 2초 간의 지연을 주고, 그 다음에 있는 NEONKID라는 Observable 객체는 바로 발행할 수 있도록 처리하게끔 구현하면 먼저 발행이 된 NEONKID가 출력됨을 알 수 있습니다.

 

비슷한 연산자로 ambWith 연산자가 있습니다. 보시다시피 기존의 Observable 객체에서 사용할 수 있는 연산자이고, amb 연산자와 다르게 Iterable이 아닌 일반 Observable 데이터를 인자로 받고 있습니다.

 

 

takeUntil()

이 연산자는 조금 특이한 연산자인데, 사전에 먼저 들어온 Observable 객체를 발행하고 구독하고 있는 상태에서 인자로 넘겨 받은 Observable이 발행되면 먼저 발행되었던 Observable 객체를 무시하고 인자로 받은 Observable을 발행한 다음 작업을 마치는 연산자입니다.

 

필터 연산자의 take와 차이점이 있다면 take는 특정 갯수만 값을 발행하되, 작업 완료 기준을 다른 Observable 값의 발행으로 하겠다는 것이죠. 즉 특정 Observable 객체의 값이 들어오면 다른 데이터의 발행이 완료되지 않아도 바로 완료 이벤트를 나타내는 것입니다.

 

인자로 들어오는 Observable은 단일 Observable을 포함하여 결합된 ObservableSource(복수 Observable)을 포함합니다. 해당 Observable이 발행되면 다른 현재 진행되고 있는 Observable의 발행을 즉각 중단하고 onComplete 이벤트를 발생시키는 연산자입니다.

 

 

 

skipUntil()

takeUntil 연산자와는 정반대로 인자로 주어진 Observable을 발행한 다음부터 기존에 주어진 Observable 객체의 데이터를 발행하겠다는 연산자입니다. 

 

주어진 인자에는 other라는 Observable 인자가 있습니다. 이 other Observable이 데이터가 발행할 때까지는 기존에 입력된 Observable 객체에서는 아무런 작업이 일어나지 않으며 other Observable 객체가 발행된 뒤에 기존 Observable 객체의 데이터를 발행하는 연산자입니다.

 

 

 

all()

마지막으로 all 연산자는 주어진 모든 조건을 부합하는 Observable을 발행하는 연산자입니다.

 

이 때 받는 인자는 Predicate인데, Predicate는 filter 연산자에서 사용했던 타입으로 람다 함수 인자와 함꼐 조건을 주어주는 타입입니다. 따라서 이 람다의 조건이 참이면 true 거짓이면 false를 반환하며 true인 값만 발행합니다.

 

 

다음 포스트에 이어서 3편(수학 연산자 및 기타 연산자)을 이어가도록 하겠습니다.

 

 

참고(이미지): https://reactivex.io/RxJava/javadoc/

반응형

Tistory Comments 0