발행자(Publisher) 구독자(Subscriber) 패턴
Pub-Sub 패턴은 실시간 데이터 처리를 하기 위한 패턴이다. 라이브러리로 사용하면 간단하게 만들 수 있는 방법이 있지만 개념이해를 위해 직접 만들어 본다.
프로젝트 생성
1.’reacter-project’ 자바 프로젝트 생성
java11부터 자바 리액터 라이브러리를 제공해준다.
2.main이 있는 클래스 ‘MyApp.java’를 생성
발행자(Publisher) 생성
3.발행자 클래스 ‘MyPublisher.java’를 생성
package ex01; import java.util.Arrays; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; // 출판사 = 신문사 public class MyPublisher implements Publisher<Integer>{ // 데이터가 순차적으로 저장되어있는 데이터 구조->순차적으로 데이터를 꺼낼 수 있다.(내부가 Queue) Iterable<Integer> its = Arrays.asList(1,2,3,4,5,6,7,8,9,10); @Override public void subscribe(Subscriber<? super Integer> subscriber) { System.out.println("1.구독 요청"); } }
구독 정보를 발행해내는 클래스
- java11부터 제공하는 라이브러리 Publisher를 implement한다.
- 어떤 데이터를 제공할지는 미리 알 수 없기 때문에 제네릭으로 제공된다. 여기서는 Integer 타입으로 테스트한다.
- 강제성 함부를 구현한다.
- Iterable 타입에 데이터 리스트를 넣어준다. 데이터가 순차적으로 저장되어있는 데이터 구조->순차적으로 데이터를 꺼낼 수 있다.(내부가 Queue이다.)
Iterable 타입 사용 예제
Iterable<Integer> its = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
System.out.println(its);
System.out.println(its.iterator());
Iterator<Integer> it = its.iterator();
System.out.println(it.next());
System.out.println(it.next());
System.out.println(it.hasNext());
System.out.println(it.next());
while(it.hasNext()) {
System.out.println(it.next());
}
index로 찾을 수 없다. hashMap처럼 key 값으로 찾을 수 없다. 순차적으로 데이터를 꺼내기 위한 데이터 타입이다
구독정보(Subscription) 생성
4.구독정보 ‘MySubscription.java’를 생성
package ex01; import java.util.Iterator; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; // 구독 정보를 가진 클래스 public class MySubscription implements Subscription { public MySubscription(Iterable<Integer> its, Subscriber subscriber) { System.out.println("구독 정보 만들어짐"); } @Override public void request(long n) { System.out.println("3.신문 "+n+"개씩 받을께!"); } @Override public void cancel() { System.out.println("구독 취소"); } }
구독 정보를 가진 클래스. MyPublisher.java에 구독 요청이 들어올 때마다 만들어진다.
- Subscription을 implement한다.
- 강제성 함부를 구현한다.
request 함수 : 구독요청 n=신문의 개수
cancel 함수 : 구독취소
5.발행자 클래스에서 구독정보를 new 한다.
package ex01; import java.util.Arrays; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; // 출판사 = 신문사 public class MyPublisher implements Publisher<Integer>{ // 데이터가 순차적으로 저장되어있는 데이터 구조->순차적으로 데이터를 꺼낼 수 있다.(내부가 Q) Iterable<Integer> its = Arrays.asList(1,2,3,4,5,6,7,8,9,10); @Override public void subscribe(Subscriber<? super Integer> subscriber) { System.out.println("1.구독 요청"); subscriber.onSubscribe(new MySubscription(its,subscriber)); // 구독자에게 구독 정보를 넘겨준다. } }
6.구독정보 클래스에서 발행자 클래스가 작동하면 데이터가 넘어갈 수 있게 한다.
package ex01; import java.util.Iterator; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; // 구독 정보를 가진 클래스 public class MySubscription implements Subscription { private Iterator<Integer> it; // Iterator 타입으로 관리하기 쉽게 만든다. private Subscriber subscriber; public MySubscription(Iterable<Integer> its, Subscriber subscriber) { System.out.println("- 구독 정보 만들어짐"); this.it = its.iterator(); this.subscriber = subscriber; } @Override public void request(long n) { System.out.println("3.신문 "+n+"개씩 받을께!"); } @Override public void cancel() { System.out.println("구독 취소"); } }
구독자(Subscriber) 생성
7.구독자 ‘MySubscriber.java’를 생성
package ex01; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; // 구독자 = 일반인 = 소비자 = 컨슈머 public class MySubscriber implements Subscriber<Integer>{ private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { System.out.println("2.구독 응답 완료(구독정보 - 데이터, 구독자)"); subscription.request(size); } @Override public void onNext(Integer item) { } @Override public void onError(Throwable throwable) { System.out.println("신문 전달하다가 교통사고 났어"); } @Override public void onComplete() { System.out.println("더 이상 줄 신문 없어"); } }
구독자 정보를 가진 클래스
- Subscriber를 implement한다.
- 강제성 함부를 구현한다.
onSubscribe 함수 : 구독 정보를 구독자에게 넘겨줄 때 실행되는 함수. 발행자 클래스에서 호출
onComplete 함수 : 응답할 정보 부재일 때 실행되는 함수
onError 함수 : 응답에 실패일 때 실행되는 함수
구독 요청 프로세스 실행
8.발행자와 구독자를 new 해서 구독자가 발행자를 구독할 수 있게 만든다.
package ex01; import java.util.Arrays; import java.util.Iterator; public class MyApp { public static void main(String[] args) { MyPublisher pub = new MyPublisher(); MySubscriber sub = new MySubscriber(); pub.subscribe(sub); // 구독 시작 } }
구독 요청이 들어오면 MyApp.java 파일을 실행, subscribe 함수가 호출되면서 구독자 정보(sub)가 넘어간다.
pub.subscribe(sub);
subscribe 함수가 실행되면 구독자에게 구독 정보를 넘겨준다.
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
System.out.println("1.구독 요청");
subscriber.onSubscribe(new MySubscription(its,subscriber)); // 구독자에게 구독 정보를 넘겨준다.
}
onSubscribe 함수로 구독 정보와 구독자 정보를 받아 구독자에게 보낸다.
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("2.구독 응답 완료(구독정보 - 데이터, 구독자)");
}
실행시 결과
1.구독 요청
– 구독정보 만들어짐
2.구독 응답 완료(구독정보 – 데이터, 구독자)
백프레셔
9.구독자에게 소비할 수 있는 구독 정보를 선택할 수 있게 동적으로 만든다.
package ex01; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; // 구독자 = 일반인 = 소비자 = 컨슈머 public class MySubscriber implements Subscriber<Integer>{ private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { System.out.println("2.구독 응답 완료(구독정보 - 데이터, 구독자)"); subscription.request(1); // 백플레셔로 소비할 수 있는만큼 받는다. -> 동적 } @Override public void onNext(Integer item) { } @Override public void onError(Throwable throwable) { System.out.println("신문 전달하다가 교통사고 났어"); } @Override public void onComplete() { System.out.println("더 이상 줄 신문 없어"); } }
request 함수가 실행된다.
subscription.request(size);
오버라이드 : 부모의 request 요청 -> 자식 request 실행
@Override
public void request(long n) {
System.out.println("3.신문 "+n+"개씩 받을께!");
}
실행시 결과
1.구독 요청
– 구독정보 만들어짐
2.구독 응답 완료(구독정보 – 데이터, 구독자)
3.신문 1개씩 받을께!
구독 요청 응답
10.구독자의 요청에 응답하는 onNext 함수를 호출하여 데이터를 1개씩 넘겨준다.
package ex01; import java.util.Iterator; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; // 구독 정보를 가진 클래스 public class MySubscription implements Subscription { private Iterator<Integer> it; // 원래는 이거 동적으로 만들어야함 (topic) private Subscriber subscriber; public MySubscription(Iterable<Integer> its, Subscriber subscriber) { System.out.println(" - 구독 정보 만들어짐"); this.it = its.iterator(); this.subscriber = subscriber; } @Override public void request(long n) { System.out.println("3.신문 "+n+"개씩 받을께!"); subscriber.onNext(it.next()); } @Override public void cancel() { System.out.println("구독 취소"); } }
package ex01; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; // 구독자 = 일반인 = 소비자 = 컨슈머 public class MySubscriber implements Subscriber<Integer>{ private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { System.out.println("2.구독 응답 완료(구독정보 - 데이터, 구독자)"); subscription.request(1); } @Override public void onNext(Integer item) { System.out.println("4.신문 받음 : "+item); } @Override public void onError(Throwable throwable) { System.out.println("신문 전달하다가 교통사고 났어"); } @Override public void onComplete() { System.out.println("더 이상 줄 신문 없어"); } }
request 함수가 실행되면 onNext 함수가 호출되어 구독자에게 데이터를 1개 넘겨준다.
@Override
public void request(long n) {
System.out.println("3.신문 "+n+"개씩 받을께!");
subscriber.onNext(it.next());
}
onNext 함수가 실행되면 구독자가 구독 정보를 받는다.
@Override
public void onNext(Integer item) {
System.out.println("4.신문 받음 : "+item);
}
실행시 결과
1.구독 요청
– 구독정보 만들어짐
2.구독 응답 완료(구독정보 – 데이터, 구독자)
3.신문 1개씩 받을께!
4.신문 받음 : 1
동적 백프레셔
11.구독 정보 10개 데이터를 2개씩 모두 받을 수 있게 한다.
package ex01; import java.util.Iterator; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; // 구독 정보를 가진 클래스 public class MySubscription implements Subscription { private Iterator<Integer> it; // 원래는 이거 동적으로 만들어야함 (topic) private Subscriber subscriber; public MySubscription(Iterable<Integer> its, Subscriber subscriber) { System.out.println(" - 구독 정보 만들어짐"); this.it = its.iterator(); this.subscriber = subscriber; } @Override public void request(long n) { System.out.println("3.신문 "+n+"개씩 받을께!"); System.out.println("========================"); while(n-->0) { if(it.hasNext()) { try { subscriber.onNext(it.next()); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } }else { subscriber.onComplete(); } } } @Override public void cancel() { System.out.println("구독 취소"); } }
package ex01; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; // 구독자 = 일반인 = 소비자 = 컨슈머 public class MySubscriber implements Subscriber<Integer>{ private Subscription subscription; private Integer size = 2; private Integer maxSize = 2; @Override public void onSubscribe(Subscription subscription) { System.out.println("2.구독 응답 완료(구독정보 - 데이터, 구독자)"); this.subscription = subscription; subscription.request(size); } @Override public void onNext(Integer item) { System.out.println("4.신문 받음 "+item); size--; if(size==0) { size =maxSize; subscription.request(size); } } @Override public void onError(Throwable throwable) { System.out.println("신문 전달하다가 교통사고 났어"); } @Override public void onComplete() { System.out.println("더 이상 줄 신문 없어"); } }
구독자 클래스에서 구독정보를 전역으로 받는다.
private Subscription subscription;
구독 데이터를 전역 변수로 빼준다.
private Integer size = 3;
private Integer maxSize = 3;
구독 정보를 구독자 클래스에서 전역으로 받을 수 있게 만든다.
this.subscription = subscription;
구독 정보를 전역으로 빼둔 값(2)만큼 받는다.
subscription.request(size);
size(2) 크기만큼 반복하고 데이터가 남아있으면(hasNext) onNext 함수가 다시 호출된다.
@Override
public void request(long n) {
System.out.println("3.신문 "+n+"개씩 받을께!");
System.out.println("========================");
while(n-->0) {
if(it.hasNext()) {
try {
subscriber.onNext(it.next());
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
subscriber.onComplete();
}
}
}
onNext 함수에서는 다시 request 함수가 호출된다.
@Override
public void onNext(Integer item) {
System.out.println("4.신문 받음 "+item);
size--;
if(size==0) {
size = maxSize;
subscription.request(size);
}
}
request 함수와 onNext 함수가 서로 주고 받으며 데이터가 전부 출력될 때까지 실행된다.
실행시 결과
1.구독 요청
– 구독정보 만들어짐
2.구독 응답 완료(구독정보 – 데이터, 구독자)
3.신문 3 개씩 받을께!
4.신문 받음 : 1
4.신문 받음 : 2
4.신문 받음 : 3
3.신문 3개씩 받을께!
4.신문 받음 : 4
4.신문 받음 : 5
4.신문 받음 : 6
3.신문 3개씩 받을께!
4.신문 받음 : 7
4.신문 받음 : 8
4.신문 받음 : 9
3.신문 3개씩 받을께!
4.신문 받음 : 10