RxJSはかなりボリュームがあるので何回かに分けて取り扱います。まずは、RxJSの概要、Observable
とObserver
について説明します。
RxJSとは?
RxJSは、ReactiveXプロジェクトの一部として開発されたJavaScript用のリアクティブプログラミングライブラリです。このライブラリを用いることで、非同期データストリームをシンプルかつ効果的に扱うことができます。Webアプリケーションでのユーザーインタラクションのハンドリングや外部データソースからのデータ更新の監視はもちろん、Node.jsベースのバックエンドでの開発においても有効です。
特に、Node.jsにおける利用ケースとしては、WebSocketメッセージのストリーム処理、外部システムやデータベースからのイベントドリブンアーキテクチャ、ストリームベースのファイル処理、外部APIのポーリングやマイクロサービス間の非同期通信など、非同期やデータストリーム処理が中心となるシナリオでの使用が挙げられます。
リアクティブプログラミングの概要
リアクティブプログラミングは、データストリームとそのデータストリームに対する変化の伝播に基づくプログラミングパラダイムの1つです。これは、変数の変更やイベント、ユーザーの操作など、さまざまな情報源からのデータを非同期的に扱うのに適しています。
例えば、ユーザーインターフェイスのイベント、APIからのレスポンス、または定期的に発生するイベントなど、様々なものがデータストリームとして表現されます。そして、これらのデータストリームに変更や操作を行うための処理を結合・合成することで、高度な動作やデータの流れを簡単に表現することができます。
ObservableとObserver
RxJSの中心となる概念は、Observable
とObserver
です。
Observable
データストリームの源となるオブジェクトです。非同期的にゼロ以上の値を生成し、それらの値をObserver
に通知します。例として、ユーザーのクリックイベントやHTTPリクエストのレスポンスなどが考えられます。
Observer
Observable
からのデータの通知を受け取るオブジェクトです。3つの主要なメソッドがあります。
next
: 新しいデータがObservable
から送られてきたときに呼び出される。error
: エラーが発生したときに呼び出される。complete
: データの送信が完了したときに呼び出される。
Observable
を購読することで、データストリームを監視し、新しいデータやエラー、完了の通知を受け取ることができます。この購読の過程でObserver
の各メソッドが関連付けられ、データの流れに応じて適切な処理が行われます。
RxJSの主要な概念
Observableの作成
Observable
は、非同期なデータや複数のデータの流れを表現するオブジェクトです。RxJSでは、様々な方法でObservable
を生成することができます。
of
静的な値からObservable
を作成します。
import { of } from 'rxjs';
const observable = of(1, 2, 3);
from
配列やPromise
、繰り返し可能なオブジェクトからObservable
を作成します。
import { from } from 'rxjs';
const observable = from([1, 2, 3]);
create
カスタムのObservable
を作成します。特定のロジックや外部のデータソースと結合したい場合に使用します。
import { Observable } from 'rxjs';
const observable = new Observable(observer => {
observer.next('Hello');
observer.next('World');
observer.complete();
});
pipe関数
pipe
関数 は、与えられたオペレータを左から右の順番で適用して、新しい Observable
を返すObservable
の関数です。このメカニズムにより、非同期データの変換、フィルタリング、エラーハンドリングなどの一連の操作を組み合わせることができます。
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const data$ = of(1, 2, 3, 4, 5);
const processedData$ = data$.pipe(
filter(num => num % 2 === 0), // 偶数だけをフィルタリング
map(num => num * 10) // 値を10倍に変換
);
processedData$.subscribe(console.log); // 20, 40 が出力される
Operatorsの概要
Operators
は、Observable
のデータストリームに対して操作を行い、新しいObservable
を生成する関数です。これにより、非同期データのフィルタリング、変換、結合などの様々な操作を行うことができます。
map
オペレーター
map
オペレーターは、Observable
が発行する各アイテムに関数を適用します。入力として関数を受け取り、この関数をストリームの各アイテムに適用します。入力としての関数は、アイテムを別の形に変換するために使用されます。
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
const nums = of(1, 2, 3);
const squareValues = nums.pipe(map(val => val * val));
filter
オペレーター
filter
オペレーターは、指定された条件に基づいて Observable
のアイテムをフィルタリングします。入力として関数を受け取り、この関数は各アイテムに対して真偽値を返します。返された真偽値が true
の場合、そのアイテムは出力されるストリームに含まれます。false
の場合、アイテムは出力されるストリームから除外されます。
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';
const nums = from([1, 2, 3, 4, 5]);
const evenNums = nums.pipe(filter(val => val % 2 === 0));
tapオペレーター
tap
オペレータは、Observable
の各要素に対して副作用を持たせるためのものです。つまり、データストリーム自体には影響を及ぼさず、Observable
の要素に何らかの操作を適用するために使用します。これはデバッグやログ出力などの目的でよく使用されます。
import { of } from 'rxjs';
import { tap } from 'rxjs/operators';
of(1, 2, 3).pipe(
tap(val => console.log(`Before map: ${val}`)),
tap(val => val * 2),
tap(val => console.log(`After map: ${val}`)) // 値は変わらない
).subscribe();
これ以外にも、数多くのOperators
が提供されており、様々なデータ操作を容易に行えます。
まとめ
ObservableとObserverはリアクティブプログラミングの考え方に即した比較的わかりやすいクラスです。また、今回扱ったオペレータはごく一部ですが、とてもよく使用するものを挙げました。
次回はSubjectを扱います。