본문 바로가기
교육/빗썸 테크 아카데미

[빗썸테크아카데미] 세번째 - Reactor, operator 코드 예제로 학습

by 디토20 2022. 4. 14.
반응형

 

 

 

[빗썸테크아카데미] 세번째 - Reactor, operator 코드 예제로 학습

 

 

세번째 수업에서는

두번째 수업때 이론으로 배운 Reactor와 operator를 

코드 예제를 통해 학습하는 시간을 가졌다.

 

https://be-developer.tistory.com/38

 

[빗썸테크아카데미] 두번째 - 블로킹/논블로킹, Reactive System

[빗썸테크아카데미] 두번째 - 블로킹/논블로킹, Reactive System 지금은 새벽 5시 54분... 월요일에 첫 수업을 듣고 나서 화요일에 마침 휴가라 아침 일찍 일어나서 복습겸 글을 작성했다. 블로그에

be-developer.tistory.com

 

 

세번째에 코드 예제로 학습을 하고

네번째인 오늘은 수업이 없고

퀴즈 푸는 시간이랑, 어제 나온 과제를 푸는 시간을 갖는다고 한다.

 

오늘 푼 과제를

내일 페어리뷰를 한다고 하니

조금 떨린다

 


 

 

1번 테스트

   @Test
    void testError() {
        Flux flux = Flux.just("thing1", "thing2")
                .concatWith(Mono.error(new IllegalArgumentException("boom")))
                .log();

        StepVerifier.create(flux)
                .expectNext("thing1")
                .expectNext("thing2")
                .expectError()
                .verify();
    }

가장 간단한 테스트.

Flux를 만들고 error를 발생시키는 Mono를 concatWith로 붙여서 테스트 해보기.

 

 

 

2번 테스트

@Test
    public void testBackPressure() {
        Flux.just(1, 2, 3, 4)
                .log()
                .subscribe(new Subscriber() {
                    private Subscription s;
                    int onNextAmount;

                    @Override
                    public void onSubscribe(Subscription s) {
                        this.s = s;
                        s.request(2);
                    }

                    @Override
                    public void onNext(Object o) {
                        onNextAmount++;
                        if (onNextAmount % 2 == 0) {
                            s.request(2);
                        }
                    }

                    @Override
                    public void onError(Throwable t) {}

                    @Override
                    public void onComplete() {}
                });
    }

한번에 두개씩만 값을 받아서 처리하라는 코드

 

 

 

3번 테스트

    @Test
    public void testMap() {
        Flux<Integer> flux = Flux.just(1, 2, 3);
        Flux<Integer> flux2 = flux.map(i -> i * 10)
                .log();

        StepVerifier.create(flux2)
                .expectNext(10, 20, 30)
                .verifyComplete();
    }

Flux 만들어서 map processing 후 새로 flux2 생성하기!

 

 

 

4번 테스트

    @Test
    public void testFlatMap() {
        Flux<Integer> flux = Flux.just(1, 2, 3);
        Flux<Integer> flux2 = flux.flatMap(i -> Mono.just(i * 10))
                .log();

        StepVerifier.create(flux2)
                .expectNextCount(flux.count().block())
                .verifyComplete();
    }

Flux 만든후 flatMap 처리해서 flux2 만들기

expectNextCount 인자로 flux.count().block()을 사용하는 이유는, 기본적으로 Flux는 비동기 처리이기 때문에 비동기는 총 갯수가 몇갠지 파악할 수 없어서 block()으로 동기 처리로 바꾸어 총 갯수를 알아내려고 사용함.

block()은 실무에서는 지양되는 함수

 

 

5번 테스트

@Test
    public void testScheduler() {
        Flux<Integer> flux = Flux.just(1, 2, 3);
        Flux<Integer> flux2 = flux.flatMap(i -> Mono.just(i * 10))
                .publishOn(Schedulers.boundedElastic())
                .filter(i -> i/10 == 2)
                .publishOn(Schedulers.parallel())
//                .subscribeOn(Schedulers.boundedElastic())
                .log();

        for (int i = 0; i < 10; i++) {
            StepVerifier.create(flux2)
                    .expectNext(20)
                    .verifyComplete();
        }
    }

publishOn이 두개 있으면, 아래를 기준으로 쓰레드가 생성된다.

parallel과 boundedElastic의 쓰레드 생성은 조금 다른방법으로 진행되는데, parallel은 호출시마다 새로운 쓰레드가 생성되는 반면, boundedElastic는 이전에 생성한 쓰레드가 일을 다 하고 놀고있으면 재사용을 한다.

 

 

 

6번 테스트

    @Test
    public void zipTest() {
        Flux<Tuple2<Integer, Integer>> flux = Flux.just(1, 2, 3, 4)
                .log()
                .map(i -> i * 2)
                .zipWith(Flux.range(0, Integer.MAX_VALUE))
                .log();

        StepVerifier.create(flux)
                .expectNextCount(4)
                .verifyComplete();
    }

플럭스와 플럭스를 압축하는 zipWith

 

 

 

7번 테스트

  @Test
    public void groupByTest() {
        List<Integer> intList = new ArrayList<>();
        Random r = new Random();
        for (int i=0; i<100; i++) {
            intList.add(r.nextInt(10));
        }

        Flux<Integer> flux = Flux.fromIterable(intList);

        flux.groupBy(i -> i % 10) //key : predicate value, value : predicate 로 필터링 된 Flux<Object>
                .doOnNext(g -> System.out.println(g.key() + " :" + g.collectList()))
                .subscribe();
    }

list를 Flux화 할때는 fromIterable을 사용한다. 실제로 Flux.just보단 Flux.fromIterable이 실무에서 훨씬 많이 쓰임!

 

 

 


 

 

 

오늘은 예제 코드만 실습을 해서

딱히 적을 내용이 없다.

 

 

블로그 쓰는것도 일이라..

다행이다 :)

오늘도 수업이 없고 과제시간만 있어서

블로그 쓸게 거의 없을 것 같아서 행복

 

 

728x90
반응형

댓글