[RxJava] 3. Reactive 연산자 파헤쳐보기 1편 (생성, 변환 연산자)

반응형

2021/01/23 - [Programming/Java] - [RxJava] 2. Reactive 기본 연산자(Operator) - map, filter, reduce

 

[RxJava] 2. Reactive 기본 연산자(Operator) - map, filter, reduce

Reactive Programming에서 꽃이라고 할 수 있는 Reactive Operator에 대해 알아보도록 하겠습니다. 만약 여러분들이 Java 8을 사용하고 있다면, 이러한 Reactive 연산자가 매우 익숙하실 수도 있습니다. RxJava에.

blog.neonkid.xyz

지난 포스트에서 Reactive의 기본 연산자를 다뤘습니다. 기본 연산자를 다루면서 RxJava에서 제공하는 수많은 연산자가 있고 그 중에 기본적인 연산자가 map, filter, reduce라는 것을 알았습니다.

 

이번 포스트에서는 RxJava에서 자주 사용하는 주요 4가지 연산자인 생성, 변환, 결합, 조건 연산자에 대해서 간단히 정리해보고 이것이 어떻게 동작하는지에 대해 알아보도록 하는 시간을 1부와 2부로 나누어 전개를 하도록 하고자 합니다.

 

이번 1부에서는 생성과 변환 연산자에 대해 알아보죠.

 

 

 

 

생성 연산자

생성 연산자의 핵심은 데이터의 흐름을 만드는 것입니다. 간단하게 RxJava에서는 Observable, Single 등 이벤트 기반의 객체를 만들어내는 것을 말합니다.

 

우리가 보통 Java에서 객체를 인스턴스화 할 때는 new 연산자를 이용했습니다. 그러나 RxJava에서는 new 연산자를 이용하지 않고, just 메소드를 사용하는 등 팩토리 패턴 형태의 메소드를 사용하여 인스턴스화 하였습니다. 이러한 연산자를 생성 연산자라 합니다.

 

그러나 이 외에도 주기적으로 데이터를 생성하는 연산자, 특정 시간에 단 한 번의 데이터를 생성하는 연산자 등 create, just 처럼 바로 데이터를 발행하는 연산자가 아닌 일부 지연을 두고 데이터를 발행하고자 하는 연산자들에 대해 알아보겠습니다.

 

 

interval()

 

interval 연산자는 일정 시간 간격으로 데이터를 생성하는 연산자입니다. 이를 테면 주어진 시간 간격으로 증가하는 데이터의 변화를 감지하고자 할 때 쓰는 방법입니다.

 

  • 일정한 시간 마다 데이터 흐름 생성
  • Java의 Generic을 사용하므로 Primitive type이 아닌 Reference Class 사용
함수 원형 설명
interval(long period, TimeUnit unit) 일정 시간(period) 지연되었다 데아터 발행
interval(long initialDelay, long period, TimeUnit unit) 위 메소드와 동작은 같으나 최초 지연 시간만 조절 하는 메소드

자세히 보면 두 번째 메소드의 경우 첫 번째 메소드와 다르지 않은데, 첫 번째 발행에만 즉시 발행을 받고자 하는 경우 두 번째 메소드를 응용할 수 있습니다.

@SchedulerSupport(SchedulerSupport.COMPUTATION)

interval 메소드의 코드를 보면 SchedulerSupport라는 어노테이션이 있는데, 이 어노테이션은 interval 간격을 주어주는 스케줄러가 현재 스레드가 아닌 별도의 스레드를 사용하겠다는 것입니다. 따라서 interval 메소드를 사용하는 경우 현재 스레드에서 스케줄러가 돌지 않고 별도의 다른 스레드에서 스케줄러가 동작하게 됩니다.

 

 

 

timer()

 

timer 연산자는 interval과 유사하지만 일정 시간이 지난 후, 단 한 개의 데이터만 발행하고 종료가 되는 연산자입니다.

 

그림에서 보 듯, 한 번 발행 뒤에는 아무것도 하지 않습니다. 간단한 예시로 이기종의 애플리케이션에서 셋팅 값이나 데이터를 가지고 올 때를 제외하고는 변경 사항을 반영하지 않을 때 사용할 수 있습니다.

timer 연산자의 경우도 역시 Scheduler는 별도의 스레드에서 동작합니다. 그러나 interval, timer 메소드 모두 공통점이 한 가지 더 있는데요.

바로 자신이 직접 스레드를 만들거나 현재 스레드에서 스케줄링을 돌 수 있도록 직접 Schduler를 설정할 수 있는 방법이 있습니다. 이 때는 어노테이션을 CUSTOM으로 쓰기 보단, 그것에 맞게 제공하는 오버라이딩된 interval 메소드를 사용하시면 됩니다.

 

 

 

range()

 

range 연산자는 주어진 값 n 부터 m 까지의 데이터를 발행해주는 범위 연산자입니다. 가령 interval이나 timer에서는 Long 클래스를 사용했다면 range는 Integer 클래스를 사용하여 범위를 주어줍니다.

 

timer, interval 연산자와의 차이가 있다면 range 연산자는 스케줄러를 별도의 스레드로 이용하지 않고 연산자를 호출한 스레드가 직접 스케줄링한다는 점입니다. 

SchedulerSupport 어노테이션을 보다시피 별도의 스케줄러를 사용하지 않겠다는 NONE으로 지정되어 있습니다. 즉, 이 연산자는 지연 발행을 위한 연산자가 아니며 단순히 특정 범위를 지정하여 데이터를 발행하는 데 그 목적을 두고 있습니다. 

 

또 한 가지, range의 데이터 타입이 int인 것인데, long을 사용하기 위해서는 아래의 메소드가 별도로 제공됩니다.

rangeLong 연산자를 이용하면 더 큰 범위를 사용할 수도 있습니다.

 

 

 

intervalRange()

 

interval + range 연산자를 혼합한 연산자로써 지연된 발행과 범위 발행을 동시에 구현한 것입니다.

 

그런데, 이 메소드는 참고해야 할 사항이 있습니다. interval이 일정 시간 간격으로 값을 발행하지만 그것이 무한히 진행되는 것은 아닙니다. range에서 지정해준 범위만큼 일정 시간 간격으로 발행된 뒤에는 onComplete 이벤트가 발생한 순간 정지 됩니다.

interval, timer와 마찬가지로 Scheduler를 RxJava의 기본 스케줄러를 이용할 수도 있고, 정의할 수도 있습니다. 

 

 

 

defer()

 

defer 연산자는 timer와 그 기능이 유사하지만 데이터 흐름에 의한 생성을 구독자가 구독 메소드를 호출할 때까지로 시점을 정할 수 있으며 이 때 새로운 Observable 객체가 생성된다는 점이 다르다.

 

create와 just와 비교했을 때 이들 둘은 바로 JVM에 Observable 객체를 메모리 위에 올립니다. 그러나 defer는 구독자가 subscribe 메소드를 호출하여 구독 이벤트가 발생하기 전까지는 JVM에 Observable 스트림 생성을 일절 하지 않습니다. 

아마 이렇게 이야기 한다면 defer는 subscribe 이벤트가 발생할 때마다 Observable 스트림을 생성하기 때문에 더 비용이 클 것이라는 생각을 하실 수도 있을 것입니다. defer는 1초 정도 딜레이를 발생시키는 헤비 연산을 하기 때문에 느릴 것이라는 예상도 하겠지만 어차피 stream 연산 자체의 딜레이로 인하여 그다지 많은 차이가 일어나지 않습니다. 

(왜냐하면 getHeavyData 등의 연산은 UpStream Thread라는 별도의 I/O 스레드로 동작하기 때문입니다.)

 

오히려 장점으로 본다면 create와 just의 경우 onNext, onCompleted, onError 등과 같은 이벤트 호출을 정의하여야 하는데, 만약 이들의 콜백이 필요가 없다면 오히려 defer 메소드가 더 나은 효과를 보일 수도 있습니다.

 

 

 

repeat()

 

repeat 연산자는 단순히 반복을 위한 연산자입니다. 어떨 때 사용할 수 있을까요? 간단히 RxJava를 이용하여 HealthCheck 기능을 만든다고 한다면 주기적으로 ping 메소드 등을 이용하여 확인할 수 있겠죠?

repeat 연산자는 특별한 인자 코드 없이 동작합니다.

기본적으로 repeat 메소드는 repeat(2) 메소드를 호출하여 동작합니다. 따라서 반복 횟수를 인자로 정할 수도 있는데, 일반적으로 이를 지정하지 않으면 Long의 최대값까지를 호출하게 됩니다. 

 

 

 

 

 

변환 연산자

변환 연산자는 주어진 데이터의 흐름을 원하는 대로 변형하는 연산자입니다. 우리는 지난 포스트에서 기본 연산자인 map을 통해서 데이터의 모음을 자유자재로 변형하는 메소드를 알게 되었습니다.

 

그런데, map은 flatMap 등 여러 파생된 형태가 많은데,그 중에서도 flatMap은 Java 8의 Stream에서도 자주 보이는 것으로, RxJava에 있는 flatMap과 동일합니다. 이와 비슷하게 RxJava에서는 concatMap이라는 메소드가 있습니다.

 

 

concatMap()

 

concatMap은 flatMap과 유사합니다. flatMap은 먼저 들어온 데이터를 처리하는 중에 새로운 데이터가 들어오면 나중에 들어온 데이터를 먼저 처리해버리는 일종의 인터리빙(끼어들기) 현상을 포함하는 map 연산입니다.

 

하지만 concatMap은 인터리빙 효과가 있지 않은 대신 데이터가 변화하더라도 먼저 들어온 데이터 순서대로 처리를 진행합니다.

 

하지만 공교롭게도 flatMap이 계산 속도는 훨씬 빠릅니다. 왜 그럴까요? 

 

concatMap은 먼저 들어온 데이터를 순서를 보장한다는 특징을 가지고 있는데, 데이터가 중간에 들어오게 되면 데이터의 순서가 바뀌게 됩니다. 먼저 들어온 데이터 순서대로 처리하기 위해서는 어떤 것이 먼저 들어온 데이터인지를 맞춰줘야 합니다. 중간에 데이터가 바뀌어도 먼저 들어온 데이터가 처리할 수 있도록 추가 연산이 들어가기 때문에 flatMap보다는 concatMap의 연산이 속도가 느립니다.

 

 

 

switchMap()

 

concatMap이 인터리빙이 발생할 수 있는 상황에서 순서를 보장해주는 것이 특징이라면 switchMap은 순서를 보장하기 위해 데이터가 변화되면 변환 작업을 중단하는 좀 어마무시한 녀석입니다.

 

이 메소드의 목적은 데이터가 수시대로 변화는 상황에서 마지막 데이터만을 받고 싶을 때의 목적이 큽니다. 중간의 데이터가 어찌되었든 최종 데이터만 잘 가져온다면 다른 데이터는 무시해도 되거나, 해야 되는 상황에 적절합니다. 특정 상황에만 적중하기 때문에 가급적 코드의 동작 상태가 어떤지 알고 사용하는 것이 가장 좋은 연산자입니다.

 

가령 예를 들어 검색어 기능을 구현할 때 유리합니다. 사용자가 검색을 자음, 모음, 알파벳을 출력하여 입력을 하는데 실시간 조회를 해야하기 때문에 아주 짧은 시간마다 쿼리를 해야한다면 아직 응답 받지 못한 데이터는 무시하고 최근에 요청한 검색어만 처리할 수 있으므로 이 연산자로 유용하게 대처할 수 있습니다. 

 

특이하게 사용할 수 있는 특수 연산자이기 때문에 switchMap 만큼은 그 기능을 잘 알고 사용하는 것이 중요한 연산자입니다.

 

 

 

groupby()

 

일정 기준으로 단일 Observable을 여러 개로 이루어진 Observable 그룹(GroupObservable)으로 만드는 연산자입니다. Iterable과는 조금 다른 개념입니다.

 

Iterable은 하나의 Observable 객체로 Iterable 클래스를 감싸는 녀석이라면 groupby를 통해서 나오는 GroupedObservable은 실제 단일 Observable 객체를 여러 개를 반환하는 것을 말합니다.

코드를 보다시피 반환 타입이 GroupedObservable이라는 이름을 가지고 있고, 이는 Map과 유사하게 Key, Value라는 제네릭을 가지고 있습니다. keyselector를 이용하여 해당 Key를 기준으로 여러 개의 value를 반환하는 연산자입니다.

 

 

scan()

 

실행할 때마다 입력값에 맞는 중간 결과, 최종 결과를 구독자에게 발행하는 연산자입니다. reduce 연산자와 비슷한데, reduce는 데이터 모음 중에서 그것을 종합하여 최종 결과 1개만을 구독자에게 발행하는 반면, scan은 중간에 생긴 결과까지를 반환한다는 점입니다.

 

원본 데이터인 빨간색을 입력하고 중간에 연산된 데이터들을 차례차례로 계속 넣어줍니다. 그리고 마지막에 결과(파란색)을 중간 결과와 함께 출력해주는 형태입니다.

reduce와 또 다른 차이점이 있다면 코드를 보다시피 Maybe 타입을 반환하지 않고, Observable 타입을 반환한다는 점입니다. reduce의 경우 마지막 데이터가 NULL이거나 값이 없을 경우 혹은 onComplete 이벤트가 발생하지 않으면 구독자에게 값을 전달하지 않는 NULL-safety 처리가 포함되어 있는 반면, scan은 값이 입력될 때마다 그 결과를 반환해 중간에서 결과까지를 보여주는 메소드이기 때문에 NULL 처리를 필요로 하지 않죠.

 

지난 포스트에서 사용한 위의 reduce 코드를 작성하고 실행하면 아래처럼 하나의 결과만을 표시합니다.

 

모든 좌표를 전부 더해서 [9, 9]가 나오는 모습입니다.

 

이 코드 그대로 메소드만 scan으로 바꿔봅시다.

 

그러면 기존에 적혀져 있는 Point 좌표별로 연산한 중간 값을 모두 가져오는 모습입니다. 처음 0,0부터 시작하여 더하는 과정이 보이기도 하며 이를 필요로 하는 곳에 적정한 연산자라고 할 수 있습니다.

 

 

다음 포스트에 이어서 2편(결합, 조건 연산자)을 이어가도록 하겠습니다.

 

 

 

 

 

참고(이미지): https://reactivex.io/documentation

 

반응형
comments powered by Disqus

Tistory Comments 0