소스코드
https://github.com/mjung1798/Jyami-Java-Lab/tree/master/java8-in-action
5. 발행-구독 그리고 리액티브 프로그래밍
리액티브 프로그래밍은 Future같은 객체를 통해 여러 결과를 제공 (future는 한번만 실행해 결과를 제공)
자바 9에서 java.util.concurrent.Flow 인터페이스에 발행-구독 모델(pub-sub 이라 불리는 프로토콜)을 적용해 리액티브 프로그래밍을 제공한다.
- 구독자(subscriber)가 구독할 수 있는 발행자(publisher)
- 이 연결은 구독(subscription)이라 한다
- 이 연결을 이용해 메세지(또는 이벤트로 알려짐)을 전송한다.
C3=C1+C2 스프레드시트 셀을 만든다 하자. (C1, C2 값이 갱신되면 C3에도 새로운 값이 반영된다)
- c1, c2에 이벤트가 발생했을 때 c3을 구독하도록 한다. > Publisher<T> 필요
- Publisher<T>는 통신할 구독자(Subscriber)를 인수로 받는다
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package java.util.concurrent;
public final class Flow {
static final int DEFAULT_BUFFER_SIZE = 256;
private Flow() {
}
public static int defaultBufferSize() {
return 256;
}
public interface Processor<T, R> extends Flow.Subscriber<T>, Flow.Publisher<R> {
}
public interface Subscription {
void request(long var1);
void cancel();
}
public interface Subscriber<T> {
void onSubscribe(Flow.Subscription var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}
@FunctionalInterface
public interface Publisher<T> {
void subscribe(Flow.Subscriber<? super T> var1);
}
}
Cell은 Publisher인 동시에 Subscriber이다. (셀 이벤트에 구독을 할 수 있다 / 다른 셀의 이벤트에 반응한다)
public class SimpleCell implements Flow.Publisher<Integer>, Flow.Subscriber<Integer> {
private int value =0;
private String name;
private List<Flow.Subscriber> subscribers = new ArrayList<>();
public SimpleCell(String name) {
this.name = name;
}
// Publisher 메서드
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
subscribers.add(subscriber);
}
//Subscriber 메서드
@Override
public void onNext(Integer newValue) {
this.value = newValue; // 구독한 셀에서 새로운 값이 생겻을 때 값을 갱신한다.
System.out.println(this.name + ":" + this.value);
notifyAllSubscribers(); // 값이 갱신됌을 모든 구독자에게 알림
}
private void notifyAllSubscribers(){ // 새로운 값이 있음을 모든 구독자에게 알리는 메서드
subscribers.forEach(subscriber -> subscriber.onNext(this.value));
}
}
@Test
@DisplayName("구독관계 확인 : C1은 Publisher / C3은 Subscriber > publisher 값이 변하면 subscriber에 이벤트가 간다")
void simpleCell() {
SimpleCell c3 = new SimpleCell("C3");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");
c1.subscribe(c3);
c1.onNext(10);
c2.onNext(20);
// C1:10
// C3:10
// C2:20
}
이 구조에서 (Subscription을 사용하지 않은 구조)에서는 publisher, subscriber의 동작을 연결하기 위해 참조객체로 List<Subsriber> subscribers을 두었다.
c1은 publisher이면서(subscribe 명령어 수행) subscriber이다(onNext 수행)
> 여기서는 역압력(backpressure) 내용이 나오지 않았음
기존의 옵저버 패턴에 비해 새로운 API 프로토콜이 더 강력해진 이유 ; onNext, onError, onComplete와 같은 메서드로 데이터 흐름에서 예외가 발생하거나 종료되었을 경우도 제어하기 때문에
옵저버 패턴 참고 : pjh3749.tistory.com/266
public interface Account { // observable
void subscribe(Platform platform);
void unSubscribe(Platform platform);
void notifyCrew(String msg);
}
public interface Platform { // observer
void update(String msg);
}
public class Jyami implements Account {
List<Platform> platforms = new ArrayList<>();
public void publishText(){
System.out.println("쟈미가 글을 발행했다");
notifyCrew("글 발행 전송");
}
@Override
public void subscribe(Platform platform) {
platforms.add(platform);
}
@Override
public void unSubscribe(Platform platform) {
platforms.remove(platform);
}
@Override
public void notifyCrew(String msg) {
platforms.forEach(crew -> crew.update(msg));
}
}
public class Blog implements Platform {
@Override
public void update(String msg) {
System.out.println("Blog 수신 : " + msg);
}
}
public class Instagram implements Platform {
@Override
public void update(String msg) {
System.out.println("instargram 수신 : "+ msg);
}
}
public class Facebook implements Platform {
@Override
public void update(String msg) {
System.out.println("FaceBook 수신 : " + msg);
}
}
public class Main {
public static void main(String[] args){
Jyami jyami = new Jyami();
Platform blog = new Blog();
Platform facebook = new Facebook();
Platform instagram = new Instagram();
jyami.subscribe(blog);
jyami.subscribe(facebook);
jyami.subscribe(instagram);
jyami.publishText();
jyami.unSubscribe(facebook);
jyami.publishText();
}
}
쟈미가 글을 발행했다
Blog 수신 : 글 발행 전송
FaceBook 수신 : 글 발행 전송
instargram 수신 : 글 발행 전송
쟈미가 글을 발행했다
Blog 수신 : 글 발행 전송
instargram 수신 : 글 발행 전송
이렇게 보면 간단한데 플로 인터페이스의 개념을 복잡하게 만든 기능은 역압력과 압력이다.(스레드 활용에서 필수적)
압력 (push 모델) : 매초마다 수천개의 메세지가 onNext()로 제한없이 전달된다면? > 어느정도의 메세지가 들어올지 숫자를 제한하는 역압력과 같은 기법이 필요하다.
역압력 (pull 모델) :자바 9 Flow API에서 발행자가 무한의 속도로 아이템을 방출하는 대신, 요청했을 때만 다음 아이템을 보내도록 하는 request() 메서드를 제공(Subscription 인터페이스)
2. 역압력
Publisher와 Subscriber 사이에 채널이 연결되면 첫 이벤트로 Subscriber 인터페이스의 onSubscribe(Subscription subscription) 메서드가 호출된다.
이 Subscription 객체는 Subscriber와 Publisher가 통신할 수 있는 메서드를 포함한다
옵저버 패턴과 pub-sub 패턴의 차이가 명확해 지는 부분은 이 subscription에 대한 부분 때문
3. 실제 역압력의 간단한 형태
한번에 한개의 이벤트를 처리하도록 pub-sub 연결을 구성하려면 다음과 같은 작업이 필요하다.
- Subscriber가 onSubscribe로 전달된 Subscription 객체를 subscription 같은 필드에 로컬로 저장한다.
- Subscriber가 수많은 이벤트를 받지 않게 onSubscribe, onNext, onError의 마지막 동작에 channel.request(1)을 추가해 오직 한 이벤트만 추가한다
- 요청을 보낸 체널에만 onNext, onError이벤트를 보내도록 Publisher의 notifyAllSubscribers 코드를 바꾼다. (보통 여러 Subscriber가 자신만의 속도를 유지할 수 있도록 Publisher는 새 Subscription을 만들어 각 Subscriber와 연결한다)
당김 기반 리액티브 역압력 : Subscriber가 Publisher로부터 요청을 당긴다(pull) = 리액티브 당김 기반(reactive pull based)라고 불린다.
6. 리액티브 시스템 vs 리액티브 프로그래밍
리액티브 시스템(reactive system) : 런타임 환경이 변화에 대응하도록 전체 아키텍처가 설계된 프로그램
www.reactivemanifesto.org/ : 공식 속성 문서
리액티브 시스템이 가져야할 속성 : 반응성(responsive), 회복성(resilient), 탄력성(elastic)
- 반응성 : 리액티브 시스템이 큰 작업을 하느라 간단한 질의의 응답을 지연하지 않고 실시간으로 입력에 반응한다.
- 회복성 : 한 컴포넌트의 실패로 전체 시스템이 실패하지 않는다.
- 탄력성 : 자신의 작업 부하에 맞게 적용하여 작업을 효율적으로 처리한다.
Java.util.concureent.Flow 관련된 자바 인터페이스에서 제공하는 리액티브 프로그래밍 형식을 이용하여 이 속성들을 구현할 수 있다.
자바 인터페이스 설계는 Reactive Manifesto의 마지막 속성인 메시지 주도(message-driven) 속성을 반영한다. > 박스와 채널모델
'Dev Book Review > Java8 in Action' 카테고리의 다른 글
[자바8인액션] Chap17. 리액티브 프로그래밍 (1) | 2021.01.10 |
---|---|
[자바8인액션] Chap.3 람다 표현식 (0) | 2020.07.30 |
[자바8인액션] Chap.2 동작 파라미터화 코드 전달하기 (0) | 2020.07.30 |
[자바8인액션] Chap.1 자바 8을 눈여겨 봐야하는 이유 (0) | 2020.07.29 |