์คํ๋ง 5.0 ๋ง์คํฐ ์คํฐ๋
์คํ๋ง 5.0 ๋ง์คํฐ ์คํฐ๋ ํ์ต ๋ด์ฉ ์ ๋ฆฌ์
๋๋ค.
๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ
์์กด์ฑ ์ถ๊ฐ
1
2
implementation ( 'org.reactivestreams:reactive-streams:1.0.2' )
implementation ( 'org.reactivestreams:reactive-streams-tck:1.0.2' )
๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ ๊ตฌ์ฑ์์
Publisher
: ๋ฐ์ดํฐ ์ ๊ณต์. ๊ตฌ๋
ํ ๊ตฌ๋
์๋ค์๊ฒ ๊ตฌ๋
์ ๋ณด๋ฅผ ํ ๋๋ก ๋ฐ์ดํฐ๋ฅผ ์ ๊ณตํ๋ค.
Subscriber
: ๋ฐ์ดํฐ ์๋ชจ์. ์ ๊ณต์๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ ์๋ชจํ๋ค.
Subscription
: ๊ตฌ๋
์ ๋ณด. Subscriber ๋ Publisher ๋ฅผ ๊ตฌ๋
ํ์ฌ ๋ฐ์ดํฐ(n) ์์ฒญํ ์ ์๋ค.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Publisher
package org.reactivestreams ;
public interface Publisher < T > {
void subscribe ( Subscriber <? super T > var1 );
}
// Subscriber
package org.reactivestreams ;
public interface Subscriber < T > {
void onSubscribe ( Subscription var1 );
void onNext ( T var1 );
void onError ( Throwable var1 );
void onComplete ();
}
// Subscription
package org.reactivestreams ;
public interface Subscription {
void request ( long var1 );
void cancel ();
}
Publisher
void subscribe(Subscriber<? super T> var1);
: Subscriber ๊ฐ์ฒด๋ฅผ ๋งค๊ฐ๋ณ์๋ก ์
๋ ฅ๋ฐ์ Publisher ๊ฐ Subscriber ๊ด๋ฆฌ ํ ์ ์๋ค. ๊ทธ๋ฆฌ๊ณ Subscriber ์ ๋ฉ์๋๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
Subscriber
onSubscribe()
: Publisher ๋ฅผ ๊ตฌ๋
ํ๊ฒ ๋๋ฉด ํธ์ถ๋๋ค.
onNext()
: ๋น๋๊ธฐ ์์ผ๋ก Executor
๋ฅผ ํตํด ๊ตฌ๋
์์๊ฒ ๋ฉ์ธ์ง๋ฅผ ๊ฒ์ํ ์ ์๋ค.
onError()
: ์๋ฌ๊ฐ ๋ฐ์ ์ ํธ์ถ๋๋ค.
onComplete()
: ๋ ๊ฒ์๋ ๋ฉ์ธ์ง ์์๊ฐ ์์ ๊ฒฝ์ฐ ๋ค์ ์์
์ ์ํํ๋ค.
Subscription
request()
: Publisher ์๊ฒ ๊ฒ์๋ ๋ฐ์ดํฐ๋ฅผ ์์ฒญํ๋ค.
cancel()
: ๋ฐ์ดํฐ์ ๋ํ ์์ฒญ์ ์ทจ์ํ๋ค.
๋ฆฌ์กํฐ
๋ฆฌ์กํฐ๋ ์คํ๋ง ํผ๋ณดํ ํ์ ๋ฆฌ์กํฐ๋ธ ํ๋ ์์ํฌ์ด๋ฉฐ, ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ ๊ธฐ๋ฐ์ผ๋ก ํ๋ค.
์์กด์ฑ ์ถ๊ฐ
build.gradle
1
2
implementation ( 'io.projectreactor:reactor-core:3.2.5.RELEASE' )
testImplementation ( 'io.projectreactor:reactor-test:3.2.5.RELEASE' )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
plugins {
id 'java'
id 'war'
id 'org.springframework.boot' version '2.1.1.RELEASE'
}
apply plugin: 'java'
apply plugin: 'io.spring.dependency-management'
group 'com.mastering.spring'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.11
repositories {
mavenCentral ()
}
dependencies {
implementation ( 'org.springframework.boot:spring-boot-starter' )
implementation ( 'org.reactivestreams:reactive-streams:1.0.2' )
implementation ( 'org.reactivestreams:reactive-streams-tck:1.0.2' )
implementation ( 'io.projectreactor:reactor-core:3.2.5.RELEASE' )
testImplementation ( 'io.projectreactor:reactor-test:3.2.5.RELEASE' )
testImplementation ( 'org.springframework.boot:spring-boot-starter-test:2.1.2.RELEASE' )
testImplementation ( 'junit:junit:4.12' )
}
Mono ์ Flux
Mono ์ Flux ๋ชจ๋ Reactive Stream ์ Publisher ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ๊ณ ์์ผ๋ฉฐ, Reactor ์์ ์ ๊ณตํ๋ ์ฌ๋ฌ ์ฐ์ฐ์(operators) ์ ์กฐํฉ์ ํตํด ์คํธ๋ฆผ์ ๊ฐ๊ณตํ ์ ์๋ค.
Mono
: ์์๊ฐ ์์ ์๊ฑฐ๋ ํ๋์ ๊ฒฐ๊ณผ๋ง์ ์ฒ๋ฆฌํ๊ธฐ ์ํ Reactor ๊ฐ์ฒด
Flux
: ๊ฒฐ๊ณผ๊ฐ 0-N ๊ฐ์ธ ์ฌ๋ฌ ๊ฐ์ ๊ฒฐ๊ณผ๋ฅผ ์ฒ๋ฆฌํ๋ Reactor ๊ฐ์ฒด
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.mastering.spring.reactive ;
import org.junit.Test ;
import reactor.core.publisher.Mono ;
import java.time.Duration ;
public class SpringReactiveTest {
@Test
public void monoExample () throws InterruptedException {
// 5์ด ํ์ ํ๋์ ์์๋ฅผ ๋ฐฉ์ถํ๋ค.
Mono < String > stubMonoWithADelay = Mono . just ( "Ranga" ). delayElement ( Duration . ofSeconds ( 5 ));
// ๋ชจ๋
ธ ์ด๋ฒคํธ๋ฅผ ์์ ํ๊ณ ์ฝ์์ ๊ธฐ๋กํ๋ค.
stubMonoWithADelay . subscribe ( System . out :: println );
// ํ
์คํธ ์คํ์๊ฐ์ ์ง์ฐ์ํจ๋ค.
Thread . sleep ( 10000 );
}
}
1
2
3
4
5
6
7
8
9
/**
* Consumer ๋ฅผ ๋ช
์์ ์ผ๋ก ์ ์
*/
class SystemOutConsumer implements Consumer < String > {
@Override
public void accept ( String s ) {
System . out . println ( "Received " + s + " at" + new Date ());
}
}
1
2
3
// ๋ชจ๋
ธ ์ด๋ฒคํธ๋ฅผ ์์ ํ๊ณ ์ฝ์์ ๊ธฐ๋กํ๋ค.
// stubMonoWithADelay.subscribe(System.out::println);
stubMonoWithADelay . subscribe ( new SystemOutConsumer ());
1
2
3
4
5
6
7
class WelcomeConsumer implements Consumer < String > {
@Override
public void accept ( String s ) {
System . out . println ( "Welcome " + s );
}
}
1
2
3
4
// ๋ชจ๋
ธ ์ด๋ฒคํธ๋ฅผ ์์ ํ๊ณ ์ฝ์์ ๊ธฐ๋กํ๋ค.
// stubMonoWithADelay.subscribe(System.out::println);
stubMonoWithADelay . subscribe ( new SystemOutConsumer ());
stubMonoWithADelay . subscribe ( new WelcomeConsumer ());
1
2
3
4
5
6
7
@Test
public void simpleFluxStream () {
Flux < String > stubFluxStream = Flux . just ( "Jane" , "Joe" );
stubFluxStream . subscribe ( new SystemOutConsumer ());
}
}
์คํ๋ง ์น ๋ฆฌ์กํฐ๋ธ
์คํ๋ง ์น ๋ฆฌ์กํฐ๋ธ๋ ์คํ๋ง ํ๋ ์์ํฌ 5 ์ ์๋ก์ด ๊ธฐ๋ฅ ์ค ํ๋์ด๋ค. ์ด ๊ธฐ๋ฅ์ ์น ์ ํ๋ฆฌ์ผ์ด์
์ ๋ฆฌ์กํฐ๋ธ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค. ์คํ๋ง ์น ๋ฆฌ์กํฐ๋ธ๋ ์คํ๋ง MVC ์ ๋์ผํ ๊ธฐ๋ณธ ํ๋ก๊ทธ๋๋ฐ ๋ชจ๋ธ์ ๊ธฐ๋ฐ์ผ๋ก ํ๋ค.
์น ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ ์์กด์ฑ ์ถ๊ฐ
1
implementation ( 'org.springframework.boot:spring-boot-starter-webflux' )
๋ฆฌ์กํฐ๋ธ ์ปจํธ๋กค๋ฌ ์์ฑ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RestController
public class StockPriceEventController {
@GetMapping ( "/stocks/price/{stockCode}" )
Flux < String > retrieveStockPriceHardcoded ( @PathVariable ( "stockCode" ) String stockCode ) {
return Flux . interval ( Duration . ofSeconds ( 5 ))
. map ( l -> getCurrentDate () + " : " + getRandomNumber ( 100 , 125 )). log ();
}
private String getCurrentDate () {
return ( new Date ()). toString ();
}
private int getRandomNumber ( int min , int max ) {
return ThreadLocalRandom . current (). nextInt ( min , max + 1 );
}
}
HTML ๋ทฐ ์์ฑ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<!DOCTYPE html>
< html lang = "en" >
< head >
< meta charset = "UTF-8" >
< title > Title</ title >
</ head >
< body >
< p >
< button id = "subscribe-button" > Get Latest IBM Price</ button >
< ul id = "display" ></ ul >
</ p >
< script >
addEvent ( "click" , document . getElementById ( 'subscribe-button' ),
() => registerEventSourceAndAddResponseTo ( "/stocks/price/IBM" , "display" ));
function registerEventSourceAndAddResponseTo ( uri , elementId ) {
let stringEvents = document . getElementById ( elementId );
let stringEventSource = new EventSource ( uri )
stringEventSource . onmessage = ( e ) => {
let newElement = document . createElement ( "li" );
newElement . innerHTML = e . data ;
stringEvents . appendChild ( newElement );
}
}
function addEvent ( event , elem , func ) {
if ( typeof ( EventSource ) !== 'undefined' ) {
elem . addEventListener ( event , func , false );
} else {
elem [ event ] = func ;
}
}
</ script >
</ body >
</ html >
์ถ์ฒ ๋ฐ ์ฐธ๊ณ ์ฌ์ดํธ
Java9 Reactive Stream API
Java9 Reactive Stream ์ ๋ํ ์์ธํ ์ค๋ช
๋ฐ ๋ด์ฉ ์ฐธ๊ณ