[RxJava] 5. Reactive 연산자 파헤쳐보기 3편 (수학 연산자 및 기타 연산자)

반응형

 

2021/02/21 - [Programming/Java] - [RxJava] 4. Reactive 연산자 파헤쳐보기 2편 (결합, 조건 연산자)

 

[RxJava] 4. Reactive 연산자 파헤쳐보기 2편 (결합, 조건 연산자)

2021/02/07 - [Programming/Java] - [RxJava] 3. Reactive 연산자 파헤쳐보기 1편 (생성, 변환 연산자) [RxJava] 3. Reactive 연산자 파헤쳐보기 1편 (생성, 변환 연산자) 2021/01/23 - [Programming/Java] - [RxJ..

blog.neonkid.xyz

지난 포스트에 이어서 RxJava의 기본을 다루는 연산자 마지막 편 수학 연산자 및 기타 연산자에 대해 알아보겠습니다.

이번 포스트에서 다룰 수학 연산자는 우리가 자주 사용하는 Sum, Average 와 같은 수학 연산자들을 이야기하는데, 이를 좀 더 Reactive 하게 다룰 수 있는 연산자로 제공하며 그 외 기타 연산자로는 지난 포스트에서도 강조했던 배압 이슈와 관련된 연산자 등 특수하게 다루는 연산자들을 이야기 합니다.

 

 

 

 

수학 연산자

수학 연산자는 여러 개의 숫자형 데이터로 이루어진 Array 등을 이용하여 합, 평균 등의 계산을 수행하여 하나의 Observable 객체로 만들어주는 연산자입니다.

 

본래 RxJava에서 수학연산자는 1.0에서 지원했던 디펜던시였습니다. 현재 RxJava 1.x에서 사용하려면 우리가 기본적으로 사용했던 RxJava의 디펜던시만으로는 사용할 수 없습니다. 따라서 Math 관련된 연산자는 별도의 다른 디펜던시를 추가해야 합니다.

implementation "io.reactivex.rxjava-math:1.0.0"

본래 RxJava 1.x에서는 이러한 디펜던시를 추가하지 않아도 바로 수학 연산자를 사용할 수 있었지만 RxJava 2.x 버전부터 레포지터리가 분리되면서 수학 관련된 연산자는 별도의 디펜던시를 설치해줘야 합니다. 

 

이러한 디펜던시는 RxJava Companion 연산자로 불리워 RxJava 2.x 이상에서는 호환되지 않고, 공식적으로 개발하여 제공하지 않습니다.

 

RxJava 2.x 이상 버전에서 이러한 연산자를 사용하기 위해서 이를 별도로 포팅을 해주신 RxJava 2의 핵심 기여자이신 David Karnok이 이러한 Extension을 공개하여 현재 Maven에서 제공되고 있습니다.

implementation "com.github.akarnokd:rxjava3-extensions:3.0.1"

 

 

 

sum()

 

앞서 말한 듯이 대표적으로는 sum은 여러개의 숫자형으로 이루어진 Array, List (하나의 Observable 혹은 Flowable) 등을 이용하여 합계를 하나의 Observable로 반환해주는 연산자입니다. 

 

위 문서는 RxJava 1.x에서 제공되었던 문서를 가져온 것이며 그림과 같이 X Observable이 있고, 해당 객체에서 Iterable 형태의 레퍼런스 인스턴스에서 자료를 빼와 합계를 하나의 객체로 나타내주고 있으며 각 자료형별로 sumInteger, sumLong, sumFloat가 존재합니다.

// RxJava 1.x
MathObservable.sumInteger(Observable.range(1, 4)).subscribe(System.out::println);

// RxJava 2.x or 3.x
Observable.range(1, 4).to(MathObservable::sumInt).subscribe(System.out::println);

공식적으로 제공하는 것과 Extension으로 제공하는 것에 사용법이 약간 차이가 있는데, 일단 메소드 이름이 RxJava Extension을 사용하는 경우 sumInt로 사용할 수 있습니다. 

 

 

 

average()

 

average 역시 sum과 마찬가지로 같은 데이터 형식을 사용합니다. 다만 average의 경우 정수가 나올 수도 있고, 소숫점 값이 나올 수도 있기 때문에 averageFloat와 averageDouble 메소드만을 가지고 있습니다.

 

// RxJava 1.x
MathObservable.averageFloat(new Integer[]{30, 60, 25, 18, 41}).subscribe(System.out::println);

// RxJava 2.x or 3.x
Observable.fromArray(new Integer[]{30, 60, 25, 18, 41}).to(MathObservable::averageFloat)
                .subscribe(System.out::println);

average의 경우는 메소드 이름의 변화가 없습니다. 그런데 sum도 그렇고 average고 그렇고 코드 사용에 조금 차이가 있는데요. 1.x의 경우는 MathObservable이라는 별도의 객체가 존재하고, Extension은 이것이 없습니다.

 

사실 코드를 열어보면 1.x에서는 Observable을 Base로 하여금 MathObservable 클래스를 만들어서 해당 메소드를 정의한 것이고, 2.x에서 제공하는 Extension의 경우 어차피 Base가 Observable이기 때문에 굳이 그것을 클래스에 정의할 필요 없이 이와 관련된 메소드를 static으로 제공함으로써 결과를 반환하는 것으로 구현되어 있습니다.

 

따라서 Extension과 기존 코드의 차이는 거의 없으며 단지 코드의 모습을 단순화 하기 위해 리팩토링 한 것 뿐이라고만 보면 되겠습니다.

 

 

 

min()

 

min 역시 같은 유형의 데이터로 최솟값을 하나의 Observable로 반환하는 메소드입니다.

 

// RxJava 1.x
MathObservable.min(new Integer[]{30, 60, 25, 18, 41}).subscribe(System.out::println);

// RxJava 2.x or RxJava 3.x
Observable.fromArray(new Integer[]{30, 60, 25, 18, 41}).to(MathObservable::min)
                .subscribe(System.out::println);

위 사용법 그대로 max 메소드를 사용할 수 있습니다.

 

 

 

max()

 

max는 데이터 모음에서 최댓값을 하나의 Observable로 반환하는 메소드입니다.

 

// RxJava 1.x
MathObservable.max(new Integer[]{30, 60, 25, 18, 41}).subscribe(System.out::println);

// RxJava 2.x or RxJava 3.x
Observable.fromArray(new Integer[]{30, 60, 25, 18, 41}).to(MathObservable::max)
                .subscribe(System.out::println);

그런데, 한 가지 의문점이 있습니다. 1.x에서는 MathObservable을 별도로 이용하고, Extension에서는 기존 구현체에서 사용하도록 되어 있는데, 이게 가능한 게 to 메소드가 눈에 띄는데요. to 메소드는 무엇인걸까요?

 

to 메소드는 RxJava에서 다른 타입으로 변환해주기 위한 메소드입니다. 많은 데이터들 모음에서 하나의 Observable로 반환하기 위해서는 Observable<List<Integer>> -> Obserable<Integer> 형식으로 변환해야하는 것이라고 보면 되겠습니다.

 

비슷한 메소드로 toFlowable 메소드가 있습니다. Flowable은 RxJava에서 배압 이슈를 완화하기 위한 클래스인데요. Observable로 가지고 있던 객체를 Flowable로 변환하기 위해 사용하는 메소드입니다. 

// RxJava 2.x or 3.x
Observable.fromArray(new Integer[]{30, 60, 25, 18, 41}).toFlowable(BackpressureStrategy.BUFFER)
                .to(MathFlowable::min).subscribe(System.out::println);

Flowable로 변형할 때 주는 인자는 바로 배압 전략인데요. 배압 전략에 따라서 많은 데이터량을 생산하고 소비할 때를 제어할 수 있습니다. 이 부분은 다른 포스트에서 상세히 다뤄보도록 하겠습니다.

 

 

 

count()

 

RxJava에서 사용 가능한 수학 연산자 중 유일하게 Extension에 의존하지 않는 연산자인 count는 어느 버전이든지 Observable 객체에서 바로 사용할 수 있는 연산자입니다.

 

Observable.range(1, 4).count().subscribe(System.out::println);

Extension에 의존하지 않기 때문에 모든 버전에서 위와 같은 동일한 코드를 사용합니다. 생산해서 들어오는 데이터들에 대한 갯수를 통계하고자 할 떄 사용할 수 있습니다.

 

 

 

 

기타 연산자

그 외 연산자들에 대해서는 지연과 관련된 연산자가 있습니다. 지난 포스트에서 잠깐 다뤘지만 데이터를 생산하고 소비하는 과정에서 sleep과 비슷한 역할을 하는 delay, timeInterval 함수 등의 시간 관련 연산자들을 보통 유틸리티 연산자라고 합니다. 

 

 

delay()

 

amb 연산자를 사용했을 때 그 차이를 알아보기 위해서 잠깐 사용했던 연산자인데요. 이런식으로 데이터의 발행을 잠시 지연시키는 역할을 하는 연산자입니다.

 

Observable.amb(Arrays.asList(Observable.just("N.K").delay(2, TimeUnit.SECONDS), Observable.just("NEONKID")))
	.subscribe(System.out::println);

시간 관련 연산자들의 특징으로는 다른 연산자들과 다르게 스케줄러를 별도의 스레드로 이용한다는 것인데요. 만약 데이터를 다루는 스레드와 같이 이루어진다면 데이터의 연결성을 보장 받기가 어렵겠죠?

 

 

 

timeInterval()

 

timeInterval 연산자는 어떤 값을 생산하는 데 걸리는 시간을 반환하는 연산자입니다. 서버 측에서 데이터를 받아오는 데만을 사용한다면 이 연산자를 사용하는 것도 좋겠죠?

 

Observable<Timed<Integer>> src = Observable.range(20, 100).delay(item -> {
    Thread.sleep(new Random().nextInt(500));
    return Observable.just(item);
}).timeInterval();

Disposable disposable = src.subscribe(System.out::println);
disposable.dispose();

쉬운 설명을 위해 20 ~ 100까지의 데이터를 range 메소드로 하여금 array 데이터를 생성하여 delay를 랜덤하게 주고 그 결과를 구독받는 코드를 생성해봤습니다.

 

이전 데이터를 기준으로 집계했을 때 117, 118, 119가 각각 발행될 때를 출력한 모습입니다. 116이 발행되고 117이 발행됐을 떄 걸린 시간은 229ms, 118이 발행됐을 때까지 걸린 시간은 161ms 이런식으로 표시되어 각 발행 데이터가 평균적으로 얼마 정도 걸리는지를 측정할 수 있도록 구현할 수 있습니다.

 

현재 코드는 Thread에서 sleep 메소드를 이용하여 최대 500ms까지 무작위로 숫자를 생성하여 걸리는 시간을 측정하였습니다.

 

 

 

 

마치며...

길고 긴 RxJava 연산자 시리즈 Reactive 연산자 파헤쳐보기 글이 마무리 되었는데요. 자세히 보면 우리가 사용하는 메소드들과 큰 차이는 없지만 이벤트 기반으로 동작하고 그에 해당하는 이슈인 배압 이슈를 올바르게 처리할 수 있도록 해주는 하나의 솔루션이었음을 확인헐 수 있었습니다.

 

 

반응형

Tistory Comments 0