RxJSとは Part 1

RxJSとは Part 1

RxJSはかなりボリュームがあるので何回かに分けて取り扱います。まずは、RxJSの概要、ObservableObserverについて説明します。

目次

RxJSとは?

RxJSは、ReactiveXプロジェクトの一部として開発されたJavaScript用のリアクティブプログラミングライブラリです。このライブラリを用いることで、非同期データストリームをシンプルかつ効果的に扱うことができます。Webアプリケーションでのユーザーインタラクションのハンドリングや外部データソースからのデータ更新の監視はもちろん、Node.jsベースのバックエンドでの開発においても有効です。

特に、Node.jsにおける利用ケースとしては、WebSocketメッセージのストリーム処理、外部システムやデータベースからのイベントドリブンアーキテクチャ、ストリームベースのファイル処理、外部APIのポーリングやマイクロサービス間の非同期通信など、非同期やデータストリーム処理が中心となるシナリオでの使用が挙げられます。

リアクティブプログラミングの概要

リアクティブプログラミングは、データストリームとそのデータストリームに対する変化の伝播に基づくプログラミングパラダイムの1つです。これは、変数の変更やイベント、ユーザーの操作など、さまざまな情報源からのデータを非同期的に扱うのに適しています。

例えば、ユーザーインターフェイスのイベント、APIからのレスポンス、または定期的に発生するイベントなど、様々なものがデータストリームとして表現されます。そして、これらのデータストリームに変更や操作を行うための処理を結合・合成することで、高度な動作やデータの流れを簡単に表現することができます。

ObservableとObserver

RxJSの中心となる概念は、ObservableObserverです。

Observable

データストリームの源となるオブジェクトです。非同期的にゼロ以上の値を生成し、それらの値をObserverに通知します。例として、ユーザーのクリックイベントやHTTPリクエストのレスポンスなどが考えられます。

Observer

Observableからのデータの通知を受け取るオブジェクトです。3つの主要なメソッドがあります。

  • next: 新しいデータがObservableから送られてきたときに呼び出される。
  • error: エラーが発生したときに呼び出される。
  • complete: データの送信が完了したときに呼び出される。

Observableを購読することで、データストリームを監視し、新しいデータやエラー、完了の通知を受け取ることができます。この購読の過程でObserverの各メソッドが関連付けられ、データの流れに応じて適切な処理が行われます。

RxJSの主要な概念

Observableの作成

Observableは、非同期なデータや複数のデータの流れを表現するオブジェクトです。RxJSでは、様々な方法でObservableを生成することができます。

of

静的な値からObservableを作成します。

import { of } from 'rxjs';
const observable = of(1, 2, 3);

from

配列やPromise、繰り返し可能なオブジェクトからObservableを作成します。

import { from } from 'rxjs';
const observable = from([1, 2, 3]);

create

カスタムのObservableを作成します。特定のロジックや外部のデータソースと結合したい場合に使用します。

import { Observable } from 'rxjs';
const observable = new Observable(observer => {
  observer.next('Hello');
  observer.next('World');
  observer.complete();
});

pipe関数

pipe関数 は、与えられたオペレータを左から右の順番で適用して、新しい Observable を返すObservableの関数です。このメカニズムにより、非同期データの変換、フィルタリング、エラーハンドリングなどの一連の操作を組み合わせることができます。

import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';

const data$ = of(1, 2, 3, 4, 5);

const processedData$ = data$.pipe(
  filter(num => num % 2 === 0),  // 偶数だけをフィルタリング
  map(num => num * 10)           // 値を10倍に変換
);

processedData$.subscribe(console.log);  // 20, 40 が出力される

Operatorsの概要

Operatorsは、Observableのデータストリームに対して操作を行い、新しいObservableを生成する関数です。これにより、非同期データのフィルタリング、変換、結合などの様々な操作を行うことができます。

mapオペレーター

map オペレーターは、Observable が発行する各アイテムに関数を適用します。入力として関数を受け取り、この関数をストリームの各アイテムに適用します。入力としての関数は、アイテムを別の形に変換するために使用されます。

import { of } from 'rxjs';
import { map } from 'rxjs/operators';
const nums = of(1, 2, 3);
const squareValues = nums.pipe(map(val => val * val));

filterオペレーター

filter オペレーターは、指定された条件に基づいて Observable のアイテムをフィルタリングします。入力として関数を受け取り、この関数は各アイテムに対して真偽値を返します。返された真偽値が true の場合、そのアイテムは出力されるストリームに含まれます。false の場合、アイテムは出力されるストリームから除外されます。

import { from } from 'rxjs';
import { filter } from 'rxjs/operators';
const nums = from([1, 2, 3, 4, 5]);
const evenNums = nums.pipe(filter(val => val % 2 === 0));

tapオペレーター

tap オペレータは、Observable の各要素に対して副作用を持たせるためのものです。つまり、データストリーム自体には影響を及ぼさず、Observable の要素に何らかの操作を適用するために使用します。これはデバッグやログ出力などの目的でよく使用されます。

import { of } from 'rxjs';
import { tap } from 'rxjs/operators';

of(1, 2, 3).pipe(
  tap(val => console.log(`Before map: ${val}`)),
  tap(val => val * 2),
  tap(val => console.log(`After map: ${val}`)) // 値は変わらない
).subscribe();

これ以外にも、数多くのOperatorsが提供されており、様々なデータ操作を容易に行えます。

まとめ

ObservableとObserverはリアクティブプログラミングの考え方に即した比較的わかりやすいクラスです。また、今回扱ったオペレータはごく一部ですが、とてもよく使用するものを挙げました。

次回はSubjectを扱います。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!
目次