์์ค์ฝ๋
https://github.com/mjung1798/Jyami-Java-Lab/tree/master/java8-in-action
mjung1798/Jyami-Java-Lab
๐ป Jyami์ Spring boot ๋ฐ Java ์คํ์ ๐ป. Contribute to mjung1798/Jyami-Java-Lab development by creating an account on GitHub.
github.com
5. ๋ฐํ-๊ตฌ๋ ๊ทธ๋ฆฌ๊ณ ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ
๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ Future๊ฐ์ ๊ฐ์ฒด๋ฅผ ํตํด ์ฌ๋ฌ ๊ฒฐ๊ณผ๋ฅผ ์ ๊ณต (future๋ ํ๋ฒ๋ง ์คํํด ๊ฒฐ๊ณผ๋ฅผ ์ ๊ณต)
์๋ฐ 9์์ java.util.concurrent.Flow ์ธํฐํ์ด์ค์ ๋ฐํ-๊ตฌ๋ ๋ชจ๋ธ(pub-sub ์ด๋ผ ๋ถ๋ฆฌ๋ ํ๋กํ ์ฝ)์ ์ ์ฉํด ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ ์ ๊ณตํ๋ค.
- ๊ตฌ๋ ์(subscriber)๊ฐ ๊ตฌ๋ ํ ์ ์๋ ๋ฐํ์(publisher)
- ์ด ์ฐ๊ฒฐ์ ๊ตฌ๋ (subscription)์ด๋ผ ํ๋ค
- ์ด ์ฐ๊ฒฐ์ ์ด์ฉํด ๋ฉ์ธ์ง(๋๋ ์ด๋ฒคํธ๋ก ์๋ ค์ง)์ ์ ์กํ๋ค.
C3=C1+C2 ์คํ๋ ๋์ํธ ์
์ ๋ง๋ ๋ค ํ์. (C1, C2 ๊ฐ์ด ๊ฐฑ์ ๋๋ฉด C3์๋ ์๋ก์ด ๊ฐ์ด ๋ฐ์๋๋ค)
- c1, c2์ ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ์ ๋ c3์ ๊ตฌ๋
ํ๋๋ก ํ๋ค. > Publisher<T> ํ์
- Publisher<T>๋ ํต์ ํ ๊ตฌ๋
์(Subscriber)๋ฅผ ์ธ์๋ก ๋ฐ๋๋ค
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package java.util.concurrent;
public final class Flow {
static final int DEFAULT_BUFFER_SIZE = 256;
private Flow() {
}
public static int defaultBufferSize() {
return 256;
}
public interface Processor<T, R> extends Flow.Subscriber<T>, Flow.Publisher<R> {
}
public interface Subscription {
void request(long var1);
void cancel();
}
public interface Subscriber<T> {
void onSubscribe(Flow.Subscription var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}
@FunctionalInterface
public interface Publisher<T> {
void subscribe(Flow.Subscriber<? super T> var1);
}
}
Cell์ Publisher์ธ ๋์์ Subscriber์ด๋ค. (์ ์ด๋ฒคํธ์ ๊ตฌ๋ ์ ํ ์ ์๋ค / ๋ค๋ฅธ ์ ์ ์ด๋ฒคํธ์ ๋ฐ์ํ๋ค)
public class SimpleCell implements Flow.Publisher<Integer>, Flow.Subscriber<Integer> {
private int value =0;
private String name;
private List<Flow.Subscriber> subscribers = new ArrayList<>();
public SimpleCell(String name) {
this.name = name;
}
// Publisher ๋ฉ์๋
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
subscribers.add(subscriber);
}
//Subscriber ๋ฉ์๋
@Override
public void onNext(Integer newValue) {
this.value = newValue; // ๊ตฌ๋
ํ ์
์์ ์๋ก์ด ๊ฐ์ด ์๊ฒป์ ๋ ๊ฐ์ ๊ฐฑ์ ํ๋ค.
System.out.println(this.name + ":" + this.value);
notifyAllSubscribers(); // ๊ฐ์ด ๊ฐฑ์ ๋์ ๋ชจ๋ ๊ตฌ๋
์์๊ฒ ์๋ฆผ
}
private void notifyAllSubscribers(){ // ์๋ก์ด ๊ฐ์ด ์์์ ๋ชจ๋ ๊ตฌ๋
์์๊ฒ ์๋ฆฌ๋ ๋ฉ์๋
subscribers.forEach(subscriber -> subscriber.onNext(this.value));
}
}
@Test
@DisplayName("๊ตฌ๋
๊ด๊ณ ํ์ธ : C1์ Publisher / C3์ Subscriber > publisher ๊ฐ์ด ๋ณํ๋ฉด subscriber์ ์ด๋ฒคํธ๊ฐ ๊ฐ๋ค")
void simpleCell() {
SimpleCell c3 = new SimpleCell("C3");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");
c1.subscribe(c3);
c1.onNext(10);
c2.onNext(20);
// C1:10
// C3:10
// C2:20
}
์ด ๊ตฌ์กฐ์์ (Subscription์ ์ฌ์ฉํ์ง ์์ ๊ตฌ์กฐ)์์๋ publisher, subscriber์ ๋์์ ์ฐ๊ฒฐํ๊ธฐ ์ํด ์ฐธ์กฐ๊ฐ์ฒด๋ก List<Subsriber> subscribers์ ๋์๋ค.
c1์ publisher์ด๋ฉด์(subscribe ๋ช ๋ น์ด ์ํ) subscriber์ด๋ค(onNext ์ํ)
> ์ฌ๊ธฐ์๋ ์ญ์๋ ฅ(backpressure) ๋ด์ฉ์ด ๋์ค์ง ์์์
๊ธฐ์กด์ ์ต์ ๋ฒ ํจํด์ ๋นํด ์๋ก์ด API ํ๋กํ ์ฝ์ด ๋ ๊ฐ๋ ฅํด์ง ์ด์ ; onNext, onError, onComplete์ ๊ฐ์ ๋ฉ์๋๋ก ๋ฐ์ดํฐ ํ๋ฆ์์ ์์ธ๊ฐ ๋ฐ์ํ๊ฑฐ๋ ์ข ๋ฃ๋์์ ๊ฒฝ์ฐ๋ ์ ์ดํ๊ธฐ ๋๋ฌธ์
์ต์ ๋ฒ ํจํด ์ฐธ๊ณ : pjh3749.tistory.com/266
[๋์์ธํจํด] ์ต์ ๋ฒ ํจํด (Observer Pattern) ์์ฃผ ๊ฐ๋จํ๊ฒ ์ ๋ฆฌํด๋ณด๊ธฐ
์ต์ ๋ฒ ํจํด์ด๋? ์ต์ ๋ฒ๋ ์คํํฌ๋ํํธ ํ๋กํ ์ค์ ์ ๋์ผ๋ก ์ ๋ค์ ๊ด์ฐฐํ๊ธฐ ์ํด ํ์ํ ์ ๋์ด๋ค. ํ ๋์ ์์ ํ์ ์ ๋์ด๋ฉฐ ์ต์ ๋ฒ ํจํด(observer pattern)์ ๊ฐ์ฒด์ ์ํ ๋ณํ๋ฅผ ๊ด์ฐฐํ๋ ๊ด
pjh3749.tistory.com
public interface Account { // observable
void subscribe(Platform platform);
void unSubscribe(Platform platform);
void notifyCrew(String msg);
}
public interface Platform { // observer
void update(String msg);
}
public class Jyami implements Account {
List<Platform> platforms = new ArrayList<>();
public void publishText(){
System.out.println("์๋ฏธ๊ฐ ๊ธ์ ๋ฐํํ๋ค");
notifyCrew("๊ธ ๋ฐํ ์ ์ก");
}
@Override
public void subscribe(Platform platform) {
platforms.add(platform);
}
@Override
public void unSubscribe(Platform platform) {
platforms.remove(platform);
}
@Override
public void notifyCrew(String msg) {
platforms.forEach(crew -> crew.update(msg));
}
}
public class Blog implements Platform {
@Override
public void update(String msg) {
System.out.println("Blog ์์ : " + msg);
}
}
public class Instagram implements Platform {
@Override
public void update(String msg) {
System.out.println("instargram ์์ : "+ msg);
}
}
public class Facebook implements Platform {
@Override
public void update(String msg) {
System.out.println("FaceBook ์์ : " + msg);
}
}
public class Main {
public static void main(String[] args){
Jyami jyami = new Jyami();
Platform blog = new Blog();
Platform facebook = new Facebook();
Platform instagram = new Instagram();
jyami.subscribe(blog);
jyami.subscribe(facebook);
jyami.subscribe(instagram);
jyami.publishText();
jyami.unSubscribe(facebook);
jyami.publishText();
}
}
์๋ฏธ๊ฐ ๊ธ์ ๋ฐํํ๋ค
Blog ์์ : ๊ธ ๋ฐํ ์ ์ก
FaceBook ์์ : ๊ธ ๋ฐํ ์ ์ก
instargram ์์ : ๊ธ ๋ฐํ ์ ์ก
์๋ฏธ๊ฐ ๊ธ์ ๋ฐํํ๋ค
Blog ์์ : ๊ธ ๋ฐํ ์ ์ก
instargram ์์ : ๊ธ ๋ฐํ ์ ์ก
์ด๋ ๊ฒ ๋ณด๋ฉด ๊ฐ๋จํ๋ฐ ํ๋ก ์ธํฐํ์ด์ค์ ๊ฐ๋ ์ ๋ณต์กํ๊ฒ ๋ง๋ ๊ธฐ๋ฅ์ ์ญ์๋ ฅ๊ณผ ์๋ ฅ์ด๋ค.(์ค๋ ๋ ํ์ฉ์์ ํ์์ )
์๋ ฅ (push ๋ชจ๋ธ) : ๋งค์ด๋ง๋ค ์์ฒ๊ฐ์ ๋ฉ์ธ์ง๊ฐ onNext()๋ก ์ ํ์์ด ์ ๋ฌ๋๋ค๋ฉด? > ์ด๋์ ๋์ ๋ฉ์ธ์ง๊ฐ ๋ค์ด์ฌ์ง ์ซ์๋ฅผ ์ ํํ๋ ์ญ์๋ ฅ๊ณผ ๊ฐ์ ๊ธฐ๋ฒ์ด ํ์ํ๋ค.
์ญ์๋ ฅ (pull ๋ชจ๋ธ) :์๋ฐ 9 Flow API์์ ๋ฐํ์๊ฐ ๋ฌดํ์ ์๋๋ก ์์ดํ ์ ๋ฐฉ์ถํ๋ ๋์ , ์์ฒญํ์ ๋๋ง ๋ค์ ์์ดํ ์ ๋ณด๋ด๋๋ก ํ๋ request() ๋ฉ์๋๋ฅผ ์ ๊ณต(Subscription ์ธํฐํ์ด์ค)
2. ์ญ์๋ ฅ
Publisher์ Subscriber ์ฌ์ด์ ์ฑ๋์ด ์ฐ๊ฒฐ๋๋ฉด ์ฒซ ์ด๋ฒคํธ๋ก Subscriber ์ธํฐํ์ด์ค์ onSubscribe(Subscription subscription) ๋ฉ์๋๊ฐ ํธ์ถ๋๋ค.
์ด Subscription ๊ฐ์ฒด๋ Subscriber์ Publisher๊ฐ ํต์ ํ ์ ์๋ ๋ฉ์๋๋ฅผ ํฌํจํ๋ค
์ต์ ๋ฒ ํจํด๊ณผ pub-sub ํจํด์ ์ฐจ์ด๊ฐ ๋ช ํํด ์ง๋ ๋ถ๋ถ์ ์ด subscription์ ๋ํ ๋ถ๋ถ ๋๋ฌธ
3. ์ค์ ์ญ์๋ ฅ์ ๊ฐ๋จํ ํํ
ํ๋ฒ์ ํ๊ฐ์ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ๋๋ก pub-sub ์ฐ๊ฒฐ์ ๊ตฌ์ฑํ๋ ค๋ฉด ๋ค์๊ณผ ๊ฐ์ ์์ ์ด ํ์ํ๋ค.
- Subscriber๊ฐ onSubscribe๋ก ์ ๋ฌ๋ Subscription ๊ฐ์ฒด๋ฅผ subscription ๊ฐ์ ํ๋์ ๋ก์ปฌ๋ก ์ ์ฅํ๋ค.
- Subscriber๊ฐ ์๋ง์ ์ด๋ฒคํธ๋ฅผ ๋ฐ์ง ์๊ฒ onSubscribe, onNext, onError์ ๋ง์ง๋ง ๋์์ channel.request(1)์ ์ถ๊ฐํด ์ค์ง ํ ์ด๋ฒคํธ๋ง ์ถ๊ฐํ๋ค
- ์์ฒญ์ ๋ณด๋ธ ์ฒด๋์๋ง onNext, onError์ด๋ฒคํธ๋ฅผ ๋ณด๋ด๋๋ก Publisher์ notifyAllSubscribers ์ฝ๋๋ฅผ ๋ฐ๊พผ๋ค. (๋ณดํต ์ฌ๋ฌ Subscriber๊ฐ ์์ ๋ง์ ์๋๋ฅผ ์ ์งํ ์ ์๋๋ก Publisher๋ ์ Subscription์ ๋ง๋ค์ด ๊ฐ Subscriber์ ์ฐ๊ฒฐํ๋ค)
๋น๊น ๊ธฐ๋ฐ ๋ฆฌ์กํฐ๋ธ ์ญ์๋ ฅ : Subscriber๊ฐ Publisher๋ก๋ถํฐ ์์ฒญ์ ๋น๊ธด๋ค(pull) = ๋ฆฌ์กํฐ๋ธ ๋น๊น ๊ธฐ๋ฐ(reactive pull based)๋ผ๊ณ ๋ถ๋ฆฐ๋ค.
6. ๋ฆฌ์กํฐ๋ธ ์์คํ vs ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ
๋ฆฌ์กํฐ๋ธ ์์คํ
(reactive system) : ๋ฐํ์ ํ๊ฒฝ์ด ๋ณํ์ ๋์ํ๋๋ก ์ ์ฒด ์ํคํ
์ฒ๊ฐ ์ค๊ณ๋ ํ๋ก๊ทธ๋จ
www.reactivemanifesto.org/ : ๊ณต์ ์์ฑ ๋ฌธ์
๋ฆฌ์กํฐ๋ธ ์์คํ
์ด ๊ฐ์ ธ์ผํ ์์ฑ : ๋ฐ์์ฑ(responsive), ํ๋ณต์ฑ(resilient), ํ๋ ฅ์ฑ(elastic)
- ๋ฐ์์ฑ : ๋ฆฌ์กํฐ๋ธ ์์คํ
์ด ํฐ ์์
์ ํ๋๋ผ ๊ฐ๋จํ ์ง์์ ์๋ต์ ์ง์ฐํ์ง ์๊ณ ์ค์๊ฐ์ผ๋ก ์
๋ ฅ์ ๋ฐ์ํ๋ค.
- ํ๋ณต์ฑ : ํ ์ปดํฌ๋ํธ์ ์คํจ๋ก ์ ์ฒด ์์คํ
์ด ์คํจํ์ง ์๋๋ค.
- ํ๋ ฅ์ฑ : ์์ ์ ์์
๋ถํ์ ๋ง๊ฒ ์ ์ฉํ์ฌ ์์
์ ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ๋ค.
Java.util.concureent.Flow ๊ด๋ จ๋ ์๋ฐ ์ธํฐํ์ด์ค์์ ์ ๊ณตํ๋ ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ ํ์์ ์ด์ฉํ์ฌ ์ด ์์ฑ๋ค์ ๊ตฌํํ ์ ์๋ค.
์๋ฐ ์ธํฐํ์ด์ค ์ค๊ณ๋ Reactive Manifesto์ ๋ง์ง๋ง ์์ฑ์ธ ๋ฉ์์ง ์ฃผ๋(message-driven) ์์ฑ์ ๋ฐ์ํ๋ค. > ๋ฐ์ค์ ์ฑ๋๋ชจ๋ธ