[자바8인액션] Chap17. 리액티브 프로그래밍
소스코드
https://github.com/mjung1798/Jyami-Java-Lab/tree/master/java8-in-action
리액티브 프로그래밍에서는 다양한 시스템과 소스에서 들어오는 데이터 항목 스트림을 비동기적으로 처리하고 합쳐서 문제를 해결한다
- 빅데이터 : 빅데이터는 페타 바이트 단위로 구성되며 매일 증가한다
- 다양한 환경 : 모바일 디바이스 부터 수천개의 멀티 코어 프로세서로 실행되는 클라우드 기반 클러스터까지 다양한 환경에 배포된다.
- 사용패턴 : 사용자는 1년 내내 항상 서비스를 이용할 수 있으며 밀리초 단위의 응답 시간을 기대한다.
리액티브 프로그래밍 패러다임에 맞게 설계된 애플리케이션은 높은 응답성을 제공한다.
전체의 리액티브 시스템을 구성하는 여러 컴포넌트를 조절에서도 가용성을 제공한다.
1. 리액티브 매니페스토
리액티브 매니페스토에서는 리액티브 애플리케이션과 시스템 개발의 핵심 원칙을 공식적으로 정의한다.
- 반응성(responsive) : 시스템은 가능한한 적정시간 안에 반응한다. 반응성이 뒷받침 되어야 사용성을 높일 수 있다.
- 탄력성(elastic) : 다양한 작업 부하에도 시스템 반응성이 유지된다. 입력속도가 바뀐다면 이들 입력 관련 서비스에 할당된 자원을 늘리거나 줄임으로 반응할 수 있다.
- 메시지 주도(message driven) : 컴포넌트 간의 약산 결합, 고립, 위치, 투명성이 유지되도록 시스템은 비동기 메시지 전달에 의존한다.
- 회복성(resilient) : 장애 시에도 시스템의 반응성은 유지된다.
1. 애플리케이션 수준의 리액티브
애플리케이션 수준에서의 주요기능은 비동기로 작업을 수행할 수 있다는 점이다.
비동기로 처리하는 것이 최신 멀티코어 CPU 사용률을 극대화(내부적으로 경쟁하는 CPU의 스레드 사용률) 할 수 있는 방법이다
이를 위해 스레드를 퓨처, 액터, 일련의 콜백을 발생시키는 이벤트 루프 등과 공유하고 처리할 이벤트를 변환하고 관리한다.
개발자 입장에서는 동기 블록, 경쟁 조건, 데드락 같은 저 수준의 멀티스레드 문제를 직접 처리할 필요가 없어져 비즈니스 요구사항 구현에 더 집중이 가능하다.
RxJava, Akka같은 리액티브 프레임워크는 별도로 지정된 스레드 풀에서 블록 동작을 실행시킨다. > 메인 풀의 모든 스레드는 방해 받지 않고 실행된다. (CPU 관련 작업과 IO관련 작업의 분리)
2. 시스템 수준의 리액티브
리액티브 시스템 = 소프트웨어 아키텍처 : 여러 애플리케이션이 한 개의 일관적인, 회복할 수 있는 플랫폼을 구성할 수 이쎅 해줄 뿐 아니라 이들 애플리케이션 중 하나가 실패해도 전체 시스템이 계속 운영되게 해준다.
- 리액티브 애플리케이션 : 이벤트 주도
- 리액티브 시스템 : 메시지 주도
컴포넌트에서 발생한 장애를 고립시킴으로 문제가 주변의 다른 컴포넌트로 전파되면서 전체 시스템 장애로 이어지는 것을 막음으로 회복성을 제공한다(== 결함 허용 능력) : 고립과 비결합
모든 컴포넌트가 수신자의 위치에 상관없이 모든 서비스와 통신 ( 탄력성 = 위치 투명성)
2. 리액티브 스트림과 플로 API
리액티브 프로그래밍 = 리액티브 스트림을 사용하는 프로그래밍
리액티브 스트림 : 잠재적으로 무한의 비동기 데이터를 순서대로 처리하고, 블록하지 않는 역압력을 전제해 처리하는 표준 기술
역압력 : pub-sub 프로토콜에서 publisher가 발행하는 속도 > subscriber가 소비하는 속도 일 경우에 문제가 발생하지 않도록 보장하는 장치 + 기존 데이터 처리에 얼마나 시간이 걸리는지 업스트림 발행자에게 알릴 수 있어야함
데이터 수신자가 스레드를 블록하지 않고도 데이터 수신자가 처리할 수 없을 만큼의 데이터를 받는 일을 방지한다.
리액티브 스트림이 구현이 제공해야 하는 최소 기능 집합 (네개 인터페이스)
- java9의 새로운 java.util.concurrent.Flow 클래스
- Akka 스트림, 리액터(reactor), RxJava, Vert.x 등 많은 서드 파티 라이브러리
2-1. Flow 클래스
Publisher가 항목을 발행하면 Subscriber가 한 개씩 또는 한 번에 여러 항목을 소비하는데 Subscription이 이 과정을 관리하도록 여러 정적 메서드를 제공한다.
public final class Flow { static final int DEFAULT_BUFFER_SIZE = 256; private Flow() { } public static int defaultBufferSize() { return 256; } public interface Processor<T, R> extends Flow.Subscriber<T>, Flow.Publisher<R> { } public interface Subscription { void request(long var1); void cancel(); } public interface Subscriber<T> { void onSubscribe(Flow.Subscription var1); void onNext(T var1); void onError(Throwable var1); void onComplete(); } @FunctionalInterface public interface Publisher<T> { void subscribe(Flow.Subscriber<? super T> var1); } }
- Publisher : 많은 이벤트 제공이 가능하지만 Subscriber의 요구사하에 따라 역압력 기법에 의해 이벤트 제공 속도가 제한
- Subscriber : Publisher가 발행한 이벤트의 리스너로 자신을 등록할 수 있음.
onSubscribe onNext* (onError| onComplete)?
- onSubscribe 메서드는 항상 처음 호출
- onNext 메서드는 여러번 호출 가능
- 이벤트 스트림이 영원히 지속되거나 onComplete, onError 호출 가능
규칙
- Publisher는 Subscription의 request 메서드에 정의된 개수 이하의 요소만 Subscriber에 전달한다.
- Publisher는 동작이 성공적으로 끝나면 onComplete, 문제가 발생하면 onError를 호출해 Subscription을 종료한다.
- Subscriber는 요소를 받아 처리할 수 있음을 Publisher에게 알려야 한다. (역압력 행사)
- onComplete나 onError를 처리하는 상황에서 Subscriber는 Publisher나 Subscription의 어떤 메서드도 호출 할수도없고 Subscription이 취소되었다 가정한다.
- Subscriber는 Subscription.reqeust() 호출 없이도 언제나 종료 시그널을 받을 준비가 되어있어야한다.
- Subscriber는 Subscription.canel() 이 호출된 이후에도 한 개 이상의 onNext를 받을 준비가 되어있어야 한다.
이 명세에 맞춰 직접 구현한 기능은 Reative Streams TCK라는 툴로 검증할 수 있다. 구현이 까다로움..
Processor 인터페이스 ; Publisher, Subscriber 상속
리액티브 스트림에서 처리하는 이벤트의 변환 관계를 나타낸다.(?)
Flow 클래스의 인터페이스는 직접 구현하도록 의도된게 아니다. (실제로 자바9에서 구현 클래스를 제공하지않는다.)
라이브러리가 준수해야할 규칙과 리액티브 애플리케이션이 서로 협동 소통할 수 있는 공용어 제시가 목표였기 때문이다.
Akka, RxJava 등의 리액티브 라이브러리에서 이미 구현체부터 만들어둔 상태였기 때문에.
2-2. 간단한 리액티브 애플리케이션
@Getter public class TempInfo { public static final Random random = new Random(); private final String town; private final int temp; public TempInfo(String town, int temp) { this.town = town; this.temp = temp; } public static TempInfo fetch(String town){ if(random.nextInt(10) == 0) throw new RuntimeException("Error!"); return new TempInfo(town, random.nextInt(100)); } @Override public String toString() { return "TempInfo{" + "town='" + town + '\'' + ", temp=" + temp + '}'; } }
public class TempSubscriber implements Flow.Subscriber<TempInfo> { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); // 구독 저장하고 첫번째 요청 전달 (1개의 요청을 받을 준비가 되어있다.) } @Override public void onNext(TempInfo tempInfo) { System.out.println(tempInfo); // 수신한 온도 출력하고 다음 정보 요청 subscription.request(1); } @Override public void onError(Throwable throwable) { System.err.println(throwable.getMessage()); // 에러 메세지 출력 } @Override public void onComplete() { System.out.println("Done!"); } }
public class TempSubscription implements Flow.Subscription { private final Flow.Subscriber<? super TempInfo> subscriber; private final String town; public TempSubscription(Flow.Subscriber<? super TempInfo> subscriber, String town) { this.subscriber = subscriber; this.town = town; } @Override public void request(long n) { for (long i = 0L; i < n; i++) { try { subscriber.onNext(TempInfo.fetch(town)); // 현재 온도를 Subscriber로 전달 }catch (Exception e){ subscriber.onError(e); // 온도 가져오기를 실패하면 Subscriber로 에러 전달 break; } } } @Override public void cancel() { subscriber.onComplete(); // 구독이 취소되면 완료 신호를 Subscriber에 전달 } }
public class Main { public static void main(String[] args){ getTemperatures("Seoul").subscribe(new TempSubscriber()); // 서울에 Publisher를 만들고 TempSubscriber를 구독 } private static Flow.Publisher<TempInfo> getTemperatures(String town){ // Publisher.subscribe() 메서드 구현 return subscriber -> subscriber.onSubscribe(new TempSubscription(subscriber, town)); // 구독한 Subscriber에게 TempSubscription을 전송하는 Publisher를 반환 } }
- Publisher는 functional 인터페이스이며, 인자로 subscriber를 받는다.
- Publisher는 인자로 받은 subscriber의 onSubscribe 메서드를 호출한다.
- onSubscribe 메서드 호출로 subscriber는 subscription 객체를 인스턴스 변수로 갖고있게 된다.
- 이 subscription 객체는 subscriber를 인스턴스 변수로 갖고있게 된다.
- onSubscribe 메서드 호출과 함께, subscription.request(1) 이 불리게 되면서, subscriber가 1개의 요청을 받을 준비가 되었음을 subscription이 알게된다.
- subscription.request(1)로 subscription 내의 인스턴스 변수인 subscriber의 onNext 혹은 onError 가 불리게된다.
- onNext의 경우에 마찬가지로 subscription.request(1) 이 불리면서 subscriber가 1개의 요청을 받을 준비가 되었음을 subscription이 알게된다.
- onError의 경우에는 더이상 subscription에 request 가 일어나지 않아 종료된다.
즉. Publisher의 subscribe로 인해 계속 이벤트가 방출되게 되고, 이 이벤트를 Subscription의 request 요청이 있어야지만 Subscriber 가 이벤트를 소비하게된다.
이때 에러를 발생시키는 코드를 없애게되면 재귀호출로인한 stackoverflow가 발생한다. : Executor를 TempSubscription으로 추가한 다음 다른 스레드에서 TempSubscriber로 전달한다.
private static final ExecutorService executor = Executors.newSingleThreadExecutor(); @Override public void request(long n) { executor.submit(() -> { for (long i = 0L; i < n; i++) { try { subscriber.onNext(TempInfo.fetch(town)); // 현재 온도를 Subscriber로 전달 }catch (Exception e){ subscriber.onError(e); // 온도 가져오기를 실패하면 Subscriber로 에러 전달 break; } } }); }
3. Processor로 데이터 변환하기
Processor는 Subscriber이며 Publisher이다.
public class TempProcessor implements Flow.Processor<TempInfo, TempInfo> { private Flow.Subscriber<? super TempInfo> subscriber; @Override public void subscribe(Flow.Subscriber subscriber) { this.subscriber = subscriber; } @Override public void onNext(TempInfo tempInfo) { int fTemp = (((tempInfo.getTemp() - 32) * 5) / 9); // 섭씨로 변환한 다음 TempInfo 다시 전송 subscriber.onNext(new TempInfo(tempInfo.getTown(), fTemp)); } // 다른 모든 신호는 업스트림 구독자에게 전달. @Override public void onSubscribe(Flow.Subscription subscription) { subscriber.onSubscribe(subscription); } @Override public void onError(Throwable throwable) { subscriber.onError(throwable); } @Override public void onComplete() { subscriber.onComplete(); } }
@Test @DisplayName("processor를 이용한 C=>F 온도 변환") void name() { getCelsiusTemperatures("Seoul").subscribe(new TempSubscriber()); } public static Flow.Publisher<TempInfo> getCelsiusTemperatures(String town){ return subscriber -> { TempProcessor processor = new TempProcessor(); processor.subscribe(subscriber); processor.onSubscribe(new TempSubscription(processor,town)); }; }
Subscriber 인터페이스를 구현하는 다른 모든 메서드는 단순히 수신한 모든 신호를 업스트림 Subscriber로 전달하며 Publisher의 subscribe 메서드는 업스트림 Subscriber를 Processor로 등록하는 동작을 수행한다.
4. 자바에서 플로 API의 구현을 제공하지 않는 이유
API를 만들 당시 Akka, RxJava 등 다양한 리액티브 스트림의 자바 코드 라이브러리가 이미 존재했기 때문이다. 둘다 같은 pub-sub 패턴을 기반한 리액티브 프로그래밍을 구현했지만 이들 라이브러리는 독립적으로 구현이 되었고 각기다른 이름 규칙과 API를 사용함
그래서 플로 API에서 리액티브 개념의 구현을 위한 표준화 작업을 하게된 것이다.
리액티브 스트림의 구현은 대부분 기존 구현을 따라가자.
3. 리액티브 라이브러리 RxJava 사용하기
RxJava는 자바로 리액티브 애플리케이션을 구현하는 데 사용하는 라이브러리이다.
RxJava 에서 제공하는 Flow.Publisher 구현 클래스
1. Flowable
public abstract class Flowable<@NonNull T> implements Publisher<T> {}
역압력 기능이 있다. : 너무 빠른 속도로 데이터를 발행해 Subscriber가 이를 감당할 수 없는 상황에 이르는걸 방지하는 기능
2. Observable
public abstract class Observable<@NonNull T> implements ObservableSource<T> {}
역압력 기능을 제공하지 않던 기존버전의 RxJava에서 제공하던 클래스
단순한 프로그램, 마우스 움직임 같은 사용자 인터페이스에 적합. (GUI이벤트나 자주 발생하지 않는 종류의 이벤트에 역압력을 적용하지 말자.)
모든 Publisher는 Subscription의 request(Long.MAX_VALUE); 메서드를 이용해 역압력 기능을 끌 수 있다.
3-1. Observable 만들고 사용하기
Observable과 Flowable 클래스는 다양한 종류의 리액티브 스트림을 편리하게 만들도록 여러 팩토리 메서드를 제공한다
이들은 모두 Publisher를 구현하므로 팩토리 메서드는 리액티브 스트림을 만드는 것이다.
@Test @DisplayName("Observable just 메서드 : Observable의 Subscriber는" + " onNext(first) onNext(second) onComplete() 순서로 메서드를 받는다.") void observableJust() { Observable<String> just = Observable.just("first", "second"); }
reactivex.io/documentation/operators/just.html
@Test @DisplayName("Observable interval 메서드 : 지정된 속도로 이벤트를 방출하는 상황에서 사용한다." + "0에서 시작해 1초 간격으로 long 형식의 값을 무한으로 증가시키며 값을 방출한다.") void observableInterval() { Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS); }
reactivex.io/documentation/operators/interval.html
RxJava에서의 Flow.Subscriber 역할
public interface Observer<@NonNull T> { void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete(); }
Subscriber과 같은 메서드를 정의하며, onSubcribe 메서드가 Subscription 대신 Disposale 인수를 갖는다는 점만 다르다.
Observable은 역압력을 지원하지 않아 Subscription의 request가 필요하지 않다.
하지만 onNext가 좀더 유연하다 (더 많은 오버로드 기능 제공) : ex onNext만 구현하고 나머지는 구현하지 않는 기본동작의 Observer 제공가능.
@Test @DisplayName("RxJava Observer: 많은 오버로드로 인해 유연하게 구현이 가능하다.") void observableIntervalWithObserver() { Observable<Long> onePerSec = Observable.interval(1, TimeUnit.SECONDS); onePerSec.subscribe(i -> System.out.println(TempInfo.fetch("Seoul"))); }
1초에 한번씩 TempInfo의 temp 값을 출력하리라 예상한다.
그렇지만 Observable이 RxJava의 연산 스레드 풀인 데몬 스레드에서 실행되어, 위 코드에서는 실행하자마자 실행할 코드가 없어 종료되게 된다. 데몬스레드도 함께 종료 > blockingSubscribe를 사용해 결과를 볼 순 있다.
심화예제
@Test @DisplayName("좀더 일반화를 위해 온도를 직접 출력하지 않고 " + "사용자에게 팩토리 메서드를 제공해 매초마다 온도를 방출하는 Observable을 만들어보자" + "이렇게 할 경우 observer는 그저 받은 데이터만 출력하는 기능을 하면된다." + "observable로 온도를 얻는 과정에서 에러와 완료처리 로직을 다 구현했기 때문에 ") void observableExample() { Observable<TempInfo> seoul = getTemperature("Seoul"); seoul.blockingSubscribe(new TempObserver()); } public static Observable<TempInfo> getTemperature(String town){ return Observable.create(emitter -> Observable.interval(1, TimeUnit.SECONDS) // 매 초마다 무한으로 증가하는 long 반환 Observable .subscribe(i -> { if(!emitter.isDisposed()){ // 소비된 옵저버가 폐기되지 않았으면 어떤 작업을 수행(에러가 안났으면) if(i >= 5){ // 5번 온도를 onNext 했으면 성공처리 후 종료 emitter.onComplete(); }else { try { emitter.onNext(TempInfo.fetch(town)); // 온도를 observer로 보 }catch (Exception e){ emitter.onError(e);// 에러가 발생하면 Observer에게 알림 } } } }) ); } public class TempObserver implements Observer<TempInfo>{ @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull TempInfo tempInfo) { System.out.println(tempInfo); } @Override public void onError(@NonNull Throwable e) { System.err.println("Got Problem " + e.getMessage()); } @Override public void onComplete() { System.out.println("Done!"); } }
여기서 사용된 emitter 변수는 그 실체가 ObservableEmitter 인터페이스인데, 그 인터페이스는 Emitter의 인터페이스를 상속한다.. : onSubscribe가 빠진 Observer와 같음.
public interface ObservableEmitter<@NonNull T> extends Emitter<T> {}
public interface Emitter<@NonNull T> { void onNext(@NonNull T value); void onError(@NonNull Throwable error); void onComplete(); }
여기서 Observable이 역압력을 지원하지 않아 전달된 요소를 처리한 다음 추가 요소를 요청하는 request()가 필요가 없다.
3-2. Observable을 변환하고 합치기
리액티브 라이브러리는 자바 9 플로 API에 비해 스트림을 합치고 만들고 거르는 등의 풍부한 메서드를 제공하는 것이 장점이다. 스트림을 다른 스트림의 입력으로 사용도 가능하며, 스트림의 매핑 함수로 요소를 변환하거나, 다양한 방법으로 합치는 등의 작업도 가능하다.
마블 다이어그램(marble diagram) : 수평선으로 표시된 리액티브 스트림에 임의의 순서로 구성된 요소가 기하학적 모양으로 나타난다.
- 특수기호: 에러나 완료신호를 나타낸다.
- 박스 : 연산이 요소를 어떻게 변화하거나 여러 스트림을 어떻게 합치는지를 보여줌
reactivex.io/documentation/operators
RxJava의 많은 operator를 설명하는 페이지에도 마블 다이어그램을 이용해 여러 연산의 스트림 동작을 설명한다.
merge - 두개 이상의 Observable이 방출한 이벤트를 하나로 합침
map - Observable이 발행하는 요소를 변환
@Test @DisplayName("Observable의 연산자를 이용해 스트림을 유동적으로 처리할 수 있다." + "filter 를 이용해 섭씨온도 영하만 필터링하고 " + "map을 이용해 화씨를 섭씨로 변환한다") void getMinusCelsiusTemperature() { Observable<TempInfo> seoul = getTemperature("Seoul") .filter(temp -> temp.getTemp()<0) .map(temp -> new TempInfo(temp.getTown(), ((temp.getTemp() - 32) * 5) / 9)); seoul.blockingSubscribe(new TempObserver()); }