[RxJava] RxJava로 시작하는 Java Reactive 프로그래밍

반응형

Java에서 여러 가지의 일을 동시에 수행하기 위한 방법에는 스레드를 사용하는 방법과 비동기 처리를 이용하는 방법이 있습니다. Java에서도 C++와 마찬가지로 select 함수를 제공하고, Thread 객체를 제공하기 때문에 이를 이용해서 동시 처리를 하는 것은 그리 큰 고민이 될 문제는 아닙니다.

 

그러나 Java로 스레드를 사용한 프로그래밍은 Python이나 Go와 같은 다른 프로그래밍 언어에 비해 사용하기가 무척 어렵습니다. 그냥 단순하게 Thread 객체를 생성하고 사용하는 것만 고려한다면 그리 큰 문제가 되는 것은 아니지만 메모리에 저장된 자원을 동시에 사용하는 이슈 등 처리해줘야 할 일들이 굉장히 많을 뿐더러 코드를 간략하게 짠다하더라도 내가 만든 메소드나 함수가 스레드 위에서 동작하는지 그렇지 않은지를 판별하는 것도 심히 고려해야 될 만한 부분들입니다.

 

 

 

RxJava

이러한 불편한 점들을 개선하기 위해 만든 것이 바로 RxJava입니다. RxJava는 Netflix에서 개발하여 3.x 버전까지 출시된 Java용 Reactive Extension Library입니다. 라이브러리이기 때문에 Java에서 공식적으로 지원되는 것은 아니지만 안드로이드 개발자들이 더 쉬운 비동기 처리를 위해서 사용하는 대표적인 라이브러리이기도 합니다. 

 

우리는 이러한 프로그래밍을 Reactive Programming이라고 이야기 합니다. 기존에 사용했던 프로그래밍 코드 방식은 "이 객체이용해 인스턴스를 만들어서, 이 데이터를 넣었을 때 결과를 말해줘"라는 명령형 이었다면, 지금은 "프로그램의 상황에 따라서 이러한 반응을 보여줘" 라는 반응형 프로그래밍입니다. 

 

이 외 다른 구현체로 Akka와 최근 Spring WebFlux에서 사용하고 있는 Reactor가 있습니다.

 

RxJava는 러닝 커브가 높고, 내용이 방대하여 여기서 모든 RxJava에 대한 내용을 다루기엔 다소 무리가 있습니다. 따라서 주요 내용만을 중점적으로 다루고, 간단한 사용법에 대한 이야기만 해보도록 하겠습니다.

 

RxJava에 대해 좀 더 자세히 알아보고 싶다면, 아래의 링크를 참고해보세요.

 

 

ReactiveX/RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM. - ReactiveX/RxJava

github.com

 

 

 

Start for Gradle

RxJava를 사용하려면 RxJava 프레임워크 디펜던시를 추가해야 합니다. Java에서는 디펜던시 관리로 mvn과 gradle 중 하나를 선택할 수 있는데, 여기서는 Gradle을 가지고 진행하도록 하겠습니다.

plugins {
    id 'java'
}

group 'org.example'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    implementation "io.reactivex.rxjava3:rxjava:3.0.9"
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
}

test {
    useJUnitPlatform()
}

가장 최근 버전은 3.0.9이기 때문에 3.0.9로 기재해보도록 하겠습니다.

 

 

 

 

Don't use "new"

RxJava에 들어가기 전에, 한 가지 이야기를 하고 넘어가겠습니다. RxJava에서는 new 연산자를 사용하지 않습니다. RxJava에 들어 있는 클래스들은 모두 Static factory 함수를 호출하게 되며 new 연산자를 사용할 경우 RxJava에 자동 구현되어 있는 구현체를 사용하기 어렵게 됩니다.

/**
 * Created by Neon K.I.D on 1/4/21
 * Blog : https://blog.neonkid.xyz
 * Github : https://github.com/NEONKID
 */
public class Main {
    public static void main(String[] args) {
        Observable<String> ob = new Observable<String>() {
            @Override
            protected void subscribeActual(@NonNull Observer<? super String> observer) {
                
            }
        }
    }
}

우리가 보통 Java를 사용할 때 인스턴스를 생성하는 방식으로 new 연산자를 사용하지만 RxJava에서는 위와 같이 사용하지 않는다는 것입니다.

/**
 * Created by Neon K.I.D on 1/4/21
 * Blog : https://blog.neonkid.xyz
 * Github : https://github.com/NEONKID
 */
public class Main {
    public static void main(String[] args) {
        Observable.just("Hello world").subscribe(System.out::println);
    }
}

위처럼 Factory 함수를 사용하여 호출하며 가급적 RxJava를 사용할 때는 new와는 잠시 멀어지도록 합시다.

 

 

 

 

How to use 

제가 처음 RxJava를 사용했던 것은 2.x 버전이고, 안드로이드 앱을 MVVM 패턴을 이용해 개발했을 때였습니다. 당시에는 ViewModel을 어떤식으로 View에 렌더링 할 때를 정하도록 할 건지가 가장 큰 관건이었는데요. RxJava에 있는 Observable이 도움이 많이 되었던 것 같습니다.

 

여기서 Observable은 RxJava의 시작과 끝이라고 할 정도로 중요한 개념입니다. 1.x 버전에서는 Observable과 Single 클래스를 주류로 사용했다면 2.x에서는 Observable, Maybe, Flowable 클래스를 상황에 맞게 사용을 하는 방식으로 나아갔었는데요.

 

그렇다면 Observable은 무엇일까요? 여기서 Observe라는 단어를 보실 수 있는데, 이 단어는 관찰이라는 뜻을 가지고 있죠. 따라서 관찰하려는 변수 대상을 가리키고 이 대상에 따라서 프로그램을 바꿔가는 클래스를 Observable 클래스라 합니다.

 

그렇다면 Observable이 아니라 Observed라는 단어가 더 맞지 않을까요? 만약 그렇다면 이 클래스는 한 번 관찰하고 난 이후 다음부턴 안돼 처럼 일회용이겠지만 Observable은 지금 당장 관찰하지 않았더라도 앞으로 계속 관찰할 것이라는 의미이기 때문에 바로 대상을 관찰하기 보단 앞으로 계속 변화를 보면서 프로그램을 이어나갈 것 이라고 보는 게 맞을 것입니다.

 

이런 이야기를 하는 이유는 반응형 프로그래밍을 처음 접하는 사람들이 이러한 개념에 익숙하지 않기 때문입니다. 만약 GUI 프로그래밍을 조금 해보신 분들이라면 이런 개념이 익숙하실 수 있습니다. 특정 버튼을 클릭했을 때 이 버튼이 클릭되었는지를 확인하기 위해 이벤트 기반의 프로그래밍을 할 수 있죠. Java에서는 Listener 인터페이스를 만들어 사용하는 방식이 존재하고, 이를 위해서는 Thread 객체를 사용해야 했는데, 쉽지가 않죠.

 

RxJava에서는 Thread 객체를 만들고, Listener 인터페이스를 만드는 번거로운 작업을 다 알아서 해줍니다. 우리는 Observable에서 제공하는 아래의 3가지 메소드만을 기억합시다.

 

  • onNext: Observable 클래스가 데이터가 발행되었음을 감지하는 함수
  • onComplete: 데이터 발행을 더 이상 받지 않을 때 해당 함수를 호출합니다. (이후 onNext 함수는 동작하지 않습니다.)
  • onError: 데이터 관찰 중 오류 발생 (이후 위 두 개의 함수는 동작하지 않으며 Observable 실행 또한 종료됨)

그럼 Observable 코드를 하나 만들어서 써보도록 하죠.

 

 

 

just()

/**
 * Created by Neon K.I.D on 1/4/21
 * Blog : https://blog.neonkid.xyz
 * Github : https://github.com/NEONKID
 */
public class Main {
    public static void main(String[] args) {
        Observable.just("Hello World", "Different class").subscribe(System.out::println);
    }
}

Observable 클래스에서 제공되는 팩토리 함수 중엔 just 함수가 있습니다. 이 just 함수는 인자로 넣은 데이터를 차례로 발행하고, Observable을 생성하는데요. 최대 10개까지 넣을 수 있습니다.

 

가장 좋은 점은 개발자가 직접 onNext, onComplete, onError 함수를 만들지 않아도 쉽게 위와 같이 동작한다는 점입니다. 

 

두 가지 데이터를 인자로 넣고, 실행한 결과는 위와 같습니다.

 

 

 

create()

 

create 함수는 just와 그 기능이 같지만 onNext, onComplete, onError 등의 콜백 함수를 직접 만들어서 사용하는 방식입니다. just는 위 3가지의 콜백 함수가 기본적으로 구현되어 있는 구현체를 가지고 있고, 우리는 subscribe 처리만 넣어주면 되었습니다.

 

create 함수는 RxJava에 어느 정도 익숙해지신 분들에게 사용하기를 권장하는 함수입니다. 

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;


/**
 * Created by Neon K.I.D on 1/4/21
 * Blog : https://blog.neonkid.xyz
 * Github : https://github.com/NEONKID
 */
public class Main {
    public static void main(String[] args) {
        Observable<String> observable = Observable.create(emitter -> {
            try {
                emitter.onNext("Hello");
            } catch (Exception ex) {
                emitter.onError(ex);
            }
        });

        Disposable disposable = observable.subscribe(System.out::println, Throwable::printStackTrace);
        disposable.dispose();
    }
}

Observable 클래스를 create 팩토리 함수로 생성하고 난 다음에는 Lambda를 이용하여 데이터의 변화를 감지했을 때 어떤 작업을 수행할지를 정의합니다. onNext를 사용하여 데이터를 계속 받도록 할 수도 있고, 원하는 데이터를 받았다면 이벤트를 중단하는 onComplete를 정의할 수도 있습니다.

 

내가 동작시키는 로직을 정의했다면 그것을 언제 동작시킬지를 정해야 하는데, 이 때 사용하는 것이 Disposable 클래스입니다. Disposable 객체는 Observable에 있는 subscribe 메소드를 이용하여 정의하고, 이를 받았을 때 처리할 부가적인 동작을 구성한 다음, dispose 함수를 실행했을 때 동작합니다.

 

이처럼 create는 조금 복잡한데요. RxJava의 동작 방식을 천천히 이해한 뒤에 사용하는 것이 좋습니다.

 

 

 

 

Why RxJava ?

이처럼 RxJava 또한 러닝 커브가 낮지만은 않습니다. 그럼에도 불구하고 RxJava를 써야하는 이유는 무엇일까요? 

 

이벤트 기반의 프로그래밍을 하기 위해서는 비동기적인 프로그래밍을 진행해야 합니다. Pure Java에서 이러한 프로그래밍을 하기 위해서는 스레드를 잘 사용할 줄 알아야 함과 동시에 코드를 깨끗하게 작성할 수 있는 능력도 필요합니다. 기본적으로 이벤트 기반의 코드를 작성하기 위해서는 아래의 3가지 요소가 필요합니다.

 

  • Thread (스레드)
  • Listener (인터페이스)
  • Flow (흐름)

코드적인 부분만 봤을 때는 스레드와 리스너 두 가지가 필요합니다. 그러나 스레드를 사용하면 프로그램의 흐름이 매끄럽지 못합니다. 보통의 명령형 프로그래밍으로 스레드를 생성하면 스레드 클래스가 위에 가거나 다른 파일에 작성되는 등의 형태가 되는데, 이를 추적하는 데 비용이 많이 생기면서 코드의 가독성이 떨어지게 됩니다.

 

반면 RxJava는 스레드를 사용하거나 인터페이스를 별도로 구현하는 등의 불편함을 덜어주고, 데이터의 변화를 감지하기 위한 로직을 단 한 줄에 작성할 수 있으며 이를 연동하는 인터페이스도 잘 구현되어 있어, 이 인터페이스 또한 개발자가 구현하는 수고를 덜어줍니다. 게다가 코드의 흐름도 일정하게 유지시켜주기 때문에 코드 가독성을 최대한 살려주지요.

 

실제로 Netflix가 RxJava를 만든 이유는 아래와 같습니다.

 

  • Emberance Concurrency

    우리의 클라이언트는 동시성 프로그래밍을 많이 필요로 했고, 이를 적극적으로 수용해야만 했다.

  • Java Futures are Expensive to Compose

    Java에서 제공하는 Future는 비동기 처리에 많은 도움이 되긴 하지만 이를 조합하고 흐름을 제어하는 것은 쉽지 않다.

  • Callbacks Have Their Own Problems

    콜백 지옥은 모든 언어가 가지고 있는 비동기 프로그래밍 방식의 문제점, 따라서 우리도 이를 해결해야 할 필요가 있다.

비록 나중에 Java 8에서 CompletableFuture라는 비동기 흐름을 매끄럽게 잡아주는 좋은 클래스가 나오긴 했지만 당시 RxJava가 나올 무렵에는 Java 8이 나오기 이전이었고, 이 때문에 Netflix에서는 Reactive 연산자까지 만들었는데, 그것이 바로 현재 Java 8에 있는 map, stream 등의 함수들입니다.

 

 

 

 

 

 

마치며

RxJava에 대해 간단한 사용법을 알아봤습니다. RxJava는 지난 달 2.x에 대한 지원을 거의 종료하고 3.x까지 도달했습니다. 비록 저는 2.x에서 왔다가 최근에서 다시 3.x를 보는 중이긴 한데도 잘 사용하지 않은 탓인지 헷갈리는 부분들이 많았습니다.

 

실제로 RxJava를 처음하시는 분들은 어려운 부분이 많을 것입니다. 생소한 단어들, 추상화된 프로그래밍 코드로 인해서 어떤식으로 동작하는지 잘 모를 수 있기 때문입니다. 따라서 자주 사용하는 클래스와 Reactive 연산자만 잘 익히는 것을 목표로 하고, 그 다음부터 커스터마이징을 하면서 차근차근 배워나가는 것이 좋을 것 같습니다.

 

 

반응형

Tistory Comments 5

  • Favicon of https://dreamaz.tistory.com BlogIcon 드리머즈

    와.. 또 좋은 글을 발견했다 했더니 Neon님 글이었네요. ㅎㅎ 잘 보고 갑니다.
    안드로이드에서 RxJava가 가장 많이 쓰이는 부분은.. 네트워크 처리 부분일까요?

    • Favicon of https://blog.neonkid.xyz BlogIcon 🅽🅴🅾🅽 🅺.🅸.🅳 Neon K.I.D

      그렇습니다. 안드로이드 애플리케이션에서 서버와 통신할 때 사용자가 REST API를 호출하게 됩니다. 그러면 애플리케이션에서는 새로운 데이터를 받게 되고, 해당 데이터는 어떠한 변수에 의해 지정될 것입니다.

      기존의 Java를 사용하는 경우, 데이터의 변화만 이루어질 뿐 데이터의 변화를 감지하기 위한 별도의 로직을 개발해야 합니다. 우리는 이를 Listener 패턴이라고 보통 이야기 합니다.

      하지만 RxJava는 이러한 Listener 패턴을 만들어주지 않아도 Observable 변수를 사용함으로써 데이터의 변화를 스스로 감지하고, 이에 대한 처리를 콜백 함수로 처리함으로써 개발자가 데이터의 변화를 감지하는 로직을 별도로 구현해야 하는 번거로움을 덜어준다는 것이 장점입니다.

  • 드리머즈

    댓글 감사합니다.
    그런데 여기 블로그에 티스토리 댓글쪽에 댓글을 1번 남기면.. 이상하게 계정 인식을 못하는 것 같습니다.
    티스토리 로그인이 되어있음에도.. 이름과 비번을 적어서 댓글을 남겨야 해요. 흠.. 버그가 있어요.

    아차 그리고.. 제가 안드로이드 앱에 RxJava를 적용시키면서 공부하려고 하는데 button 같은 곳에 RxJava 적용 시키는 것도 흔한 일인가요?

    • Favicon of https://blog.neonkid.xyz BlogIcon 🅽🅴🅾🅽 🅺.🅸.🅳 Neon K.I.D

      아.... 아마 그 문제는 2차 도메인으로 접속했을 때 문제인 것 같습니다. 그 문제는 저의 문제인지, 티스토리의 문제인지 확인 중에 있습니다. 다소 불편을 드려 죄송합니다.

      보통 버튼 클릭 등의 액션을 통한 RxJava 적용도 사용될 수는 있습니다. 앞서 말씀드렸다시피 Listener 패턴을 좀 더 쉽게 활용할 수 있는 방법이 RxJava 입니다.

      다만 버튼의 액션은 사용자가 이벤트를 발생시키기 때문에 RxJava와 같이 데이터의 변화로 발생하는 반응형 프로그래밍과는 조금 다릅니다. 사용자의 이벤트가 아닌 데이터의 변화를 감지하며 이를 공부하시고자 한다면 디자인 패턴 중 옵저버 패턴을 공부해보시는 걸 추천드립니다.

  • Favicon of https://dreamaz.tistory.com BlogIcon 드리머즈

    아하.. 그런 것 같습니다. 티스토리 기본 주소 말고.. 따로 설정하신 도메인으로 접속되면 그런가 보네요.
    RxJava에 대한 의견 감사합니다.