본문 바로가기

프론트엔드/JavaScript

RxJS

반응형

기초 개념 공부

Reactive X

반응형 프로그래밍

함수형 프로그래밍은 선언형 프로그래밍의 성질을 가진다.

반응형 프로그래밍은 함수형 프로그래밍의 골격을 갖추고 있다.

Reactive X는 크게 3 요소로 구성된다.

  1. Observable
    1. 일련의 값들을 발행한다.
    2. 관찰될 수 있는 것, 관찰되는 대상이란 의미이다.
    3. Stream은 Observable이 발행하는 연속된 값들의 흐름이다.
    4. pipe라는 배관을 타고 흐른다.
  2. Operators
    1. 연산자
    2. 순수함수들
  3. Observer
    1. 관찰자
    2. 파이프만 쳐다보며 값을 기다리다가 나온 결과로 최종 작업을 실행한다.

반응형 프로그래밍에서 중요한것은 소프트웨어에서 구현하고자 하는 것을 이 스트림 즉, 흐름들로 재해석할 수 있어야 한다.

흐름들을 Observable로 발행하고 나온 데이터를 Operators로 정제해서 그 최종값에 구독자가 어떻게 반응할지를 프로그래밍하는 것이다.

Observables - 스트림 생성기 만들기

1. 배열형태의 스트림

const { of, from, range, generate } = rxjs

const obs1$ = of(1, 2, 3, 4, 5)
const obs2$ = from([6, 7, 8, 9, 10])
const obs3$ = range(11, 5)
const obs4$ = generate(
  15, x => x < 30, x => x + 2
)

//obs1$.subscribe(item => console.log(`of: ${item}`)) => 1, 2, 3, 4, 5
//obs2$.subscribe(item => console.log(`from: ${item}`)) => 6, 7, 8, 9, 10
//obs3$.subscribe(item => console.log(`range: ${item}`)) => 11, 12, 13, 14, 15
//obs4$.subscribe(item => console.log(`generate: ${item}`)) => 15, 17, 19, 21, 23, 25, 27, 29
  • of : 인자들 자체를 하나씩 값으로 평가한다.
  • from : Array를 인자로 받아서 요소를 하나씩 꺼내서 값으로 평가한다.
  • range : 시작하는 숫자와 갯수를 인자로 받아서 시작 숫자부터 인자로 들어온 갯수만큼 값으로 평가한다.
  • generate : js의 for문과 비슷하다. 첫 번째 인자로 시작값을 주고 두 번재 인자로 조건을주고 세 번째 인자로 실행할 명령 로직을 인자로 준다.

2. 시간에 의한 스트림

const { interval, timer } = rxjs

const obs1$ = interval(1000)
const obs2$ = timer(3000)

//obs1$.subscribe(item => console.log(`interval: ${item}`))
//obs2$.subscribe(item => console.log(`timer: ${item}`))
  • interval : 인자로 들어간 시간 마다 0부터 1씩 증가한 값들이 출력. 브라우저를 종료하거나 따로 종료 처리를 안하면 계속 실행 된다.
  • timer : 인자로 들어간 시간이 지나고 딱 한 번 값이 출력 (0부터 시작)

3. 이벤트에 의한 스트림

<input id="myInput" type="text"/>

const { fromEvent } = rxjs

const obs1$ = fromEvent(document, 'click')
const obs2$ = fromEvent(document.getElementById('myInput'), 'keypress')

obs1$.subscribe(item => console.log(item))
obs2$.subscribe(item => console.log(item))
  • fromEvent : 특정 이벤트로부터 값을 발행한다.
    • 첫 번째 인자로 요소를 넣어준다.
    • 두 번째 인자로 이벤트 종류를 넣어준다.

4. Ajax를 통한 스트림

const { ajax } = rxjs.ajax

const obs$ = ajax(`http://127.0.0.1:3000/people/1`)
obs$.subscribe(result => console.log(result.response))
  • ajax : 인자로 들어온 주소로 서버 요청을 보내서 받아온 결과를 값으로 평가한다.

5. 직접 만드는 스트림

const { Observable } = rxjs

const obs$ = new Observable(subscriber => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.next(3)

  // 값을 다 발행한 뒤에는 compelte를 실행하여 메모리 해제 
  subscriber.complete()
})

obs$.subscribe(item => console.log(item))
  • Observable : 커스텀으로 스트림을 발행할 때 사용한다.
    • next 함수 : 발행되는 값으로 subscribe에 주어진 함수를 실행한다.

Observable은 lazy(게으르다)

  • 미리 값을 발행하지 않는다.
  • 누군가 구독을 해야 발행을 시작한다.
  • 각 구독자에게 따로 발행한다.
const { of, interval, fromEvent } = rxjs

const obs1$ = of('a', 'b', 'c')
const obs2$ = interval(1000)
const obs3$ = fromEvent(document, 'click')

setTimeout(_ => {
  console.log('of 구독 시작')
  obs1$.subscribe(item => console.log(item))
}, 5000)
setTimeout(_ => {
  console.log('interval 구독 시작')
  obs2$.subscribe(item => console.log(item))
}, 10000)
setTimeout(_ => {
  console.log('fromEvent 구독 시작')
  obs3$.subscribe(_ => console.log('click!'))
}, 15000)
setTimeout(_ => {
  console.log('interval 구독 시작 2')
  obs2$.subscribe(item => console.log(item))
}, 20000)

 

Observer - 구독자 만들기

const { from } = rxjs
const observable$ = from([1, 2, 3, 4, 5])
  • Observer는 발행된 스트림을 가지고 어떤 동작을하는 주체이다.

구독자 생성

const observer = {
  next: console.log,
  error: err => console.error('발행중 오류', err),
  complete: () => console.log('발행물 완결'),
}

observable$.subscribe(observer)
  • js 객체 형태로 되어있다.
  • next, error, complete로 구성되어 있다.
  • next : 스트림에서 들어온 값들 하나하나에 적용할 함수
  • error : 에러가 났을 떄의 로직을 입력
  • complete : 완료 됐을 때의 로직을 입력 (스트림의 값이 모두 발행되어서 스트림이 종료되면 실행)
  • error, complete는 생략가능하다.

부분 지정

const observer_1 = {
  next: console.log,
  error: err => console.error('발행중 오류', err),
}

const observer_2 = {
  next: console.log
}
observable$.subscribe(
  console.log,
  err => console.error('발행중 오류', err),
  _ => console.log('발행물 완결')
)
  • observer 객체를 따로 만들지 않고 subscribe 함수에 3개의 인자로 넣어도 동일하게 작동한다.(단 순서는 지켜서 넣어줘야 한다,)

Error와 Complete 살펴보기

Error

const { Observable } = rxjs

const obs$ = new Observable(subscriber => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.next(3)
  (null)[0]
  subscriber.next(4)
})

obs$.subscribe(
  console.log,
  err => console.error('발행중 오류', err),
  _ => console.log('발행물 완결')
)
  • 값을 발행하는 중에 오류가 발생하면 발행을 중단한다.

Complete

const { Observable } = rxjs

const obs$ = new Observable(subscriber => {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.next(3)
  subscriber.complete()
  subscriber.next(4)
})

obs$.subscribe(
  console.log,
  err => console.error('발행중 오류', err),
  _ => console.log('발행물 완결')
)
  • subscriber.complete() 다음에는 실행되지 않는다.

구독 해제하기

  • 구독을 변수/상수로 지정한 뒤 unsubscribe() 사용
const { interval } = rxjs

const obs$ = interval(1000)
const subscription = obs$.subscribe(console.log)

setTimeout(_ => subscription.unsubscribe(), 5500)

Creation operators:

  • Observable을 생성하는 operators
    • of, from, range, fronEvent, timeout, interval...
  • rxjs에서 로드

Pipable operators:

  • Observable의 데이터를 순수함수로 가공
    • (현존하는 데이터를 수정하지 않음 = 부수 효과를 없앰)
  • rxjs.operators에서 로드
  • pipe 함수에 하나 이상 넣어 연결
const { range } = rxjs

const { filter } = rxjs.operators
const observable$ = range(1, 10)

const observer = {
  next: x => console.log(x + ' 발행'),
  error: err => console.error('발행중 오류', err),
  complete: () => console.log('발행물 완결'),
}

observable$.pipe(
  filter(x => x % 2 === 0)
).subscribe(observer)

파이프에는 하나 이상의 operator들이 쉽표로 구분되어 들어갈 수 있습니다.

// map(x => x * x) 추가
observable$.pipe(
  filter(x => x % 2 === 0),
	map(x => x * x),
	map(x => x + 10)
).subscribe(observer)

시간, 이벤트에 의한 발행물에 적용해보기

const { interval } = rxjs

const { tap, filter, map } = rxjs.operators
const observable$ = interval(1000) 

const observer = {
  next: x => console.log(x + ' 발행'),
  error: err => console.error('발행중 오류', err),
  complete: () => console.log('발행물 완결'),
}

observable$.pipe(
  tap(console.log),
  filter(x => x % 2 === 0),
  map(x => x * x)
).subscribe(observer)
const { fromEvent } = rxjs

const { map } = rxjs.operators
const observable$ = fromEvent(document, 'click') 

const observer = {
  next: x => console.log(x + ' 발행'),
  error: err => console.error('발행중 오류', err),
  complete: () => console.log('발행물 완결'),
}

observable$.pipe(
  map(e => e.x + ' ' + e.y),
).subscribe(observer)

Subject

  • Observable의 특성때문에 Subject가 필요할 때가 있다.
  • Observable처럼 subscribe사용해서 구독하여 사용한다.
const { Subject } = rxjs
const subject = new Subject()

subject.subscribe(console.log)

subject.next(1)
subject.next(3)
subject.next(5)

Observable과의 차이

Observable

  • 누군가 구독을 해야 발행을 시작
  • 각 구독자에게 따로 발행
  • unicast
  • 🧊 cold 발행

Subject

  • 개발자가 원하는 때에 발행
  • 모든 구독자에게 똑같이 발행
  • multicast
  • 🔥 hot 발행
const { Subject } = rxjs
const subject = new Subject()

setTimeout(_ => {
    let x = 0
    setInterval (_ => {
        subject.next(x++)
    }, 2000)
}, 5000)

subject.subscribe(x => console.log('바로구독: ' + x))
setTimeout(_ => {
    subject.subscribe(x => console.log('3초 후 구독: ' + x))
}, 3000)
setTimeout(_ => {
    subject.subscribe(x => console.log('10초 후 구독: ' + x))
}, 10000)
setTimeout(_ => {
    subject.subscribe(x => console.log('14초 후 구독: ' + x))
}, 14000)
  • 구독 시점이 다른 모든 구독자에게 같은 값을 발행한다.

일반 Observable에 결합하기

  • Subject를 observable에 subscriber로서 넘겨줄 수 있습니다.
const { interval, Subject } = rxjs

const subject = new Subject()
const obs$ = interval(1000)

obs$.subscribe(subject)

subject.subscribe(x => console.log('바로구독: ' + x))
setTimeout(_ => {
    subject.subscribe(x => console.log('3초 후 구독: ' + x))
}, 3000)
setTimeout(_ => {
    subject.subscribe(x => console.log('5초 후 구독: ' + x))
}, 5000)
setTimeout(_ => {
    subject.subscribe(x => console.log('10초 후 구독: ' + x))
}, 10000)
  • 다른 시기에 구독을 시작한 observer들이 같은 값을 발행받도록 할 때 **Subject**를 사용할 수 있습니다.

추가 기능이 있는 Subject

BehaviorSubject

  • 마지막 값을 저장 후 추가 구독자에게 발행
  • 초기값을 인자로 정할 수 있다.
const { BehaviorSubject } = rxjs
const subject = new BehaviorSubject(0) // 초기값이 있음
 
subject.subscribe((x) => console.log('A: ' + x))
 
subject.next(1)
subject.next(2)
subject.next(3)
 
subject.subscribe((x) => console.log('B: ' + x))
 
subject.next(4)
subject.next(5)

const lastValue = subject.getValue() // 서브젝트가 마지막으로 발행한 값을 얻을 수 있다.
  • B는 A구독의 마지막 값인 3부터 시작한다. (B 콘솔은 3, 4, 5가 차례로 출력된다,)

ReplaySubject

  • 마지막 N개 값을 저장 후 추가 구독자에게 발행
  • 마지막 몇개를 저장할 것인지 인자로 넣어준다.
const { ReplaySubject } = rxjs
const subject = new ReplaySubject(3) // 마지막 3개 값 저장
 
subject.subscribe((x) => console.log('A: ' + x))
 
subject.next(1)
subject.next(2)
subject.next(3)
subject.next(4)
subject.next(5)
 
subject.subscribe((x) => console.log('B: ' + x))
 
subject.next(6)
subject.next(7)
  • B는 A구독의 마지막 값 3개를 먼저 출력하고 다음 값들을 출력한다 (B 콘솔은 3, 4, 5, 6, 7가 차례로 출력된다,)

AsyncSubject

  • Complete 후의 마지막 값만 발행
const { AsyncSubject } = rxjs
const subject = new AsyncSubject()
 
subject.subscribe((x) => console.log('A: ' + x))
 
subject.next(1)
subject.next(2)
subject.next(3)
 
subject.subscribe((x) => console.log('B: ' + x))
 
subject.next(4)
subject.next(5)

subject.subscribe((x) => console.log('C: ' + x))

subject.next(6)
subject.next(7)
subject.complete()

// "A : 7"
// "B : 7"
// "C : 7"
  • 구독을 언제 시작했든 모든 구독자에게 complete가 됐을때의 마지막 값만 발행한다.
반응형

'프론트엔드 > JavaScript' 카테고리의 다른 글

RxJS - 연산자 4  (0) 2022.09.12
RxJS - 연산자 3  (0) 2022.09.12
RxJS - 연산자 2  (0) 2022.09.11
RxJS - 연산자 1  (0) 2022.09.09