๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Dev Book Review/Java8 in Action

[์ž๋ฐ”8์ธ์•ก์…˜] Chap17. ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ

728x90

์†Œ์Šค์ฝ”๋“œ

https://github.com/mjung1798/Jyami-Java-Lab/tree/master/java8-in-action

mjung1798/Jyami-Java-Lab

๐Ÿ’ป Jyami์˜ Spring boot ๋ฐ Java ์‹คํ—˜์†Œ ๐Ÿ’ป. Contribute to mjung1798/Jyami-Java-Lab development by creating an account on GitHub.

github.com

๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์—์„œ๋Š” ๋‹ค์–‘ํ•œ ์‹œ์Šคํ…œ๊ณผ ์†Œ์Šค์—์„œ ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ ํ•ญ๋ชฉ ์ŠคํŠธ๋ฆผ์„ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ณ  ํ•ฉ์ณ์„œ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•œ๋‹ค
- ๋น…๋ฐ์ดํ„ฐ : ๋น…๋ฐ์ดํ„ฐ๋Š” ํŽ˜ํƒ€ ๋ฐ”์ดํŠธ ๋‹จ์œ„๋กœ ๊ตฌ์„ฑ๋˜๋ฉฐ ๋งค์ผ ์ฆ๊ฐ€ํ•œ๋‹ค
- ๋‹ค์–‘ํ•œ ํ™˜๊ฒฝ : ๋ชจ๋ฐ”์ผ ๋””๋ฐ”์ด์Šค ๋ถ€ํ„ฐ ์ˆ˜์ฒœ๊ฐœ์˜ ๋ฉ€ํ‹ฐ ์ฝ”์–ด ํ”„๋กœ์„ธ์„œ๋กœ ์‹คํ–‰๋˜๋Š” ํด๋ผ์šฐ๋“œ ๊ธฐ๋ฐ˜ ํด๋Ÿฌ์Šคํ„ฐ๊นŒ์ง€ ๋‹ค์–‘ํ•œ ํ™˜๊ฒฝ์— ๋ฐฐํฌ๋œ๋‹ค.
- ์‚ฌ์šฉํŒจํ„ด : ์‚ฌ์šฉ์ž๋Š” 1๋…„ ๋‚ด๋‚ด ํ•ญ์ƒ ์„œ๋น„์Šค๋ฅผ ์ด์šฉํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ ๋ฐ€๋ฆฌ์ดˆ ๋‹จ์œ„์˜ ์‘๋‹ต ์‹œ๊ฐ„์„ ๊ธฐ๋Œ€ํ•œ๋‹ค.

๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ํŒจ๋Ÿฌ๋‹ค์ž„์— ๋งž๊ฒŒ ์„ค๊ณ„๋œ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์€ ๋†’์€ ์‘๋‹ต์„ฑ์„ ์ œ๊ณตํ•œ๋‹ค.
์ „์ฒด์˜ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์‹œ์Šคํ…œ์„ ๊ตฌ์„ฑํ•˜๋Š” ์—ฌ๋Ÿฌ ์ปดํฌ๋„ŒํŠธ๋ฅผ ์กฐ์ ˆ์—์„œ๋„ ๊ฐ€์šฉ์„ฑ์„ ์ œ๊ณตํ•œ๋‹ค.

1. ๋ฆฌ์•กํ‹ฐ๋ธŒ ๋งค๋‹ˆํŽ˜์Šคํ† 

www.reactivemanifesto.org/

The Reactive Manifesto

Responsive: The system responds in a timely manner if at all possible. Responsiveness is the cornerstone of usability and utility, but more than that, responsiveness means that problems may be detected quickly and dealt with effectively. Responsive systems

www.reactivemanifesto.org

๋ฆฌ์•กํ‹ฐ๋ธŒ ๋งค๋‹ˆํŽ˜์Šคํ† ์—์„œ๋Š” ๋ฆฌ์•กํ‹ฐ๋ธŒ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜๊ณผ ์‹œ์Šคํ…œ ๊ฐœ๋ฐœ์˜ ํ•ต์‹ฌ ์›์น™์„ ๊ณต์‹์ ์œผ๋กœ ์ •์˜ํ•œ๋‹ค. 

 

 

  • ๋ฐ˜์‘์„ฑ(responsive) : ์‹œ์Šคํ…œ์€ ๊ฐ€๋Šฅํ•œํ•œ ์ ์ •์‹œ๊ฐ„ ์•ˆ์— ๋ฐ˜์‘ํ•œ๋‹ค. ๋ฐ˜์‘์„ฑ์ด ๋’ท๋ฐ›์นจ ๋˜์–ด์•ผ ์‚ฌ์šฉ์„ฑ์„ ๋†’์ผ ์ˆ˜ ์žˆ๋‹ค.
  • ํƒ„๋ ฅ์„ฑ(elastic) : ๋‹ค์–‘ํ•œ ์ž‘์—… ๋ถ€ํ•˜์—๋„ ์‹œ์Šคํ…œ ๋ฐ˜์‘์„ฑ์ด ์œ ์ง€๋œ๋‹ค. ์ž…๋ ฅ์†๋„๊ฐ€ ๋ฐ”๋€๋‹ค๋ฉด ์ด๋“ค ์ž…๋ ฅ ๊ด€๋ จ ์„œ๋น„์Šค์— ํ• ๋‹น๋œ ์ž์›์„ ๋Š˜๋ฆฌ๊ฑฐ๋‚˜ ์ค„์ž„์œผ๋กœ ๋ฐ˜์‘ํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ๋ฉ”์‹œ์ง€ ์ฃผ๋„(message driven) : ์ปดํฌ๋„ŒํŠธ ๊ฐ„์˜ ์•ฝ์‚ฐ ๊ฒฐํ•ฉ, ๊ณ ๋ฆฝ, ์œ„์น˜, ํˆฌ๋ช…์„ฑ์ด ์œ ์ง€๋˜๋„๋ก ์‹œ์Šคํ…œ์€ ๋น„๋™๊ธฐ ๋ฉ”์‹œ์ง€ ์ „๋‹ฌ์— ์˜์กดํ•œ๋‹ค.
  • ํšŒ๋ณต์„ฑ(resilient) : ์žฅ์•  ์‹œ์—๋„ ์‹œ์Šคํ…œ์˜ ๋ฐ˜์‘์„ฑ์€ ์œ ์ง€๋œ๋‹ค.

1. ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์ˆ˜์ค€์˜ ๋ฆฌ์•กํ‹ฐ๋ธŒ

์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์ˆ˜์ค€์—์„œ์˜ ์ฃผ์š”๊ธฐ๋Šฅ์€ ๋น„๋™๊ธฐ๋กœ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ์ ์ด๋‹ค.
๋น„๋™๊ธฐ๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒƒ์ด ์ตœ์‹  ๋ฉ€ํ‹ฐ์ฝ”์–ด CPU ์‚ฌ์šฉ๋ฅ ์„ ๊ทน๋Œ€ํ™”(๋‚ด๋ถ€์ ์œผ๋กœ ๊ฒฝ์Ÿํ•˜๋Š” CPU์˜ ์Šค๋ ˆ๋“œ ์‚ฌ์šฉ๋ฅ ) ํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค

์ด๋ฅผ ์œ„ํ•ด ์Šค๋ ˆ๋“œ๋ฅผ ํ“จ์ฒ˜, ์•กํ„ฐ, ์ผ๋ จ์˜ ์ฝœ๋ฐฑ์„ ๋ฐœ์ƒ์‹œํ‚ค๋Š” ์ด๋ฒคํŠธ ๋ฃจํ”„ ๋“ฑ๊ณผ ๊ณต์œ ํ•˜๊ณ  ์ฒ˜๋ฆฌํ•  ์ด๋ฒคํŠธ๋ฅผ ๋ณ€ํ™˜ํ•˜๊ณ  ๊ด€๋ฆฌํ•œ๋‹ค.

๊ฐœ๋ฐœ์ž ์ž…์žฅ์—์„œ๋Š” ๋™๊ธฐ ๋ธ”๋ก, ๊ฒฝ์Ÿ ์กฐ๊ฑด, ๋ฐ๋“œ๋ฝ ๊ฐ™์€ ์ € ์ˆ˜์ค€์˜ ๋ฉ€ํ‹ฐ์Šค๋ ˆ๋“œ ๋ฌธ์ œ๋ฅผ ์ง์ ‘ ์ฒ˜๋ฆฌํ•  ํ•„์š”๊ฐ€ ์—†์–ด์ ธ ๋น„์ฆˆ๋‹ˆ์Šค ์š”๊ตฌ์‚ฌํ•ญ ๊ตฌํ˜„์— ๋” ์ง‘์ค‘์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

RxJava, Akka๊ฐ™์€ ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋ ˆ์ž„์›Œํฌ๋Š” ๋ณ„๋„๋กœ ์ง€์ •๋œ ์Šค๋ ˆ๋“œ ํ’€์—์„œ ๋ธ”๋ก ๋™์ž‘์„ ์‹คํ–‰์‹œํ‚จ๋‹ค. > ๋ฉ”์ธ ํ’€์˜ ๋ชจ๋“  ์Šค๋ ˆ๋“œ๋Š” ๋ฐฉํ•ด ๋ฐ›์ง€ ์•Š๊ณ  ์‹คํ–‰๋œ๋‹ค. (CPU ๊ด€๋ จ ์ž‘์—…๊ณผ IO๊ด€๋ จ ์ž‘์—…์˜ ๋ถ„๋ฆฌ)

2. ์‹œ์Šคํ…œ ์ˆ˜์ค€์˜ ๋ฆฌ์•กํ‹ฐ๋ธŒ

๋ฆฌ์•กํ‹ฐ๋ธŒ ์‹œ์Šคํ…œ  = ์†Œํ”„ํŠธ์›จ์–ด ์•„ํ‚คํ…์ฒ˜ : ์—ฌ๋Ÿฌ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ํ•œ ๊ฐœ์˜ ์ผ๊ด€์ ์ธ, ํšŒ๋ณตํ•  ์ˆ˜ ์žˆ๋Š” ํ”Œ๋žซํผ์„ ๊ตฌ์„ฑํ•  ์ˆ˜ ์ด์Ž… ํ•ด์ค„ ๋ฟ ์•„๋‹ˆ๋ผ ์ด๋“ค ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์ค‘ ํ•˜๋‚˜๊ฐ€ ์‹คํŒจํ•ด๋„ ์ „์ฒด ์‹œ์Šคํ…œ์ด ๊ณ„์† ์šด์˜๋˜๊ฒŒ ํ•ด์ค€๋‹ค.

- ๋ฆฌ์•กํ‹ฐ๋ธŒ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ : ์ด๋ฒคํŠธ ์ฃผ๋„
- ๋ฆฌ์•กํ‹ฐ๋ธŒ ์‹œ์Šคํ…œ : ๋ฉ”์‹œ์ง€ ์ฃผ๋„

์ปดํฌ๋„ŒํŠธ์—์„œ ๋ฐœ์ƒํ•œ ์žฅ์• ๋ฅผ ๊ณ ๋ฆฝ์‹œํ‚ด์œผ๋กœ ๋ฌธ์ œ๊ฐ€ ์ฃผ๋ณ€์˜ ๋‹ค๋ฅธ ์ปดํฌ๋„ŒํŠธ๋กœ ์ „ํŒŒ๋˜๋ฉด์„œ ์ „์ฒด ์‹œ์Šคํ…œ ์žฅ์• ๋กœ ์ด์–ด์ง€๋Š” ๊ฒƒ์„ ๋ง‰์Œ์œผ๋กœ ํšŒ๋ณต์„ฑ์„ ์ œ๊ณตํ•œ๋‹ค(== ๊ฒฐํ•จ ํ—ˆ์šฉ ๋Šฅ๋ ฅ) : ๊ณ ๋ฆฝ๊ณผ ๋น„๊ฒฐํ•ฉ

๋ชจ๋“  ์ปดํฌ๋„ŒํŠธ๊ฐ€ ์ˆ˜์‹ ์ž์˜ ์œ„์น˜์— ์ƒ๊ด€์—†์ด ๋ชจ๋“  ์„œ๋น„์Šค์™€ ํ†ต์‹  ( ํƒ„๋ ฅ์„ฑ = ์œ„์น˜ ํˆฌ๋ช…์„ฑ)

 

2. ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ๊ณผ ํ”Œ๋กœ API

๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ = ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์„ ์‚ฌ์šฉํ•˜๋Š” ํ”„๋กœ๊ทธ๋ž˜๋ฐ
๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ : ์ž ์žฌ์ ์œผ๋กœ ๋ฌดํ•œ์˜ ๋น„๋™๊ธฐ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆœ์„œ๋Œ€๋กœ ์ฒ˜๋ฆฌํ•˜๊ณ , ๋ธ”๋กํ•˜์ง€ ์•Š๋Š” ์—ญ์••๋ ฅ์„ ์ „์ œํ•ด ์ฒ˜๋ฆฌํ•˜๋Š” ํ‘œ์ค€ ๊ธฐ์ˆ 

์—ญ์••๋ ฅ : pub-sub ํ”„๋กœํ† ์ฝœ์—์„œ publisher๊ฐ€ ๋ฐœํ–‰ํ•˜๋Š” ์†๋„ > subscriber๊ฐ€ ์†Œ๋น„ํ•˜๋Š” ์†๋„ ์ผ ๊ฒฝ์šฐ์— ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•˜์ง€ ์•Š๋„๋ก ๋ณด์žฅํ•˜๋Š” ์žฅ์น˜ + ๊ธฐ์กด ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ์— ์–ผ๋งˆ๋‚˜ ์‹œ๊ฐ„์ด ๊ฑธ๋ฆฌ๋Š”์ง€ ์—…์ŠคํŠธ๋ฆผ ๋ฐœํ–‰์ž์—๊ฒŒ ์•Œ๋ฆด ์ˆ˜ ์žˆ์–ด์•ผํ•จ

๋ฐ์ดํ„ฐ ์ˆ˜์‹ ์ž๊ฐ€ ์Šค๋ ˆ๋“œ๋ฅผ ๋ธ”๋กํ•˜์ง€ ์•Š๊ณ ๋„ ๋ฐ์ดํ„ฐ ์ˆ˜์‹ ์ž๊ฐ€ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์—†์„ ๋งŒํผ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›๋Š” ์ผ์„ ๋ฐฉ์ง€ํ•œ๋‹ค.

๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์ด ๊ตฌํ˜„์ด ์ œ๊ณตํ•ด์•ผ ํ•˜๋Š” ์ตœ์†Œ ๊ธฐ๋Šฅ ์ง‘ํ•ฉ (๋„ค๊ฐœ ์ธํ„ฐํŽ˜์ด์Šค)

  • java9์˜ ์ƒˆ๋กœ์šด java.util.concurrent.Flow ํด๋ž˜์Šค
  • Akka ์ŠคํŠธ๋ฆผ, ๋ฆฌ์•กํ„ฐ(reactor), RxJava, Vert.x ๋“ฑ ๋งŽ์€ ์„œ๋“œ ํŒŒํ‹ฐ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ

 

2-1. Flow ํด๋ž˜์Šค

Publisher๊ฐ€ ํ•ญ๋ชฉ์„ ๋ฐœํ–‰ํ•˜๋ฉด Subscriber๊ฐ€ ํ•œ ๊ฐœ์”ฉ ๋˜๋Š” ํ•œ ๋ฒˆ์— ์—ฌ๋Ÿฌ ํ•ญ๋ชฉ์„ ์†Œ๋น„ํ•˜๋Š”๋ฐ Subscription์ด ์ด ๊ณผ์ •์„ ๊ด€๋ฆฌํ•˜๋„๋ก ์—ฌ๋Ÿฌ ์ •์  ๋ฉ”์„œ๋“œ๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

public final class Flow {
    static final int DEFAULT_BUFFER_SIZE = 256;

    private Flow() {
    }

    public static int defaultBufferSize() {
        return 256;
    }

    public interface Processor<T, R> extends Flow.Subscriber<T>, Flow.Publisher<R> {
    }

    public interface Subscription {
        void request(long var1);

        void cancel();
    }

    public interface Subscriber<T> {
        void onSubscribe(Flow.Subscription var1);

        void onNext(T var1);

        void onError(Throwable var1);

        void onComplete();
    }

    @FunctionalInterface
    public interface Publisher<T> {
        void subscribe(Flow.Subscriber<? super T> var1);
    }
}
  • Publisher : ๋งŽ์€ ์ด๋ฒคํŠธ ์ œ๊ณต์ด ๊ฐ€๋Šฅํ•˜์ง€๋งŒ Subscriber์˜ ์š”๊ตฌ์‚ฌํ•˜์— ๋”ฐ๋ผ ์—ญ์••๋ ฅ ๊ธฐ๋ฒ•์— ์˜ํ•ด ์ด๋ฒคํŠธ ์ œ๊ณต ์†๋„๊ฐ€ ์ œํ•œ
  • Subscriber : Publisher๊ฐ€ ๋ฐœํ–‰ํ•œ ์ด๋ฒคํŠธ์˜ ๋ฆฌ์Šค๋„ˆ๋กœ ์ž์‹ ์„ ๋“ฑ๋กํ•  ์ˆ˜ ์žˆ์Œ.
onSubscribe onNext* (onError| onComplete)?

- onSubscribe ๋ฉ”์„œ๋“œ๋Š” ํ•ญ์ƒ ์ฒ˜์Œ ํ˜ธ์ถœ
- onNext ๋ฉ”์„œ๋“œ๋Š” ์—ฌ๋Ÿฌ๋ฒˆ ํ˜ธ์ถœ ๊ฐ€๋Šฅ
- ์ด๋ฒคํŠธ ์ŠคํŠธ๋ฆผ์ด ์˜์›ํžˆ ์ง€์†๋˜๊ฑฐ๋‚˜ onComplete, onError ํ˜ธ์ถœ ๊ฐ€๋Šฅ

 

 

๊ทœ์น™

  • Publisher๋Š” Subscription์˜ request ๋ฉ”์„œ๋“œ์— ์ •์˜๋œ ๊ฐœ์ˆ˜ ์ดํ•˜์˜ ์š”์†Œ๋งŒ Subscriber์— ์ „๋‹ฌํ•œ๋‹ค.
  • Publisher๋Š” ๋™์ž‘์ด ์„ฑ๊ณต์ ์œผ๋กœ ๋๋‚˜๋ฉด onComplete, ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด onError๋ฅผ ํ˜ธ์ถœํ•ด Subscription์„ ์ข…๋ฃŒํ•œ๋‹ค.
  • Subscriber๋Š” ์š”์†Œ๋ฅผ ๋ฐ›์•„ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Œ์„ Publisher์—๊ฒŒ ์•Œ๋ ค์•ผ ํ•œ๋‹ค. (์—ญ์••๋ ฅ ํ–‰์‚ฌ)
  • onComplete๋‚˜ onError๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ์ƒํ™ฉ์—์„œ Subscriber๋Š” Publisher๋‚˜ Subscription์˜ ์–ด๋–ค ๋ฉ”์„œ๋“œ๋„ ํ˜ธ์ถœ ํ• ์ˆ˜๋„์—†๊ณ  Subscription์ด ์ทจ์†Œ๋˜์—ˆ๋‹ค ๊ฐ€์ •ํ•œ๋‹ค.
  • Subscriber๋Š” Subscription.reqeust() ํ˜ธ์ถœ ์—†์ด๋„ ์–ธ์ œ๋‚˜ ์ข…๋ฃŒ ์‹œ๊ทธ๋„์„ ๋ฐ›์„ ์ค€๋น„๊ฐ€ ๋˜์–ด์žˆ์–ด์•ผํ•œ๋‹ค.
  • Subscriber๋Š” Subscription.canel() ์ด ํ˜ธ์ถœ๋œ ์ดํ›„์—๋„ ํ•œ ๊ฐœ ์ด์ƒ์˜ onNext๋ฅผ ๋ฐ›์„ ์ค€๋น„๊ฐ€ ๋˜์–ด์žˆ์–ด์•ผ ํ•œ๋‹ค.

์ด ๋ช…์„ธ์— ๋งž์ถฐ ์ง์ ‘ ๊ตฌํ˜„ํ•œ ๊ธฐ๋Šฅ์€ Reative Streams TCK๋ผ๋Š” ํˆด๋กœ ๊ฒ€์ฆํ•  ์ˆ˜ ์žˆ๋‹ค. ๊ตฌํ˜„์ด ๊นŒ๋‹ค๋กœ์›€..

Processor ์ธํ„ฐํŽ˜์ด์Šค ; Publisher, Subscriber ์ƒ์†
๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์—์„œ ์ฒ˜๋ฆฌํ•˜๋Š” ์ด๋ฒคํŠธ์˜ ๋ณ€ํ™˜ ๊ด€๊ณ„๋ฅผ ๋‚˜ํƒ€๋‚ธ๋‹ค.(?)

Flow ํด๋ž˜์Šค์˜ ์ธํ„ฐํŽ˜์ด์Šค๋Š” ์ง์ ‘ ๊ตฌํ˜„ํ•˜๋„๋ก ์˜๋„๋œ๊ฒŒ ์•„๋‹ˆ๋‹ค. (์‹ค์ œ๋กœ ์ž๋ฐ”9์—์„œ ๊ตฌํ˜„ ํด๋ž˜์Šค๋ฅผ ์ œ๊ณตํ•˜์ง€์•Š๋Š”๋‹ค.)
๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๊ฐ€ ์ค€์ˆ˜ํ•ด์•ผํ•  ๊ทœ์น™๊ณผ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ์„œ๋กœ ํ˜‘๋™ ์†Œํ†ตํ•  ์ˆ˜ ์žˆ๋Š” ๊ณต์šฉ์–ด ์ œ์‹œ๊ฐ€ ๋ชฉํ‘œ์˜€๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.
Akka, RxJava ๋“ฑ์˜ ๋ฆฌ์•กํ‹ฐ๋ธŒ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์—์„œ ์ด๋ฏธ ๊ตฌํ˜„์ฒด๋ถ€ํ„ฐ ๋งŒ๋“ค์–ด๋‘” ์ƒํƒœ์˜€๊ธฐ ๋•Œ๋ฌธ์—.

2-2. ๊ฐ„๋‹จํ•œ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜

@Getter
public class TempInfo {
    public static final Random random = new Random();

    private final String town;
    private final int temp;

    public TempInfo(String town, int temp) {
        this.town = town;
        this.temp = temp;
    }

    public static TempInfo fetch(String town){
        if(random.nextInt(10) == 0)
            throw new RuntimeException("Error!");
        return new TempInfo(town, random.nextInt(100));
    }

    @Override
    public String toString() {
        return "TempInfo{" +
                "town='" + town + '\'' +
                ", temp=" + temp +
                '}';
    }
}
public class TempSubscriber implements Flow.Subscriber<TempInfo> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); // ๊ตฌ๋… ์ €์žฅํ•˜๊ณ  ์ฒซ๋ฒˆ์งธ ์š”์ฒญ ์ „๋‹ฌ (1๊ฐœ์˜ ์š”์ฒญ์„ ๋ฐ›์„ ์ค€๋น„๊ฐ€ ๋˜์–ด์žˆ๋‹ค.)
    }

    @Override
    public void onNext(TempInfo tempInfo) {
        System.out.println(tempInfo); // ์ˆ˜์‹ ํ•œ ์˜จ๋„ ์ถœ๋ ฅํ•˜๊ณ  ๋‹ค์Œ ์ •๋ณด ์š”์ฒญ
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println(throwable.getMessage()); // ์—๋Ÿฌ ๋ฉ”์„ธ์ง€ ์ถœ๋ ฅ
    }

    @Override
    public void onComplete() {
        System.out.println("Done!");
    }
}
public class TempSubscription implements Flow.Subscription {

    private final Flow.Subscriber<? super TempInfo> subscriber;
    private final String town;

    public TempSubscription(Flow.Subscriber<? super TempInfo> subscriber, String town) {
        this.subscriber = subscriber;
        this.town = town;
    }

    @Override
    public void request(long n) {
        for (long i = 0L; i < n; i++) {
            try {
                subscriber.onNext(TempInfo.fetch(town)); // ํ˜„์žฌ ์˜จ๋„๋ฅผ Subscriber๋กœ ์ „๋‹ฌ
            }catch (Exception e){
                subscriber.onError(e); // ์˜จ๋„ ๊ฐ€์ ธ์˜ค๊ธฐ๋ฅผ ์‹คํŒจํ•˜๋ฉด Subscriber๋กœ ์—๋Ÿฌ ์ „๋‹ฌ
                break;
            }
        }
    }

    @Override
    public void cancel() {
        subscriber.onComplete(); // ๊ตฌ๋…์ด ์ทจ์†Œ๋˜๋ฉด ์™„๋ฃŒ ์‹ ํ˜ธ๋ฅผ Subscriber์— ์ „๋‹ฌ
    }
}
public class Main {
    public static void main(String[] args){
        getTemperatures("Seoul").subscribe(new TempSubscriber()); // ์„œ์šธ์— Publisher๋ฅผ ๋งŒ๋“ค๊ณ  TempSubscriber๋ฅผ ๊ตฌ๋…
    }

    private static Flow.Publisher<TempInfo> getTemperatures(String town){ // Publisher.subscribe() ๋ฉ”์„œ๋“œ ๊ตฌํ˜„
        return subscriber -> subscriber.onSubscribe(new TempSubscription(subscriber, town));
        // ๊ตฌ๋…ํ•œ Subscriber์—๊ฒŒ TempSubscription์„ ์ „์†กํ•˜๋Š” Publisher๋ฅผ ๋ฐ˜ํ™˜
    }
}
  1. Publisher๋Š” functional ์ธํ„ฐํŽ˜์ด์Šค์ด๋ฉฐ, ์ธ์ž๋กœ subscriber๋ฅผ ๋ฐ›๋Š”๋‹ค.
  2. Publisher๋Š” ์ธ์ž๋กœ ๋ฐ›์€ subscriber์˜ onSubscribe ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•œ๋‹ค.
  3. onSubscribe ๋ฉ”์„œ๋“œ ํ˜ธ์ถœ๋กœ subscriber๋Š” subscription ๊ฐ์ฒด๋ฅผ ์ธ์Šคํ„ด์Šค ๋ณ€์ˆ˜๋กœ ๊ฐ–๊ณ ์žˆ๊ฒŒ ๋œ๋‹ค.
  4. ์ด subscription ๊ฐ์ฒด๋Š” subscriber๋ฅผ ์ธ์Šคํ„ด์Šค ๋ณ€์ˆ˜๋กœ ๊ฐ–๊ณ ์žˆ๊ฒŒ ๋œ๋‹ค.
  5. onSubscribe ๋ฉ”์„œ๋“œ ํ˜ธ์ถœ๊ณผ ํ•จ๊ป˜, subscription.request(1) ์ด ๋ถˆ๋ฆฌ๊ฒŒ ๋˜๋ฉด์„œ, subscriber๊ฐ€ 1๊ฐœ์˜ ์š”์ฒญ์„ ๋ฐ›์„ ์ค€๋น„๊ฐ€ ๋˜์—ˆ์Œ์„ subscription์ด ์•Œ๊ฒŒ๋œ๋‹ค.
  6. subscription.request(1)๋กœ subscription ๋‚ด์˜ ์ธ์Šคํ„ด์Šค ๋ณ€์ˆ˜์ธ subscriber์˜ onNext ํ˜น์€ onError ๊ฐ€ ๋ถˆ๋ฆฌ๊ฒŒ๋œ๋‹ค.
  7. onNext์˜ ๊ฒฝ์šฐ์— ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ subscription.request(1) ์ด ๋ถˆ๋ฆฌ๋ฉด์„œ subscriber๊ฐ€ 1๊ฐœ์˜ ์š”์ฒญ์„ ๋ฐ›์„ ์ค€๋น„๊ฐ€ ๋˜์—ˆ์Œ์„ subscription์ด ์•Œ๊ฒŒ๋œ๋‹ค.
  8. onError์˜ ๊ฒฝ์šฐ์—๋Š” ๋”์ด์ƒ subscription์— request ๊ฐ€ ์ผ์–ด๋‚˜์ง€ ์•Š์•„ ์ข…๋ฃŒ๋œ๋‹ค.

์ฆ‰. Publisher์˜ subscribe๋กœ ์ธํ•ด ๊ณ„์† ์ด๋ฒคํŠธ๊ฐ€ ๋ฐฉ์ถœ๋˜๊ฒŒ ๋˜๊ณ , ์ด ์ด๋ฒคํŠธ๋ฅผ Subscription์˜ request ์š”์ฒญ์ด ์žˆ์–ด์•ผ์ง€๋งŒ Subscriber ๊ฐ€ ์ด๋ฒคํŠธ๋ฅผ ์†Œ๋น„ํ•˜๊ฒŒ๋œ๋‹ค.

์ด๋•Œ ์—๋Ÿฌ๋ฅผ ๋ฐœ์ƒ์‹œํ‚ค๋Š” ์ฝ”๋“œ๋ฅผ ์—†์• ๊ฒŒ๋˜๋ฉด ์žฌ๊ท€ํ˜ธ์ถœ๋กœ์ธํ•œ stackoverflow๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค. : Executor๋ฅผ TempSubscription์œผ๋กœ ์ถ”๊ฐ€ํ•œ ๋‹ค์Œ ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ์—์„œ TempSubscriber๋กœ ์ „๋‹ฌํ•œ๋‹ค. 

private static final ExecutorService executor = Executors.newSingleThreadExecutor();

    @Override
    public void request(long n) {
        executor.submit(() -> {
            for (long i = 0L; i < n; i++) {
                try {
                    subscriber.onNext(TempInfo.fetch(town)); // ํ˜„์žฌ ์˜จ๋„๋ฅผ Subscriber๋กœ ์ „๋‹ฌ
                }catch (Exception e){
                    subscriber.onError(e); // ์˜จ๋„ ๊ฐ€์ ธ์˜ค๊ธฐ๋ฅผ ์‹คํŒจํ•˜๋ฉด Subscriber๋กœ ์—๋Ÿฌ ์ „๋‹ฌ
                    break;
                }
            }
        });
    }

3. Processor๋กœ ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜ํ•˜๊ธฐ

Processor๋Š” Subscriber์ด๋ฉฐ Publisher์ด๋‹ค.

public class TempProcessor implements Flow.Processor<TempInfo, TempInfo> {

    private Flow.Subscriber<? super TempInfo> subscriber;

    @Override
    public void subscribe(Flow.Subscriber subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onNext(TempInfo tempInfo) {
        int fTemp = (((tempInfo.getTemp() - 32) * 5) / 9); // ์„ญ์”จ๋กœ ๋ณ€ํ™˜ํ•œ ๋‹ค์Œ TempInfo ๋‹ค์‹œ ์ „์†ก
        subscriber.onNext(new TempInfo(tempInfo.getTown(), fTemp));
    }

    // ๋‹ค๋ฅธ ๋ชจ๋“  ์‹ ํ˜ธ๋Š” ์—…์ŠคํŠธ๋ฆผ ๊ตฌ๋…์ž์—๊ฒŒ ์ „๋‹ฌ.
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscriber.onSubscribe(subscription);
    }

    @Override
    public void onError(Throwable throwable) {
        subscriber.onError(throwable);
    }

    @Override
    public void onComplete() {
        subscriber.onComplete();
    }
}
    @Test
    @DisplayName("processor๋ฅผ ์ด์šฉํ•œ C=>F ์˜จ๋„ ๋ณ€ํ™˜")
    void name() {
        getCelsiusTemperatures("Seoul").subscribe(new TempSubscriber());
    }

    public static Flow.Publisher<TempInfo> getCelsiusTemperatures(String town){
        return subscriber -> {
            TempProcessor processor = new TempProcessor();
            processor.subscribe(subscriber);
            processor.onSubscribe(new TempSubscription(processor,town));
        };
    }

Subscriber ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ๋‹ค๋ฅธ ๋ชจ๋“  ๋ฉ”์„œ๋“œ๋Š” ๋‹จ์ˆœํžˆ ์ˆ˜์‹ ํ•œ ๋ชจ๋“  ์‹ ํ˜ธ๋ฅผ ์—…์ŠคํŠธ๋ฆผ Subscriber๋กœ ์ „๋‹ฌํ•˜๋ฉฐ Publisher์˜ subscribe ๋ฉ”์„œ๋“œ๋Š” ์—…์ŠคํŠธ๋ฆผ Subscriber๋ฅผ Processor๋กœ ๋“ฑ๋กํ•˜๋Š” ๋™์ž‘์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

4. ์ž๋ฐ”์—์„œ ํ”Œ๋กœ API์˜ ๊ตฌํ˜„์„ ์ œ๊ณตํ•˜์ง€ ์•Š๋Š” ์ด์œ 

API๋ฅผ ๋งŒ๋“ค ๋‹น์‹œ Akka, RxJava ๋“ฑ ๋‹ค์–‘ํ•œ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์˜ ์ž๋ฐ” ์ฝ”๋“œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๊ฐ€ ์ด๋ฏธ ์กด์žฌํ–ˆ๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค. ๋‘˜๋‹ค ๊ฐ™์€ pub-sub ํŒจํ„ด์„ ๊ธฐ๋ฐ˜ํ•œ ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ๊ตฌํ˜„ํ–ˆ์ง€๋งŒ ์ด๋“ค ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋Š” ๋…๋ฆฝ์ ์œผ๋กœ ๊ตฌํ˜„์ด ๋˜์—ˆ๊ณ  ๊ฐ๊ธฐ๋‹ค๋ฅธ ์ด๋ฆ„ ๊ทœ์น™๊ณผ API๋ฅผ ์‚ฌ์šฉํ•จ

๊ทธ๋ž˜์„œ ํ”Œ๋กœ API์—์„œ ๋ฆฌ์•กํ‹ฐ๋ธŒ ๊ฐœ๋…์˜ ๊ตฌํ˜„์„ ์œ„ํ•œ ํ‘œ์ค€ํ™” ์ž‘์—…์„ ํ•˜๊ฒŒ๋œ ๊ฒƒ์ด๋‹ค.

๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์˜ ๊ตฌํ˜„์€ ๋Œ€๋ถ€๋ถ„ ๊ธฐ์กด ๊ตฌํ˜„์„ ๋”ฐ๋ผ๊ฐ€์ž.

 

3. ๋ฆฌ์•กํ‹ฐ๋ธŒ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ RxJava ์‚ฌ์šฉํ•˜๊ธฐ

RxJava๋Š” ์ž๋ฐ”๋กœ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ๊ตฌํ˜„ํ•˜๋Š” ๋ฐ ์‚ฌ์šฉํ•˜๋Š” ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์ด๋‹ค.

github.com/ReactiveX/RxJava

ReactiveX/RxJava

RxJava โ€“ Reactive Extensions for the JVM โ€“ a library for composing asynchronous and event-based programs using observable sequences for the Java VM. - ReactiveX/RxJava

github.com

reactivex.io/

ReactiveX

CROSS-PLATFORM Available for idiomatic Java, Scala, C#, C++, Clojure, JavaScript, Python, Groovy, JRuby, and others

reactivex.io

 

RxJava ์—์„œ ์ œ๊ณตํ•˜๋Š” Flow.Publisher ๊ตฌํ˜„ ํด๋ž˜์Šค

1. Flowable

public abstract class Flowable<@NonNull T> implements Publisher<T> {}

์—ญ์••๋ ฅ ๊ธฐ๋Šฅ์ด ์žˆ๋‹ค. : ๋„ˆ๋ฌด ๋น ๋ฅธ ์†๋„๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐœํ–‰ํ•ด Subscriber๊ฐ€ ์ด๋ฅผ ๊ฐ๋‹นํ•  ์ˆ˜ ์—†๋Š” ์ƒํ™ฉ์— ์ด๋ฅด๋Š”๊ฑธ ๋ฐฉ์ง€ํ•˜๋Š” ๊ธฐ๋Šฅ

2. Observable

public abstract class Observable<@NonNull T> implements ObservableSource<T> {}

์—ญ์••๋ ฅ ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•˜์ง€ ์•Š๋˜ ๊ธฐ์กด๋ฒ„์ „์˜ RxJava์—์„œ ์ œ๊ณตํ•˜๋˜ ํด๋ž˜์Šค

๋‹จ์ˆœํ•œ ํ”„๋กœ๊ทธ๋žจ, ๋งˆ์šฐ์Šค ์›€์ง์ž„ ๊ฐ™์€ ์‚ฌ์šฉ์ž ์ธํ„ฐํŽ˜์ด์Šค์— ์ ํ•ฉ. (GUI์ด๋ฒคํŠธ๋‚˜ ์ž์ฃผ ๋ฐœ์ƒํ•˜์ง€ ์•Š๋Š” ์ข…๋ฅ˜์˜ ์ด๋ฒคํŠธ์— ์—ญ์••๋ ฅ์„ ์ ์šฉํ•˜์ง€ ๋ง์ž.)

๋ชจ๋“  Publisher๋Š” Subscription์˜ request(Long.MAX_VALUE); ๋ฉ”์„œ๋“œ๋ฅผ ์ด์šฉํ•ด ์—ญ์••๋ ฅ ๊ธฐ๋Šฅ์„ ๋Œ ์ˆ˜ ์žˆ๋‹ค.

 

3-1. Observable ๋งŒ๋“ค๊ณ  ์‚ฌ์šฉํ•˜๊ธฐ

Observable๊ณผ Flowable ํด๋ž˜์Šค๋Š” ๋‹ค์–‘ํ•œ ์ข…๋ฅ˜์˜ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์„ ํŽธ๋ฆฌํ•˜๊ฒŒ ๋งŒ๋“ค๋„๋ก ์—ฌ๋Ÿฌ ํŒฉํ† ๋ฆฌ ๋ฉ”์„œ๋“œ๋ฅผ ์ œ๊ณตํ•œ๋‹ค
์ด๋“ค์€ ๋ชจ๋‘ Publisher๋ฅผ ๊ตฌํ˜„ํ•˜๋ฏ€๋กœ ํŒฉํ† ๋ฆฌ ๋ฉ”์„œ๋“œ๋Š” ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์„ ๋งŒ๋“œ๋Š” ๊ฒƒ์ด๋‹ค.

    @Test
    @DisplayName("Observable just ๋ฉ”์„œ๋“œ : Observable์˜ Subscriber๋Š”" +
            " onNext(first) onNext(second) onComplete() ์ˆœ์„œ๋กœ ๋ฉ”์„œ๋“œ๋ฅผ ๋ฐ›๋Š”๋‹ค.")
    void observableJust() {
        Observable<String> just = Observable.just("first", "second");
    }

reactivex.io/documentation/operators/just.html

    @Test
    @DisplayName("Observable interval ๋ฉ”์„œ๋“œ : ์ง€์ •๋œ ์†๋„๋กœ ์ด๋ฒคํŠธ๋ฅผ ๋ฐฉ์ถœํ•˜๋Š” ์ƒํ™ฉ์—์„œ ์‚ฌ์šฉํ•œ๋‹ค." +
            "0์—์„œ ์‹œ์ž‘ํ•ด 1์ดˆ ๊ฐ„๊ฒฉ์œผ๋กœ long ํ˜•์‹์˜ ๊ฐ’์„ ๋ฌดํ•œ์œผ๋กœ ์ฆ๊ฐ€์‹œํ‚ค๋ฉฐ ๊ฐ’์„ ๋ฐฉ์ถœํ•œ๋‹ค.")
    void observableInterval() {
        Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS);
    }

reactivex.io/documentation/operators/interval.html

 

RxJava์—์„œ์˜ Flow.Subscriber ์—ญํ• 

public interface Observer<@NonNull T> {

    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();

}

Subscriber๊ณผ ๊ฐ™์€ ๋ฉ”์„œ๋“œ๋ฅผ ์ •์˜ํ•˜๋ฉฐ, onSubcribe ๋ฉ”์„œ๋“œ๊ฐ€ Subscription ๋Œ€์‹  Disposale ์ธ์ˆ˜๋ฅผ ๊ฐ–๋Š”๋‹ค๋Š” ์ ๋งŒ ๋‹ค๋ฅด๋‹ค.
Observable์€ ์—ญ์••๋ ฅ์„ ์ง€์›ํ•˜์ง€ ์•Š์•„ Subscription์˜ request๊ฐ€ ํ•„์š”ํ•˜์ง€ ์•Š๋‹ค.

ํ•˜์ง€๋งŒ onNext๊ฐ€ ์ข€๋” ์œ ์—ฐํ•˜๋‹ค (๋” ๋งŽ์€ ์˜ค๋ฒ„๋กœ๋“œ ๊ธฐ๋Šฅ ์ œ๊ณต) : ex onNext๋งŒ ๊ตฌํ˜„ํ•˜๊ณ  ๋‚˜๋จธ์ง€๋Š” ๊ตฌํ˜„ํ•˜์ง€ ์•Š๋Š” ๊ธฐ๋ณธ๋™์ž‘์˜ Observer ์ œ๊ณต๊ฐ€๋Šฅ.

 

 

    @Test
    @DisplayName("RxJava Observer: ๋งŽ์€ ์˜ค๋ฒ„๋กœ๋“œ๋กœ ์ธํ•ด ์œ ์—ฐํ•˜๊ฒŒ ๊ตฌํ˜„์ด ๊ฐ€๋Šฅํ•˜๋‹ค.")
    void observableIntervalWithObserver() {
        Observable<Long> onePerSec = Observable.interval(1, TimeUnit.SECONDS);
        onePerSec.subscribe(i -> System.out.println(TempInfo.fetch("Seoul")));
    }

1์ดˆ์— ํ•œ๋ฒˆ์”ฉ TempInfo์˜ temp ๊ฐ’์„ ์ถœ๋ ฅํ•˜๋ฆฌ๋ผ ์˜ˆ์ƒํ•œ๋‹ค.

๊ทธ๋ ‡์ง€๋งŒ Observable์ด RxJava์˜ ์—ฐ์‚ฐ ์Šค๋ ˆ๋“œ ํ’€์ธ ๋ฐ๋ชฌ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰๋˜์–ด, ์œ„ ์ฝ”๋“œ์—์„œ๋Š” ์‹คํ–‰ํ•˜์ž๋งˆ์ž ์‹คํ–‰ํ•  ์ฝ”๋“œ๊ฐ€ ์—†์–ด ์ข…๋ฃŒ๋˜๊ฒŒ ๋œ๋‹ค. ๋ฐ๋ชฌ์Šค๋ ˆ๋“œ๋„ ํ•จ๊ป˜ ์ข…๋ฃŒ > blockingSubscribe๋ฅผ ์‚ฌ์šฉํ•ด ๊ฒฐ๊ณผ๋ฅผ ๋ณผ ์ˆœ ์žˆ๋‹ค.

์‹ฌํ™”์˜ˆ์ œ

    @Test
    @DisplayName("์ข€๋” ์ผ๋ฐ˜ํ™”๋ฅผ ์œ„ํ•ด ์˜จ๋„๋ฅผ ์ง์ ‘ ์ถœ๋ ฅํ•˜์ง€ ์•Š๊ณ  " +
            "์‚ฌ์šฉ์ž์—๊ฒŒ ํŒฉํ† ๋ฆฌ ๋ฉ”์„œ๋“œ๋ฅผ ์ œ๊ณตํ•ด ๋งค์ดˆ๋งˆ๋‹ค ์˜จ๋„๋ฅผ ๋ฐฉ์ถœํ•˜๋Š” Observable์„ ๋งŒ๋“ค์–ด๋ณด์ž" +
            "์ด๋ ‡๊ฒŒ ํ•  ๊ฒฝ์šฐ observer๋Š” ๊ทธ์ € ๋ฐ›์€ ๋ฐ์ดํ„ฐ๋งŒ ์ถœ๋ ฅํ•˜๋Š” ๊ธฐ๋Šฅ์„ ํ•˜๋ฉด๋œ๋‹ค." +
            "observable๋กœ ์˜จ๋„๋ฅผ ์–ป๋Š” ๊ณผ์ •์—์„œ ์—๋Ÿฌ์™€ ์™„๋ฃŒ์ฒ˜๋ฆฌ ๋กœ์ง์„ ๋‹ค ๊ตฌํ˜„ํ–ˆ๊ธฐ ๋•Œ๋ฌธ์— ")
    void observableExample() {
        Observable<TempInfo> seoul = getTemperature("Seoul");
        seoul.blockingSubscribe(new TempObserver());
    }

    public static Observable<TempInfo> getTemperature(String town){
        return Observable.create(emitter ->
                Observable.interval(1, TimeUnit.SECONDS) // ๋งค ์ดˆ๋งˆ๋‹ค ๋ฌดํ•œ์œผ๋กœ ์ฆ๊ฐ€ํ•˜๋Š” long ๋ฐ˜ํ™˜ Observable
                        .subscribe(i -> {
                            if(!emitter.isDisposed()){ // ์†Œ๋น„๋œ ์˜ต์ €๋ฒ„๊ฐ€ ํ๊ธฐ๋˜์ง€ ์•Š์•˜์œผ๋ฉด ์–ด๋–ค ์ž‘์—…์„ ์ˆ˜ํ–‰(์—๋Ÿฌ๊ฐ€ ์•ˆ๋‚ฌ์œผ๋ฉด)
                                if(i >= 5){ // 5๋ฒˆ ์˜จ๋„๋ฅผ onNext ํ–ˆ์œผ๋ฉด ์„ฑ๊ณต์ฒ˜๋ฆฌ ํ›„ ์ข…๋ฃŒ
                                    emitter.onComplete();
                                }else {
                                    try {
                                        emitter.onNext(TempInfo.fetch(town)); // ์˜จ๋„๋ฅผ observer๋กœ ๋ณด
                                    }catch (Exception e){
                                        emitter.onError(e);// ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด Observer์—๊ฒŒ ์•Œ๋ฆผ
                                    }
                                }
                            }
                        })
        );
    }
    
    public class TempObserver implements Observer<TempInfo>{

        @Override
        public void onSubscribe(@NonNull Disposable d) {
        }

        @Override
        public void onNext(@NonNull TempInfo tempInfo) {
            System.out.println(tempInfo);
        }

        @Override
        public void onError(@NonNull Throwable e) {
            System.err.println("Got Problem " + e.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println("Done!");
        }
    }

์—ฌ๊ธฐ์„œ ์‚ฌ์šฉ๋œ emitter ๋ณ€์ˆ˜๋Š” ๊ทธ ์‹ค์ฒด๊ฐ€ ObservableEmitter ์ธํ„ฐํŽ˜์ด์Šค์ธ๋ฐ, ๊ทธ ์ธํ„ฐํŽ˜์ด์Šค๋Š” Emitter์˜ ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์ƒ์†ํ•œ๋‹ค.. : onSubscribe๊ฐ€ ๋น ์ง„ Observer์™€ ๊ฐ™์Œ.

public interface ObservableEmitter<@NonNull T> extends Emitter<T> {}
public interface Emitter<@NonNull T> {

    void onNext(@NonNull T value);

    void onError(@NonNull Throwable error);

    void onComplete();
}

์—ฌ๊ธฐ์„œ Observable์ด ์—ญ์••๋ ฅ์„ ์ง€์›ํ•˜์ง€ ์•Š์•„ ์ „๋‹ฌ๋œ ์š”์†Œ๋ฅผ ์ฒ˜๋ฆฌํ•œ ๋‹ค์Œ ์ถ”๊ฐ€ ์š”์†Œ๋ฅผ ์š”์ฒญํ•˜๋Š” request()๊ฐ€ ํ•„์š”๊ฐ€ ์—†๋‹ค.

 

3-2. Observable์„ ๋ณ€ํ™˜ํ•˜๊ณ  ํ•ฉ์น˜๊ธฐ

๋ฆฌ์•กํ‹ฐ๋ธŒ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋Š” ์ž๋ฐ” 9 ํ”Œ๋กœ API์— ๋น„ํ•ด ์ŠคํŠธ๋ฆผ์„ ํ•ฉ์น˜๊ณ  ๋งŒ๋“ค๊ณ  ๊ฑฐ๋ฅด๋Š” ๋“ฑ์˜ ํ’๋ถ€ํ•œ ๋ฉ”์„œ๋“œ๋ฅผ ์ œ๊ณตํ•˜๋Š” ๊ฒƒ์ด ์žฅ์ ์ด๋‹ค. ์ŠคํŠธ๋ฆผ์„ ๋‹ค๋ฅธ ์ŠคํŠธ๋ฆผ์˜ ์ž…๋ ฅ์œผ๋กœ ์‚ฌ์šฉ๋„ ๊ฐ€๋Šฅํ•˜๋ฉฐ, ์ŠคํŠธ๋ฆผ์˜ ๋งคํ•‘ ํ•จ์ˆ˜๋กœ ์š”์†Œ๋ฅผ ๋ณ€ํ™˜ํ•˜๊ฑฐ๋‚˜, ๋‹ค์–‘ํ•œ ๋ฐฉ๋ฒ•์œผ๋กœ ํ•ฉ์น˜๋Š” ๋“ฑ์˜ ์ž‘์—…๋„ ๊ฐ€๋Šฅํ•˜๋‹ค.

๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ(marble diagram) : ์ˆ˜ํ‰์„ ์œผ๋กœ ํ‘œ์‹œ๋œ ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์— ์ž„์˜์˜ ์ˆœ์„œ๋กœ ๊ตฌ์„ฑ๋œ ์š”์†Œ๊ฐ€ ๊ธฐํ•˜ํ•™์  ๋ชจ์–‘์œผ๋กœ ๋‚˜ํƒ€๋‚œ๋‹ค.
- ํŠน์ˆ˜๊ธฐํ˜ธ: ์—๋Ÿฌ๋‚˜ ์™„๋ฃŒ์‹ ํ˜ธ๋ฅผ ๋‚˜ํƒ€๋‚ธ๋‹ค.
- ๋ฐ•์Šค : ์—ฐ์‚ฐ์ด ์š”์†Œ๋ฅผ ์–ด๋–ป๊ฒŒ ๋ณ€ํ™”ํ•˜๊ฑฐ๋‚˜ ์—ฌ๋Ÿฌ ์ŠคํŠธ๋ฆผ์„ ์–ด๋–ป๊ฒŒ ํ•ฉ์น˜๋Š”์ง€๋ฅผ ๋ณด์—ฌ์คŒ

reactivex.io/documentation/operators

ReactiveX - Operators

Introduction Each language-specific implementation of ReactiveX implements a set of operators. Although there is much overlap between implementations, there are also some operators that are only implemented in certain implementations. Also, each implementa

reactivex.io

RxJava์˜ ๋งŽ์€ operator๋ฅผ ์„ค๋ช…ํ•˜๋Š” ํŽ˜์ด์ง€์—๋„ ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ์„ ์ด์šฉํ•ด ์—ฌ๋Ÿฌ ์—ฐ์‚ฐ์˜ ์ŠคํŠธ๋ฆผ ๋™์ž‘์„ ์„ค๋ช…ํ•œ๋‹ค.

 

์‹ค์ œ ์—ฌ๋Ÿฌ ํ•จ์ˆ˜์— ๋Œ€ํ•œ ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ

 

merge - ๋‘๊ฐœ ์ด์ƒ์˜ Observable์ด ๋ฐฉ์ถœํ•œ ์ด๋ฒคํŠธ๋ฅผ ํ•˜๋‚˜๋กœ ํ•ฉ์นจ
map - Observable์ด ๋ฐœํ–‰ํ•˜๋Š” ์š”์†Œ๋ฅผ ๋ณ€ํ™˜

    @Test
    @DisplayName("Observable์˜ ์—ฐ์‚ฐ์ž๋ฅผ ์ด์šฉํ•ด ์ŠคํŠธ๋ฆผ์„ ์œ ๋™์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค." +
            "filter ๋ฅผ ์ด์šฉํ•ด ์„ญ์”จ์˜จ๋„ ์˜ํ•˜๋งŒ ํ•„ํ„ฐ๋งํ•˜๊ณ  " +
            "map์„ ์ด์šฉํ•ด ํ™”์”จ๋ฅผ ์„ญ์”จ๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค")
    void getMinusCelsiusTemperature() {
        Observable<TempInfo> seoul = getTemperature("Seoul")
                .filter(temp -> temp.getTemp()<0)
                .map(temp -> new TempInfo(temp.getTown(), ((temp.getTemp() - 32) * 5) / 9));
        seoul.blockingSubscribe(new TempObserver());
    }