Contents

[Mastering Spring 5.0] 11.1 Reactive Programming

μŠ€ν”„λ§ 5.0 λ§ˆμŠ€ν„° μŠ€ν„°λ””
μŠ€ν”„λ§ 5.0 λ§ˆμŠ€ν„° μŠ€ν„°λ”” ν•™μŠ΅ λ‚΄μš© μ •λ¦¬μž…λ‹ˆλ‹€.

λ¦¬μ•‘ν‹°λΈŒ μ‹œμŠ€ν…œ

μƒˆλ‘œμš΄ λ””λ°”μ΄μŠ€ (λͺ¨λ°”일, νƒœλΈ”λ¦Ώ λ“±) real-time data 에 λŒ€ν•œ μˆ˜μš” 증가

  • λŒ€λŸ‰μ˜ ν”„λ‘œμ„ΈμŠ€ 처리 λ‘œλ“œ λ°œμƒ
  • 데이터 λ³Όλ₯¨μ΄ κΈ°ν•˜κΈ‰μˆ˜μ μœΌλ‘œ 증가
  • 인프라 μœ μ§€ 보수 λΉ„μš© 증가

Reactive μ‹œμŠ€ν…œ νŠΉμ§•

Reactive manifesto : https://www.reactivemanifesto.org/ko

Reative Manifesto λŠ” λ‹€μŒ λ„€ 가지 핡심 원칙에 따라 Reactive System 의 νŠΉμ„±μ„ 개랡적으둜 μ„€λͺ…ν•˜κ³  μžˆλ‹€.

  • λ°˜μ‘μ„± (Responsive) : λͺ¨λ“  응닡은 μ μ‹œμ— λΉ λ₯΄κ³  μΌκ΄€λœ λŒ€μ‘μ„ μ œκ³΅ν•˜λ©° μ‹ λ’°ν• μˆ˜ 있으며 μΌκ΄€λœ μ„œλΉ„μŠ€ ν’ˆμ§ˆμ„ μ œκ³΅ν•œλ‹€.
  • 회볡λ ₯ (Resilient) : 각각의 κ΅¬μ„±μš”μ†Œ 듀이 λΆ„λ¦¬λ˜μ–΄ 있기 λ•Œλ¬Έμ— κ΅¬μ„±μš”μ†Œ 쀑 ν•˜λ‚˜μ— λ¬Έμ œκ°€ λ°œμƒν•˜λ”λΌλ„ 전체 μ‹œμŠ€ν…œμ΄ λ‹€μš΄λ˜λŠ” 것을 λ°©μ§€ν•˜κ³  볡ꡬ ν•  수 μžˆλ„λ‘ 보μž₯ν•œλ‹€. λ˜ν•œ μž₯μ• μ˜ λ°œμƒμ΄ μ˜ˆμ™Έμ μΈ ν˜„μƒμ΄ μ•„λ‹Œ 정상적인 κΈ°λŠ₯의 μΌλΆ€λ‘œ μ²˜λ¦¬ν•œλ‹€.
  • μœ μ—°μ„± (Elastic) : μš”μ²­μ„ μ²˜λ¦¬ν•˜κΈ° μœ„ν•΄ ν• λ‹Ήλœ λ¦¬μ†ŒμŠ€λ₯Ό λŠ˜λ¦¬κ±°λ‚˜ μ€„μž„μœΌλ‘œμ¨ μš”μ²­ λ‘œλ“œμ˜ 변화에 λŒ€μ‘ν•  수 μžˆλ‹€. ( auto scale )
  • λ©”μ‹œμ§€ 기반(Message-driven) : μ‹œμŠ€ν…œμ˜ κ΅¬μ„±μš”μ†Œλ“€μ΄ 메세지(λ˜λŠ” 이벀트) λ₯Ό 톡해 이루어진닀. μ—¬κΈ°μ—μ„œ κ΅¬μ„±μš”μ†Œλ“€μ€ μ»΄ν¬λ„ŒνŠΈ, μ„œλΉ„μŠ€, 객체, API 무엇이라도 될 수 있으며 과거처럼 λ©”μ„œλ“œ ν˜ΈμΆœμ΄λ‚˜ RPC 같은 λΈ”λ‘œν‚Ή λ°©μ‹μœΌλ‘œ μ˜μ‚¬μ†Œν†΅ ν•˜μ§€ μ•Šκ³ , 보내고 μžŠλŠ”(fire-and-forget) λ°©μ‹μœΌλ‘œ λ©”μ‹œμ§€λ₯Ό μ£Όκ³  λ°›μœΌλ©° μ†Œν†΅ν•œλ‹€.

Reactive Keyworld

  • A reactive stream should be non-blocking
  • It should be a stream of data
  • It should work asynchronously
  • And it should be able to handle back pressure.

Non-Bloking

Bloking

λΈ”λ‘œν‚Ήμ€ μš”μ²­μ΄ λ°œμƒν•˜κ³  μž‘μ—…μ„ μ§„ν–‰ν•˜λŠ” λ™μ•ˆ ν”„λ‘œκ·Έλž¨μ˜ 진행을 λ©ˆμΆ”κ³ (block) μ™„λ£Œλ  λ•ŒκΉŒμ§€ λͺ¨λ“  일을 μ€‘λ‹¨ν•œ μƒνƒœλ‘œ λŒ€κΈ°ν•΄μ•Ό ν•˜λŠ” 것을 λΈ”λ‘œν‚Ή 방식이라고 ν•œλ‹€.

Blocking Java Socket 톡신 예제

ν΄λΌμ΄μ–ΈνŠΈκ°€ 접속할 λ•Œ κΉŒμ§€ accept() λ©”μ„œλ“œλŠ” 항상 λΈ”λ‘œν‚Ή λœλ‹€.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Socket clientSocket = serverSocket.accept();
...
String request, response;
while ((request = in.readLine()) != null) {
    response = processRequest(request);
    out.println(response);
    if ("Done".equals(request)) {
        break;
    }
}
  • λ˜ν•œ ν΄λΌμ΄μ–ΈνŠΈκ°€ 보낸 슀트림이 μ™„λ£Œ λ˜λŠ” μž…λ ₯ 끝 문자 ( Ctrl + C μž…λ ₯ μ‹œ) λ₯Ό 보낼 λ•Œ κΉŒμ§€ λ°œμƒν•œλ‹€.
  • ν΄λΌμ΄μ–ΈνŠΈκ°€ λ§Žμ„ 수둝 μŠ€λ ˆλ“œ μˆ˜κ°€ μ¦κ°€ν•˜κ³  μ„œλ²„μ— μ‹¬κ°ν•œ μ„±λŠ₯μ €ν•˜ λ°œμƒ -> λŒ€μ•ˆμœΌλ‘œ μŠ€λ ˆλ“œ 풀을 μ‚¬μš©

Non-Blocking

μ–΄λ–€ μŠ€λ ˆλ“œμ—μ„œ 였λ₯˜κ°€ λ°œμƒν•˜κ±°λ‚˜ λ©ˆμΆ”μ—ˆμ„ λ•Œ λ‹€λ₯Έ μ“°λ ˆλ“œμ—κ²Œ 영ν–₯을 λΌμΉ˜μ§€ μ•Šλ„λ‘ ν•˜λŠ” 방법이닀. μš”μ²­ν•œ μž‘μ—…(μŠ€λ ˆλ“œ) 이 진행 λ˜λŠ” λ™μ•ˆ μ¦‰μ‹œ λ‹€μŒ μž‘μ—…μ„ μ²˜λ¦¬ν•¨μœΌλ‘œμ¨ μ‹œμŠ€ν…œ μžμ›μ„ 더 효율적으둜 ν™œμšœμ΄ κ°€λŠ₯ν•˜λ‹€. κ·ΈλŸ¬λ‚˜ μš”μ²­ν•œ μž‘μ—… 이후 후속 μž‘μ—…μ„ μ΄μ–΄μ„œ 진행할 수 μžˆλ„λ‘ λ³„λ„μ˜ 약속(Polling, Callback Function λ“±) 이 ν•„μš”ν•˜λ‹€.

1
2
3
4
5
6
7
8

this.selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);

// bind server socket channel to port
serverChannel.socket().bind(listenAddress);
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);    

accept() λ©”μ„œλ“œκ°€ λΈ”λ‘œν‚Ή λ˜μ§€ μ•Šκ³  λ°”λ‘œ λ¦¬ν„΄λ˜κΈ° λ•Œλ¬Έμ—, ν΄λΌμ΄μ–ΈνŠΈκ°€ μ—°κ²° μš”μ²­μ„ 보내기 μ „κΉŒμ§€ while 블둝 μ½”λ“œκ°€ μ‰΄μƒˆ 없이 λ°˜λ³΅λ˜μ–΄ CPUκ°€ κ³Όλ„ν•˜κ²Œ μ†ŒλΉ„λ˜λŠ” 문제점이 λ°œμƒν•œλ‹€. κ·Έλž˜μ„œ λ„ŒλΈ”λ‘œν‚Ήμ€ 이벀트 λ¦¬μŠ€λ„ˆ 역할을 ν•˜λŠ” μ…€λ ‰ν„°(Selector) λ₯Ό μ‚¬μš©ν•œλ‹€. 이 Selector λ₯Ό λ„ŒλΈ”λ‘œν‚Ή 채널에 등둝해 λ†“μœΌλ©΄ ν΄λΌμ΄μ–ΈνŠΈμ˜ μ—°κ²° μš”μ²­μ΄ λ“€μ–΄μ˜€κ±°λ‚˜ 데이터가 도착할 경우 채널은 Selector 에 ν†΅λ³΄ν•œλ‹€. SelectorλŠ” ν†΅λ³΄ν•œ 채널듀을 선택해 μž‘μ—… μŠ€λ ˆλ“œκ°€ accept() λ˜λŠ” read() λ©”μ†Œλ“œλ₯Ό μ‹€ν–‰ν•œλ‹€.

1
2
3
4
5
6
while (true) {
    int readyCount = selector.select();
        if (readyCount == 0) {
            continue;
        }
}

Backpressure

비동기식 데이터 슀트림 μ²˜λ¦¬μ— λŒ€ν•œ 이슈

  • λ„ˆλ¬΄ 큰 데이터 슀트림 -> busy waiting
  • λ„ˆλ¬΄ λΉ λ₯Έ 데이터 슀트림 전솑 속도 -> out of memory exception

Backpressure λŠ” μ•ˆμ •μ μœΌλ‘œ μ²˜λ¦¬ν•˜κΈ°μ— λ„ˆλ¬΄ 큰 데이터 ν˜Ήμ€ 데이터λ₯Ό μ‹ λ’° 있게 처리 ν•  수 μžˆλŠ” μ†λ„λ‘œ κ΅¬λ…μžμ—κ²Œ μ œκ³΅ν•˜λŠ” 방법이닀. Reactive 데이터 슀트림의 Push 와 Pull 을 κ΄€λ¦¬ν•˜λŠ” Buffer λ₯Ό μ‚¬μš©ν•˜μ—¬ κ΅¬λ…μžλŠ” νŠΉμ • μ–‘μ˜ 데이터λ₯Ό μš”μ²­ν•˜κ³  μ†ŒμŠ€λŠ” κ΅¬μ„±λœ 데이터λ₯Ό ν•΄λ‹Ή λ°μ΄ν„°λ‘œ μœ λ™μ μœΌλ‘œ Push 와 Pull 을 ν•˜λŠ” 방식이닀.

좜처 및 μ°Έκ³  μ‚¬μ΄νŠΈ