[RxJava] 1. RxJava의 기본 - Observable

지난 포스트에서 Observable에 대해 간단히 다뤄봤고, 이에 따른 리액트 함수에 대해 알아보며 각 차이를 알아봤습니다.

 

이번 포스트에서는 Observable을 좀 더 심오하게 다뤄보고, 이에 응하는 Single, Maybe 등 다양한 클래스 등을 공부하면서 RxJava와 좀 더 친숙해지는 시간을 가져보도록 하겠습니다.

 

 

 

 

Observer Design Pattern

RxJava의 근간이 되는 Observable 클래스는 소프트웨어 디자인 패턴인 Observer Pattern을 근간으로 되어 있습니다.

 

 

옵서버 패턴 - 위키백과, 우리 모두의 백과사전

위키백과, 우리 모두의 백과사전. 옵서버 패턴(observer pattern)은 객체의 상태 변화를 관찰하는 관찰자들, 즉 옵저버들의 목록을 객체에 등록하여 상태 변화가 있을 때마다 메서드 등을 통해 객체

ko.wikipedia.org

위키에 나와 있는 것처럼 옵저버 패턴은 Listener 인터페이스를 사용하는 것과 유사합니다. Listener의 경우, 인터페이스 등의 추상적인 Object로 만든 후 이를 원하는 Object에 인자로 넣어 사용하고, 이를 원하는 함수에 넣으면 이벤트 기반의 프로그래밍이 완성되는 것이죠.

 

프로그램에서 사용자의 입력 혹은 이벤트에 의하여 객체의 상태가 변경될 경우 이를 관찰하고 있는 Listener(Observer)는 이 변화를 감지하고 정해진 로직을 수행하며 프로그램에 이벤트를 더하는 방식입니다.

 

 

 

 

Hot Observable / Cold Observable

이러한 옵저버 패턴을 이용하여 만들어진 Observable에는 두 가지 종류가 있는데, 가령 데이터를 로딩하고 바로 그 로딩된 내용을 반영할 것이냐, 아니면 사용자의 요청이 있을 때까지 기다릴 것이냐라는 의미의 두 가지 종류가 있습니다.

 

실제로 이벤트 기반의 프로그래밍이 자주 요구되는 곳은 바로 데이터 통신이 잦은 소프트웨어입니다. 그러나 여기서 데이터 통신이란 서버와 클라이언트 간의 데이터 교환이라고만 할 수는 없습니다. 객체 간의 데이터가 자주 왕래하는 소프트웨어도 이벤트 기반의 프로그래밍이 요구됩니다. 예를 들어, 마우스 클릭의 이벤트가 잦은 ERP 등의 소프트웨어가 있습니다. 

 

  • Hot Observable

    구독 함수 호출 여부(구독자의 존재 여부)와 관계 없이 데이터를 발행하며 여러 구독자가 이 객체를 관찰할 수 있지만 모든 구독자에 대한 데이터의 구독 여부를 확인하는 것은 불가능하다.

    구독한 시점부터 발행한 값을 받기 때문에 이러한 작업에 적합한 이벤트들은 마우스 이벤트와 센서 이벤트 등 하드웨어 이벤트가 대부분이다.

    여러 구독자가 구독하는 것이 가능하지만 대량으로 데이터가 발생했을 때의 배압을 고려해야 한다. 여기서 배압이란, 발행하는 데이터보다 구독하는 속도가 느려지는 경우 생기는 문제를 이야기 한다. 이를 대체하는 것으로 Flowable이 있다.


  • Cold Observable

    구독 함수가 호출되어 있지 않다면 일절 메시지 발행을 하지 않는 Observable이다. 비슷한 단어로 Lazy Loading 기법이라고도 이야기 한다. ORM에서도 RelationShip이 양쪽으로 성립되어 있는 Entity일 경우 이 Lazy Loading을 많이 사용하는데, RxJava에서 Cold Observable은 REST API와 통신시 많이 이용하는 방법이다.

    애플리케이션이 실행되었을 때는 서버와 호출하지 않다가 특정 이벤트가 발생되었을 때 서버와 연결을 시작함으로써 데이터를 발행하고, 결과를 받아 이벤트를 발생시키는 대표적인 Observable이기도 하다.

설명이 조금 어려울 수도 있을 것 같은데, 간단히 설명하자면 Hot Observable은 구독과 관계 없이 메시지를 계속 받는 형태로써 가장 많은 리소스를 사용하는 형태를 말합니다. 가령 마우스 이벤트나 키보드 이벤트 등 사용자의 무한한 입력을 계속 받아야 하는 경우에 적합하며 비교적 이벤트 자체가 무거운 서버와의 통신에는 적합하지 않습니다. 왜냐하면 서버의 과부하를 일으키는 원인이 되기 때문이죠.

 

REST API의 특성상 필요할 떄 한 번 호출하고 연결을 끊는 형태인 Cold Observable이 적합할 것입니다. 내가 필요할 때 구독 함수를 호출하고, 이 때 메시지 발행시 구독 함수에서 반응하면 적합한 리소스를 가지고 원하는 프로그램을 만드는 게 가능하기 때문입니다. 대표적인 Observable로 Connectable이 있습니다. Connectable은 connect 메소드가 호출되었을 때만 데이터를 발행합니다.

 

 

 

 

Single 

Observable에 대한 개념 이해가 되었다면 이제 하나씩 Observable을 사용해보도록 하겠습니다. 지난 포스트에서는 Observable이라는 순수 클래스를 사용하였는데, 이번 포스트에서는 Single이라는 클래스를 사용해서 이벤트 기반의 프로그래밍을 한 번 구현해보겠습니다.

 

Single 클래스는 RxJava 1.x에 처음 등장한 Observable 클래스의 특수한 형태로 오직 하나의  데이터를 넣을 수 있는 Observable 클래스입니다. 지난 포스트에서 사용했던 Observable 클래스는 최대 10개의 데이터까지를 인자로 받을 수 있고, 인자가 아니어도 무제한으로 데이터를 발행할 수 있었지만 Single은 오직 한 개의 데이터만을 받을 수 있습니다.

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;


/**
 * Created by Neon K.I.D on 1/4/21
 * Blog : https://blog.neonkid.xyz
 * Github : https://github.com/NEONKID
 */
public class Main {
    public static void main(String[] args) {
        Single<String> single = Single.just("Hello");

        Disposable disposable = single.subscribe(System.out::println, Throwable::printStackTrace);
        disposable.dispose();
    }
}

Single 클래스 역시 Observable과 동일하게 리액트 함수를 사용하고, 구독하는 방법도 모두 같습니다. 다만 단일의 데이터만을 받을 수 있기 때문에 오직 하나의 데이터에 대해서만 사용할 수 있습니다.

 

따라서 REST API의 특정 엔드포인트에 대한 결과를 받을 때 유용합니다. okhttp3 등의 라이브러리에서 제공하는 Callback 인터페이스를 이용해서 특정 URI의 주소를 호출하고, 그 데이터를 받았을 때 프로그램을 어떻게 제어를 할 것인지를 결정하기 아주 간단하고 좋습니다.

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import okhttp3.OkHttpClient;
import retrofit2.Call;
import retrofit2.Callback;
import retrofit2.Response;
import retrofit2.Retrofit;
import retrofit2.converter.gson.GsonConverterFactory;
import retrofit2.http.GET;


/**
 * Created by Neon K.I.D on 1/4/21
 * Blog : https://blog.neonkid.xyz
 * Github : https://github.com/NEONKID
 */
public class Main {
    interface API {
        @GET(value = "/")
        Call<String> hello();
    }

    public static void main(String[] args) {
        OkHttpClient client = new OkHttpClient.Builder().addInterceptor(chain -> null).build();

        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("http://127.0.0.1")
                .client(client)
                .addConverterFactory(GsonConverterFactory.create())
                .build();

        API api = retrofit.create(API.class);

        Single<String> single = Single.create(emitter -> {
            api.hello().enqueue(new Callback<String>() {
                @Override
                public void onResponse(Call<String> call, Response<String> response) {

                }

                @Override
                public void onFailure(Call<String> call, Throwable t) {

                }
            });
        });

        Disposable disposable = single.subscribe(System.out::println, Throwable::printStackTrace);
        disposable.dispose();
    }
}

REST API 클라이언트 라이브러리로 Retrofit2를 사용한다고 가정한다면 위와 같이 코딩하실 수 있습니다. Single 클래스를 하나만들고, create 함수를 이용하여 커스터마이징 할 수 있습니다.

 

단, Single 클래스의 경우 Observable과는 달리 onSuccess와 onError 함수만을 제공하기 때문에 이를 참고하시고 잘 사용하시는 게 좋겠습니다.

 

 

 

 

Maybe

다음은 Maybe 클래스입니다. Maybe 클래스는 RxJava 2.x에서 나온 클래스로 Single 클래스와 마찬가지로 오직 하나의 데이터만을 가질 수 있지만 구독 함수가 호출되지 않아도 데이터 발행을 진행할 수 있는 Hot Observable 클래스입니다.

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;


/**
 * Created by Neon K.I.D on 1/4/21
 * Blog : https://blog.neonkid.xyz
 * Github : https://github.com/NEONKID
 */
public class Main {
    public static void main(String[] args) {
        Maybe<String> maybe = Maybe.just("HELLO");

        Disposable disposable = maybe.subscribe(System.out::println, Throwable::printStackTrace);
        disposable.dispose();
    }
}

Single 클래스와 마찬가지로 위와 같이 사용할 수 있습니다.

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;


/**
 * Created by Neon K.I.D on 1/4/21
 * Blog : https://blog.neonkid.xyz
 * Github : https://github.com/NEONKID
 */
public class Main {
    public static void main(String[] args) {
        Observable<String> observable = Observable.just("SINGLE");
        Maybe<String> maybe = Maybe.fromObservable(observable);

        Disposable disposable = maybe.subscribe(System.out::println, Throwable::printStackTrace);
        disposable.dispose();
    }
}

그러나 보통의 경우 위와 같이 Observable로 사용했다가 Maybe로 오는 케이스가 꽤 있는데요. 사실 단일 데이터로만 사용한다면 이런 경우는 거의 없겠지만 복수의 데이터(Map, Set, List)들이 존재하는 Collections의 경우에서 특정 데이터가 들어왔을 때 사용하는 방식으로 많이 응용합니다.

 

이 외에도 reduce와 같은 리액티브 연산 함수와 같이 응용하는 경우도 있는데, 아직 리액티브 연산 함수를 들어가지 않았기 때문에 이 내용은 잠시 미뤄두도록 하겠습니다.

 

 

 

 

마치며...

Observable을 좀 더 심층적으로 사용할 수 있는 방법에 대해 알아봤습니다. 이벤트 기반의 프로그래밍은 기본적으로 스레드나 select와 같은 비동기적인 부분들이 많이 속하기 때문에 이들의 개념을 알고 써야 할 필요가 있습니다.

 

개인적으로 중요하게 생각하는 부분은 동시성 처리에 대한 이슈입니다. 현재 우리가 RxJava를 계속 다루면서 단일 데이터에 대한 내용을 다루고 있는데요. 스레드를 사용하신 분들이라면 공감하시겠지만 서브 스레드에서 사용하고 있는 변수를 메인 스레드가 다시 사용할 수 없습니다. 만약 그렇게 프로그래밍 하도록 하였다면 세마포어나 뮤텍스와 같은 운영체제 리소스를 이용하는 것이 좋고, 이용하지 않았다면 운영체제에서 Critical Error을 내뿜으니 주의해야 하는 부분일 것입니다.

 

RxJava는 개발자가 이러한 요소를 개발할 때마다 신경쓰지 않도록 도와줍니다. 하지만 그렇다고 개발자가 이러한 소양을 몰라도 되는 이야기는 아닙니다. 기본된 소양은 가지고 있 되, 개발할 때마다 이를 구현함으로써 생산성을 떨어뜨리는 일을 줄이기 위해 라이브러리가 존재한다는 점을 아셔야 할 것입니다.

 

comments powered by Disqus

Tistory Comments 0