๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Dev Book Review/Java8 in Action

[์ž๋ฐ”8์ธ์•ก์…˜] Chap15. CompletableFuture์™€ ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ์ปจ์…‰์˜ ๊ธฐ์ดˆ

์†Œ์Šค์ฝ”๋“œ

https://github.com/mjung1798/Jyami-Java-Lab/tree/master/java8-in-action

 

mjung1798/Jyami-Java-Lab

๐Ÿ’ป Jyami์˜ Spring boot ๋ฐ Java ์‹คํ—˜์†Œ ๐Ÿ’ป. Contribute to mjung1798/Jyami-Java-Lab development by creating an account on GitHub.

github.com

 

5. ๋ฐœํ–‰-๊ตฌ๋… ๊ทธ๋ฆฌ๊ณ  ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ

๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์€ Future๊ฐ™์€ ๊ฐ์ฒด๋ฅผ ํ†ตํ•ด ์—ฌ๋Ÿฌ ๊ฒฐ๊ณผ๋ฅผ ์ œ๊ณต (future๋Š” ํ•œ๋ฒˆ๋งŒ ์‹คํ–‰ํ•ด ๊ฒฐ๊ณผ๋ฅผ ์ œ๊ณต)

์ž๋ฐ” 9์—์„œ java.util.concurrent.Flow ์ธํ„ฐํŽ˜์ด์Šค์— ๋ฐœํ–‰-๊ตฌ๋… ๋ชจ๋ธ(pub-sub ์ด๋ผ ๋ถˆ๋ฆฌ๋Š” ํ”„๋กœํ† ์ฝœ)์„ ์ ์šฉํ•ด ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ์ œ๊ณตํ•œ๋‹ค.

  • ๊ตฌ๋…์ž(subscriber)๊ฐ€ ๊ตฌ๋…ํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐœํ–‰์ž(publisher)
  • ์ด ์—ฐ๊ฒฐ์€ ๊ตฌ๋…(subscription)์ด๋ผ ํ•œ๋‹ค
  • ์ด ์—ฐ๊ฒฐ์„ ์ด์šฉํ•ด ๋ฉ”์„ธ์ง€(๋˜๋Š” ์ด๋ฒคํŠธ๋กœ ์•Œ๋ ค์ง)์„ ์ „์†กํ•œ๋‹ค.

C3=C1+C2 ์Šคํ”„๋ ˆ๋“œ์‹œํŠธ ์…€์„ ๋งŒ๋“ ๋‹ค ํ•˜์ž. (C1, C2 ๊ฐ’์ด ๊ฐฑ์‹ ๋˜๋ฉด C3์—๋„ ์ƒˆ๋กœ์šด ๊ฐ’์ด ๋ฐ˜์˜๋œ๋‹ค)
- c1, c2์— ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ–ˆ์„ ๋•Œ c3์„ ๊ตฌ๋…ํ•˜๋„๋ก ํ•œ๋‹ค. > Publisher<T> ํ•„์š”
- Publisher<T>๋Š” ํ†ต์‹ ํ•  ๊ตฌ๋…์ž(Subscriber)๋ฅผ ์ธ์ˆ˜๋กœ ๋ฐ›๋Š”๋‹ค

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package java.util.concurrent;

public final class Flow {
    static final int DEFAULT_BUFFER_SIZE = 256;

    private Flow() {
    }

    public static int defaultBufferSize() {
        return 256;
    }

    public interface Processor<T, R> extends Flow.Subscriber<T>, Flow.Publisher<R> {
    }

    public interface Subscription {
        void request(long var1);

        void cancel();
    }

    public interface Subscriber<T> {
        void onSubscribe(Flow.Subscription var1);

        void onNext(T var1);

        void onError(Throwable var1);

        void onComplete();
    }

    @FunctionalInterface
    public interface Publisher<T> {
        void subscribe(Flow.Subscriber<? super T> var1);
    }
}

Cell์€ Publisher์ธ ๋™์‹œ์— Subscriber์ด๋‹ค. (์…€ ์ด๋ฒคํŠธ์— ๊ตฌ๋…์„ ํ•  ์ˆ˜ ์žˆ๋‹ค / ๋‹ค๋ฅธ ์…€์˜ ์ด๋ฒคํŠธ์— ๋ฐ˜์‘ํ•œ๋‹ค)

public class SimpleCell implements Flow.Publisher<Integer>, Flow.Subscriber<Integer> {
    private int value =0;
    private String name;
    private List<Flow.Subscriber> subscribers = new ArrayList<>();

    public SimpleCell(String name) {
        this.name = name;
    }

    // Publisher ๋ฉ”์„œ๋“œ
    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        subscribers.add(subscriber);
    }

    //Subscriber ๋ฉ”์„œ๋“œ
    @Override
    public void onNext(Integer newValue) {
        this.value = newValue; // ๊ตฌ๋…ํ•œ ์…€์—์„œ ์ƒˆ๋กœ์šด ๊ฐ’์ด ์ƒ๊ฒป์„ ๋•Œ ๊ฐ’์„ ๊ฐฑ์‹ ํ•œ๋‹ค.
        System.out.println(this.name + ":" + this.value);
        notifyAllSubscribers(); // ๊ฐ’์ด ๊ฐฑ์‹ ๋Œ์„ ๋ชจ๋“  ๊ตฌ๋…์ž์—๊ฒŒ ์•Œ๋ฆผ
    }

    private void notifyAllSubscribers(){ // ์ƒˆ๋กœ์šด ๊ฐ’์ด ์žˆ์Œ์„ ๋ชจ๋“  ๊ตฌ๋…์ž์—๊ฒŒ ์•Œ๋ฆฌ๋Š” ๋ฉ”์„œ๋“œ
        subscribers.forEach(subscriber -> subscriber.onNext(this.value));
    }
}
    @Test
    @DisplayName("๊ตฌ๋…๊ด€๊ณ„ ํ™•์ธ : C1์€ Publisher / C3์€ Subscriber > publisher ๊ฐ’์ด ๋ณ€ํ•˜๋ฉด subscriber์— ์ด๋ฒคํŠธ๊ฐ€ ๊ฐ„๋‹ค")
    void simpleCell() {
        SimpleCell c3 = new SimpleCell("C3");
        SimpleCell c2 = new SimpleCell("C2");
        SimpleCell c1 = new SimpleCell("C1");

        c1.subscribe(c3);

        c1.onNext(10);
        c2.onNext(20);

//        C1:10
//        C3:10
//        C2:20
    }

 

์ด ๊ตฌ์กฐ์—์„œ (Subscription์„ ์‚ฌ์šฉํ•˜์ง€ ์•Š์€ ๊ตฌ์กฐ)์—์„œ๋Š” publisher, subscriber์˜ ๋™์ž‘์„ ์—ฐ๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด ์ฐธ์กฐ๊ฐ์ฒด๋กœ List<Subsriber> subscribers์„ ๋‘์—ˆ๋‹ค. 
c1์€ publisher์ด๋ฉด์„œ(subscribe ๋ช…๋ น์–ด ์ˆ˜ํ–‰) subscriber์ด๋‹ค(onNext ์ˆ˜ํ–‰) 

> ์—ฌ๊ธฐ์„œ๋Š” ์—ญ์••๋ ฅ(backpressure) ๋‚ด์šฉ์ด ๋‚˜์˜ค์ง€ ์•Š์•˜์Œ

๊ธฐ์กด์˜ ์˜ต์ €๋ฒ„ ํŒจํ„ด์— ๋น„ํ•ด ์ƒˆ๋กœ์šด API ํ”„๋กœํ† ์ฝœ์ด ๋” ๊ฐ•๋ ฅํ•ด์ง„ ์ด์œ  ; onNext, onError, onComplete์™€ ๊ฐ™์€ ๋ฉ”์„œ๋“œ๋กœ ๋ฐ์ดํ„ฐ ํ๋ฆ„์—์„œ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๊ฑฐ๋‚˜ ์ข…๋ฃŒ๋˜์—ˆ์„ ๊ฒฝ์šฐ๋„ ์ œ์–ดํ•˜๊ธฐ ๋•Œ๋ฌธ์—

์˜ต์ €๋ฒ„ ํŒจํ„ด ์ฐธ๊ณ  : pjh3749.tistory.com/266

 

[๋””์ž์ธํŒจํ„ด] ์˜ต์ €๋ฒ„ ํŒจํ„ด (Observer Pattern) ์•„์ฃผ ๊ฐ„๋‹จํ•˜๊ฒŒ ์ •๋ฆฌํ•ด๋ณด๊ธฐ

์˜ต์ €๋ฒ„ ํŒจํ„ด์ด๋ž€? ์˜ต์ €๋ฒ„๋ž€ ์Šคํƒ€ํฌ๋ž˜ํ”„ํŠธ ํ”„๋กœํ† ์Šค์˜ ์œ ๋‹›์œผ๋กœ ์ ๋“ค์„ ๊ด€์ฐฐํ•˜๊ธฐ ์œ„ํ•ด ํƒ„์ƒํ•œ ์œ ๋‹›์ด๋‹ค. ํ…Œ๋ž€์ „์—์„œ ํ•„์ˆ˜ ์œ ๋‹›์ด๋ฉฐ ์˜ต์ €๋ฒ„ ํŒจํ„ด(observer pattern)์€ ๊ฐ์ฒด์˜ ์ƒํƒœ ๋ณ€ํ™”๋ฅผ ๊ด€์ฐฐํ•˜๋Š” ๊ด€

pjh3749.tistory.com

public interface Account { // observable
    void subscribe(Platform platform);
    void unSubscribe(Platform platform);
    void notifyCrew(String msg);
}

public interface Platform { // observer
    void update(String msg);
}
public class Jyami implements Account {
    List<Platform> platforms = new ArrayList<>();
    public void publishText(){
        System.out.println("์Ÿˆ๋ฏธ๊ฐ€ ๊ธ€์„ ๋ฐœํ–‰ํ–ˆ๋‹ค");
        notifyCrew("๊ธ€ ๋ฐœํ–‰ ์ „์†ก");
    }
    @Override
    public void subscribe(Platform platform) {
        platforms.add(platform);
    }

    @Override
    public void unSubscribe(Platform platform) {
        platforms.remove(platform);
    }

    @Override
    public void notifyCrew(String msg) {
        platforms.forEach(crew -> crew.update(msg));
    }
}

public class Blog implements Platform {
    @Override
    public void update(String msg) {
        System.out.println("Blog ์ˆ˜์‹  : " + msg);
    }
}

public class Instagram implements Platform {
    @Override
    public void update(String msg) {
        System.out.println("instargram ์ˆ˜์‹  : "+ msg);
    }
}

public class Facebook implements Platform {
    @Override
    public void update(String msg) {
        System.out.println("FaceBook ์ˆ˜์‹  : " + msg);
    }
}
public class Main {
    public static void main(String[] args){
        Jyami jyami = new Jyami();
        Platform blog = new Blog();
        Platform facebook = new Facebook();
        Platform instagram = new Instagram();

        jyami.subscribe(blog);
        jyami.subscribe(facebook);
        jyami.subscribe(instagram);

        jyami.publishText();

        jyami.unSubscribe(facebook);

        jyami.publishText();
    }
}
์Ÿˆ๋ฏธ๊ฐ€ ๊ธ€์„ ๋ฐœํ–‰ํ–ˆ๋‹ค
Blog ์ˆ˜์‹  : ๊ธ€ ๋ฐœํ–‰ ์ „์†ก
FaceBook ์ˆ˜์‹  : ๊ธ€ ๋ฐœํ–‰ ์ „์†ก
instargram ์ˆ˜์‹  : ๊ธ€ ๋ฐœํ–‰ ์ „์†ก
์Ÿˆ๋ฏธ๊ฐ€ ๊ธ€์„ ๋ฐœํ–‰ํ–ˆ๋‹ค
Blog ์ˆ˜์‹  : ๊ธ€ ๋ฐœํ–‰ ์ „์†ก
instargram ์ˆ˜์‹  : ๊ธ€ ๋ฐœํ–‰ ์ „์†ก

์ด๋ ‡๊ฒŒ ๋ณด๋ฉด ๊ฐ„๋‹จํ•œ๋ฐ ํ”Œ๋กœ ์ธํ„ฐํŽ˜์ด์Šค์˜ ๊ฐœ๋…์„ ๋ณต์žกํ•˜๊ฒŒ ๋งŒ๋“  ๊ธฐ๋Šฅ์€ ์—ญ์••๋ ฅ๊ณผ ์••๋ ฅ์ด๋‹ค.(์Šค๋ ˆ๋“œ ํ™œ์šฉ์—์„œ ํ•„์ˆ˜์ )

์••๋ ฅ (push ๋ชจ๋ธ) : ๋งค์ดˆ๋งˆ๋‹ค ์ˆ˜์ฒœ๊ฐœ์˜ ๋ฉ”์„ธ์ง€๊ฐ€ onNext()๋กœ ์ œํ•œ์—†์ด ์ „๋‹ฌ๋œ๋‹ค๋ฉด? > ์–ด๋Š์ •๋„์˜ ๋ฉ”์„ธ์ง€๊ฐ€ ๋“ค์–ด์˜ฌ์ง€ ์ˆซ์ž๋ฅผ ์ œํ•œํ•˜๋Š” ์—ญ์••๋ ฅ๊ณผ ๊ฐ™์€ ๊ธฐ๋ฒ•์ด ํ•„์š”ํ•˜๋‹ค.

์—ญ์••๋ ฅ (pull ๋ชจ๋ธ) :์ž๋ฐ” 9 Flow API์—์„œ ๋ฐœํ–‰์ž๊ฐ€ ๋ฌดํ•œ์˜ ์†๋„๋กœ ์•„์ดํ…œ์„ ๋ฐฉ์ถœํ•˜๋Š” ๋Œ€์‹ , ์š”์ฒญํ–ˆ์„ ๋•Œ๋งŒ ๋‹ค์Œ ์•„์ดํ…œ์„ ๋ณด๋‚ด๋„๋ก ํ•˜๋Š” request() ๋ฉ”์„œ๋“œ๋ฅผ ์ œ๊ณต(Subscription ์ธํ„ฐํŽ˜์ด์Šค)

 

2. ์—ญ์••๋ ฅ

Publisher์™€ Subscriber ์‚ฌ์ด์— ์ฑ„๋„์ด ์—ฐ๊ฒฐ๋˜๋ฉด ์ฒซ ์ด๋ฒคํŠธ๋กœ Subscriber ์ธํ„ฐํŽ˜์ด์Šค์˜ onSubscribe(Subscription subscription) ๋ฉ”์„œ๋“œ๊ฐ€ ํ˜ธ์ถœ๋œ๋‹ค.

์ด Subscription ๊ฐ์ฒด๋Š” Subscriber์™€ Publisher๊ฐ€ ํ†ต์‹ ํ•  ์ˆ˜ ์žˆ๋Š” ๋ฉ”์„œ๋“œ๋ฅผ ํฌํ•จํ•œ๋‹ค 

 

 

์˜ต์ €๋ฒ„ ํŒจํ„ด๊ณผ pub-sub ํŒจํ„ด์˜ ์ฐจ์ด๊ฐ€ ๋ช…ํ™•ํ•ด ์ง€๋Š” ๋ถ€๋ถ„์€ ์ด subscription์— ๋Œ€ํ•œ ๋ถ€๋ถ„ ๋•Œ๋ฌธ

3.  ์‹ค์ œ ์—ญ์••๋ ฅ์˜ ๊ฐ„๋‹จํ•œ ํ˜•ํƒœ

ํ•œ๋ฒˆ์— ํ•œ๊ฐœ์˜ ์ด๋ฒคํŠธ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋„๋ก pub-sub ์—ฐ๊ฒฐ์„ ๊ตฌ์„ฑํ•˜๋ ค๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์ž‘์—…์ด ํ•„์š”ํ•˜๋‹ค.

  • Subscriber๊ฐ€ onSubscribe๋กœ ์ „๋‹ฌ๋œ Subscription ๊ฐ์ฒด๋ฅผ subscription ๊ฐ™์€ ํ•„๋“œ์— ๋กœ์ปฌ๋กœ ์ €์žฅํ•œ๋‹ค.
  • Subscriber๊ฐ€ ์ˆ˜๋งŽ์€ ์ด๋ฒคํŠธ๋ฅผ ๋ฐ›์ง€ ์•Š๊ฒŒ onSubscribe, onNext, onError์˜ ๋งˆ์ง€๋ง‰ ๋™์ž‘์— channel.request(1)์„ ์ถ”๊ฐ€ํ•ด ์˜ค์ง ํ•œ ์ด๋ฒคํŠธ๋งŒ ์ถ”๊ฐ€ํ•œ๋‹ค
  • ์š”์ฒญ์„ ๋ณด๋‚ธ ์ฒด๋„์—๋งŒ onNext, onError์ด๋ฒคํŠธ๋ฅผ ๋ณด๋‚ด๋„๋ก Publisher์˜ notifyAllSubscribers ์ฝ”๋“œ๋ฅผ ๋ฐ”๊พผ๋‹ค. (๋ณดํ†ต ์—ฌ๋Ÿฌ Subscriber๊ฐ€ ์ž์‹ ๋งŒ์˜ ์†๋„๋ฅผ ์œ ์ง€ํ•  ์ˆ˜ ์žˆ๋„๋ก Publisher๋Š” ์ƒˆ Subscription์„ ๋งŒ๋“ค์–ด ๊ฐ Subscriber์™€ ์—ฐ๊ฒฐํ•œ๋‹ค)

๋‹น๊น€ ๊ธฐ๋ฐ˜ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์—ญ์••๋ ฅ : Subscriber๊ฐ€ Publisher๋กœ๋ถ€ํ„ฐ ์š”์ฒญ์„ ๋‹น๊ธด๋‹ค(pull) = ๋ฆฌ์•กํ‹ฐ๋ธŒ ๋‹น๊น€ ๊ธฐ๋ฐ˜(reactive pull based)๋ผ๊ณ  ๋ถˆ๋ฆฐ๋‹ค.

 

6. ๋ฆฌ์•กํ‹ฐ๋ธŒ ์‹œ์Šคํ…œ vs ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ

๋ฆฌ์•กํ‹ฐ๋ธŒ ์‹œ์Šคํ…œ(reactive system) : ๋Ÿฐํƒ€์ž„ ํ™˜๊ฒฝ์ด ๋ณ€ํ™”์— ๋Œ€์‘ํ•˜๋„๋ก ์ „์ฒด ์•„ํ‚คํ…์ฒ˜๊ฐ€ ์„ค๊ณ„๋œ ํ”„๋กœ๊ทธ๋žจ
www.reactivemanifesto.org/ : ๊ณต์‹ ์†์„ฑ ๋ฌธ์„œ


๋ฆฌ์•กํ‹ฐ๋ธŒ ์‹œ์Šคํ…œ์ด ๊ฐ€์ ธ์•ผํ•  ์†์„ฑ : ๋ฐ˜์‘์„ฑ(responsive), ํšŒ๋ณต์„ฑ(resilient), ํƒ„๋ ฅ์„ฑ(elastic)
- ๋ฐ˜์‘์„ฑ : ๋ฆฌ์•กํ‹ฐ๋ธŒ ์‹œ์Šคํ…œ์ด ํฐ ์ž‘์—…์„ ํ•˜๋Š๋ผ ๊ฐ„๋‹จํ•œ ์งˆ์˜์˜ ์‘๋‹ต์„ ์ง€์—ฐํ•˜์ง€ ์•Š๊ณ  ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ž…๋ ฅ์— ๋ฐ˜์‘ํ•œ๋‹ค.
- ํšŒ๋ณต์„ฑ : ํ•œ ์ปดํฌ๋„ŒํŠธ์˜ ์‹คํŒจ๋กœ ์ „์ฒด ์‹œ์Šคํ…œ์ด ์‹คํŒจํ•˜์ง€ ์•Š๋Š”๋‹ค.
- ํƒ„๋ ฅ์„ฑ : ์ž์‹ ์˜ ์ž‘์—… ๋ถ€ํ•˜์— ๋งž๊ฒŒ ์ ์šฉํ•˜์—ฌ ์ž‘์—…์„ ํšจ์œจ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•œ๋‹ค.

Java.util.concureent.Flow ๊ด€๋ จ๋œ ์ž๋ฐ” ์ธํ„ฐํŽ˜์ด์Šค์—์„œ ์ œ๊ณตํ•˜๋Š” ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ํ˜•์‹์„ ์ด์šฉํ•˜์—ฌ ์ด ์†์„ฑ๋“ค์„ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋‹ค.

์ž๋ฐ” ์ธํ„ฐํŽ˜์ด์Šค ์„ค๊ณ„๋Š” Reactive Manifesto์˜ ๋งˆ์ง€๋ง‰ ์†์„ฑ์ธ ๋ฉ”์‹œ์ง€ ์ฃผ๋„(message-driven) ์†์„ฑ์„ ๋ฐ˜์˜ํ•œ๋‹ค. > ๋ฐ•์Šค์™€ ์ฑ„๋„๋ชจ๋ธ

 

  • ํ˜น์‹œ ์ž๋ฐ” ์•ˆ๋ฐฐ์šฐ๊ณ  ๋ฌด์ž‘์ • ์Šคํ”„๋ง ๋ฐฐ์šฐ๋ฉด ๋งŽ์ด ์–ด๋ ค์šธ๊นŒ์š”?? ใ… 
    ์Šคํ”„๋ง ๋ถ€ํŠธ๊ฐ€ ์ž๋ฐ” ๊ธฐ๋ฐ˜์ด๋ผ๊ณ  ํ•ด์„œ ๊ณ ๋ฏผ์ด๋˜์š”ใ… ใ…œ
    ์†ํŠธ์—์„œ ์ฒ˜์Œ ์Šคํ”„๋ง์„ ๋ฐฐ์šฐ์…จ๋‹ค๊ณ  ํ•˜์…จ๋Š”๋ฐ ๊ทธ ๋•Œ ์ž๋ฐ”๋ฅผ ์•Œ๊ณ  ๊ณ„์…จ๋Š”์ง€ ๊ถ๊ธˆํ•ด์š”!!
    ๋‹ต๊ธ€ ๊ผญ ์ฃผ์‹œ๋ฉด ๊ฐ์‚ฌํ•˜๊ฒ ์Šต๋‹ˆ๋‹น๐Ÿ˜€๐Ÿ˜€