オブザーバブル

コミュニティグループドラフトレポート,

このバージョン:
https://wicg.github.io/observable/
編集者:
Dominic Farolino (Google)
参加:
GitHub WICG/observable (新しい課題, オープン課題)
コミット履歴:
GitHub spec.bs コミット
テストスイート:
https://wpt.fyi/results/dom/observable/

概要

Observable APIは、非同期のイベントストリームを扱うための、構成可能で使いやすい方法を提供します。

この文書のステータス

本仕様書は Web Platform Incubator Community Group によって公開されました。 W3C標準でもなく、W3C標準化トラックにもありません。 W3Cコミュニティ貢献者ライセンス契約 (CLA) の下では、 限定的なオプトアウトやその他の条件が適用されることにご注意ください。 W3Cコミュニティおよびビジネスグループ について詳しくは、こちらをご覧ください。

1. はじめに

このセクションは規範的ではありません。

2. コアインフラストラクチャ

2.1. Subscriber インターフェース

[Exposed=*]
interface Subscriber {
  undefined next(any value);
  undefined error(any error);
  undefined complete();
  undefined addTeardown(VoidFunction teardown);

  // True after the Subscriber is created, up until either
  // complete()/error() are invoked, or the subscriber unsubscribes. Inside
  // complete()/error(), this attribute is true.
  readonly attribute boolean active;

  readonly attribute AbortSignal signal;
};

Subscriber順序付き集合内部オブザーバを持ち、初期状態では空です。

Subscriberティアダウンコールバックを持ち、これは リストであり、 VoidFunction のリストで、初期値は空です。

Subscriberサブスクリプションコントローラーを持ち、 AbortController です。

Subscriberactive ブール値を持ち、初期値は true です。

注: これは、Subscriberクローズされてから、 所有するコールバックを二度と呼び出さないようにするための管理用変数です。

active ゲッターの手順は、thisactive ブール値を返します。

signal ゲッターの手順は、thisサブスクリプションコントローラーsignal を返します。

next(value) メソッドの手順は以下の通りです:
  1. thisactive が false なら、return。

  2. this関連グローバルオブジェクトWindow オブジェクトであり、 その 関連付けられた Document完全にアクティブ でない場合、return。

  3. internal observers copythis内部オブザーバのコピーとする。

    注: 内部オブザーバリストのコピーを作成して反復処理することで、 もし1つの 内部オブザーバnext steps が サブスクリプションを引き起こした場合でも、イテレーション中にオブザーバリストが変化しないようにしています。

  4. internal observers copy の各 observer について:

    1. observernext stepsvalue で実行する。

      アサート例外は投げられなかった

      注: ここで例外が投げられないのは、内部オブザーバnext steps が スクリプト提供のコールバックのラッパーの場合、process observer 手順がこれらのコールバックを呼び出す際に例外をキャッチし、グローバルに報告するロジックでラップしているためです。

      next steps が仕様アルゴリズムの場合、 これらの手順はこのアサートを満たすため、例外を外部に投げません。

error(error) メソッドの手順は以下の通りです:
  1. thisactive が false なら、例外を報告し、errorthis関連グローバルオブジェクト で、return。

  2. this関連グローバルオブジェクトWindow オブジェクトであり、 その 関連付けられた Document完全にアクティブでない場合、return。

  3. クローズ this

  4. internal observers copythis内部オブザーバ のコピーとする。

  5. internal observers copy の各 observer について:

    1. observererror stepserror で実行する。

      アサート例外は投げられなかった

      注: これが正しい理由については、next() の説明を参照してください。

complete() メソッドの手順は以下の通りです:
  1. thisactive が false なら、return。

  2. this関連グローバルオブジェクトWindow オブジェクトであり、 その 関連付けられた Document完全にアクティブ でない場合、return。

  3. クローズ this

  4. internal observers copythis内部オブザーバ のコピーとする。

  5. internal observers copy の各 observer について:

    1. observercomplete steps を実行する。

      アサート例外は投げられなかった

      注: これが正しい理由については、next() の説明を参照してください。

addTeardown(teardown) メソッドの手順は以下の通りです:
  1. this関連グローバルオブジェクトWindow オブジェクトであり、 その 関連付けられた Document完全にアクティブ でない場合、return。

  2. thisactive が true なら、appendteardownthisティアダウンコールバックリストに追加する。

  3. それ以外の場合は、invoke teardown を «» と "report" で呼び出す。

サブスクリプションをクローズするために、 Subscriber subscriber と、オプションの any reason を受け取った場合、以下の手順を実行します:
  1. subscriberactive が false なら、return。

    これは再入呼び出しを防ぐためのガードです。これは「プロデューサー主導の」購読解除の場合に発生することがあります。以下の例を参照:

    const outerController = new AbortController();
    const observable = new Observable(subscriber => {
      subscriber.addTeardown(() => {
        // 2.) This teardown executes inside the "Close" algorithm, while it’s
        //     running. Aborting the downstream signal run its abort algorithms,
        //     one of which is the currently-running "Close" algorithm.
        outerController.abort();
      });
    
      // 1.) This immediately invokes the "Close" algorithm, which
      //     sets subscriber.active to false.
      subscriber.complete();
    });
    
    observable.subscribe({}, {signal: outerController.signal});
    
  2. subscriberactive ブール値を false に設定する。

  3. abort シグナルsubscriberサブスクリプションコントローラー に対して reason が与えられていればそれを使って実行する。

  4. subscriberティアダウンコールバック を 逆順で反復して各 teardown について:

    1. もし subscriber関連グローバルオブジェクトWindow オブジェクトであり、その 関連付けられた Document完全にアクティブ でない場合は、この手順を中止する。

      注: この手順は繰り返し実行されます。なぜなら各 teardown が上記の Document を非アクティブにする可能性があるためです。

    2. teardown を «» と "report" で呼び出す。

2.2. Observable インターフェース

// SubscribeCallbackは、Observableの「作成者」のコードが存在する場所です。
// subscribe()が呼び出されたときに呼び出され、新しい購読をセットアップします。
callback SubscribeCallback = undefined (Subscriber subscriber);
callback ObservableSubscriptionCallback = undefined (any value);

dictionary SubscriptionObserver {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;
};

callback ObservableInspectorAbortHandler = undefined (any value);

dictionary ObservableInspector {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;

  VoidFunction subscribe;
  ObservableInspectorAbortHandler abort;
};

typedef (ObservableSubscriptionCallback or SubscriptionObserver) ObserverUnion;
typedef (ObservableSubscriptionCallback or ObservableInspector) ObservableInspectorUnion;

dictionary SubscribeOptions {
  AbortSignal signal;
};

callback Predicate = boolean (any value, unsigned long long index);
callback Reducer = any (any accumulator, any currentValue, unsigned long long index);
callback Mapper = any (any value, unsigned long long index);
// Mapperとの違いは戻り値の型のみで、このコールバックはシーケンス内の各要素を訪問するためだけに使われ、変換はしません。
callback Visitor = undefined (any value, unsigned long long index);

// このコールバックは`any`を返し、`Observable`への変換セマンティクスを通じて`Observable`に変換される必要があります。
callback CatchCallback = any (any value);

[Exposed=*]
interface Observable {
  constructor(SubscribeCallback callback);
  undefined subscribe(optional ObserverUnion observer = {}, optional SubscribeOptions options = {});

  // valueが以下のいずれかの場合、ネイティブObservableを構築します:
  //   - Observable
  //   - AsyncIterable
  //   - Iterable
  //   - Promise
  static Observable from(any value);

  // Observableを返すオペレーター。仕様書の「Operators」セクション参照。
  //
  // takeUntil()はpromise, iterable, async iterable, 他のobservableを消費できます。
  Observable takeUntil(any value);
  Observable map(Mapper mapper);
  Observable filter(Predicate predicate);
  Observable take(unsigned long long amount);
  Observable drop(unsigned long long amount);
  Observable flatMap(Mapper mapper);
  Observable switchMap(Mapper mapper);
  Observable inspect(optional ObservableInspectorUnion inspectorUnion = {});
  Observable catch(CatchCallback callback);
  Observable finally(VoidFunction callback);

  // Promiseを返すオペレーター。
  Promise<sequence<any>> toArray(optional SubscribeOptions options = {});
  Promise<undefined> forEach(Visitor callback, optional SubscribeOptions options = {});
  Promise<boolean> every(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> first(optional SubscribeOptions options = {});
  Promise<any> last(optional SubscribeOptions options = {});
  Promise<any> find(Predicate predicate, optional SubscribeOptions options = {});
  Promise<boolean> some(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> reduce(Reducer reducer, optional any initialValue, optional SubscribeOptions options = {});
};

Observablesubscribe コールバックを持ち、 SubscribeCallback または Subscriber を受け取る一連の手順です。

Observable弱いsubscriberを持ち、 Subscriber への弱参照またはnullで、初期値はnullです。

注: これらの型の「ユニオン」は、 Observableが JavaScriptで作成される場合(必ずSubscribeCallbackで構築される)と、 ネイティブに構築されたObservableオブジェクト( subscribe コールバックが任意のネイティブ手順セットであり、JavaScriptコールバックではない場合) の両方をサポートするためです。when() の戻り値は後者の例です。

new Observable(callback) コンストラクターの手順は以下の通りです:
  1. thissubscribe コールバックcallback に設定する。

    注: このコールバックは、後でsubscribe() が呼ばれた時に呼び出されます。

subscribe(observer, options) メソッドの手順は:
  1. Observableを購読するthis に対して observeroptions を使う。

2.2.1. 補助概念

デフォルトのエラーアルゴリズムは、 any error を受け取り、以下の手順を実行します:
  1. 例外を報告error現在のrealmグローバルオブジェクト で。

注: このデフォルトを別途定義することで、 本仕様のすべてのネイティブな Observable購読(spec proseから購読、subscribe()メソッドを経由しない場合)で 冗長な手順定義が不要になります。

内部オブザーバは、 構造体であり、以下の 項目を持ちます:

nextステップ

anyの単一パラメータを取るアルゴリズム。 初期状態では何もしない手順です。

errorステップ

anyの単一パラメータを取るアルゴリズム。 初期状態ではデフォルトのエラーアルゴリズムです。

completeステップ

パラメータなしのアルゴリズム。初期状態では何もしない手順です。

内部オブザーバ構造体は、next, error, complete コールバック関数のミラー用です。 JavaScriptからObservablesubscribe()メソッドで購読した場合は、 これらのアルゴリズム「手順」はスクリプト提供のコールバックをラップしたものになります。

しかし、内部仕様プローズ(ユーザースクリプトではない)が Observableを購読するときは、 これらの「手順」は任意の仕様アルゴリズムとなり、 ObserverUnion でWeb IDL コールバック関数として提供されるものではありません。 例として§2.3.3 Promiseを返すオペレーター参照。

Observableに変換するために、 any value を受け取った場合、以下の手順を実行します:

注: このアルゴリズムはWeb IDLの from() メソッドから分離しています。仕様プローズからWeb IDLバインディングを経由せずに値を変換できるようにするためです。

  1. Type(value)が Objectでない場合は、TypeErrorTypeError で投げる。

    注: これはプリミティブ型がIterableに変換されること(例:String)を防ぎます。 WICG/observable#125の議論参照。

  2. Observableからの変換: valuespecific typeObservable の場合、value を返す。

  3. Async Iterableからの変換: asyncIteratorMethod? GetMethod(value, %Symbol.asyncIterator%) とする。

    注: GetMethodを使うのは、async iteratorプロトコル対応のみを調べたいからで、 実装されていない時にthrowしたくないためです。 GetIteratorは(a)プロトコル未実装、(b)実装済だがcallableでない・getterがthrowする、 両方でthrowします。GetMethodなら後者だけでthrowします。

  4. asyncIteratorMethod が undefined または null なら、Iterableからの変換へジャンプ。

  5. nextAlgorithm を、Subscriber subscriberIterator Record iteratorRecord を受け取る以下の手順とする:

    1. subscribersubscription controllersignalabortedならreturn。

    2. nextPromisePromiseまたはundefinedで初期値undefinedとする。

    3. nextCompletionIteratorNext(iteratorRecord)とする。

      注: IteratorNextを使うのは、IteratorStepValueが iteratorのnext()が即座に値を返すobjectを期待するためですが、 async iteratorの場合はnext()がPromise/thenableを返す想定なので、Promiseでラップして値取得に反応します。

    4. nextCompletionthrow completionの場合:

      1. Assert: iteratorRecordの[[Done]]はtrue。

      2. nextPromise拒否されたpromisenextRecordの[[Value]]を設定。

    5. それ以外でnextRecordnormal completionの場合は nextPromise解決されたpromisenextRecordの[[Value]]を設定。

      注: nextRecordの[[Value]]自体が すでにPromise でない場合に行います。

    6. Promiseがsettleしたら反応

      • nextPromiseが値iteratorResultでfulfillされたら:

        1. Type(iteratorResult) がObjectでない場合は、subscribererror() メソッドをTypeError で呼び出し、この手順を中止。

        2. doneIteratorComplete(iteratorResult)とする。

        3. donethrow completionなら subscribererror() メソッドをdoneの[[Value]]で呼び出し、この手順を中止。

        4. doneの[[Value]]がtrueなら、subscribercomplete() を呼び出し、この手順を中止。

        5. valueIteratorValue(iteratorResult)とする。

        6. valuethrow completionなら subscribererror() メソッドをvalueの[[Value]]で呼び出し、この手順を中止。

        7. subscribernext()valueの[[Value]]で呼び出す。

        8. nextAlgorithmsubscriberiteratorRecordで再実行。

      • nextPromiseが理由rでrejectされたら、subscribererror()rで呼び出す。

  6. 新しい Observable を返す。subscribe コールバックSubscriber subscriberを受け取り、以下の処理をするアルゴリズムとする:

    1. subscribersubscription controllersignalabortedならreturn。

    2. iteratorRecordCompletionGetIterator(value, async)とする。

      注: %Symbol.asyncIterator% のgetterが再度呼ばれる場合がありますが、これは極端なケースでありテスト期待値に合わせます。 詳細はissue#127の議論参照。 プロトコル自体を呼び出してIterator Recordを取得します。

    3. iteratorRecordCompletionthrow completionなら subscribererror()iteratorRecordCompletionの[[Value]]で呼び出し、手順中止。

      注: この場合だけ、購読に同期してerror() を呼びます。他ケースではPromiseでrejectしてmicrotaskで非同期に報告します。 この同期エラー伝播は言語構造―たとえばfor-await-ofループと同様の挙動です。

    4. iteratorRecord! iteratorRecordCompletionとする。

    5. Assert: iteratorRecordIterator Recordである。

    6. subscribersubscription controllersignalabortedならreturn。

    7. 以下のabortアルゴリズムを追加subscribersubscription controllersignalに追加:

      1. AsyncIteratorClose(iteratorRecord, NormalCompletion(subscribersubscription controllerabort reason))

    8. nextAlgorithmsubscriberiteratorRecordで実行。

  7. Iterableからの変換: iteratorMethod? GetMethod(value, %Symbol.iterator%) とする。

  8. iteratorMethodがundefinedなら、Promiseからの変換へジャンプ。

    そうでなければ、新しい Observable を返す。subscribe コールバックSubscriber subscriberを受け取り、以下の手順を行うアルゴリズムとする:

    1. subscribersubscription controllersignalabortedならreturn。

    2. iteratorRecordCompletionGetIterator(value, sync)とする。

    3. iteratorRecordCompletionthrow completionなら subscribererror()iteratorRecordCompletionの[[Value]]で呼び出し、手順中止。

    4. iteratorRecord! iteratorRecordCompletionとする。

    5. subscribersubscription controllersignalabortedならreturn。

    6. 以下のabortアルゴリズムを追加subscribersubscription controllersignalに追加:

      1. IteratorClose(iteratorRecord, NormalCompletion(UNUSED))

    7. true である間:

      1. nextIteratorStepValue(iteratorRecord) とする。

      2. もし nextthrow completion なら、 subscribererror() メソッドを next の [[Value]] で実行し、break する。

      3. next!next に設定する。

      4. もし next が done なら:

        1. アサート: iteratorRecord の [[Done]] は true である。

        2. subscribercomplete() を実行する。

        3. Return。

      5. subscribernext()next で実行する。

      6. もし subscribersubscription controllersignalaborted なら break する。

  9. Promiseから: IsPromise(value) が true なら、

    1. 新しい Observable を返す。その subscribe callback は、Subscriber subscriber を受け取り、以下を行うアルゴリズムである:

      1. value に反応する

        1. もし valuev で fulfilled された場合:

          1. subscribernext() メソッドを v で実行する。

          2. subscribercomplete() メソッドを実行する。

        2. もし valuer で rejected された場合は、 subscribererror() メソッドを r で実行する。

  10. TypeErrorthrow する。

テスト
Observable に subscribe する とき、ObserverUnion または internal observer observer と、SubscribeOptions options を受け取り、以下の手順を実行する:

注: このアルゴリズムは Web IDL の subscribe() メソッドから分離されており、 仕様のプローズが subscribe を Web IDL バインディングを通さずに呼び出せるようにしている。 JavaScript によってプロパティが変更されうるオブジェクトでは、「内部」プローズは Web IDL バインディングを経由してはいけない。 § 2.3.3 Promiseを返すオペレーター でこの使用例を参照。

  1. this関連グローバルオブジェクトWindow オブジェクトで、 その 関連付けられた Document完全にアクティブ でない場合は return。

  2. internal observer を新しい internal observer とする。

  3. observer を以下のように処理する:

    1. もし observerObservableSubscriptionCallback なら
      internal observernext steps を、any value を受け取る以下の手順に設定:
      1. observer を «value» と "report" で呼び出す。

      もし observerSubscriptionObserver なら
      1. もし observernext存在すれば、 internal observernext steps を、any value を受け取る以下の手順に設定:

        1. observer の next を «value» と "report" で呼び出す。

      2. もし observererror存在すれば、 internal observererror steps を、any error を受け取る以下の手順に設定:

        1. observer の error を «error» と "report" で呼び出す。

      3. もし observercomplete存在すれば、 internal observercomplete steps を以下の手順に設定:

        1. observer の complete を «» と "report" で呼び出す。

      もし observerinternal observer なら
      internal observerobserver に設定する。
  4. アサート: internal observererror stepsデフォルトの error アルゴリズム か、 error コールバック関数を呼び出すアルゴリズムである。

  5. thisweak subscriber が null でなく、かつ thisweak subscriberactive が true なら:

    1. subscriberthisweak subscriber とする。

    2. internal observersubscriberinternal observers に追加する。

    3. もし optionssignal存在すれば:

      1. もし optionssignalaborted なら、 internal observersubscriberinternal observers から削除する。

      2. それ以外の場合は、以下の abort アルゴリズムoptionssignal に追加する:

        1. もし subscriberactive が false なら、この手順を中止する。

        2. internal observersubscriberinternal observers から削除する。

        3. もし subscriberinternal observersなら、 subscriber を close するoptionssignalabort reason を使う。

    4. Return。

  6. subscriber新しい Subscriber とする。

  7. internal observersubscriberinternal observers に追加する。

  8. thisweak subscribersubscriber に設定する。

  9. もし optionssignal存在すれば:

    1. もし optionssignalaborted なら、 subscriber を close する optionssignalabort reason を使う。

    2. それ以外の場合は 以下の abort アルゴリズムoptionssignal に追加する:

      1. もし subscriberactive が false なら、この手順を中止する。

      2. internal observersubscriberinternal observers から削除する。

      3. もし subscriberinternal observersなら、 subscriber を close するoptionssignalabort reason を使う。

  10. もし thissubscribe callbackSubscribeCallback なら、 subscriber を «subscriber» と "rethrow" で呼び出す。

    もし 例外 E が投げられた場合は、 subscribererror()E で呼び出す。

  11. それ以外の場合は thissubscribe callback の手順を subscriber で実行する。

テスト

2.3. オペレーター

現時点では https://github.com/wicg/observable#operators を参照してください。

2.3.1. from()

from(value) メソッドの手順は以下の通りです:
  1. valueObservable変換した結果を返す。例外があれば再スローする。

2.3.2. Observableを返すオペレーター

takeUntil(value) メソッドの手順は以下の通りです:
  1. sourceObservablethis とする。

  2. notifiervalueObservable変換した結果とする。

  3. observable新しい Observable とし、その subscribe callbackSubscriber subscriber を受け取るアルゴリズムである:

    このメソッドは2つの Observablesubscribeすることを伴います。 (1) notifier、(2) sourceObservable。次の状況で両方の購読を解除します:
    1. notifier が値("next" または "error")の発行を開始した場合、この場合は notifier から購読解除します。 必要な情報を得たので、これ以上値を発行し続ける必要がありません。同時に sourceObservable からも購読解除します。 なぜなら、observable への購読を手動で終了するため、sourceObservable の値を observable に送る必要がなくなるためです。

    2. sourceObservableerror() または complete() を自ら実行した場合、この場合は notifier から購読解除します。 なぜなら observablesourceObservable の値をミラーする必要がなくなったためです。 sourceObservable の購読解除は不要です、なぜなら自ら完了したためです。

    1. notifierObserver を新しい internal observer とし、次のように初期化する:

      next steps

      subscribercomplete() メソッドを実行する。

      注: これにより sourceObservable の購読も解除されます。 これは sourceObservable が "外側" subscribersubscription controllersignal で購読されているためです。 この signal は上記または下記で subscribercomplete() が呼ばれると abort されます。

      error steps

      subscribercomplete() メソッドを実行する。

      注: notifier Observable が自ら complete した場合は subscriber を complete する必要はありません。 この場合 observablesourceObservable を継続してミラーし続けます。

    2. options を新しい SubscribeOptions とし、signalsubscribersubscription controllersignal とする。

    3. notifiernotifierObserveroptionssubscribeする。

    4. subscriberactive が false なら、return。

      注: notifier が同期的に値を発行した場合、sourceObservablesubscribe callback は一度も呼ばれません。 ただし notifier が同期的に complete のみした場合は、subscriberactive は true のままなので、sourceObservable への購読が行われ、observable は途切れることなくミラーします。

    5. sourceObserver を新しい internal observer とし、次のように初期化する:

      next steps

      渡された value を使って subscribernext() メソッドを実行する。

      error steps

      渡された error を使って subscribererror() メソッドを実行する。

      complete steps

      subscribercomplete() メソッドを実行する。

      注: sourceObserver は基本的にパススルーであり、sourceObservable の発行するすべてをミラーします。 ただし sourceObservablenotifier より先に完了した場合は notifier から購読を解除します。

    6. sourceObservablesourceObserveroptionssubscribeする。

  4. observable を返す。

テスト
map(mapper) メソッドの手順は以下の通りです:
  1. sourceObservablethis とする。

  2. observable新しい Observable とし、その subscribe callbackSubscriber subscriber を受け取るアルゴリズムである:

    1. idxunsigned long long、初期値0とする。

    2. sourceObserver を新しい internal observer とし、次のように初期化する:

      next steps
      1. mapper を «渡された value, idx» と "rethrow" で呼び出し、返り値を mappedValue とする。

        もし 例外 E が投げられた場合は、 subscribererror() メソッドを E で実行し、この手順を中止する。

      2. idx をインクリメントする。

      3. subscribernext() メソッドを mappedValue で実行する。

      error steps

      渡された errorsubscribererror() メソッドを実行する。

      complete steps

      subscribercomplete() メソッドを実行する。

    3. options を新しい SubscribeOptions とし、 signalsubscribersubscription controllersignal とする。

    4. sourceObservablesourceObserveroptions で subscribe する。

  3. observable を返す。

テスト
filter(predicate) メソッドの手順は以下の通りです:
  1. sourceObservablethis とする。

  2. observable新しい Observable とし、その subscribe callbackSubscriber subscriber を受け取るアルゴリズムである:

    1. idxunsigned long long、初期値0とする。

    2. sourceObserver を新しい internal observer とし、次のように初期化する:

      next steps
      1. predicate を «渡された value, idx» と "rethrow" で呼び出し、返り値を matches とする。

        もし 例外 E が投げられた場合は、 subscribererror() メソッドを E で実行し、この手順を中止する。

      2. idxidx + 1 に設定する。

      3. もし matches が true なら、subscribernext() メソッドを value で実行する。

      error steps

      渡された errorsubscribererror() メソッドを実行する。

      complete steps

      subscribercomplete() メソッドを実行する。

    3. options を新しい SubscribeOptions とし、 signalsubscribersubscription controllersignal とする。

    4. sourceObservablesourceObserveroptions で subscribe する。

  3. observable を返す。

テスト
take(amount) メソッドの手順は以下の通りです:
  1. sourceObservablethis とする。

  2. observable新しい Observable とし、その subscribe callbackSubscriber subscriber を受け取るアルゴリズムである:

    1. remainingamount とする。

    2. もし remaining が 0 なら、subscribercomplete() メソッドを実行し、この手順を中止する。

    3. sourceObserver を新しい internal observer とし、次のように初期化する:

      next steps
      1. 渡された valuesubscribernext() メソッドを実行する。

      2. remaining をデクリメントする。

      3. もし remaining が 0 なら、subscribercomplete() メソッドを実行する。

      error steps

      渡された errorsubscribererror() メソッドを実行する。

      complete steps

      subscribercomplete() メソッドを実行する。

    4. options を新しい SubscribeOptions とし、 signalsubscribersubscription controllersignal とする。

    5. sourceObservablesourceObserveroptions で subscribe する。

  3. observable を返す。

テスト
drop(amount) メソッドの手順は以下の通りです:
  1. sourceObservablethis とする。

  2. observable新しい Observable とし、その subscribe callbackSubscriber subscriber を受け取るアルゴリズムである:

    1. remainingamount とする。

    2. sourceObserver を新しい internal observer とし、次のように初期化する:

      next steps
      1. もし remaining が > 0 なら、remaining をデクリメントし、この手順を中止する。

      2. アサート: remaining は 0 である。

      3. 渡された valuesubscribernext() メソッドを実行する。

      error steps

      渡された errorsubscribererror() メソッドを実行する。

      complete steps

      subscribercomplete() メソッドを実行する。

    3. options を新しい SubscribeOptions とし、 signalsubscribersubscription controllersignal とする。

    4. sourceObservablesourceObserveroptions で subscribe する。

  3. observable を返す。

テスト
flatMap(mapper) メソッドの手順は以下の通りです:
  1. sourceObservablethis とする。

  2. observable新しい Observable とし、その subscribe callbackSubscriber subscriber を受け取るアルゴリズムである:

    1. idxunsigned long long、初期値0とする。

    2. outerSubscriptionHasCompletedboolean、初期値 false とする。

    3. queue を新しい listany型値)、初期値は空とする。

      注: queueobservable が直前に subscribe された Observable がまだ完了していない間に sourceObservable から発行される Observable を保存するために使われます。

    4. activeInnerSubscriptionboolean、初期値 false とする。

    5. sourceObserver を新しい internal observer とし、次のように初期化する:

      next steps
      1. もし activeInnerSubscription が true なら:

        1. queuevalue を追加する。

          注: この value は、現在 subscribe 中の Observable が 完了した後に処理されます。

      2. それ以外の場合:

        1. activeInnerSubscription を true に設定する。

        2. flatmap process next value stepsvalue, subscriber, mapper、そして 参照として queue, activeInnerSubscription, outerSubscriptionHasCompleted, idx で実行する。

          注: この flatmap process next value stepsvalue から導出された Observable に subscribe し、 その購読が非アクティブになるまで値を処理し続けます(エラーまたは完了)。"inner" の Observable が完了した場合、 処理手順は queue の次の値で再帰的に自分自身を呼び出します。

          値が 存在しない 場合、処理手順は終了し、 activeInnerSubscription の設定を解除し、 sourceObservable から発行される将来の値が正しく処理されるようにします。

      error steps

      渡された errorsubscribererror() メソッドを実行する。

      complete steps
      1. outerSubscriptionHasCompleted を true にする。

        注: activeInnerSubscription が true の場合、下記の手順では subscriber を complete しません。 その場合は flatmap process next value steps が、 "inner" の購読が非アクティブになった後、queueであれば subscriber を complete する責任を持ちます。

      2. activeInnerSubscription が false かつ queueなら、subscribercomplete() メソッドを実行する。

    6. options を新しい SubscribeOptions とし、 signalsubscribersubscription controllersignal とする。

    7. sourceObservablesourceObserveroptions で subscribe する。

  3. observable を返す。

flatmap process next value steps は、any valueSubscriber subscriberMapper mapper、および次の参照を受け取る:listany 値の queuebooleanactiveInnerSubscriptionbooleanouterSubscriptionHasCompletedunsigned long longidx の場合、次の手順を実行する:
  1. mappedResultmapper を «value, idx» と "rethrow" で呼び出した結果とする。

    もし 例外 E が投げられた場合は、 subscribererror() メソッドを E で実行し、この手順を中止する。

  2. idxidx + 1 に設定する。

  3. innerObservablefrom()mappedResult で呼び出した結果とする。

    もし 例外 E が投げられた場合は、 subscribererror() メソッドを E で実行し、この手順を中止する。

    from() を直接呼び出すべきではありません。例外を適切に処理する内部アルゴリズムを呼び出し、subscriber にパイプしたいです。

  4. innerObserver を新しい internal observer とし、次のように初期化する:

    next steps

    渡された valuesubscribernext() メソッドを実行する。

    error steps

    渡された errorsubscribererror() メソッドを実行する。

    complete steps
    1. queue が空でなければ:

      1. nextValuequeue の先頭要素とし、removequeue から削除する。

      2. flatmap process next value stepsnextValuesubscribermapper参照として queueactiveInnerSubscription で実行する。

    2. それ以外の場合:

      1. activeInnerSubscription を false に設定する。

        注: activeInnerSubscription は参照型なので、 以降 "outer" の ObservablesourceObservable)から発行される値が正しく処理されます。

      2. outerSubscriptionHasCompleted が true なら、 subscribercomplete() メソッドを実行する。

        注: "outer" Observable が完了しているが、 まだ subscriber を complete していません。これは、まだ完了していない "inner" の Observable(つまり innerObservable)が キューに存在していたためです。今まさに完了したということです。

  5. innerOptions を新しい SubscribeOptions とし、 signalsubscribersubscription controllersignal とする。

  6. innerObservableinnerObserverinnerOptions で subscribe する。

switchMap(mapper) メソッドの手順は以下の通りです:
  1. sourceObservablethis とする。

  2. observable新しい Observable とし、その subscribe callbackSubscriber subscriber を受け取るアルゴリズムである:

    1. idxunsigned long long、初期値0とする。

    2. outerSubscriptionHasCompletedboolean、初期値 false とする。

    3. activeInnerAbortControllerAbortController または null、初期値 null とする。

      注: AbortController はこのアルゴリズムの next steps のみで新しい値が代入され、 null になるのは switchmap process next value steps で "inner" Observable が完了またはエラーになった時です。現在 "inner" の購読があるかどうかの判定に使います。"complete steps" も参照します。

    4. sourceObserver を新しい internal observer とし、次のように初期化する:

      next steps
      1. activeInnerAbortController が null でなければ、 signal abort する。

        注: これにより前回の値から導出された "inner" Observable から購読解除される。 すぐに新しい値から導出された "inner" Observable に購読する。

      2. activeInnerAbortController新しい AbortController に設定する。

      3. switchmap process next value stepsvaluesubscribermapper参照として activeInnerAbortControllerouterSubscriptionHasCompletedidx で実行する。

        注: switchmap process next value steps は、 value から導出される Observable に subscribe し、 その購読が非アクティブになる(エラー/完了)か、activeInnerAbortController が abort されるまで値を処理します。

      error steps

      渡された errorsubscribererror() メソッドを実行する。

      complete steps
      1. outerSubscriptionHasCompleted を true に設定する。

        注: activeInnerAbortController が null でない場合、直ちに subscriber を complete しない。 その場合は switchmap process next value steps が inner の購読が完了した時点で subscriber を complete する。

      2. activeInnerAbortController が null なら、subscribercomplete() メソッドを実行する。

    5. options を新しい SubscribeOptions とし、 signalsubscribersubscription controllersignal とする。

    6. sourceObservablesourceObserveroptions で subscribe する。

  3. observable を返す。

switchmap process next value steps は、any valueSubscriber subscriberMapper mapper、および次の参照を受け取る:AbortController activeInnerAbortControllerbooleanouterSubscriptionHasCompletedunsigned long longidx の場合、次の手順を実行する:
  1. mappedResultmapper を «value, idx» と "rethrow" で呼び出した結果とする。

    もし 例外 E が投げられた場合は、 subscribererror() メソッドを E で実行し、この手順を中止する。

  2. idxidx + 1 に設定する。

  3. innerObservablefrom()mappedResult で呼び出した結果とする。

    もし 例外 E が投げられた場合は、 subscribererror() メソッドを E で実行し、この手順を中止する。

  4. innerObserver を新しい internal observer とし、次のように初期化する:

    next steps

    渡された valuesubscribernext() メソッドを実行する。

    error steps

    渡された errorsubscribererror() メソッドを実行する。

    注: この時点で activeInnerAbortController を null にする必要はありません。 なぜなら subscribererror() を呼び出すことで "outer" の Observable から購読解除されるため、以降値が push されることはありません。

    complete steps
    1. outerSubscriptionHasCompleted が true なら、 subscribercomplete() メソッドを実行する。

    2. それ以外の場合は activeInnerAbortController を null に設定する。

      注: この変数は参照型なので、 switchMap complete steps に "inner" の購読がないことを知らせます。

  5. innerOptions を新しい SubscribeOptions とし、 signal は、create a dependent abort signal を «activeInnerAbortControllersignalsubscribersubscription controllersignal»、 AbortSignalcurrent realm で生成したものとする。

  6. innerObservableinnerObserverinnerOptions で subscribe する。

inspect(inspectorUnion) メソッドの手順は以下の通りです:
  1. subscribe callbackVoidFunction もしくは null(初期値 null)とする。

  2. next callbackObservableSubscriptionCallback もしくは null(初期値 null)とする。

  3. error callbackObservableSubscriptionCallback もしくは null(初期値 null)とする。

  4. complete callbackVoidFunction もしくは null(初期値 null)とする。

  5. abort callbackObservableInspectorAbortHandler もしくは null(初期値 null)とする。

  6. inspectorUnion を次のように処理する:

    もし inspectorUnionObservableSubscriptionCallback なら
    1. next callbackinspectorUnion に設定する。

    もし inspectorUnionObservableInspector なら
    1. subscribe存在すれば、subscribe callback にそれを設定。

    2. next存在すれば、next callback にそれを設定。

    3. error存在すれば、error callback にそれを設定。

    4. complete存在すれば、complete callback にそれを設定。

    5. abort存在すれば、abort callback にそれを設定。

  7. sourceObservablethis とする。

  8. observable新しい Observable とし、 その subscribe callbackSubscriber subscriber を受け取るアルゴリズムである:

    1. subscribe callback が null でなければ、invoke で «» と "rethrow" で呼び出す。

      もし 例外 E が投げられた場合は、 subscribererror() メソッドを E で実行し、この手順を中止する。

      注: この結果、sourceObservable は決して subscribe されません。

    2. abort callback が null でなければ、subscribersubscription controllersignal に次の abort algorithm を追加する:

      1. abort callback を «subscribersubscription controllersignalabort reason» と "report" で呼び出す。

    3. sourceObserver を新しい internal observer とし、次のように初期化する:

      next steps
      1. next callback が null でなければ、 next callback を «渡された value» と "rethrow" で呼び出す。

        もし 例外 E が投げられた場合:

        1. abort callbacksubscribersubscription controllersignal から削除する。

          注: abort callback は consumer-initiated の解除だけに使うため、プロデューサーによるエラーや完了時は呼ばれないようにする必要があります。

          Chromiumの実装と一致していますが、もともと渡された SubscribeOptionssignal への参照を保持し、その abort 時だけ abort callback を呼び出すべきかも。要調査。

        2. subscribererror() メソッドを E で実行し、この手順を中止する。

      2. 渡された valuesubscribernext() メソッドを実行する。

      error steps
      1. abort callbacksubscribersubscription controllersignal から削除する。

      2. error callback が null でなければ、 error callback を «渡された error» と "rethrow" で呼び出す。

        もし 例外 E が投げられた場合は、 subscribererror() メソッドを E で実行し、この手順を中止する。

      3. 渡された errorsubscribererror() メソッドを実行する。

      complete steps
      1. abort callbacksubscribersubscription controllersignal から削除する。

      2. complete callback が null でなければ、 complete callback を «» と "rethrow" で呼び出す。

        もし 例外 E が投げられた場合は、 subscribererror() メソッドを E で実行し、この手順を中止する。

      3. subscribercomplete() メソッドを実行する。

    4. options を新しい SubscribeOptions とし、 signalsubscribersubscription controllersignal とする。

    5. sourceObservablesourceObserveroptions で subscribe する。

  9. observable を返す。

テスト
catch(callback) メソッドの手順は以下の通りです:
  1. sourceObservablethis とする。

  2. observable新しい Observable とし、 その subscribe callbackSubscriber subscriber を受け取るアルゴリズムである:

    1. sourceObserver を新しい internal observer とし、次のように初期化する:

      next steps

      渡された valuesubscribernext() メソッドを実行する。

      error steps
      1. callback を «渡された error» と "rethrow" で呼び出し、 返り値を result とする。

        もし 例外 E が投げられた場合は、 subscribererror()E で実行し、この手順を中止する。

      2. innerObservablefrom()result で呼び出した結果とする。

        もし 例外 E が投げられた場合は、 subscribererror() メソッドを E で実行し、この手順を中止する。

        from() を直接呼び出すべきではありません。例外を適切に処理する内部アルゴリズムを呼び出し、subscriber にパイプしたいです。

      3. innerObserver を新しい internal observer とし、次のように初期化する:

        next steps

        渡された valuesubscribernext() メソッドを実行する。

        error steps

        渡された errorsubscribererror() メソッドを実行する。

        complete steps

        subscribercomplete() メソッドを実行する。

      4. innerOptions を新しい SubscribeOptions とし、 signalsubscribersubscription controllersignal とする。

      5. innerObservableinnerObserverinnerOptions で subscribe する。

        注: innerObservableへのsubscribeは sourceObservableから購読解除せず安全に実施可能です。なぜなら error steps 内で呼ばれるので sourceObservable の購読は既に完了しています。

      complete steps

      subscribercomplete() メソッドを実行する。

    2. options を新しい SubscribeOptions とし、 signalsubscribersubscription controllersignal とする。

    3. sourceObservablesourceObserveroptions で subscribe する。

  3. observable を返す。

finally(callback) メソッドの手順は以下の通りです:
  1. sourceObservablethis とする。

  2. observable新しい Observable とし、 その subscribe callbackSubscriber subscriber を受け取るアルゴリズムである:

    1. subscriberaddTeardown() メソッドを callback で実行する。

    2. sourceObserver を新しい internal observer とし、次のように初期化する:

      next steps

      渡された valuesubscribernext() メソッドを実行する。

      error steps
      1. 渡された errorsubscribererror() メソッドを実行する。

      complete steps
      1. subscribercomplete() メソッドを実行する。

    3. options を新しい SubscribeOptions とし、 signalsubscribersubscription controllersignal とする。

    4. sourceObservablesourceObserveroptions で subscribe する。

  3. observable を返す。

2.3.3. Promiseを返すオペレーター

toArray(options) メソッドの手順は以下の通りです:
  1. p新しい promiseとする。

  2. optionssignal が null でない場合:

    1. optionssignalaborted なら:

      1. poptionssignalabort reason で reject する。

      2. p を返す。

    2. abort algorithmoptionssignal に追加:

      1. poptionssignalabort reason で reject する。

      注: ここでは p を reject するだけです。this Observable の購読も自動的にcloseされます。"inner" Subscriberが closeされるためです。

  3. values新しい listとする。

  4. observer を新しい internal observer とし、次のように初期化:

    next steps

    渡された valuevalues に追加する。

    error steps

    p を渡された error で reject する。

    complete steps

    pvalues で resolve する。

  5. thisobserveroptions で subscribe する。

  6. p を返す。

テスト
forEach(callback, options) メソッドの手順は以下の通りです:
  1. p新しい promiseとする。

  2. visitor callback controller新しい AbortControllerとする。

  3. internal options を新しい SubscribeOptions とし、 signaldependent abort signalvisitor callback controllersignaloptionssignal(nullでなければ))で生成する。 AbortSignalcurrent realm を使う。

    多くの trivial な internal observer は値をチェーンに流すだけで購読を制御しません。 このオペレーターでは下記 observernext steps が callback が例外を投げた場合、基礎となる購読を abort する責任を持ちます。なので "Observableへのsubscribe" に渡す SubscribeOptionssignal は options の signal と、かつ AbortSignal を持つ AbortController の dependent signal にする必要がある。

  4. internal optionssignalaborted なら:

    1. pinternal optionssignalabort reason で reject する。

    2. p を返す。

  5. abort algorithminternal optionssignal に追加:

    1. pinternal optionssignalabort reason で reject する。

      注: p の reject は internal options の signal に結び付けられています。 options の signal の abort時の microtaskが先にキューされ、p の reject handler より先に実行されます。

  6. idxunsigned long long(初期値0)とする。

  7. observer を新しい internal observer とし、次のように初期化:

    next steps
    1. callback を «渡された value, idx» と "rethrow" で呼び出す。

      もし 例外 E が投げられた場合は、 pE で reject し、visitor callback controllerE で abort する。

    2. idx をインクリメントする。

    error steps

    p を渡された error で reject する。

    complete steps

    pundefined で resolve する。

  8. thisobserverinternal options で subscribe する。

  9. p を返す。

テスト
every(predicate, options) メソッドの手順は以下の通りです:
  1. p新しい promiseとする。

  2. controller新しい AbortControllerとする。

  3. internal options を新しい SubscribeOptions とし、 signaldependent abort signalcontrollersignaloptionssignal(nullでなければ))で生成する。 AbortSignalcurrent realm を使う。

  4. internal optionssignalaborted なら:

    1. pinternal optionssignalabort reason で reject する。

    2. p を返す。

  5. abort algorithminternal optionssignal に追加:

    1. pinternal optionssignalabort reason で reject する。

  6. idxunsigned long long(初期値0)とする。

  7. observer を新しい internal observer とし、次のように初期化:

    next steps
    1. predicate を «渡された value, idx» と "rethrow" で呼び出し、返り値を passed とする。

      もし 例外 E が投げられた場合は、 pE で reject し、controllerE で abort する。

    2. idxidx + 1 に設定する。

    3. passed が false なら p を false で resolve し、controller を abort する。

    error steps

    p を渡された error で reject する。

    complete steps

    p を true で resolve する。

  8. thisobserverinternal options で subscribe する。

  9. p を返す。

first(options) メソッドの手順は以下の通りです:
  1. p新しい promiseとする。

  2. controller新しい AbortControllerとする。

  3. internal options を新しい SubscribeOptions とし、 signaldependent abort signalcontrollersignaloptionssignal(nullでなければ))で生成する。 AbortSignalcurrent realm を使う。

  4. internal optionssignalaborted なら:

    1. pinternal optionssignalabort reason で reject する。

    2. p を返す。

  5. abort algorithminternal optionssignal に追加:

    1. pinternal optionssignalabort reason で reject する。

  6. internal observer を新しい internal observer とし、次のように初期化:

    next steps
    1. p を渡された value で resolve する。

    2. controller を abort する。

    error steps

    Reject p を渡された error で拒否する。

    complete steps

    p を新しい RangeError で reject する。

    注: これは、ソース Observable が 1つも値を発行する前に complete した場合のみ到達します。

  7. thisinternal observerinternal options で subscribe する。

  8. p を返す。

last(options) メソッドの手順は以下の通りです:
  1. p新しい promiseとする。

  2. optionssignal が null でない場合:

    1. optionssignalaborted なら:

      1. poptionssignalabort reason で reject する。

      2. p を返す。

    2. abort algorithmoptionssignal に追加:

      1. poptionssignalabort reason で reject する。

  3. lastValueany または null(初期値 null)とする。

  4. hasLastValueboolean(初期値 false)とする。

  5. observer を新しい internal observer とし、次のように初期化:

    next steps
    1. hasLastValue を true に設定する。

    2. lastValue を渡された value に設定する。

    error steps

    p を渡された error で reject する。

    complete steps
    1. hasLastValue が true なら plastValue で resolve する。

      1. それ以外なら p を新しい RangeError で reject する。

        注: first() の注を参照。

  6. thisobserveroptions で subscribe する。

  7. p を返す。

find(predicate, options) メソッドの手順は以下の通りです:
  1. p新しい promiseとする。

  2. controller新しい AbortControllerとする。

  3. internal options を新しい SubscribeOptions とし、 signaldependent abort signalcontrollersignaloptionssignal(nullでなければ))で生成する。 AbortSignalcurrent realm を使う。

  4. internal optionssignalaborted なら:

    1. pinternal optionssignalabort reason で reject する。

    2. p を返す。

  5. abort algorithminternal optionssignal に追加:

    1. pinternal optionssignalabort reason で reject する。

  6. idxunsigned long long(初期値0)とする。

  7. observer を新しい internal observer とし、次のように初期化:

    next steps
    1. predicate を «渡された value, idx» と "rethrow" で呼び出し、返り値を passed とする。

      もし 例外 E が投げられた場合は、 pE で reject し、controllerE で abort する。

    2. idxidx + 1 に設定する。

    3. passed が true なら pvalue で resolve し、controller を abort する。

    error steps

    p を渡された error で reject する。

    complete steps

    pundefined で resolve する。

  8. thisobserverinternal options で subscribe する。

  9. p を返す。

some(predicate, options) メソッドの手順は以下の通りです:
  1. p新しい promiseとする。

  2. controller新しい AbortControllerとする。

  3. internal options を新しい SubscribeOptions とし、 signaldependent abort signalcontrollersignaloptionssignal(nullでなければ))で生成する。 AbortSignalcurrent realm を使う。

  4. internal optionssignalaborted なら:

    1. pinternal optionssignalabort reason で reject する。

    2. p を返す。

  5. abort algorithminternal optionssignal に追加:

    1. pinternal optionssignalabort reason で reject する。

  6. idxunsigned long long(初期値0)とする。

  7. observer を新しい internal observer とし、次のように初期化:

    next steps
    1. predicate を «渡された value, idx» と "rethrow" で呼び出し、返り値を passed とする。

      もし 例外 E が投げられた場合は、 pE で reject し、controllerE で abort する。

    2. idxidx + 1 に設定する。

    3. passed が true なら p を true で resolve し、controller を abort する。

    error steps

    p を渡された error で reject する。

    complete steps

    p を false で resolve する。

  8. thisobserverinternal options で subscribe する。

  9. p を返す。

reduce(reducer, initialValue, options) メソッドの手順は以下の通りです:
  1. p新しい promiseとする。

  2. controller新しい AbortControllerとする。

  3. internal options を新しい SubscribeOptions とし、 signaldependent abort signalcontrollersignaloptionssignal(nullでなければ))で生成する。 AbortSignalcurrent realm を使う。

  4. internal optionssignalaborted なら:

    1. pinternal optionssignalabort reason で reject する。

    2. p を返す。

  5. abort algorithminternal optionssignal に追加:

    1. pinternal optionssignalabort reason で reject する。

  6. idxunsigned long long(初期値0)とする。

  7. accumulatorinitialValue(与えられていれば)で初期化し、なければ未初期化とする。

  8. observer を新しい internal observer とし、次のように初期化:

    next steps
    1. accumulator が未初期化(つまり initialValue が渡されていない)なら、 accumulator を渡された value に設定し、idxidx + 1 にし、この手順を中止する。

      注: この場合 reducerthis が最初に出す currentValue では呼ばれません。 2つ目以降の値が発行されたとき、reducer をその値(currentValue)と 最初の値(accumulator)で呼び出します。

    2. reducer を «accumulatoraccumulator), 渡された valuecurrentValue), idxindex)»と "rethrow" で呼び出し、返り値を result とする。

      もし 例外 E が投げられた場合は、 pE で reject し、controllerE で abort する。

    3. idxidx + 1 に設定する。

    4. accumulatorresult に設定する。

    error steps

    p を渡された error で reject する。

    complete steps
    1. accumulator が "unset" 以外なら paccumulator で resolve する。

      それ以外なら pTypeError で reject する。

  9. thisobserverinternal options で subscribe する。

  10. p を返す。

3. EventTarget 統合

dictionary ObservableEventListenerOptions {
  boolean capture = false;
  boolean passive;
};

partial interface EventTarget {
  Observable when(DOMString type, optional ObservableEventListenerOptions options = {});
};
when(type, options) メソッドの手順は以下の通りです:
  1. this関連グローバルオブジェクトWindow オブジェクトで、その 関連付けられた Document完全にアクティブ でない場合は return。

  2. event targetthis とする。

  3. observable新しい Observable とし、次のように初期化する:

    subscribe callback

    Subscriber subscriber を受け取るアルゴリズムで、次の手順を実行する:

    1. event target が null なら、この手順を中止する。

      注: event target は購読時点で ガベージコレクションされている可能性を考慮しています。

    2. subscribersubscription controllersignalaborted なら、この手順を中止する。

    3. event listener を追加する。対象は event targetevent listener は次の通り:

      type

      type

      callback

      新しい Web IDL EventListener インスタンスを生成し、1引数の Eventevent を受け取る関数への参照とする。この関数は observable event listener invoke algorithmsubscriberevent で実行する。

      capture

      optionscapture

      passive

      optionspassive存在すればその値、なければ null。

      once

      false

      signal

      subscribersubscription controllersignal

      注: event listenersubscription controllersignalaborted になった時 必ずクリーンアップされます(エンジンの所有モデルによらず)。

  4. observable を返す。

observable event listener invoke algorithmSubscriber subscriberEvent event を受け取り、次の手順を実行する:
  1. subscribernext() メソッドを event で実行する。

テスト

4. セキュリティとプライバシーに関する考慮事項

この内容は explainer から本仕様へのアップストリーム中です。現時点では以下のリソースを参照できます:

5. 謝辞

特に Ben Lesh に感謝します。Observable API の設計に多大な貢献をいただき、長年にわたりユーザーランド Observable コードをメンテナンスしてきたことが Webプラットフォームへの貢献を可能にしました。

索引

本仕様で定義されている用語

参考文献で定義されている用語

参考文献

規範的参考文献

[DOM]
Anne van Kesteren. DOM 標準. リビングスタンダード. URL: https://dom.spec.whatwg.org/
[ECMASCRIPT]
ECMAScript 言語仕様. URL: https://tc39.es/ecma262/multipage/
[HTML]
Anne van Kesteren; et al. HTML 標準. リビングスタンダード. URL: https://html.spec.whatwg.org/multipage/
[INFRA]
Anne van Kesteren; Domenic Denicola. Infra 標準. リビングスタンダード. URL: https://infra.spec.whatwg.org/
[WEBIDL]
Edgar Chen; Timothy Gu. Web IDL 標準. リビング スタンダード. URL: https://webidl.spec.whatwg.org/

IDL索引

[Exposed=*]
interface Subscriber {
  undefined next(any value);
  undefined error(any error);
  undefined complete();
  undefined addTeardown(VoidFunction teardown);

  // True after the Subscriber is created, up until either
  // complete()/error() are invoked, or the subscriber unsubscribes. Inside
  // complete()/error(), this attribute is true.
  readonly attribute boolean active;

  readonly attribute AbortSignal signal;
};

// SubscribeCallback is where the Observable "creator's" code lives. It's
// called when subscribe() is called, to set up a new subscription.
callback SubscribeCallback = undefined (Subscriber subscriber);
callback ObservableSubscriptionCallback = undefined (any value);

dictionary SubscriptionObserver {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;
};

callback ObservableInspectorAbortHandler = undefined (any value);

dictionary ObservableInspector {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;

  VoidFunction subscribe;
  ObservableInspectorAbortHandler abort;
};

typedef (ObservableSubscriptionCallback or SubscriptionObserver) ObserverUnion;
typedef (ObservableSubscriptionCallback or ObservableInspector) ObservableInspectorUnion;

dictionary SubscribeOptions {
  AbortSignal signal;
};

callback Predicate = boolean (any value, unsigned long long index);
callback Reducer = any (any accumulator, any currentValue, unsigned long long index);
callback Mapper = any (any value, unsigned long long index);
// Differs from Mapper only in return type, since this callback is exclusively
// used to visit each element in a sequence, not transform it.
callback Visitor = undefined (any value, unsigned long long index);

// This callback returns an `any` that must convert into an `Observable`, via
// the `Observable` conversion semantics.
callback CatchCallback = any (any value);

[Exposed=*]
interface Observable {
  constructor(SubscribeCallback callback);
  undefined subscribe(optional ObserverUnion observer = {}, optional SubscribeOptions options = {});

  // Constructs a native Observable from value if it's any of the following:
  //   - Observable
  //   - AsyncIterable
  //   - Iterable
  //   - Promise
  static Observable from(any value);

  // Observable-returning operators. See "Operators" section in the spec.
  //
  // takeUntil() can consume promises, iterables, async iterables, and other
  // observables.
  Observable takeUntil(any value);
  Observable map(Mapper mapper);
  Observable filter(Predicate predicate);
  Observable take(unsigned long long amount);
  Observable drop(unsigned long long amount);
  Observable flatMap(Mapper mapper);
  Observable switchMap(Mapper mapper);
  Observable inspect(optional ObservableInspectorUnion inspectorUnion = {});
  Observable catch(CatchCallback callback);
  Observable finally(VoidFunction callback);

  // Promise-returning operators.
  Promise<sequence<any>> toArray(optional SubscribeOptions options = {});
  Promise<undefined> forEach(Visitor callback, optional SubscribeOptions options = {});
  Promise<boolean> every(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> first(optional SubscribeOptions options = {});
  Promise<any> last(optional SubscribeOptions options = {});
  Promise<any> find(Predicate predicate, optional SubscribeOptions options = {});
  Promise<boolean> some(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> reduce(Reducer reducer, optional any initialValue, optional SubscribeOptions options = {});
};

dictionary ObservableEventListenerOptions {
  boolean capture = false;
  boolean passive;
};

partial interface EventTarget {
  Observable when(DOMString type, optional ObservableEventListenerOptions options = {});
};

課題索引

from() を直接呼び出すべきではありません。例外を正しく処理できる内部アルゴリズムを呼び出し、ここで適切に例外を扱い、subscriber にパイプしたいです。
これは Chromium の実装と一致していますが、元々渡された SubscribeOptionssignal への参照を保持し、それが abort されたときのみ abort callback を呼ぶべきかもしれません。結果はほぼ同じですが、要調査です。
from() を直接呼び出すべきではありません。例外を正しく処理できる内部アルゴリズムを呼び出し、ここで適切に例外を扱い、subscriber にパイプしたいです。