본문 바로가기

Dev Book Review/Java8 in Action

[자바8인액션] Chap15. CompletableFuture와 리액티브 프로그래밍 컨셉의 기초

소스코드

https://github.com/mjung1798/Jyami-Java-Lab/tree/master/java8-in-action

 

mjung1798/Jyami-Java-Lab

💻 Jyami의 Spring boot 및 Java 실험소 💻. Contribute to mjung1798/Jyami-Java-Lab development by creating an account on GitHub.

github.com

 

5. 발행-구독 그리고 리액티브 프로그래밍

리액티브 프로그래밍은 Future같은 객체를 통해 여러 결과를 제공 (future는 한번만 실행해 결과를 제공)

자바 9에서 java.util.concurrent.Flow 인터페이스에 발행-구독 모델(pub-sub 이라 불리는 프로토콜)을 적용해 리액티브 프로그래밍을 제공한다.

  • 구독자(subscriber)가 구독할 수 있는 발행자(publisher)
  • 이 연결은 구독(subscription)이라 한다
  • 이 연결을 이용해 메세지(또는 이벤트로 알려짐)을 전송한다.

C3=C1+C2 스프레드시트 셀을 만든다 하자. (C1, C2 값이 갱신되면 C3에도 새로운 값이 반영된다)
- c1, c2에 이벤트가 발생했을 때 c3을 구독하도록 한다. > Publisher<T> 필요
- Publisher<T>는 통신할 구독자(Subscriber)를 인수로 받는다

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package java.util.concurrent;

public final class Flow {
    static final int DEFAULT_BUFFER_SIZE = 256;

    private Flow() {
    }

    public static int defaultBufferSize() {
        return 256;
    }

    public interface Processor<T, R> extends Flow.Subscriber<T>, Flow.Publisher<R> {
    }

    public interface Subscription {
        void request(long var1);

        void cancel();
    }

    public interface Subscriber<T> {
        void onSubscribe(Flow.Subscription var1);

        void onNext(T var1);

        void onError(Throwable var1);

        void onComplete();
    }

    @FunctionalInterface
    public interface Publisher<T> {
        void subscribe(Flow.Subscriber<? super T> var1);
    }
}

Cell은 Publisher인 동시에 Subscriber이다. (셀 이벤트에 구독을 할 수 있다 / 다른 셀의 이벤트에 반응한다)

public class SimpleCell implements Flow.Publisher<Integer>, Flow.Subscriber<Integer> {
    private int value =0;
    private String name;
    private List<Flow.Subscriber> subscribers = new ArrayList<>();

    public SimpleCell(String name) {
        this.name = name;
    }

    // Publisher 메서드
    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        subscribers.add(subscriber);
    }

    //Subscriber 메서드
    @Override
    public void onNext(Integer newValue) {
        this.value = newValue; // 구독한 셀에서 새로운 값이 생겻을 때 값을 갱신한다.
        System.out.println(this.name + ":" + this.value);
        notifyAllSubscribers(); // 값이 갱신됌을 모든 구독자에게 알림
    }

    private void notifyAllSubscribers(){ // 새로운 값이 있음을 모든 구독자에게 알리는 메서드
        subscribers.forEach(subscriber -> subscriber.onNext(this.value));
    }
}
    @Test
    @DisplayName("구독관계 확인 : C1은 Publisher / C3은 Subscriber > publisher 값이 변하면 subscriber에 이벤트가 간다")
    void simpleCell() {
        SimpleCell c3 = new SimpleCell("C3");
        SimpleCell c2 = new SimpleCell("C2");
        SimpleCell c1 = new SimpleCell("C1");

        c1.subscribe(c3);

        c1.onNext(10);
        c2.onNext(20);

//        C1:10
//        C3:10
//        C2:20
    }

 

이 구조에서 (Subscription을 사용하지 않은 구조)에서는 publisher, subscriber의 동작을 연결하기 위해 참조객체로 List<Subsriber> subscribers을 두었다. 
c1은 publisher이면서(subscribe 명령어 수행) subscriber이다(onNext 수행) 

> 여기서는 역압력(backpressure) 내용이 나오지 않았음

기존의 옵저버 패턴에 비해 새로운 API 프로토콜이 더 강력해진 이유 ; onNext, onError, onComplete와 같은 메서드로 데이터 흐름에서 예외가 발생하거나 종료되었을 경우도 제어하기 때문에

옵저버 패턴 참고 : pjh3749.tistory.com/266

 

[디자인패턴] 옵저버 패턴 (Observer Pattern) 아주 간단하게 정리해보기

옵저버 패턴이란? 옵저버란 스타크래프트 프로토스의 유닛으로 적들을 관찰하기 위해 탄생한 유닛이다. 테란전에서 필수 유닛이며 옵저버 패턴(observer pattern)은 객체의 상태 변화를 관찰하는 관

pjh3749.tistory.com

public interface Account { // observable
    void subscribe(Platform platform);
    void unSubscribe(Platform platform);
    void notifyCrew(String msg);
}

public interface Platform { // observer
    void update(String msg);
}
public class Jyami implements Account {
    List<Platform> platforms = new ArrayList<>();
    public void publishText(){
        System.out.println("쟈미가 글을 발행했다");
        notifyCrew("글 발행 전송");
    }
    @Override
    public void subscribe(Platform platform) {
        platforms.add(platform);
    }

    @Override
    public void unSubscribe(Platform platform) {
        platforms.remove(platform);
    }

    @Override
    public void notifyCrew(String msg) {
        platforms.forEach(crew -> crew.update(msg));
    }
}

public class Blog implements Platform {
    @Override
    public void update(String msg) {
        System.out.println("Blog 수신 : " + msg);
    }
}

public class Instagram implements Platform {
    @Override
    public void update(String msg) {
        System.out.println("instargram 수신 : "+ msg);
    }
}

public class Facebook implements Platform {
    @Override
    public void update(String msg) {
        System.out.println("FaceBook 수신 : " + msg);
    }
}
public class Main {
    public static void main(String[] args){
        Jyami jyami = new Jyami();
        Platform blog = new Blog();
        Platform facebook = new Facebook();
        Platform instagram = new Instagram();

        jyami.subscribe(blog);
        jyami.subscribe(facebook);
        jyami.subscribe(instagram);

        jyami.publishText();

        jyami.unSubscribe(facebook);

        jyami.publishText();
    }
}
쟈미가 글을 발행했다
Blog 수신 : 글 발행 전송
FaceBook 수신 : 글 발행 전송
instargram 수신 : 글 발행 전송
쟈미가 글을 발행했다
Blog 수신 : 글 발행 전송
instargram 수신 : 글 발행 전송

이렇게 보면 간단한데 플로 인터페이스의 개념을 복잡하게 만든 기능은 역압력과 압력이다.(스레드 활용에서 필수적)

압력 (push 모델) : 매초마다 수천개의 메세지가 onNext()로 제한없이 전달된다면? > 어느정도의 메세지가 들어올지 숫자를 제한하는 역압력과 같은 기법이 필요하다.

역압력 (pull 모델) :자바 9 Flow API에서 발행자가 무한의 속도로 아이템을 방출하는 대신, 요청했을 때만 다음 아이템을 보내도록 하는 request() 메서드를 제공(Subscription 인터페이스)

 

2. 역압력

Publisher와 Subscriber 사이에 채널이 연결되면 첫 이벤트로 Subscriber 인터페이스의 onSubscribe(Subscription subscription) 메서드가 호출된다.

이 Subscription 객체는 Subscriber와 Publisher가 통신할 수 있는 메서드를 포함한다 

 

 

옵저버 패턴과 pub-sub 패턴의 차이가 명확해 지는 부분은 이 subscription에 대한 부분 때문

3.  실제 역압력의 간단한 형태

한번에 한개의 이벤트를 처리하도록 pub-sub 연결을 구성하려면 다음과 같은 작업이 필요하다.

  • Subscriber가 onSubscribe로 전달된 Subscription 객체를 subscription 같은 필드에 로컬로 저장한다.
  • Subscriber가 수많은 이벤트를 받지 않게 onSubscribe, onNext, onError의 마지막 동작에 channel.request(1)을 추가해 오직 한 이벤트만 추가한다
  • 요청을 보낸 체널에만 onNext, onError이벤트를 보내도록 Publisher의 notifyAllSubscribers 코드를 바꾼다. (보통 여러 Subscriber가 자신만의 속도를 유지할 수 있도록 Publisher는 새 Subscription을 만들어 각 Subscriber와 연결한다)

당김 기반 리액티브 역압력 : Subscriber가 Publisher로부터 요청을 당긴다(pull) = 리액티브 당김 기반(reactive pull based)라고 불린다.

 

6. 리액티브 시스템 vs 리액티브 프로그래밍

리액티브 시스템(reactive system) : 런타임 환경이 변화에 대응하도록 전체 아키텍처가 설계된 프로그램
www.reactivemanifesto.org/ : 공식 속성 문서


리액티브 시스템이 가져야할 속성 : 반응성(responsive), 회복성(resilient), 탄력성(elastic)
- 반응성 : 리액티브 시스템이 큰 작업을 하느라 간단한 질의의 응답을 지연하지 않고 실시간으로 입력에 반응한다.
- 회복성 : 한 컴포넌트의 실패로 전체 시스템이 실패하지 않는다.
- 탄력성 : 자신의 작업 부하에 맞게 적용하여 작업을 효율적으로 처리한다.

Java.util.concureent.Flow 관련된 자바 인터페이스에서 제공하는 리액티브 프로그래밍 형식을 이용하여 이 속성들을 구현할 수 있다.

자바 인터페이스 설계는 Reactive Manifesto의 마지막 속성인 메시지 주도(message-driven) 속성을 반영한다. > 박스와 채널모델