Contents

[Mastering Spring 5.0] 11.2 Spring Reactive - 1

์Šคํ”„๋ง 5.0 ๋งˆ์Šคํ„ฐ ์Šคํ„ฐ๋””
์Šคํ”„๋ง 5.0 ๋งˆ์Šคํ„ฐ ์Šคํ„ฐ๋”” ํ•™์Šต ๋‚ด์šฉ ์ •๋ฆฌ์ž…๋‹ˆ๋‹ค.

๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ

์˜์กด์„ฑ ์ถ”๊ฐ€

1
2
implementation ('org.reactivestreams:reactive-streams:1.0.2')
implementation ('org.reactivestreams:reactive-streams-tck:1.0.2')

๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์˜ ๊ตฌ์„ฑ์š”์†Œ

  • Publisher : ๋ฐ์ดํ„ฐ ์ œ๊ณต์ž. ๊ตฌ๋…ํ•œ ๊ตฌ๋…์ž๋“ค์—๊ฒŒ ๊ตฌ๋… ์ •๋ณด๋ฅผ ํ† ๋Œ€๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ œ๊ณตํ•œ๋‹ค.
  • Subscriber : ๋ฐ์ดํ„ฐ ์†Œ๋ชจ์ž. ์ œ๊ณต์ž๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์•„ ์†Œ๋ชจํ•œ๋‹ค.
  • Subscription : ๊ตฌ๋… ์ •๋ณด. Subscriber ๋Š” Publisher ๋ฅผ ๊ตฌ๋…ํ•˜์—ฌ ๋ฐ์ดํ„ฐ(n) ์š”์ฒญํ•  ์ˆ˜ ์žˆ๋‹ค.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Publisher
package org.reactivestreams;

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> var1);
}

// Subscriber
package org.reactivestreams;

public interface Subscriber<T> {
    void onSubscribe(Subscription var1);

    void onNext(T var1);

    void onError(Throwable var1);

    void onComplete();
}

// Subscription
package org.reactivestreams;

public interface Subscription {
    void request(long var1);

    void cancel();
}

Publisher

  • void subscribe(Subscriber<? super T> var1); : Subscriber ๊ฐ์ฒด๋ฅผ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ์ž…๋ ฅ๋ฐ›์•„ Publisher ๊ฐ€ Subscriber ๊ด€๋ฆฌ ํ•  ์ˆ˜ ์žˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  Subscriber ์˜ ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

Subscriber

  • onSubscribe() : Publisher ๋ฅผ ๊ตฌ๋…ํ•˜๊ฒŒ ๋˜๋ฉด ํ˜ธ์ถœ๋œ๋‹ค.
  • onNext() : ๋น„๋™๊ธฐ ์‹์œผ๋กœ Executor ๋ฅผ ํ†ตํ•ด ๊ตฌ๋…์ž์—๊ฒŒ ๋ฉ”์„ธ์ง€๋ฅผ ๊ฒŒ์‹œํ•  ์ˆ˜ ์žˆ๋‹ค.
  • onError() : ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒ ์‹œ ํ˜ธ์ถœ๋œ๋‹ค.
  • onComplete() : ๋” ๊ฒŒ์‹œ๋œ ๋ฉ”์„ธ์ง€ ์š”์†Œ๊ฐ€ ์—†์„ ๊ฒฝ์šฐ ๋‹ค์Œ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

Subscription

  • request() : Publisher ์—๊ฒŒ ๊ฒŒ์‹œ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์š”์ฒญํ•œ๋‹ค.
  • cancel() : ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•œ ์š”์ฒญ์„ ์ทจ์†Œํ•œ๋‹ค.

๋ฆฌ์•กํ„ฐ

๋ฆฌ์•กํ„ฐ๋Š” ์Šคํ”„๋ง ํ”ผ๋ณดํƒˆ ํŒ€์˜ ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋ ˆ์ž„์›Œํฌ์ด๋ฉฐ, ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•œ๋‹ค.

์˜์กด์„ฑ ์ถ”๊ฐ€

build.gradle

1
2
implementation      ('io.projectreactor:reactor-core:3.2.5.RELEASE')
testImplementation  ('io.projectreactor:reactor-test:3.2.5.RELEASE')
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
plugins {
    id 'java'
    id 'war'
    id 'org.springframework.boot' version '2.1.1.RELEASE'
}

apply plugin: 'java'
apply plugin: 'io.spring.dependency-management'

group 'com.mastering.spring'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.11

repositories {
    mavenCentral()
}

dependencies {

    implementation ('org.springframework.boot:spring-boot-starter')

    implementation ('org.reactivestreams:reactive-streams:1.0.2')
    implementation ('org.reactivestreams:reactive-streams-tck:1.0.2')

    implementation      ('io.projectreactor:reactor-core:3.2.5.RELEASE')
    testImplementation  ('io.projectreactor:reactor-test:3.2.5.RELEASE')

    testImplementation  ('org.springframework.boot:spring-boot-starter-test:2.1.2.RELEASE')
    testImplementation  ('junit:junit:4.12')
}

Mono ์™€ Flux

Mono ์™€ Flux ๋ชจ๋‘ Reactive Stream ์˜ Publisher ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•˜๊ณ  ์žˆ์œผ๋ฉฐ, Reactor ์—์„œ ์ œ๊ณตํ•˜๋Š” ์—ฌ๋Ÿฌ ์—ฐ์‚ฐ์ž(operators) ์˜ ์กฐํ•ฉ์„ ํ†ตํ•ด ์ŠคํŠธ๋ฆผ์„ ๊ฐ€๊ณตํ•  ์ˆ˜ ์žˆ๋‹ค.

  • Mono : ์š”์†Œ๊ฐ€ ์•„์˜ˆ ์—†๊ฑฐ๋‚˜ ํ•˜๋‚˜์˜ ๊ฒฐ๊ณผ๋งŒ์„ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ Reactor ๊ฐ์ฒด
  • Flux : ๊ฒฐ๊ณผ๊ฐ€ 0-N ๊ฐœ์ธ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ๊ฒฐ๊ณผ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” Reactor ๊ฐ์ฒด
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.mastering.spring.reactive;

import org.junit.Test;
import reactor.core.publisher.Mono;

import java.time.Duration;

public class SpringReactiveTest {

    @Test
    public void monoExample() throws InterruptedException {

        // 5์ดˆ ํ›„์— ํ•˜๋‚˜์˜ ์š”์†Œ๋ฅผ ๋ฐฉ์ถœํ•œ๋‹ค.
        Mono<String> stubMonoWithADelay = Mono.just("Ranga").delayElement(Duration.ofSeconds(5));

        // ๋ชจ๋…ธ ์ด๋ฒคํŠธ๋ฅผ ์ˆ˜์‹ ํ•˜๊ณ  ์ฝ˜์†”์— ๊ธฐ๋กํ•œ๋‹ค.
        stubMonoWithADelay.subscribe(System.out::println);

        // ํ…Œ์ŠคํŠธ ์‹คํ–‰์‹œ๊ฐ„์„ ์ง€์—ฐ์‹œํ‚จ๋‹ค.
        Thread.sleep(10000);
    }

}
1
2
3
4
5
6
7
8
9
/**
 * Consumer ๋ฅผ ๋ช…์‹œ์ ์œผ๋กœ ์ •์˜
 */
class SystemOutConsumer implements Consumer<String> {
    @Override
    public void accept(String s) {
        System.out.println("Received " + s + " at" + new Date());
    }
}
1
2
3
// ๋ชจ๋…ธ ์ด๋ฒคํŠธ๋ฅผ ์ˆ˜์‹ ํ•˜๊ณ  ์ฝ˜์†”์— ๊ธฐ๋กํ•œ๋‹ค.
// stubMonoWithADelay.subscribe(System.out::println);
stubMonoWithADelay.subscribe(new SystemOutConsumer());
1
2
3
4
5
6
7
class WelcomeConsumer implements Consumer<String> {
    @Override
    public void accept(String s) {
        System.out.println("Welcome " + s);

    }
}
1
2
3
4
// ๋ชจ๋…ธ ์ด๋ฒคํŠธ๋ฅผ ์ˆ˜์‹ ํ•˜๊ณ  ์ฝ˜์†”์— ๊ธฐ๋กํ•œ๋‹ค.
// stubMonoWithADelay.subscribe(System.out::println);
stubMonoWithADelay.subscribe(new SystemOutConsumer());
stubMonoWithADelay.subscribe(new WelcomeConsumer());
1
2
3
4
5
6
7

@Test
public void simpleFluxStream() {
        Flux<String> stubFluxStream = Flux.just("Jane", "Joe");
        stubFluxStream.subscribe(new SystemOutConsumer());
    }
}

์Šคํ”„๋ง ์›น ๋ฆฌ์•กํ‹ฐ๋ธŒ

์Šคํ”„๋ง ์›น ๋ฆฌ์•กํ‹ฐ๋ธŒ๋Š” ์Šคํ”„๋ง ํ”„๋ ˆ์ž„์›Œํฌ 5 ์˜ ์ƒˆ๋กœ์šด ๊ธฐ๋Šฅ ์ค‘ ํ•˜๋‚˜์ด๋‹ค. ์ด ๊ธฐ๋Šฅ์€ ์›น ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์˜ ๋ฆฌ์•กํ‹ฐ๋ธŒ ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•œ๋‹ค. ์Šคํ”„๋ง ์›น ๋ฆฌ์•กํ‹ฐ๋ธŒ๋Š” ์Šคํ”„๋ง MVC ์™€ ๋™์ผํ•œ ๊ธฐ๋ณธ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ๋ชจ๋ธ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•œ๋‹ค.

์›น ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ ์˜์กด์„ฑ ์ถ”๊ฐ€

1
implementation ('org.springframework.boot:spring-boot-starter-webflux')

๋ฆฌ์•กํ‹ฐ๋ธŒ ์ปจํŠธ๋กค๋Ÿฌ ์ƒ์„ฑ

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@RestController
public class StockPriceEventController {

    @GetMapping("/stocks/price/{stockCode}")
    Flux<String> retrieveStockPriceHardcoded(@PathVariable("stockCode") String stockCode) {
        return Flux.interval(Duration.ofSeconds(5))
                .map(l -> getCurrentDate() +" : " + getRandomNumber(100, 125)).log();

    }

    private String getCurrentDate() {
        return (new Date()).toString();
    }

    private int getRandomNumber(int min, int max) {
        return ThreadLocalRandom.current().nextInt(min, max + 1);
    }

}

HTML ๋ทฐ ์ƒ์„ฑ

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
    <p>
        <button id="subscribe-button">Get Latest IBM Price</button>
        <ul id="display"></ul>
    </p>

    <script>
        addEvent("click", document.getElementById('subscribe-button'),
            () => registerEventSourceAndAddResponseTo("/stocks/price/IBM", "display"));

        function registerEventSourceAndAddResponseTo(uri, elementId) {
            let stringEvents = document.getElementById(elementId);
            let stringEventSource = new EventSource(uri)

            stringEventSource.onmessage = (e) => {
                let newElement = document.createElement("li");
                newElement.innerHTML = e.data;
                stringEvents.appendChild(newElement);
            }
        }

        function addEvent(event, elem, func) {
            if (typeof (EventSource) !== 'undefined') {
                elem.addEventListener(event, func, false);
            } else {
                elem[event] = func;
            }
        }

    </script>
</body>
</html>

์ถœ์ฒ˜ ๋ฐ ์ฐธ๊ณ  ์‚ฌ์ดํŠธ