chooblarin’s blog

RxJSのforkJoinはPromise.all相当

October 10, 2018

  • #RxJS

以前(ずいぶんと前だけど)に,RxJavaについてのエントリで,「RxJavaに Promise.all()が欲しい」という話をしました.この時に紹介したRxJavaのconcatMapEagerは,当時はExperimentalでしたが現在はStableになっています.


今回はRxJavaではなくRxJSについてです.

先のエントリの中で,私が求めいたのは

  1. 処理が並行して実行される
  2. 順番が保存されている

という特徴を持ったオペレータでした.そして最近,RxJSのforkJoinを知りました.

RxJSのforkJoin

RxJSにforkJoinというAPIがあります.これはObservableのリストを受け取り,それらが最後にemitした値のリストのObservableをつくります.

例えば

forkJoin(
  of('君がッ', '泣くまで'),
  of('殴るのを', 'やめないッ!'),
).subscribe(console.log); // ["泣くまで", "やめないッ!"]

という具合です.forkJoin(...)Promise.all(...)のObservable版なのです.

次に,こんなモノを用意しました.

const rand = () => Math.floor(Math.random() * 3) * 1000;
const delayedEcho = (message) => of(message).pipe(delay(rand()));

ランダム時間後にメッセージをそのまま返してくれる,その名もdelayedEchoです.

const ids = ['1', '2', '3', '4', '5'];
const observables = ids.map(delayedEcho);
forkJoin(observables).subscribe(console.log);
// ["1", "2", "3", "4", "5"]

はい,完全に思い通りに動いてくれました.

xxxMapなオペレータ達

ここから先はおまけです.先ほどのdelayedEchoで遊んでみます.

mergeMap

from(ids)
  .pipe(
    mergeMap(delayedEcho),
    toArray()
  )
  .subscribe(console.log);

// ["3", "5", "1", "2", "4"]

mergeMapflatMapです (flatMapmergeMapのエイリアス).処理は並行に走りますが,購読される値は先着順です.

concatMap

from(ids)
  .pipe(
    concatMap(delayedEcho),
    toArray()
  )
  .subscribe(console.log);

// ["1", "2", "3", "4", "5"]

concatMapは,処理が直列です.現在の処理が完了するまで次の処理を開始しません.したがって,上記の例はmergeMapの平均およそ5倍の時間が掛かります.

switchMap

from(ids)
  .pipe(
    switchMap(delayedEcho),
    toArray()
  )
  .subscribe(console.log);

// ["5"]

switchMapは最新のObservableに関心を切り替えます.

ここで,delayedEchoに少し小細工をします.

const delayedEcho = (message) => of(message)
  .pipe(
    tap(console.log), // <= これを追加
    delay(rand())
  );

そして先ほどのswitchMapのコードをもう一度実行すると...

from(ids)
  .pipe(
    switchMap(delayedEcho),
    toArray()
  )
  .subscribe(console.log);

// "1"
// "2"
// "3"
// "4"
// "5"
// ["5"]

"1"から"4"は無視されたかと思いきや,全てのObservableの処理は実行を開始していたことがわかります.

exhaustMap

最後がexhaustMapです.このオペレータは重要です.

from(ids)
  .pipe(
    exhaustMap(delayedEcho),
    toArray()
  )
  .subscribe(console.log);

// "1"
// ["1"]

exhaustMapは,前の処理が完了していなかい場合には後続の処理を無視します.この挙動は,非同期処理が不整合を起こさないための様々なケースに使えます.

参考