์คํ๋ง 5.0 ๋ง์คํฐ ์คํฐ๋ ํ์ต ๋ด์ฉ ์ ๋ฆฌ์
๋๋ค.
         
     
๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ 
์์กด์ฑ ์ถ๊ฐ 
1
 2
  
implementation  ( 'org.reactivestreams:reactive-streams:1.0.2' ) 
implementation  ( 'org.reactivestreams:reactive-streams-tck:1.0.2' ) 
 
 
๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ ๊ตฌ์ฑ์์ 
Publisher  : ๋ฐ์ดํฐ ์ ๊ณต์. ๊ตฌ๋
ํ ๊ตฌ๋
์๋ค์๊ฒ ๊ตฌ๋
 ์ ๋ณด๋ฅผ ํ ๋๋ก ๋ฐ์ดํฐ๋ฅผ ์ ๊ณตํ๋ค.Subscriber : ๋ฐ์ดํฐ ์๋ชจ์. ์ ๊ณต์๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ ์๋ชจํ๋ค.Subscription : ๊ตฌ๋
 ์ ๋ณด. Subscriber ๋ Publisher ๋ฅผ ๊ตฌ๋
ํ์ฌ ๋ฐ์ดํฐ(n) ์์ฒญํ  ์ ์๋ค. 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
  
// Publisher 
 package   org.reactivestreams ; 
 
 public   interface  Publisher < T >   { 
      void   subscribe ( Subscriber <?   super   T >   var1 ); 
 } 
 
 // Subscriber 
 package   org.reactivestreams ; 
 
 public   interface  Subscriber < T >   { 
      void   onSubscribe ( Subscription   var1 ); 
 
      void   onNext ( T   var1 ); 
 
      void   onError ( Throwable   var1 ); 
 
      void   onComplete (); 
 } 
 
 // Subscription 
 package   org.reactivestreams ; 
 
 public   interface  Subscription   { 
      void   request ( long   var1 ); 
 
      void   cancel (); 
 } 
  
 
Publisher 
void subscribe(Subscriber<? super T> var1); : Subscriber  ๊ฐ์ฒด๋ฅผ ๋งค๊ฐ๋ณ์๋ก ์
๋ ฅ๋ฐ์ Publisher  ๊ฐ Subscriber  ๊ด๋ฆฌ ํ  ์ ์๋ค. ๊ทธ๋ฆฌ๊ณ  Subscriber  ์ ๋ฉ์๋๋ฅผ ์ฌ์ฉํ  ์ ์๋ค. 
Subscriber 
onSubscribe() : Publisher  ๋ฅผ ๊ตฌ๋
ํ๊ฒ ๋๋ฉด ํธ์ถ๋๋ค.onNext() : ๋น๋๊ธฐ ์์ผ๋ก Executor ๋ฅผ ํตํด ๊ตฌ๋
์์๊ฒ ๋ฉ์ธ์ง๋ฅผ ๊ฒ์ํ  ์ ์๋ค.onError() : ์๋ฌ๊ฐ ๋ฐ์ ์ ํธ์ถ๋๋ค.onComplete() : ๋ ๊ฒ์๋ ๋ฉ์ธ์ง ์์๊ฐ ์์ ๊ฒฝ์ฐ ๋ค์ ์์
์ ์ํํ๋ค. 
Subscription 
request() :  Publisher  ์๊ฒ ๊ฒ์๋ ๋ฐ์ดํฐ๋ฅผ ์์ฒญํ๋ค.cancel() : ๋ฐ์ดํฐ์ ๋ํ ์์ฒญ์ ์ทจ์ํ๋ค. 
๋ฆฌ์กํฐ 
๋ฆฌ์กํฐ๋ ์คํ๋ง ํผ๋ณดํ ํ์ ๋ฆฌ์กํฐ๋ธ ํ๋ ์์ํฌ์ด๋ฉฐ, ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ์ ๊ธฐ๋ฐ์ผ๋ก ํ๋ค.
์์กด์ฑ ์ถ๊ฐ 
build.gradle
1
 2
  
implementation       ( 'io.projectreactor:reactor-core:3.2.5.RELEASE' ) 
testImplementation   ( 'io.projectreactor:reactor-test:3.2.5.RELEASE' ) 
 
 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
  
plugins  { 
    id  'java' 
     id  'war' 
     id  'org.springframework.boot'  version  '2.1.1.RELEASE' 
 } 
 apply  plugin:  'java' 
apply  plugin:  'io.spring.dependency-management' 
 group  'com.mastering.spring' 
version  '1.0-SNAPSHOT' 
 sourceCompatibility  =  1.11 
 repositories  { 
    mavenCentral () 
 } 
 dependencies  { 
     implementation  ( 'org.springframework.boot:spring-boot-starter' ) 
 
     implementation  ( 'org.reactivestreams:reactive-streams:1.0.2' ) 
     implementation  ( 'org.reactivestreams:reactive-streams-tck:1.0.2' ) 
 
     implementation       ( 'io.projectreactor:reactor-core:3.2.5.RELEASE' ) 
     testImplementation   ( 'io.projectreactor:reactor-test:3.2.5.RELEASE' ) 
 
     testImplementation   ( 'org.springframework.boot:spring-boot-starter-test:2.1.2.RELEASE' ) 
     testImplementation   ( 'junit:junit:4.12' ) 
 } 
 
 
Mono ์ Flux 
Mono ์ Flux ๋ชจ๋ Reactive Stream ์ Publisher ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ๊ณ  ์์ผ๋ฉฐ, Reactor ์์ ์ ๊ณตํ๋ ์ฌ๋ฌ ์ฐ์ฐ์(operators) ์ ์กฐํฉ์ ํตํด ์คํธ๋ฆผ์ ๊ฐ๊ณตํ  ์ ์๋ค.
Mono : ์์๊ฐ ์์ ์๊ฑฐ๋ ํ๋์ ๊ฒฐ๊ณผ๋ง์ ์ฒ๋ฆฌํ๊ธฐ ์ํ Reactor ๊ฐ์ฒดFlux : ๊ฒฐ๊ณผ๊ฐ 0-N ๊ฐ์ธ ์ฌ๋ฌ ๊ฐ์ ๊ฒฐ๊ณผ๋ฅผ ์ฒ๋ฆฌํ๋ Reactor ๊ฐ์ฒด 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
  
package   com.mastering.spring.reactive ; 
 
 import   org.junit.Test ; 
 import   reactor.core.publisher.Mono ; 
 
 import   java.time.Duration ; 
 
 public   class  SpringReactiveTest   { 
 
      @Test 
      public   void   monoExample ()   throws   InterruptedException   { 
 
          // 5์ด ํ์ ํ๋์ ์์๋ฅผ ๋ฐฉ์ถํ๋ค. 
          Mono < String >   stubMonoWithADelay   =   Mono . just ( "Ranga" ). delayElement ( Duration . ofSeconds ( 5 )); 
 
          // ๋ชจ๋
ธ ์ด๋ฒคํธ๋ฅผ ์์ ํ๊ณ  ์ฝ์์ ๊ธฐ๋กํ๋ค. 
          stubMonoWithADelay . subscribe ( System . out :: println ); 
 
          // ํ
์คํธ ์คํ์๊ฐ์ ์ง์ฐ์ํจ๋ค. 
          Thread . sleep ( 10000 ); 
      } 
 
 } 
  
 
1
 2
 3
 4
 5
 6
 7
 8
 9
  
/**
  * Consumer ๋ฅผ ๋ช
์์ ์ผ๋ก ์ ์
  */ 
 class  SystemOutConsumer   implements   Consumer < String >   { 
      @Override 
      public   void   accept ( String   s )   { 
          System . out . println ( "Received "   +   s   +   " at"   +   new   Date ()); 
      } 
 } 
  
 
1
 2
 3
  
// ๋ชจ๋
ธ ์ด๋ฒคํธ๋ฅผ ์์ ํ๊ณ  ์ฝ์์ ๊ธฐ๋กํ๋ค. 
 // stubMonoWithADelay.subscribe(System.out::println); 
 stubMonoWithADelay . subscribe ( new   SystemOutConsumer ()); 
  
 
1
 2
 3
 4
 5
 6
 7
  
class  WelcomeConsumer   implements   Consumer < String >   { 
      @Override 
      public   void   accept ( String   s )   { 
          System . out . println ( "Welcome "   +   s ); 
 
      } 
 } 
  
 
1
 2
 3
 4
  
// ๋ชจ๋
ธ ์ด๋ฒคํธ๋ฅผ ์์ ํ๊ณ  ์ฝ์์ ๊ธฐ๋กํ๋ค. 
 // stubMonoWithADelay.subscribe(System.out::println); 
 stubMonoWithADelay . subscribe ( new   SystemOutConsumer ()); 
 stubMonoWithADelay . subscribe ( new   WelcomeConsumer ()); 
  
 
1
 2
 3
 4
 5
 6
 7
  
 @Test 
 public   void   simpleFluxStream ()   { 
          Flux < String >   stubFluxStream   =   Flux . just ( "Jane" ,   "Joe" ); 
          stubFluxStream . subscribe ( new   SystemOutConsumer ()); 
      } 
 } 
  
 
์คํ๋ง ์น ๋ฆฌ์กํฐ๋ธ 
์คํ๋ง ์น ๋ฆฌ์กํฐ๋ธ๋ ์คํ๋ง ํ๋ ์์ํฌ 5 ์ ์๋ก์ด ๊ธฐ๋ฅ ์ค ํ๋์ด๋ค. ์ด ๊ธฐ๋ฅ์ ์น ์ ํ๋ฆฌ์ผ์ด์
์ ๋ฆฌ์กํฐ๋ธ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค. ์คํ๋ง ์น ๋ฆฌ์กํฐ๋ธ๋ ์คํ๋ง MVC ์ ๋์ผํ ๊ธฐ๋ณธ ํ๋ก๊ทธ๋๋ฐ ๋ชจ๋ธ์ ๊ธฐ๋ฐ์ผ๋ก ํ๋ค.
์น ๋ฆฌ์กํฐ๋ธ ์คํธ๋ฆผ ์์กด์ฑ ์ถ๊ฐ 
1
  
implementation  ( 'org.springframework.boot:spring-boot-starter-webflux' ) 
 
 
๋ฆฌ์กํฐ๋ธ ์ปจํธ๋กค๋ฌ ์์ฑ 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
  
@RestController 
 public   class  StockPriceEventController   { 
 
      @GetMapping ( "/stocks/price/{stockCode}" ) 
      Flux < String >   retrieveStockPriceHardcoded ( @PathVariable ( "stockCode" )   String   stockCode )   { 
          return   Flux . interval ( Duration . ofSeconds ( 5 )) 
                  . map ( l   ->   getCurrentDate ()   + " : "   +   getRandomNumber ( 100 ,   125 )). log (); 
 
      } 
 
      private   String   getCurrentDate ()   { 
          return   ( new   Date ()). toString (); 
      } 
 
      private   int   getRandomNumber ( int   min ,   int   max )   { 
          return   ThreadLocalRandom . current (). nextInt ( min ,   max   +   1 ); 
      } 
 
 } 
  
 
HTML ๋ทฐ ์์ฑ 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
  
<!DOCTYPE html> 
< html  lang = "en" > 
< head > 
    < meta  charset = "UTF-8" > 
     < title > Title</ title > 
 </ head > 
< body > 
    < p > 
         < button  id = "subscribe-button" > Get Latest IBM Price</ button > 
         < ul  id = "display" ></ ul > 
     </ p > 
 
     < script > 
         addEvent ( "click" ,  document . getElementById ( 'subscribe-button' ), 
             ()  =>  registerEventSourceAndAddResponseTo ( "/stocks/price/IBM" ,  "display" )); 
 
         function  registerEventSourceAndAddResponseTo ( uri ,  elementId )  { 
             let  stringEvents  =  document . getElementById ( elementId ); 
             let  stringEventSource  =  new  EventSource ( uri ) 
 
             stringEventSource . onmessage  =  ( e )  =>  { 
                 let  newElement  =  document . createElement ( "li" ); 
                 newElement . innerHTML  =  e . data ; 
                 stringEvents . appendChild ( newElement ); 
             } 
         } 
 
         function  addEvent ( event ,  elem ,  func )  { 
             if  ( typeof  ( EventSource )  !==  'undefined' )  { 
                 elem . addEventListener ( event ,  func ,  false ); 
             }  else  { 
                 elem [ event ]  =  func ; 
             } 
         } 
 
     </ script > 
 </ body > 
</ html > 
 
 
์ถ์ฒ ๋ฐ ์ฐธ๊ณ  ์ฌ์ดํธ 
Java9 Reactive Stream API
 
Java9 Reactive Stream ์ ๋ํ ์์ธํ ์ค๋ช
 ๋ฐ ๋ด์ฉ ์ฐธ๊ณ