Post
EN

Reactive Streams 몇가지 사항만 기술

Reactor는 JVM위에서 효율적으로 요청을 관리할 수 있는 온전한 논블로킹 반응형 프로그램 입니다. 이것은 Java 8의 함수형 API에 특히 CompletableFuture, Stream, 과 Duration으로 통합되었습니다. 이것은 비동기 순차 API Flux( N개의 요소) 와 Mono(0또는1) 를 구성할 수 있도록 제공해준다.

Reactive programming은 data stream 고려한 비동기적인 프로그래밍 패러다임이다. 이것은 정적 또는 동적인 데이터 스트림을 쉽게 표현할 수 있음을 의미한다.

Reactive Programming Paradigm 는 종종 객체지향 언어의 Observer design pattern의 확장으로 표현되곤 한다. 이러한 모든 라이브러리의 Iterable-Iterator 쌍에 대한 연관이 있으므로 익숙한 iterator 디자인 패턴과 기본 Reactive Streams 패턴을 비교할 수도 있습니다.

값에 접근하는 방법은 전적으로 Iterable의 책임 임에도 불구하고 반복자를 사용하는 것은 필수적인 프로그래밍 패턴입니다. 사실, 개발자는 시퀀스의 next () 항목에 언제 액세스해야하는지 선택해야합니다. Reactive Streams에서 위의 쌍과 동일한 것은 게시자 - 구독자 형태입니다. 그러나 구독자에게 새로 사용 가능한 값이 올 때 이를 알리는 것은 게시자와 같은 것이고, 밀어내는 쪽과 받는 쪽은 서로 대응 되는 형태 입니다. 또한 푸시 된 값에 적용된 연산은 명령형이 아닌 선언적으로 표현됩니다. 프로그래머는 정확한 제어 흐름을 설명하기보다는 계산 논리를 표현합니다.

추가적으로 전달된 값들은 오류 핸들링이나 완료등이 잘 정의된 방식으로 다루어집니다. publisher는 subscriber에게 새로운 값을 넣을 수 있다. 또한 완료나 오류 처리를 알릴 수 있습니다. 또한 오류나 완료도 순차적으로 종료할 수 있습니다. 이것은 다음과 같이 요약될 수 있다.

onNext x 0..N [onError | onComplete]

명령형 프로그래밍에서 반응형 프로그래밍으로

Reactive 라이브러리들은 고전적인 비동기 접근 방식의 문제점들을 다루고 있다.

또한 몇가지 추가 측면에 초첨을 맞추고 있다.

  • 결합성과 가독성

  • 풍부한 Operator로 조작할 수 있는 데이터 흐름

  • 구독 할때까지 아무일도 발생하지 않는다.

  • backpressure(역압) 또는 생산자에게 생산 속도가 너무 높다는 신호를 보내는 능력

  • 동시성에 의존하지 않는 높은 수준, 높은 가치의 추상화

명령형 프로그래밍에서 선언형 프로그래밍으로 이야기 해도 괜찮을 것 같다.

Java 8 의 Stream 처럼 Collection 같은 데이터 처리 함에 있어 Reactive Streams에서 제공하는 Operator등을 이용하여 선언적으로 데이터를 제어 할 수 있다는 것이다.

여기서 명령형과 선언형을 좀더 설명하자면 다음과 같은 내용이다.

private static int sumIterator(List list) { Iterator it = list.iterator(); int sum = 0; while (it.hasNext()) { int num = it.next(); if (num > 10) { sum += num; } } return sum; }

코드 (알고리즘)을 명시하고 어떤 작업을 한다는 것을 명시하지 않음.

private static int sumStream(List list) { return list.stream() .filter(i -> i > 10) .mapToInt(i -> i) .sum(); }

코드(알고리즘)을 명시하지 않고 어떤 작업을 한다는 것을 명시함.

코드 예제 : https://okky.kr/article/329818

또한 결합성 및 가독성도 아래 처럼 가능하다.

  • 결합성과 가독성

  • 풍부한 Operator로 조작할 수 있는 데이터 흐름

(예제 언넝 고쳐야징 -_-);

class JoinRequest { private String name; private Integer age; public Person(String name, Integer age) { this.name = name; this.age = age; } // getter } class Member { private String name; private Integer age; public AdultPerson(JoinRequest request) { this.name = request.getName(); this.age = request.getAge(); } // getter } List requests = Arrays.asList(new JoinRequest("Ava", 18), new JoinRequest("Amelia", 20), new JoinRequest("Amy", 30)); Flux.fromIterable(requests) .filter( request -> request.getAge() > 17) // (1) .map(Member::new) // (2) .subscribe(() -> { // 회원 등록 });

(더 많은 예제로 설명하고 싶지만, 간단하게 작성하면 다음과 같다.)

여러개의 Request를 받아서 회원을 가입시키는데 18세 이상만 가입을 시킨다고 해보자.

이때 stream에 선언된 operator 등을 보고, 어떤 조건으로 필터링되고 어떤 형태로 형변환이 되어 최종적으로 가입이 된다는 것들을 알아볼 수 있다.

저런 하나의 흐름(stream)에 추가적인 작업이 들어간다고 하면, 그에 따른 operator만 추가해주면 되고, 이런 이점은 유지 보수에 용이하다는 것을 알 수 있다.

또한 비동기-논블록킹 코드를 단계별로, 쉽게 구현할 수 있으며 성능 향상도 얻을 수 있으니 더 이점이 있다고 생각된다.

(각 Operator 마다 저러한 마블 다이어그램을 document로 제공해주니 어떤 기능을 하는지 좀더 명확해진다는 장점? 이 있는 것 같다.)

![](/assets/images/posts/221607289750/1f87c5784d2c.png?type=w580)

  • 구독 할때까지 아무일도 발생하지 않는다.

class Fruit { private String name; private int weight; // getter } class Juice { private Fruit fruit; public Juice(Fruit fruit) { this.fruit = fruit; } // getter } class UnripeApple { private Fruit fruit; public UnripeApple(Fruit fruit) { this.fruit = fruit; } // constructor // getter } public Flux makeBananaJuice(List names) { Flux publisher = selectFruit(names, (s) -> s.getName().equals("banana")); return publisher.map(Juice::new); } public Flux (List names) { Flux publisher = selectFruit(names, s.getName().equals("apple")); return publisher.map(apple -> { notificationService.send(apple); return new UnripeApple(apple); }); } public Flux selectFruit(List fruits, Function fnc) { // function이 해당 메서드를 호출 하는 부분을 재사용하는 것 처럼 변한다고 해보자. return Flux.fromIterable(list) .filter(fnc::apply); }

퍼블리셔를 생성하는 createPublisher 메서드를 통해서 publisher를 생성한다고 해보자.

이때 각각 메서드를 참조하는 여러 메서드가 있다면 subscribe 하지 않는 한 return 받고 Operator를 추가적으로

결합하여 사용할 수 있다.

이러한 흐름(Stream)을 정의한다고 바로 실행되는 것이 아니라, 나중에 subscribe 하는 시점에서 처리가 됨으로

구독할 때까지 아무일도 발생하지 않는다고 할 수 있다.

  • backpressure(역압) 또는 생산자에게 생산 속도가 너무 높다는 신호를 보내는 능력

또한 배압 관련된 사항도 스스로 제어가 가능하다.

인터넷에서 떠도는 예제 처럼 1개의 흐름에 여러개의 publisher를 등록하게 되면 배압 설정이 256개로 기본적으로 가져와지는 것을 확인할 수 있는데, 이런 제어도 flatMap을 통해서 제어가 가능하다.

![](/assets/images/posts/221607289750/0950d8dc1fb7.png?type=w580)

함수형 언어 패러다임도 추가되어 있기 때문에 Reactive Streams 에 추가되는 Operation의 오류나 이상 처리 방법에 대하여 난감해 할 수 있다. Reactive Streams에서 오류 처리는 결국 Streams를 종료하는 형태로 동작하기 때문이다. 이러한 동작으로 인하여 흐름 처리 방법을 Return 타입의 다형성을 적용해서 이러한 문제를 풀어가는 방향으로 진행하였다. 무슨 말이냐면, 값에 의하여 상태를 표시하는 방법으로 진행된 것이다. Null이 발생되거나 오류가 발생될 때 Java에서 행해지던 Exception 처리가 아닌 값으로 상태를 표현하는 방식으로 진행한 것이다. ex) NullValue, ErrorValue 이러한 방법으로 하위 Stream에 영향 없이 진행될 수 있도록 구성하였다. 이러한 방법으로 하위 Stream 진입 전 filter를 걸어서 누락시키는 방법등 응용할 수 있는 방법들이 많이 생겨났다. 값에 의한 예외 처리 방법에 대해서는 scala를 접했을 때 들었던 내용이다.

이러한 oprator 결합을 자유롭게 활용하며 비동기/넌블록킹을 구성할 수 있는 것은 매우 매력적인 모습이다.

또한 함수형으로 작성된 코드를 이용하여 결합하고 구성하여 그동안 복잡하게 풀어갔던 내용들을 풀어갈 수 있어서 많은 장점이 있는 것 같다.

단점은 물론 런닝커브가 높다는 것이다. 제공해주는 많은 operator 중에 일부만 사용했을 뿐이고, 개념을 익히는데 여간 애를 먹은 것이 아니었다.

하지만 작성하고 나서 기존 코드와 차이점을 확인하는 순간, 그동안의 고생보다 값질 것이라는 변함 없을 듯 하다.

This article is licensed under CC BY 4.0 by the author.