RxJSとは Part 1

RxJSはかなりボリュームがあるので何回かに分けて取り扱います。まずは、RxJSの概要、ObservableObserverについて説明します。

RxJSとは?

RxJSは、ReactiveXプロジェクトの一部として開発されたJavaScript用のリアクティブプログラミングライブラリです。このライブラリを用いることで、非同期データストリームをシンプルかつ効果的に扱うことができます。Webアプリケーションでのユーザーインタラクションのハンドリングや外部データソースからのデータ更新の監視はもちろん、Node.jsベースのバックエンドでの開発においても有効です。

特に、Node.jsにおける利用ケースとしては、WebSocketメッセージのストリーム処理、外部システムやデータベースからのイベントドリブンアーキテクチャ、ストリームベースのファイル処理、外部APIのポーリングやマイクロサービス間の非同期通信など、非同期やデータストリーム処理が中心となるシナリオでの使用が挙げられます。

リアクティブプログラミングの概要

リアクティブプログラミングは、データストリームとそのデータストリームに対する変化の伝播に基づくプログラミングパラダイムの1つです。これは、変数の変更やイベント、ユーザーの操作など、さまざまな情報源からのデータを非同期的に扱うのに適しています。

例えば、ユーザーインターフェイスのイベント、APIからのレスポンス、または定期的に発生するイベントなど、様々なものがデータストリームとして表現されます。そして、これらのデータストリームに変更や操作を行うための処理を結合・合成することで、高度な動作やデータの流れを簡単に表現することができます。

ObservableとObserver

RxJSの中心となる概念は、ObservableObserverです。

Observable

データストリームの源となるオブジェクトです。非同期的にゼロ以上の値を生成し、それらの値をObserverに通知します。例として、ユーザーのクリックイベントやHTTPリクエストのレスポンスなどが考えられます。

Observer

Observableからのデータの通知を受け取るオブジェクトです。3つの主要なメソッドがあります。

  • next: 新しいデータがObservableから送られてきたときに呼び出される。
  • error: エラーが発生したときに呼び出される。
  • complete: データの送信が完了したときに呼び出される。

Observableを購読することで、データストリームを監視し、新しいデータやエラー、完了の通知を受け取ることができます。この購読の過程でObserverの各メソッドが関連付けられ、データの流れに応じて適切な処理が行われます。

RxJSの主要な概念

Observableの作成

Observableは、非同期なデータや複数のデータの流れを表現するオブジェクトです。RxJSでは、様々な方法でObservableを生成することができます。

of

静的な値からObservableを作成します。

import { of } from 'rxjs';
const observable = of(1, 2, 3);

from

配列やPromise、繰り返し可能なオブジェクトからObservableを作成します。

import { from } from 'rxjs';
const observable = from([1, 2, 3]);

create

カスタムのObservableを作成します。特定のロジックや外部のデータソースと結合したい場合に使用します。

import { Observable } from 'rxjs';
const observable = new Observable(observer => {
  observer.next('Hello');
  observer.next('World');
  observer.complete();
});

pipe関数

pipe関数 は、与えられたオペレータを左から右の順番で適用して、新しい Observable を返すObservableの関数です。このメカニズムにより、非同期データの変換、フィルタリング、エラーハンドリングなどの一連の操作を組み合わせることができます。

import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';

const data$ = of(1, 2, 3, 4, 5);

const processedData$ = data$.pipe(
  filter(num => num % 2 === 0),  // 偶数だけをフィルタリング
  map(num => num * 10)           // 値を10倍に変換
);

processedData$.subscribe(console.log);  // 20, 40 が出力される

Operatorsの概要

Operatorsは、Observableのデータストリームに対して操作を行い、新しいObservableを生成する関数です。これにより、非同期データのフィルタリング、変換、結合などの様々な操作を行うことができます。

mapオペレーター

map オペレーターは、Observable が発行する各アイテムに関数を適用します。入力として関数を受け取り、この関数をストリームの各アイテムに適用します。入力としての関数は、アイテムを別の形に変換するために使用されます。

import { of } from 'rxjs';
import { map } from 'rxjs/operators';
const nums = of(1, 2, 3);
const squareValues = nums.pipe(map(val => val * val));

filterオペレーター

filter オペレーターは、指定された条件に基づいて Observable のアイテムをフィルタリングします。入力として関数を受け取り、この関数は各アイテムに対して真偽値を返します。返された真偽値が true の場合、そのアイテムは出力されるストリームに含まれます。false の場合、アイテムは出力されるストリームから除外されます。

import { from } from 'rxjs';
import { filter } from 'rxjs/operators';
const nums = from([1, 2, 3, 4, 5]);
const evenNums = nums.pipe(filter(val => val % 2 === 0));

tapオペレーター

tap オペレータは、Observable の各要素に対して副作用を持たせるためのものです。つまり、データストリーム自体には影響を及ぼさず、Observable の要素に何らかの操作を適用するために使用します。これはデバッグやログ出力などの目的でよく使用されます。

import { of } from 'rxjs';
import { tap } from 'rxjs/operators';

of(1, 2, 3).pipe(
  tap(val => console.log(`Before map: ${val}`)),
  tap(val => val * 2),
  tap(val => console.log(`After map: ${val}`)) // 値は変わらない
).subscribe();

これ以外にも、数多くのOperatorsが提供されており、様々なデータ操作を容易に行えます。

まとめ

ObservableとObserverはリアクティブプログラミングの考え方に即した比較的わかりやすいクラスです。また、今回扱ったオペレータはごく一部ですが、とてもよく使用するものを挙げました。

次回はSubjectを扱います。

リアクティブプログラミング

Angularのサービスの説明をする前にリアクティブプログラミングについて説明します。

リアクティブプログラミングとは

リアクティブプログラミングは、データの流れ(ストリーム)に着目し、データを受け取るたびに処理を処理するプログラミングパラダイムです。

リアクティブプログラミングは、用語が独特で概念の通常のプログラミングとは少し異なっているため、理解しづらい場合があります。また、実装系によっても違いがあります。今回は一般的なリアクティブプログラミングについて説明し、次回RxJSについて説明します。

リアクティブプログラミングの用語

リアクティブプログラミングには多くの用語があり、異なるライブラリやフレームワークによって少し異なる言い回しがされることもあります。しかし、以下はリアクティブプログラミングのコンセプトを理解する上で基本となる用語のリストです。これらの用語は、多くのリアクティブプログラミングの文脈で一般的に使用されています。

発行者(Publisher)

生産者(Producer)やオブザーバブル(Observable)とも呼ばれます。データやイベントを生み出し、それをストリームとして発行するエンティティです。発行者は、一つまたは複数の購読者にデータを非同期的に供給する役割を果たします。

購読者(Subscriber)

消費者(Consumer)やオブザーバー(Observer)とも呼ばれます。発行者からのデータやイベントを受け取り、それを処理するエンティティです。購読者は、発行者からデータを受け取ることを購読する(subscribe)と言われ、この購読関係を通じてデータが非同期的に供給されます。

操作(Operators)

購読者が購読しているデータストリームからデータを受け取る際に適用する操作です。フィルタ(filter)やマップ(map)など様々な操作があります。

バックプレッシャー(Back pressure)

データの発行速度と購読速度の不均衡を管理する概念。発行側が高速で、購読側がそれに追いつけない場合に対処するための機構を提供します。

ColdとHot

  • Cold:購読が開始されるまでデータを発行しない
  • Hot:購読の有無に関係なくデータを発行する

リアクティブプログラミングの概念

リアクティブプログラミングの概念について説明します。

発行者はデータを発行し、データストリームを形成します。データの発行は一般に非同期で行われますが、同期で行われることもあります。

購読者は発行者を購読し、データを待ち受けます。データを受け取るたびに処理を実行します。ただし、すべてのデータを処理するわけではなく、Operatorsによって受け取るかどうかをフィルタすることもできます。

購読者が処理できる量には限界があります。発行者がデータを発行するペースよりも購読者が処理できるペースの方が遅い場合、発行済み未処理のデータをどうするかという問題がでてきます。購読者の処理能力を発行者の発行能力が上回る状態に対する対処を行う機構をバックプレッシャーと言います。具体的な対処方法としては、

  • 購読者が購読可能なデータ量を発行者に伝え、発行者はそれに合わせてデータを発行する
  • 発行者の発行しているデータをバッファリングする
  • 受け取れないデータは捨てる

などがあります。どの戦略が適しているかはデータの性格(失われると困るのか、多少失われても問題ないのか)やデータ発行の処理(購読者の要求に応じて発行できるのか、とりあえず生成し続けるしかないのか)によって変わります。

リアクティブプログラミングのユースケース

リアクティブプログラミングを適用できるユースケースをいくつか挙げます。

  1. リアルタイムデータのストリーミング:フィンテック、株価のストリーミングや、ライブスポーツのスコア更新など
  2. リアルタイムのダッシュボード:ユーザーアクティビティ、システムメトリクス、eコマースの販売統計など、リアルタイムで更新する必要があるダッシュボード
  3. チャットアプリケーション:ユーザー間でリアルタイムにメッセージを交換するアプリ
  4. オンラインゲーム:特にマルチプレイヤーオンラインゲームで、プレイヤー間のリアルタイムのインタラクションが必要
  5. イベント駆動のマイクロサービスアーキテクチャ:サービスが非同期的にデータを交換する
  6. 非同期なバッチ処理:大量のデータを効率的に処理する必要がある場面で、特にデータの供給速度が不均一な場合
  7. リアルタイムの通知システム:ユーザーにリアルタイムで通知やアラートを提供するシステム
  8. センサーデータの処理:IoTデバイスからのリアルタイムデータを効率的に収集、分析、反応する
  9. 動的なネットワーク応答:ユーザーのリクエストに応じて、非同期的にコンテンツを動的に生成・配信するウェブサービス
  10. リソース制限のある環境:例えばモバイルデバイスや組み込みシステムなど、リソースが制限されている環境での非同期処理やリアルタイム処理

最近ではWebアプリケーションで使用することが多いため、サーバーが発行者、クライアントが購読者という組み合わせや画面のコンポーネント間での使用をイメージしやすいですが、サービス間でのやりとりや非同期バッチなどに使用することもあります。

まとめ

サービスを作成しようとするとRxJSを使うことになるため、先にリアクティブプログラミングについて説明しました。次回はRxJSについて説明します。

モバイルバージョンを終了