본문 바로가기

Dev Book Review/lettuce

[Lettuce.io] 4.4 Reactive API 번역본

728x90

lettuce.io/core/release/reference/index.html#reactive-api

 

Lettuce Reference Guide

Connections to a Redis Standalone, Sentinel, or Cluster require a specification of the connection details. The unified form is RedisURI. You can provide the database, password and timeouts within the RedisURI. You have following possibilities to create a R

lettuce.io


이 챕터의 목표 : Reacitve Stream 패턴의 이해와 reactive application의 설계 방법의 전반적인 이해를 위함.

4.4.1 Motivation

비동기(Asynchronous)와 리액티브 방법론 (reactive methodolgies)는 네트워크나 디스크 IO로 인한 쓰레드의 대기시간 낭비 대신 시스템의 리소스를 좀 더 효율적으로 사용하도록 해준다. 

이런 스타일의 프로그래밍을 용이하게 하기위한 광법위한 기술이 존재하는데, java.util.concurrent.Future 부터 Akka와 같이 완전한 라이브러리들이 있다 (?)

 Project Reactor는 매우 방대한 셋의 asynchronous workflow를 위한 연산자들이 있고, 이것들은 더이상 프레임워크에 의존하지 않으며 매우 많은 Reactive Streams model을 지원한다.

4.4.2 Understanding Reactive Streams

Reactive Stream은 처음에는 표준의 non-blocking back pressure(이게 뭐지) 을 기반으로 한 asynchronous stream의 표준을 제공하기 위해 만들어졌다. 여기에는 런타임 환경 (JVM and Javascript)뿐만 아니라 네트워크 프로토콜도 포함하려하는 목표가 있었다.

Reactive Streams의 범위는 목표를 달성하는 데 필요한 작업이나 엔터티를 설명하는 최소한의 인터페이스, 메서드 및 프로토콜 집합을 찾는 것이다. 그 목표는 non-blocking back pressure 이 있는 asynchronous streams data이다. (?)

그것은 어플리케이션 코드에서 라이브러리를 연결할 필요 없이, 상호자용을 할 수 있도록 허용해주는 multiple reactive composition 라이브러리들 사이의 운영 표준이다.

Reactive Stream의 통합은 주로 Publicsher<T>Subscriber<T> 타입으로 복잡성을 숨기는 composition library (컴포지션 라이브러리)로 제공된다. Lettuce는 publisher를 Mono, Flux로 사용하는 Project Reactor를 사용한다.

Reacitve Stream에 대한 더 자세한 설명 : http://reactive-streams.org. 

4.4.3. Understanding Publishers

Aynchronouse processing은 IO작업이나 계산과 같은 작업을 호추한 스레드에서 분리한다. handle이 그 return 타입이며, 주로 java.util.concurrent.Future와 동일하거나 비슷하다. 유사하게 single object나 collection, exception을 return 한다. asynchronously하게 fetch 한 결과를 가져온 결과를 기억하는 것은 주로 한 프로우의 끝이 아니다. 한번 데이터가 확보되면 항상 혹은 조건부로 추가 요청을 발행할 수 있다. Java 8이나 Promise 패턴을 사용하면 futuers의 chaining은 연속적으로 그 이후의 asynchronous한 request가 발생하도록 할 수 있다. 한번 조건처리가 필요하면 asynchronous한 flow을 interrupted 시키고 synchronized 해야한다. 이런 접근 방식은 가능하지만 asynchronous의 장점을 완전히 활용하지는 않는다.

반면 Publisher<T> 객체는 다양하고 asynchronous한 질문에 다른방향으로 대답한다 : Pull pattern을 Push patter으로 변환한다.

push : 데이터 변경시 변경이 발새된 곳에서 데이터를 보내주는 방식 (Mono, Flux)
pull : 변경된 데이터가 있는지 질의 후 가져오는 방식 (클라이언트 요청 -> 서버 -> 응답)

Publisher<T>는 Futures처럼 single scalar value에 대한 emission 뿐만 아니라 emission sequences 심지어 infinite streams도 지원한다. 당신이 stream 작업을 시작하기만 하면 이 사실을 감사하게 여길 것이다. Project Reactor는 두개의 타입의 publisher가 사용된다 : Mono 와 Flux

Mono : 0부터 1까지의 event를 emit 한다.
Flux : 0
부터 N까지의 event를 emit 한다.

Publisher<T> 는 concurrency(동시성)나 asynchronicity(비동기성)의 특정 소스나 코드가 실행되는 방식에 편향되지 않는다. synchronous, asynchronous 둘다 ThreadPool 내에서 실행된다. Publisher<T>의 consumer는 실제 구현을 supplier에게 맡겨 나중에 supplier의 코드를 수정하지 않고도 변경이 가능하다.

Publisher<T>의 마지막 키포인트는 Publisher<T>를 가져올 때 처리가 되는 것이 아니라, Publisher<T>에 대한 observer가 subscribe하거나 신호가 보내지는 순간 프로세스가 실행된다는 것이다. 이것은 java.util.concurrent.Future와의 중요한 차이점이다. (Future는 created/obtained 가되는 순간 프로세스가 시작되기 때문이다.) 따라서 어떤 옵저버가 Publisher<T>를 subscribe 하지 않으면 아무일도 일어나지 않는다.

4.4.4. A word on the lettuce Reacitve API

모든 커맨드는 Flux<T>, Mono<T>, Mono<Void> 를 return 한다. 이것들은 Subscriber가 subscribe할 수 있다. 이 구독자는 Publisher <T>가 emit하는 아이템 또는 아이템의 시퀀스에 반응한다. 이 패턴은 Publisher<T>가 객체를 emit 할때까지 기다리는 동안 차단을 할 필요가 없어 동시작업(concurrent operations)에 용이하다. 대신, Publisher<T>가 어떤 futuer time에든 적절하게 반응하도록 준비가 되어있는 Subscriber 형태로 sentry를 만든다.

4.4.5 Consuming Publisher<T>

publisher를 사용할 때 가장먼저 고려할 일은 그들을 consume하는 것이다. Consuming a publisher의 의미는 subscribing을 의미한다. 

    @Test
    @DisplayName("Consuming Publisher 예제 : emit 된 모든 항목을 subscribe 하고 print 한다.")
    void consumingPublisher() {
        Flux.just("jyami", "java", "javabom").subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(3);
            }

            @Override
            public void onNext(String s) {
                System.out.println("Hello " + s + "!");
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("Complete consuming!");
            }
        });
    }
Hello jyami!
Hello java!
Hello javabom!
Complete consuming!

 

모든 구독자 또는 관찰자 (Subscriber or Observer)가 모든 이벤트에 대한 알림을 받고 완료된 이벤트도 받은걸을 확인 할 수 있다.한개의 Publisher<T>는 exception이 발생하거나, 그 Publisher<T>가 종료되었다고 onComplete()를 호출할 때까지 아이템을 내 보낸다. 그 이후에는 더이상 element가 emit되지 않는다.

subscribe의 호출로 한개의 Subscription이 등록되고 이것은 cancel을 허용하므로 더이상 이벤트를 수진하지 않는다. Publisher는 한번 Publisher<T>가 unsubscribed하면 구독 취소및 리소스 할당 해제(free resource)를 상호 운용할 수 있다. (?)

좀더 간단한 포맷의 Subscriber<T> 구현

    @Test
    @DisplayName("Subscriber<T> 의 더 간단한 구현")
    void subscriberSimpleImpl() {
        Flux.just("Ben", "Michael", "Mark").doOnNext(new Consumer<String>() {
            public void accept(String s) {
                System.out.println("Hello " + s + "!");
            }
        }).doOnComplete(new Runnable() {
            public void run() {
                System.out.println("Completed");
            }
        }).subscribe();
    }
    @Test
    @DisplayName("Subscriber<T> 의 더 간단한 구현 - 람다사용")
    void subscriberSimpleImplWithLambda() {
        Flux.just("Ben", "Michael", "Mark")
                .doOnNext(s -> System.out.println("Hello " + s + "!"))
                .doOnComplete(() -> System.out.println("Completed")).subscribe();
    }

Subscriber의 여러 연산자를 이용해서 element를 제어할 수 있다. take() 연산자는 관심이 있는 처음 N개 요소까지로 emit되는 수를 제한한다.

Hello Ben!
Hello Michael!

왜 여기서 Completed가 같이 출력이 안되는지를 모르겠다.

take 연산자는 기대하는 요소 수를() emit한 후 Publisher<T>에서 암시적으로 subscription을 취소한다.

Publisher<T> 에 대한 subscription은 다른 FluxSubscriber가 수행 할 수도 있다. 커스텀한 Publisher를 구현하지 않는한 항상 Subscriber를 사용하라.

그리고 항상 error handler를 올바르게 구현하는 것을 권장한다. 특정시점에서 일이 잘못될 수 있다 (스택트레이스가 쌓여서) 완벽하게 구현이 된 subscriber는 이벤트에 반응할 수 있게 onCompleted, onError 메서드를 모두 선언하자

4.4.6. From push to pull

위에서 햇던 예제는 publisher가 blocking, non-blocking execution에 대해서 별다른 의견없이(추가 없이) 설정되어있는 방법을 기술하였다. Flux<T>는 명시적으로 Iterable<T>로 변환하거나 block() 메서드를 이용해서 synchronized(동기화) 할 수 있다. block() 호출은 피하자. block()을 호출하면 애플리케이션에 대한 reactive chain의 모든 non-blocking 적인 장점이 사라진다.

    @Test
    @DisplayName("block() 연산자 : async -> sycn")
    void blockOperator() {
        String last = Flux.just("Ben", "Michael", "Mark").last().block();
        System.out.println(last); // Mark
    }

 

blocking 호출은 publisher의 체인을 synchronize로 사용되게 할 수 있고, 평범하고 잘 알려진 Pull 패턴으로 돌아가는 방법을 찾는다.

    @Test
    @DisplayName("block() 연산자 : async -> sycn")
    void blockOperatorCollection() {
        List<String> list = Flux.just("Ben", "Michael", "Mark").collectList().block();
        System.out.println(list); // [Ben, Michael, Mark]
    }

toList 연산자는 모든 emmited된 요소들을 모으고(collect), 그리고 그 리스트를 BlockingPublisher<T> 타입으로 패스한다.

4.4.7 Creating Flux and Mono using Lettuce

publisher를 설계하는 방법은 많이 있다. 당신은 이미 just(), take(), collectList()를 이미 보았다. Project Reactor documentation 의 참조에 따르면 Flux와 Mono를 만드는데 사용 할 수 있는 더 많은 메서드가 있다.

Lettuce publisher는 초기화나 체이닝 작업에 사용할 수 있다. Lettuce publisher를 사용하면 non-blocking 동작을 확인 할 수 있다. 이는 모든 I/O 및 커맨드 처리가 netty의 EventLoop를 사용해 aynchronously(비동기적)으로 처리되기 때문이다.

Redis에 연결하는건 매우 간단하다.

private RedisStringReactiveCommands<String, String> commands;

    @BeforeEach
    void setUp() {
        RedisClient client = RedisClient.create("redis://localhost");
        commands = client.connect().reactive();
    }

key에서 value를 가져오려면 GET 연산이 필요하다. (subscribe자리에 Consumer를 넣었다.)

 

    @Test
    @DisplayName("lettuce publisher를 사용하고, 여기서 get 연산자를 통해 redis의 key에 따른 value를 가져올 수 있다. ")
    void LettucePublisher() {
        commands.get("key")
                .subscribe(System.out::println);
    }

이것의 실행은 asynchronously(비동기적으로) 처리되며, Netty EventLoop Thread에서 작업이 완료되는 동안 호출하는 스레드(invoking thread)는 프로세스내의 다른 일을 처리할 수 있다. 분리된 특성으로 인해 호출한 메서드(calling method)는 Publisher<T>의 실행이 완료되기 전에 그대로 둘 수 있다 (?).

Lettuce의 publichser는 연결된 컨텍스트(context of chaining) 내에서 여러개의 키들을 asynchronously(비동기적으로) 로드하는데 사용될 수 있다.

'Dev Book Review > lettuce' 카테고리의 다른 글

[Lettuce.io] 4.4 Reactive API 번역본  (0) 2021.01.02