1. はじめに
この節は参考情報です。
ウェブプラットフォームの多くはストリーミングデータに基づいて構築されています。つまり、データが順次生成・処理・消費され、すべてをメモリに読み込むことなく扱われています。ストリーム現行標準は、そのようなストリーミングデータを作成し、やり取りするための共通APIセットを提供します。これらはReadableストリーム、Writableストリーム、およびTransformストリームとして具体化されています。
これらのAPIは、低レベルのI/Oプリミティブに効率的にマッピングできるよう設計されており、必要に応じてバイトストリームへの特化も含みます。複数のストリームをパイプチェーンとして容易に合成したり、リーダーやライターを通して直接利用できます。さらに、バックプレッシャーやキューイングも自動的に提供されるよう設計されています。
この標準は、ウェブプラットフォームの他の部分が自身のストリーミングデータを公開するための基本的なストリームプリミティブを提供します。例えば、[FETCH]はResponse
のbodyを
ReadableStream
インスタンスとして公開します。より一般的には、プラットフォーム上にはストリームとして表現できるストリーミング抽象が多く存在します。マルチメディアストリーム、ファイルストリーム、グローバル間通信などは、すべてのデータを一度にバッファせずに順次処理できることで恩恵を受けます。これらのストリームを開発者に公開する基盤を提供することで、Streams現行標準は以下のようなユースケースを可能にします:
-
動画エフェクト: 読み取り可能な動画ストリームをリアルタイムでエフェクトを適用するTransformストリームにパイプする。
-
展開処理: ファイルストリームをTransformストリームにパイプし、.tgzアーカイブからファイルを選択的に展開して、画像ギャラリーをスクロールする際に
img
要素に変換する。 -
画像デコード: HTTPレスポンスストリームをTransformストリームにパイプし、バイト列をビットマップデータへデコードし、さらに別のTransformでビットマップをPNGへ変換する。これをサービスワーカーの
fetch
フック内に組み込むことで、新しい画像フォーマットを透過的にポリフィルできる。[SERVICE-WORKERS]
ウェブ開発者は、ここで説明されているAPIを使って自身のストリームを作成することもでき、プラットフォームが提供するものと同じAPIを利用できます。他の開発者も、プラットフォーム提供のストリームとライブラリ提供のストリームを透過的に合成できます。このように、ここで説明するAPIはすべてのストリームの統一抽象を提供し、共有・合成可能なインターフェースの周りにエコシステムの成長を促します。
2. モデル
チャンクは、ストリームに書き込まれる、またはストリームから読み取られる単一のデータ片です。型は任意であり、ストリーム内に異なる型のチャンクを含むことも可能です。チャンクは、ストリームごとに最も細かい単位とは限りません。例えば、バイトストリームの場合、16
KiBのUint8Array
が1チャンクとなることがあり、単一バイトではない場合もあります。
2.1. Readableストリーム
Readableストリーム
は、データのソースを表し、そこからデータを読み取ることができます。つまり、データはReadableストリームから出てきます。具体的には、Readableストリームは
ReadableStream
クラスのインスタンスです。
Readableストリームは任意の動作で作成できますが、ほとんどのReadableストリームは基礎のソースと呼ばれる低レベルのI/Oソースをラップします。基礎のソースには、プッシュソースとプルソースの2種類があります。
プッシュソースは、リスナーの有無に関係なくデータを送り出します。データの流れを一時停止・再開する仕組みを持つ場合もあります。例としてはTCPソケットがあり、OSレベルから継続的にデータがプッシュされ、TCPウィンドウサイズの変更で速度調整が可能です。
プルソースは、データ要求があった時のみデータを返します。データはOSのメモリバッファにあれば同期的に取得できますし、ディスクから読み込む場合は非同期的になります。例としてはファイルハンドルがあり、特定の位置にシークして一定量を読み取ります。
Readableストリームは、これら両方のソースを単一の統一インターフェースでラップするよう設計されています。ウェブ開発者が作るストリームの場合、ソースの実装詳細は特定のメソッドやプロパティを持つオブジェクトとして
ReadableStream()
コンストラクタに渡されます。
チャンクは、ストリームの基礎のソースによってストリームにエンキューされます。これらはストリームの公開インターフェースを使って1つずつ読み取ることができ、特にReadableストリームリーダーを
getReader()
メソッドで取得することで利用できます。
公開インターフェースを使ってReadableストリームから読み取るコードはコンシューマと呼ばれます。
コンシューマは、Readableストリームのキャンセルもできます。
これはcancel()
メソッドで実行でき、ストリームへの関心を失ったことを示します。ストリームは即座に閉じられ、キューされたチャンクは破棄され、基礎のソースのキャンセル機構が実行されます。
また、コンシューマはReadableストリームの分岐(ティー)もできます。
tee()
メソッドで実行でき、ストリームはロックされ直接利用できなくなりますが、分岐と呼ばれる2つの新しいストリームが作られ、それぞれ独立して消費できます。
バイトを扱うストリームについては、コピーを最小化するなど効率的にバイトを扱うための拡張版Readableストリームが提供されます。こうしたReadableストリームの基礎のソースは基礎のバイトソースと呼ばれます。基礎のソースが基礎のバイトソースであるReadableストリームは、Readableバイトストリームと呼ばれることがあります。Readableバイトストリームの利用者は、ストリームの
BYOBリーダーを
getReader()
メソッドで取得できます。
2.2. Writableストリーム
Writableストリーム
は、データの書き込み先を表します。つまり、データはWritableストリームに入ります。具体的には、Writableストリームは
WritableStream
クラスのインスタンスです。
Readableストリームと同様に、ほとんどのWritableストリームは基礎のシンクと呼ばれる低レベルI/Oシンクをラップします。Writableストリームは、基礎のシンクへの書き込みをキューに入れ、順番に一つずつ基礎のシンクへ渡すことで、その複雑さを抽象化します。
チャンクは、ストリームの公開インターフェースを通じて書き込まれ、1つずつストリームの基礎のシンクに渡されます。ウェブ開発者が作成するストリームの場合、シンクの実装詳細は特定のメソッドを持つオブジェクトとして
WritableStream()
コンストラクタに渡されます。
公開インターフェースを使ってWritableストリームに書き込むコードはプロデューサと呼ばれます。
プロデューサは、Writableストリームを中断することもできます。
abort()
メソッドで実行でき、何らかの問題が発生したと判断した場合に今後の書き込みを中止します。ストリームはエラー状態となり、基礎のシンクからのシグナルがなくても、ストリームの内部キュー内のすべての書き込みが破棄されます。
2.3. Transformストリーム
Transformストリーム は、Writableストリーム(Writable側と呼ばれる)と、Readableストリーム(Readable側と呼ばれる)、2つのストリームのペアで構成されます。Transformストリームごとに定められた方法で、Writable側への書き込みは新たなデータがReadable側から読み取れるようになります。
具体的には、writable
プロパティとreadable
プロパティを持つオブジェクトはTransformストリームとして機能します。しかし、標準のTransformStream
クラスを使えば、このペアを適切に結びつけて容易に作成できます。これはトランスフォーマをラップし、変換処理のアルゴリズムを定義します。ウェブ開発者が作成するストリームの場合、トランスフォーマの実装詳細は特定のメソッドやプロパティを持つオブジェクトとして
TransformStream()
コンストラクタに渡されます。他の仕様ではGenericTransformStream
ミックスインを使って、同じwritable
/readable
ペアのプロパティを持つが独自APIを追加したクラスを作成する場合もあります。
アイデンティティTransformストリームは、Writable側に書き込まれたチャンクを、そのままReadable側に転送する変換を行わないTransformストリームです。これは様々な場面で有用です。デフォルトでは、TransformStream
コンストラクタにtransform()
メソッドがトランスフォーマオブジェクト上に存在しない場合、アイデンティティTransformストリームが作成されます。
Transformストリームの例として、以下のようなものがあります:
-
GZIP圧縮: 非圧縮バイトを書き込み、圧縮バイトを読み取る。
-
動画デコーダ: エンコードされたバイトを書き込み、非圧縮の動画フレームを読み取る。
-
テキストデコーダ: バイトを書き込み、文字列を読み取る。
-
CSV→JSON変換: CSVファイルの各行を表す文字列を書き込み、対応するJavaScriptオブジェクトを読み取る。
2.4. パイプチェーンとバックプレッシャー
ストリームは主に、パイプ処理によって相互接続して使われます。ReadableストリームはpipeTo()
メソッドで直接Writableストリームにパイプできますし、pipeThrough()
メソッドで1つ以上のTransformストリームを経由してパイプすることもできます。
このように複数のストリームを接続したものをパイプチェーンと呼びます。パイプチェーンでは、元のソースはチェーン内最初のReadableストリームの基礎のソース、最終シンクはチェーン最後のWritableストリームの基礎のシンクです。
パイプチェーンが構築されると、どのチャンクをどの速度で流すかに関するシグナルを伝播します。チェーン内のどこかでチャンクをまだ受け付けられない場合、シグナルはチェーンを逆方向に伝播し、最終的に元のソースに「チャンクの生成を急がないように」と通知されます。このような元のソースのデータ生成速度をチェーンの処理速度に合わせて調整する仕組みをバックプレッシャーと呼びます。
具体的には、元のソースは
controller.desiredSize
(または
byteController.desiredSize
)の値を受け取り、それに応じてデータ生成速度を調整します。この値は最終シンクに対応する
writer.desiredSize
から導出され、最終シンクがチャンクを書き込み終えるたびに更新されます。チェーン構築時に使うpipeTo()
メソッドは、この情報がパイプチェーン全体で自動的に逆伝播することを保証します。
ティー処理されたReadableストリームでは、2つのバックプレッシャーシグナルが分岐それぞれから集約され、どちらの分岐も読み取られていない場合は元のストリームの基礎のソースにバックプレッシャーシグナルが送られます。
パイプ処理はReadableストリームとWritableストリームをロックし、パイプ動作中は操作できなくなります。これによって実装は重要な最適化を行えます。例えば、基礎のソースから基礎のシンクへ直接データを受け渡し、中間キューをバイパスすることが可能です。
2.5. 内部キューとキュー戦略
ReadableストリームとWritableストリームはどちらも内部キューを持ち、類似の目的で利用します。Readableストリームの場合、内部キューにはチャンクが、基礎のソースによってエンキューされ、まだコンシューマに読まれていないものが入っています。Writableストリームの場合は、内部キューにチャンクがプロデューサによって書き込まれ、まだ基礎のシンクで処理・確定されていないものが入っています。
キュー戦略は、ストリームが内部キューの状態に基づいて、どのようにバックプレッシャーをシグナルすべきかを決定するオブジェクトです。キュー戦略は各チャンクにサイズを割り当て、キュー内のチャンク合計サイズと、ハイウォーターマークと呼ばれる指定値を比較します。ハイウォーターマークから合計サイズを引いた差分が、キューを満たすための望ましいサイズとして使われます。
Readableストリームでは、基礎のソースがこの望ましいサイズをバックプレッシャーのシグナルとして使い、チャンク生成を減速し望ましいサイズが0以上になるように努めます。Writableストリームでは、プロデューサが同様に、望ましいサイズが負にならないよう書き込みを避けることができます。
具体的には、ウェブ開発者作成のストリーム用キュー戦略は、highWaterMark
プロパティを持つ任意のJavaScriptオブジェクトです。バイトストリームではhighWaterMark
の単位は常にバイトです。それ以外のストリームではデフォルト単位はチャンクですが、strategyオブジェクトにsize()
関数を含めることで、任意のチャンクに対してサイズを返せます。これによりhighWaterMark
を任意の浮動小数点単位で指定できます。
JavaScriptでは、例えば
のように手動で戦略を記述できますし、組み込みのCountQueuingStrategy
クラスを使えば
と記述できます。
2.6. ロック
Readableストリームリーダー(単にリーダー)は、Readableストリームからチャンクを直接読み取るためのオブジェクトです。リーダーがない場合、コンシューマはストリームの高水準操作(キャンセルや、パイプ処理など)しか行えません。リーダーはストリームのgetReader()
メソッドで取得します。
Readableバイトストリームは、2種類のリーダー(デフォルトリーダーとBYOBリーダー)を提供できます。BYOB("bring your own
buffer")リーダーは、開発者が用意したバッファに読み込むことでコピーを最小化します。非バイトストリームはデフォルトリーダーのみ提供します。デフォルトリーダーはReadableStreamDefaultReader
クラスのインスタンス、BYOBリーダーは
ReadableStreamBYOBReader
クラスのインスタンスです。
同様に、Writableストリームライター(単にライター)は、Writableストリームにチャンクを直接書き込むためのオブジェクトです。ライターがない場合、プロデューサはストリームの高水準操作(中断や、パイプ処理など)しか行えません。ライターはWritableStreamDefaultWriter
クラスで表されます。
内部的には、これらの高水準操作も実際にはリーダーやライターを利用しています。
ReadableストリームまたはWritableストリームには、同時に最大1つのリーダーまたはライターしか存在できません。この場合、ストリームはロック状態であり、リーダーやライターはアクティブです。この状態はreadableStream.locked
や
writableStream.locked
プロパティで判定できます。
リーダーやライターは、ロック解除も可能です。ロック解除すると非アクティブになり、次のリーダーやライターの取得が可能になります。これは
defaultReader.releaseLock()
、
byobReader.releaseLock()
、
writer.releaseLock()
メソッドで実行します。
3. 規約
本仕様はInfra現行標準に依存します。[INFRA]
本仕様では、JavaScript仕様から抽象演算の概念を内部アルゴリズムに利用しています。これには、戻り値を完了レコードとして扱うこと、および!や?の接頭辞による完了レコードの展開が含まれます。[ECMASCRIPT]
また、本仕様ではJavaScript仕様から内部スロットの概念と記法も使用します。(ただし、内部スロットはWeb IDLのプラットフォームオブジェクト上にあり、JavaScriptオブジェクト上にはありません。)
これら外部のJavaScript仕様由来の規約を使っている理由は主に歴史的なものです。独自のウェブ仕様を書く際は、ここでの例を安易に模倣しないよう推奨します。
本仕様では、すべての数値は倍精度64ビットIEEE 754浮動小数点値(JavaScriptのNumber型やWeb IDLunrestricted double
型と同様)として表現され、それらに対する算術演算は標準的な方法で行う必要があります。これは特に§ 8.1
Queue-with-sizesで説明されるデータ構造に関して重要です。[IEEE-754]
4. Readableストリーム
4.1. Readableストリームの利用
readableStream. pipeTo( writableStream) . then(() => console. log( "すべてのデータが正常に書き込まれました!" )) . catch ( e=> console. error( "何らかのエラーが発生しました!" , e));
readableStream. pipeTo( new WritableStream({ write( chunk) { console. log( "チャンク受信" , chunk); }, close() { console. log( "すべてのデータが正常に読み込まれました!" ); }, abort( e) { console. error( "何らかのエラーが発生しました!" , e); } }));
read()
メソッドで順次チャンクを直接読み取ることもできます。例えば次のコードは、ストリーム内の次のチャンクがあればログに出力します。
const reader= readableStream. getReader(); reader. read(). then( ({ value, done}) => { if ( done) { console. log( "ストリームは既に閉じられています!" ); } else { console. log( value); } }, e=> console. error( "ストリームがエラー状態となり、読み込みできません!" , e) );
const reader= readableStream. getReader({ mode: "byob" }); let startingAB= new ArrayBuffer( 1024 ); const buffer= await readInto( startingAB); console. log( "最初の1024バイト:" , buffer); async function readInto( buffer) { let offset= 0 ; while ( offset< buffer. byteLength) { const { value: view, done} = await reader. read( new Uint8Array( buffer, offset, buffer. byteLength- offset)); buffer= view. buffer; if ( done) { break ; } offset+= view. byteLength; } return buffer; }
ここで重要なのは、最終的なbuffer
の値はstartingAB
とは異なりますが、(すべての中間バッファも含めて)同じメモリ領域を共有している点です。各ステップでバッファは転送され、新しいArrayBuffer
オブジェクトになります。view
は、Uint8Array
の読み取り値から分割代入され、そのArrayBuffer
オブジェクトはbuffer
プロパティ、バイトが書き込まれたオフセットはbyteOffset
プロパティ、書き込まれたバイト数はbyteLength
プロパティとなります。
この例は主に教育用です。実用面では、min
オプションを使えば、read()
でバイト数を直接指定して簡単に読み込めます:
const reader= readableStream. getReader({ mode: "byob" }); const { value: view, done} = await reader. read( new Uint8Array( 1024 ), { min: 1024 }); console. log( "最初の1024バイト:" , view);
4.2. ReadableStream
クラス
ReadableStream
クラスは、一般的なReadableストリーム概念の具象インスタンスです。任意のチャンク型に適応でき、基礎のソースから供給され、まだコンシューマに読まれていないデータを管理するための内部キューを保持します。
4.2.1. インターフェイス定義
ReadableStream
クラスのWeb IDL定義は以下の通りです:
[Exposed=*,Transferable ]interface {
ReadableStream constructor (optional object ,
underlyingSource optional QueuingStrategy = {});
strategy static ReadableStream from (any );
asyncIterable readonly attribute boolean locked ;Promise <undefined >cancel (optional any );
reason ReadableStreamReader getReader (optional ReadableStreamGetReaderOptions = {});
options ReadableStream pipeThrough (ReadableWritablePair ,
transform optional StreamPipeOptions = {});
options Promise <undefined >pipeTo (WritableStream ,
destination optional StreamPipeOptions = {});
options sequence <ReadableStream >tee ();async_iterable <any >(optional ReadableStreamIteratorOptions = {}); };
options typedef (ReadableStreamDefaultReader or ReadableStreamBYOBReader );
ReadableStreamReader enum {
ReadableStreamReaderMode };
"byob" dictionary {
ReadableStreamGetReaderOptions ReadableStreamReaderMode ; };
mode dictionary {
ReadableStreamIteratorOptions boolean =
preventCancel false ; };dictionary {
ReadableWritablePair required ReadableStream ;
readable required WritableStream ; };
writable dictionary {
StreamPipeOptions boolean =
preventClose false ;boolean =
preventAbort false ;boolean =
preventCancel false ;AbortSignal ; };
signal
4.2.2. 内部スロット
ReadableStream
のインスタンスは、以下の表で説明する内部スロットと共に生成されます:
内部スロット | 説明(参考情報) |
---|---|
[[controller]] | ReadableStreamDefaultController
または
ReadableByteStreamController
で、このストリームの状態やキューを制御するために生成されたもの
|
[[Detached]] | ストリームが転送されたときにtrueとなるブールフラグ |
[[disturbed]] | ストリームが読み取られた、またはキャンセルされたときにtrueとなるブールフラグ |
[[reader]] | ReadableStreamDefaultReader
または ReadableStreamBYOBReader
のインスタンス。ストリームがリーダーにロックされている場合はそのインスタンス、そうでなければundefined
|
[[state]] |
ストリームの現在の状態を示す内部的な文字列。"readable "、"closed "、"errored "のいずれか
|
[[storedError]] | ストリームが失敗した理由を示す値。エラー状態のストリームに対して操作しようとしたときに例外や失敗理由として返される |
4.2.3. 基礎のソースAPI
ReadableStream()
コンストラクタは、最初の引数として基礎のソースを表すJavaScriptオブジェクトを受け付けます。これらのオブジェクトは以下のいずれかのプロパティを含むことができます:
dictionary {
UnderlyingSource UnderlyingSourceStartCallback start ;UnderlyingSourcePullCallback pull ;UnderlyingSourceCancelCallback cancel ;ReadableStreamType type ; [EnforceRange ]unsigned long long autoAllocateChunkSize ; };typedef (ReadableStreamDefaultController or ReadableByteStreamController );
ReadableStreamController callback =
UnderlyingSourceStartCallback any (ReadableStreamController );
controller callback =
UnderlyingSourcePullCallback Promise <undefined > (ReadableStreamController );
controller callback =
UnderlyingSourceCancelCallback Promise <undefined > (optional any );
reason enum {
ReadableStreamType "bytes" };
start(controller)
, 型 UnderlyingSourceStartCallback-
ReadableStream
の作成時に即座に呼び出される関数です。通常は、プッシュソースを適応するため、該当するイベントリスナーの設定(§ 10.1 基礎のpushソース(バックプレッシャーなし)のReadableストリームの例など)や、 プルソースへのアクセス取得(§ 10.4 基礎のpullソースのReadableストリームなど)に使われます。
このセットアップ処理が非同期の場合は、Promiseを返して成功・失敗を通知できます。Promiseがrejectされた場合はストリームがエラーになり、例外が投げられた場合は
ReadableStream()
コンストラクタで再スローされます。 pull(controller)
, 型 UnderlyingSourcePullCallback-
ストリームの内部キューが満杯でなくなったとき(つまりキューの望ましいサイズが正になったとき)に呼び出される関数です。通常はキューがハイウォーターマークに達するまで繰り返し呼ばれます(すなわち望ましいサイズが0以下になるまで)。
プッシュソースでは、§ 10.2 基礎のpushソース+バックプレッシャー付きのReadableストリームのように一時停止したデータフローを再開するために使われます。プルソースでは、新しいチャンクをストリームにエンキューするために利用されます(§ 10.4 基礎のpullソースのReadableストリーム参照)。
この関数は
start()
が正常に完了するまでは呼ばれません。また、1つでもチャンクをエンキューするかBYOBリクエストを満たさない限り、繰り返し呼ばれることはありません。noopなpull()
実装は連続して呼ばれません。関数がPromiseを返した場合、そのPromiseがfulfilledになるまで次は呼ばれません(rejectされた場合はストリームがエラーになります)。これは主にプルソースの場合に使われ、Promiseのfulfilledが新しいチャンク取得処理の完了を示します。例外を投げることは、rejectされたPromiseを返すのと同じ扱いになります。
cancel(reason)
, 型 UnderlyingSourceCancelCallback-
コンシューマが
stream.cancel()
またはreader.cancel()
でストリームをキャンセルしたときに呼び出される関数です。引数にはコンシューマがそのメソッドに渡した値が渡されます。さらに、パイプ処理中にも特定条件下でキャンセルされる場合があります。詳細は
pipeTo()
メソッドの定義を参照してください。すべてのストリームで、基礎リソースへのアクセス解放に一般的に使われます(例:§ 10.1 基礎のpushソース(バックプレッシャーなし)のReadableストリーム参照)。
シャットダウン処理が非同期の場合は、Promiseで成功・失敗を通知できます。結果は呼び出された
cancel()
メソッドの戻り値で伝達されます。例外を投げることは、rejectされたPromiseを返すのと同じ扱いとなります。キャンセル処理が失敗しても、ストリームは必ず閉じられ、エラー状態にはなりません。これは、コンシューマがキャンセルによってストリームへの関心を失った時点で、キャンセル処理の失敗がコンシューマの視点では問題にならないためです。失敗は対応するメソッドの直近の呼び出し元にのみ伝達されます。
これは、
close
やabort
オプションを持つWritableStream
の基礎のシンクの挙動とは異なり、そちらは失敗時に対応するWritableStream
をエラー状態にします。これらはプロデューサが要求する特定操作に対応し、その操作が失敗した場合はより永続的な問題を示します。 type
(バイトストリームのみ)、 型 ReadableStreamType-
"
bytes
" を設定すると、作成されるReadableStream
がReadableバイトストリームであることを示します。これにより、生成されたReadableStream
がgetReader()
メソッドを介してBYOBリーダーを提供できるようになります。また、start()
やpull()
に渡されるcontroller引数にも影響します(下記参照)。バイトストリームのセットアップ例、コントローラインターフェースの違いを含む例については、§ 10.3 基礎のpushソース(バックプレッシャーなし)のReadableバイトストリームを参照してください。
"
bytes
" 以外またはundefined以外の値を設定するとReadableStream()
コンストラクタは例外を投げます。 autoAllocateChunkSize
(バイトストリームのみ)、 型 unsigned long long-
正の整数値を設定すると、実装が基礎ソースコード用のバッファを自動的に割り当てるようになります。この場合、コンシューマが
デフォルトリーダー
を使う際、指定サイズのArrayBuffer
が自動割当され、controller.byobRequest
が常に利用できる状態となります(BYOBリーダーを使う場合と同様)。これは、デフォルトリーダーを使うコンシューマ向けの処理を簡素化したい場合に利用されます。例として、§ 10.3 基礎のpushソース(バックプレッシャーなし)のReadableバイトストリーム(自動割当なし)と § 10.5 基礎のpullソースのReadableバイトストリーム(自動割当あり)を比較してください。
start()
や
pull()
に渡されるcontroller引数の型は、type
オプションの値によって異なります。type
がundefined(省略含む)の場合は
controllerはReadableStreamDefaultController
となり、
"bytes
"
の場合はcontrollerはReadableByteStreamController
となります。
4.2.4. コンストラクタ、メソッド、およびプロパティ
stream = new
ReadableStream
(underlyingSource[, strategy])-
指定された基礎のソースをラップする新しい
ReadableStream
を作成します。underlyingSource引数の詳細は§ 4.2.3 基礎のソースAPIを参照してください。strategy引数はストリームのキュー戦略を表します。詳細は§ 7.1 キュー戦略API参照。未指定の場合、デフォルト動作は
CountQueuingStrategy
でハイウォーターマークが1となります。 stream =
ReadableStream.from
(asyncIterable)-
指定されたiterableやasync iterableをラップする新しい
ReadableStream
を作成します。配列、async generator、Node.jsのReadableストリームなど、様々なオブジェクトをReadableストリームへ適応できます。
isLocked = stream.
locked
-
Readableストリームがリーダーにロックされているかどうかを返します。
await stream.
cancel
([ reason ])-
コンシューマによるストリームへの関心喪失を示してキャンセルします。与えられたreasonは基礎のソースの
cancel()
メソッドに渡されます。戻り値のPromiseは、ストリームのシャットダウンが正常ならfulfillし、基礎のソースがエラーを通知した場合はrejectされます。また、ストリームがロックされている場合は
TypeError
でrejectされ、キャンセル処理は実行されません。 reader = stream.
getReader
()-
ReadableStreamDefaultReader
を作成し、ストリームを新しいリーダーにロックします。ストリームがロックされている間は、このリーダーが解除されるまで、他のリーダーを取得できません。この機能は、ストリーム全体を消費したい抽象化を作成する際に特に有用です。リーダーを取得することで、他者があなたの読み取りに割り込んだり、ストリームをキャンセルすることを防ぎ、抽象化の処理が妨害されるのを防げます。
reader = stream.
getReader
({mode
: "byob
" })-
ReadableStreamBYOBReader
を作成し、ストリームを新しいリーダーにロックします。引数なしのバリアントと動作は同じですが、これはReadableバイトストリーム("bring your own buffer"読み取り対応ストリーム)でのみ動作します。返されるBYOBリーダーは、開発者が用意したバッファに
read()
メソッドで直接チャンクを読み取ることができ、バッファ割り当てを細かく制御できます。 readable = stream.
pipeThrough
({writable
,readable
}[, {preventClose
,preventAbort
,preventCancel
,signal
}])-
このReadableストリームをTransformストリーム(または任意の
{ writable, readable }
ペア)にパイプ処理する便利なチェイン可能メソッドです。ストリームをペアのWritable側にパイプし、Readable側を返します。ストリームをパイプ処理すると、その間ロックされ、他のコンシューマがリーダーを取得できなくなります。
await stream.
pipeTo
(destination[, {preventClose
,preventAbort
,preventCancel
,signal
}])-
このReadableストリームを指定されたWritableストリーム destinationにパイプ処理します。エラー条件下の動作はオプションでカスタマイズでき、正常終了時はPromiseがfulfillされ、エラー発生時はrejectされます。
ストリームをパイプ処理するとその間ロックされ、他のコンシューマがリーダーを取得できなくなります。
ソース・デスティネーション両ストリームのエラーやクローズ時の伝播は以下の通りです:
-
このソースReadableストリームにエラーが発生した場合、destinationは中断されます。ただし
preventAbort
がtruthyの場合は除きます。戻り値のPromiseはソースのエラー、またはデスティネーション中断時のエラーでrejectされます。 -
destinationにエラーが発生した場合、このソースReadableストリームはキャンセルされます。ただし
preventCancel
がtruthyの場合は除きます。Promiseはデスティネーションのエラー、またはソースキャンセル時のエラーでrejectされます。 -
このソースReadableストリームが閉じられると、destinationも閉じられます。ただし
preventClose
がtruthyの場合は除きます。クローズ処理が正常終了すればPromiseはfulfillされ、エラーがあればrejectされます。 -
destinationが最初から閉じている場合や閉じつつある場合、このソースReadableストリームはキャンセルされます。ただし
preventCancel
がtrueの場合は除きます。Promiseは閉じたストリームへのパイプ失敗エラー、またはソースキャンセル時のエラーでrejectされます。
signal
オプションにはAbortSignal
を設定でき、対応するAbortController
でパイプ操作を中断できます。この場合、ソースReadableストリームはキャンセルされ、destinationは中断されます。各オプションpreventCancel
やpreventAbort
が設定されている場合は除外されます。 -
[branch1, branch2] = stream.
tee
()-
このReadableストリームを分岐(ティー)し、2つの新しい
ReadableStream
インスタンスとして返します。ティー処理されたストリームはロックされ、他のコンシューマがリーダーを取得できなくなります。ストリームをキャンセルするには両方の分岐をキャンセルし、複合キャンセル理由が基礎のソースに伝達されます。
このストリームがReadableバイトストリームであれば、各分岐はそれぞれのチャンクのコピーを受け取ります。そうでなければ分岐ごとに同じオブジェクトのチャンクを受け取ります。チャンクが不変でない場合、分岐間で相互干渉が起こる可能性があります。
new ReadableStream(underlyingSource, strategy)
コンストラクタの手順は次の通り:
-
underlyingSourceが省略された場合、nullに設定する。
-
underlyingSourceDictを、underlyingSourceを
UnderlyingSource
型のIDL値に変換したものとする。引数underlyingSourceを
UnderlyingSource
型として宣言できない理由は、元のオブジェクト参照が失われるためです。元のオブジェクトを保持して、各メソッド呼び出し時にコールバック関数を呼び出す必要があります。 -
! InitializeReadableStream(this)を実行する。
-
underlyingSourceDict["
type
"] が"bytes
"の場合:-
strategy["
size
"] が存在する場合、RangeError
例外をスローする。 -
highWaterMarkを ? ExtractHighWaterMark(strategy, 0)とする。
-
? SetUpReadableByteStreamControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark)を実行する。
-
-
それ以外の場合、
-
sizeAlgorithmを ! ExtractSizeAlgorithm(strategy)とする。
-
highWaterMarkを ? ExtractHighWaterMark(strategy, 1)とする。
-
? SetUpReadableStreamDefaultControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark, sizeAlgorithm)を実行する。
from(asyncIterable)
メソッドの手順は:
-
? ReadableStreamFromIterable(asyncIterable)を返す。
locked
ゲッターの手順:
-
! IsReadableStreamLocked(this)を返す。
cancel(reason)
メソッドの手順:
-
! IsReadableStreamLocked(this)がtrueなら、TypeError例外でrejectされたPromiseを返す。
-
! ReadableStreamCancel(this, reason)を返す。
getReader(options)
メソッドの手順:
-
options["
mode
"] が存在しない場合、? AcquireReadableStreamDefaultReader(this)を返す。
function readAllChunks( readableStream) { const reader= readableStream. getReader(); const chunks= []; return pump(); function pump() { return reader. read(). then(({ value, done}) => { if ( done) { return chunks; } chunks. push( value); return pump(); }); } }
最初にリーダーを取得し、その後は専用で使い続けている点に注目してください。他のコンシューマがチャンクを読んだり、キャンセルしたりしてストリームに干渉できなくなります。
pipeThrough(transform, options)
メソッドの手順:
-
! IsReadableStreamLocked(this)がtrueなら、
TypeError
例外を投げる。 -
! IsWritableStreamLocked(transform["
writable
"]) がtrueなら、TypeError
例外を投げる。 -
promiseを ! ReadableStreamPipeTo(this, transform["
writable
"], options["preventClose
"], options["preventAbort
"], options["preventCancel
"], signal)とする。 -
promise.[[PromiseIsHandled]]をtrueに設定する。
-
transform["
readable
"]を返す。
pipeThrough(transform, options)
を使ったパイプチェーン構築の典型例:
httpResponseBody. pipeThrough( decompressorTransform) . pipeThrough( ignoreNonImageFilesTransform) . pipeTo( mediaGallery);
pipeTo(destination, options)
メソッドの手順:
-
! IsReadableStreamLocked(this)がtrueなら、TypeError例外でrejectされたPromiseを返す。
-
! IsWritableStreamLocked(destination) がtrueなら、TypeError例外でrejectされたPromiseを返す。
-
! ReadableStreamPipeTo(this, destination, options["
preventClose
"], options["preventAbort
"], options["preventCancel
"], signal)を返す。
AbortSignal
で停止できます:
const controller= new AbortController(); readable. pipeTo( writable, { signal: controller. signal}); // ... 時間が経過した後 ... controller. abort();
(上記ではpipeTo()
のPromiseのエラー処理は省略しています。また、preventAbort
やpreventCancel
オプションの挙動も考慮が必要です。)
WritableStream
に書き込みつつ、パイプするReadableStream
を切り替える用途でも使えます:
const controller= new AbortController(); const pipePromise= readable1. pipeTo( writable, { preventAbort: true , signal: controller. signal}); // ... 時間が経過した後 ... controller. abort(); // パイプ完了を待ってから新しいパイプ開始: try { await pipePromise; } catch ( e) { // "AbortError" DOMExceptionは想定通りなので無視し、その他の失敗は再スローする。 if ( e. name!== "AbortError" ) { throw e; } } // 新しいパイプ開始! readable2. pipeTo( writable);
tee()
メソッドの手順:
-
? ReadableStreamTee(this, false)を返す。
cacheEntry
と、リモートサーバへのアップロードを表すWritableストリームhttpRequestBody
があれば、同じReadableストリームを両方に同時にパイプできます:
const [ forLocal, forRemote] = readableStream. tee(); Promise. all([ forLocal. pipeTo( cacheEntry), forRemote. pipeTo( httpRequestBody) ]) . then(() => console. log( "ストリームをキャッシュに保存し、同時にアップロードしました!" )) . catch ( e=> console. error( "キャッシュまたはアップロードのどちらかが失敗: " , e));
4.2.5. 非同期イテレーション
for await (const chunk of stream) { ... }
for await (const chunk of stream.values({
preventCancel
: true })) { ... }-
ストリームの内部キュー内のチャンクを非同期でイテレートします。
ストリームを非同期イテレートすると、ロックされ、他のコンシューマがリーダーを取得できなくなります。async iteratorの
return()
メソッドが呼ばれると(例えばbreak
でループを抜けた場合など)、ロックが解除されます。デフォルトでは、async iteratorの
return()
メソッドを呼ぶとストリームもキャンセルされます。これを防ぐには、ストリームのvalues()
メソッドを使い、preventCancel
オプションに true を渡します。
ReadableStream
、
stream、iterator、args)は次の通り:
-
readerを ? AcquireReadableStreamDefaultReader(stream)とする。
-
iteratorのreaderをreaderに設定する。
-
preventCancelをargs[0]["
preventCancel
"]とする。 -
iteratorのprevent cancelをpreventCancelに設定する。
ReadableStream
、stream、iterator)の手順:
-
readerをiteratorのreaderとする。
-
Assert: reader.[[stream]]はundefinedではない。
-
promiseを新しいPromiseとする。
-
readRequestを次のread requestとして作成する:
- chunk steps(chunkを受け取る)
-
-
Resolve promise with chunk.
-
- close steps
-
-
! ReadableStreamDefaultReaderRelease(reader)を実行する。
-
- error steps(eを受け取る)
-
-
! ReadableStreamDefaultReaderRelease(reader)を実行する。
-
Reject promise with e。
-
-
! ReadableStreamDefaultReaderRead(this, readRequest)を実行する。
-
promiseを返す。
ReadableStream
、stream、iterator、arg)の手順:
-
readerをiteratorのreaderとする。
-
Assert: reader.[[stream]]はundefinedではない。
-
Assert: reader.[[readRequests]]は空である。async iteratorの仕組みで
next()
の前回呼び出しが確定してから呼ばれることが保証されているため。 -
iteratorのprevent cancelがfalseの場合:
-
resultを ! ReadableStreamReaderGenericCancel(reader, arg)とする。
-
! ReadableStreamDefaultReaderRelease(reader)を実行する。
-
resultを返す。
-
-
! ReadableStreamDefaultReaderRelease(reader)を実行する。
4.2.6. postMessage()
による転送
destination.postMessage(rs, { transfer: [rs] });
-
ReadableStream
を他のフレーム、ウィンドウ、またはワーカーに送信します。転送されたストリームは元のストリームと同様に利用できます。元のストリームはロックされ、直接利用できなくなります。
ReadableStream
オブジェクトは転送可能オブジェクトです。その転送手順(valueとdataHolder)は以下の通り:
-
! IsReadableStreamLocked(value) がtrueなら、"
DataCloneError
"のDOMException
をスローする。 -
port1を新しい
MessagePort
(現在のRealm)とする。 -
port2を新しい
MessagePort
(現在のRealm)とする。 -
writableを新しい
WritableStream
(現在のRealm)とする。 -
! SetUpCrossRealmTransformWritable(writable, port1)を実行する。
-
promiseを ! ReadableStreamPipeTo(value, writable, false, false, false)とする。
-
promise.[[PromiseIsHandled]]をtrueに設定する。
-
dataHolder.[[port]]に ! StructuredSerializeWithTransfer(port2, « port2 »)を設定する。
-
deserializedRecordを ! StructuredDeserializeWithTransfer(dataHolder.[[port]], 現在のRealm)とする。
-
portをdeserializedRecord.[[Deserialized]]とする。
-
! SetUpCrossRealmTransformReadable(value, port)を実行する。
4.3. ReadableStreamGenericReader
ミックスイン
ReadableStreamGenericReader
ミックスインは、ReadableStreamDefaultReader
およびReadableStreamBYOBReader
オブジェクト間で共通する内部スロット、ゲッター、メソッドを定義します。
4.3.1. ミックスイン定義
ReadableStreamGenericReader
ミックスインのWeb IDL定義は以下の通りです:
interface mixin {
ReadableStreamGenericReader readonly attribute Promise <undefined >closed ;Promise <undefined >cancel (optional any ); };
reason
4.3.2. 内部スロット
ReadableStreamGenericReader
ミックスインを含むクラスのインスタンスは、以下の表に示される内部スロットと共に生成されます:
内部スロット | 説明(参考情報) |
---|---|
[[closedPromise]] | リーダーのclosed
ゲッターが返すPromise
|
[[stream]] | ReadableStream
インスタンス(このリーダーの所有者)
|
4.3.3. メソッドおよびプロパティ
closed
ゲッター手順:
cancel(reason)
メソッド手順:
-
this.[[stream]]がundefinedなら、TypeError例外でrejectされたPromiseを返す。
-
! ReadableStreamReaderGenericCancel(this, reason)を返す。
4.4. ReadableStreamDefaultReader
クラス
ReadableStreamDefaultReader
クラスは、デフォルトリーダーを表し、ReadableStream
インスタンスから取得されることを想定しています。
4.4.1. インターフェイス定義
ReadableStreamDefaultReader
クラスのWeb IDL定義は以下の通りです:
[Exposed=*]interface {
ReadableStreamDefaultReader constructor (ReadableStream );
stream Promise <ReadableStreamReadResult >read ();undefined releaseLock (); };ReadableStreamDefaultReader includes ReadableStreamGenericReader ;dictionary {
ReadableStreamReadResult any ;
value boolean ; };
done
4.4.2. 内部スロット
ReadableStreamDefaultReader
のインスタンスはReadableStreamGenericReader
で定義された内部スロットと、以下の表で説明する内部スロットを持って生成されます。
内部スロット | 説明(参考情報) |
---|---|
[[readRequests]] | リスト(read requestのリスト)。コンシューマが チャンクの取得を、利用可能になる前に要求した時に利用される |
read requestは、構造体であり、 Readableストリームの内部キューが満たされたり状態が変化した時に実行される3つのアルゴリズムを含みます。以下の 項目を持ちます:
- chunk steps
-
チャンクを受け取るアルゴリズム。チャンクが読み取り可能になったときに呼ばれる。
- close steps
-
引数なしのアルゴリズム。ストリームが閉じていてチャンクが取得できない時に呼ばれる。
- error steps
-
JavaScript値を受け取るアルゴリズム。ストリームがエラー状態でチャンクが取得できない時に呼ばれる。
4.4.3. コンストラクタ、メソッド、およびプロパティ
reader = new
ReadableStreamDefaultReader
(stream)-
これは
stream.
を呼び出すのと同等です。getReader()
await reader.
closed
-
ストリームが閉じられたときにfulfillされるPromiseを返します。ストリームがエラーになったり、リーダーのロックがストリームの完了前に解除された場合はrejectされます。
await reader.
cancel
([ reason ]){ value, done } = await reader.
read
()-
ストリームの内部キューから次のチャンクにアクセスできるPromiseを返します(利用可能な場合)。
- チャンクが利用可能になった場合、Promiseは
の形のオブジェクトでfulfillされます。{ value: theChunk, done: false } - ストリームが閉じられると、Promiseは
の形のオブジェクトでfulfillされます。{ value: undefined , done: true } - ストリームがエラーになると、Promiseはそのエラーでrejectされます。
チャンクを読み取ってキューが空になると、基礎のソースから追加データがプルされます。
- チャンクが利用可能になった場合、Promiseは
reader.
releaseLock
()-
対応するストリームへのリーダーのロックを解除します。ロック解除後、リーダーはアクティブではなくなります。関連するストリームがエラー状態のまま解除された場合、以降リーダーも同様にエラー状態となり、それ以外の場合はクローズ状態となります。
ロック解除時に未処理のreadリクエストが存在する場合、
read()
メソッドが返すPromiseは即座にTypeError
でrejectされます。未読チャンクはストリームの内部キューに残り、新しいリーダーを取得すれば後で読むことができます。
new ReadableStreamDefaultReader(stream)
コンストラクタ手順:
-
? SetUpReadableStreamDefaultReader(this, stream)を実行する。
read()
メソッド手順:
-
this.[[stream]]がundefinedなら、TypeError例外でrejectされたPromiseを返す。
-
promiseを新しいPromiseとする。
-
readRequestを次のread requestとして作成する(以下の項目を持つ):
- chunk steps(chunkを受け取る)
- close steps
- error steps(eを受け取る)
-
-
Reject promise with e。
-
-
! ReadableStreamDefaultReaderRead(this, readRequest)を実行する。
-
promiseを返す。
releaseLock()
メソッド手順:
-
this.[[stream]]がundefinedなら、return。
-
! ReadableStreamDefaultReaderRelease(this)を実行する。
4.5. ReadableStreamBYOBReader
クラス
ReadableStreamBYOBReader
クラスは、BYOBリーダーを表し、
ReadableStream
インスタンスから取得されることを想定しています。
4.5.1. インターフェイス定義
ReadableStreamBYOBReader
クラスのWeb IDL定義は以下の通りです:
[Exposed=*]interface {
ReadableStreamBYOBReader constructor (ReadableStream );
stream Promise <ReadableStreamReadResult >read (ArrayBufferView ,
view optional ReadableStreamBYOBReaderReadOptions = {});
options undefined releaseLock (); };ReadableStreamBYOBReader includes ReadableStreamGenericReader ;dictionary { [
ReadableStreamBYOBReaderReadOptions EnforceRange ]unsigned long long = 1; };
min
4.5.2. 内部スロット
ReadableStreamBYOBReader
のインスタンスはReadableStreamGenericReader
で定義された内部スロットと、以下の表で説明する内部スロットを持って生成されます。
内部スロット | 説明(参考情報) |
---|---|
[[readIntoRequests]] | リスト(read-into requestのリスト)。コンシューマが チャンクの取得を、利用可能になる前に要求した時に利用される |
read-into requestは、構造体であり、 Readableバイトストリームの内部キューが満たされたり状態が変化した時に実行される3つのアルゴリズムを含みます。以下の 項目を持ちます:
- chunk steps
-
チャンクを受け取るアルゴリズム。チャンクが読み取り可能になったときに呼ばれる。
- close steps
-
チャンクまたはundefinedを受け取るアルゴリズム。ストリームが閉じていてチャンクが取得できない時に呼ばれる。
- error steps
-
JavaScript値を受け取るアルゴリズム。ストリームがエラー状態でチャンクが取得できない時に呼ばれる。
close
stepsはチャンクを受け取ることで、可能ならバッファのメモリを呼び出し元に返すことができます。例えば、
byobReader.read(chunk)
は、ストリームが閉じている場合
でfulfillされます。ストリームが
キャンセルされた場合、バッファのメモリは破棄され、
byobReader.read(chunk)
は従来通り
でfulfillされます。
4.5.3. コンストラクタ、メソッド、およびプロパティ
reader = new
ReadableStreamBYOBReader
(stream)await reader.
closed
-
ストリームが閉じられたときにfulfillされるPromiseを返します。ストリームがエラーになったり、リーダーのロックがストリームの完了前に解除された場合はrejectされます。
await reader.
cancel
([ reason ]){ value, done } = await reader.
read
(view[, {min
}])-
viewにバイトを読み込もうとし、その結果でresolveされるPromiseを返します:
- チャンクが利用可能になった場合、Promiseは
の形のオブジェクトでfulfillされます。このとき、viewはデタッチされ利用不可となり、{ value: newView, done: false } newView
は同じメモリ領域を指す新しいビュー(型も同じ)で、チャンクのデータが書き込まれます。 - ストリームが閉じられると、Promiseは
の形のオブジェクトでfulfillされます。このとき、viewはデタッチされ利用不可となり、{ value: newView, done: true } newView
は同じメモリ領域を指す新しいビュー(型も同じ)で、データは変更されません(メモリを呼び出し元に返すため)。 - リーダーがキャンセルされた場合、Promiseは
の形のオブジェクトでfulfillされます。この場合、viewのメモリ領域は破棄され、呼び出し元に返されません。{ value: undefined , done: true } - ストリームがエラーになると、Promiseはそのエラーでrejectされます。
チャンクを読み取ってキューが空になると、基礎のソースから追加データがプルされます。
min
が指定された場合、Promiseは指定された最小要素数が利用可能になった時点でfulfillされます。"要素数"はnewView
のlength
(Typed Arrayの場合)、またはnewView
のbyteLength
(DataView
の場合)で表されます。ストリームが閉じた場合は、Promiseは残った要素でfulfillされ、要求数より少ない場合もあります。minが指定されていなければ、少なくとも1要素利用可能になればresolveされます。 - チャンクが利用可能になった場合、Promiseは
reader.
releaseLock
()-
対応するストリームへのリーダーのロックを解除します。ロック解除後、リーダーはアクティブではなくなります。関連するストリームがエラー状態のまま解除された場合、以降リーダーも同様にエラー状態となり、それ以外の場合はクローズ状態となります。
ロック解除時に未処理のreadリクエストが存在する場合、
read()
メソッドが返すPromiseは即座にTypeError
でrejectされます。未読チャンクはストリームの内部キューに残り、新しいリーダーを取得すれば後で読むことができます。
new ReadableStreamBYOBReader(stream)
コンストラクタ手順:
-
? SetUpReadableStreamBYOBReader(this, stream)を実行する。
read(view, options)
メソッド手順:
-
view.[[ByteLength]]が0の場合、TypeError例外でrejectされたPromiseを返す。
-
view.[[ViewedArrayBuffer]].[[ByteLength]]が0の場合、TypeError例外でrejectされたPromiseを返す。
-
! IsDetachedBuffer(view.[[ViewedArrayBuffer]])がtrueの場合、 TypeError例外でrejectされたPromiseを返す。
-
options["
min
"]が0の場合、 TypeError例外でrejectされたPromiseを返す。 -
viewが[[TypedArrayName]]内部スロットを持つ場合、
-
options["
min
"] > view.[[ArrayLength]]の場合、 RangeError例外でrejectされたPromiseを返す。
-
-
それ以外の場合(
DataView
の場合)、-
options["
min
"] > view.[[ByteLength]]の場合、 RangeError例外でrejectされたPromiseを返す。
-
-
this.[[stream]]がundefinedなら、TypeError例外でrejectされたPromiseを返す。
-
promiseを新しいPromiseとする。
-
readIntoRequestを次のread-into requestとして作成する(以下の項目を持つ):
- chunk steps(chunkを受け取る)
- close steps(chunkを受け取る)
- error steps(eを受け取る)
-
-
Reject promise with e。
-
-
! ReadableStreamBYOBReaderRead(this, view, options["
min
"], readIntoRequest)を実行する。 -
promiseを返す。
releaseLock()
メソッド手順:
-
this.[[stream]]がundefinedなら、return。
-
! ReadableStreamBYOBReaderRelease(this)を実行する。
4.6.
ReadableStreamDefaultController
クラス
ReadableStreamDefaultController
クラスは、ReadableStream
の状態や内部キューを制御するメソッドを持っています。
ReadableStream
をReadableバイトストリームではない形で構築する場合、
underlying
sourceには対応するReadableStreamDefaultController
インスタンスが与えられ、操作が可能になります。
4.6.1. インターフェイス定義
ReadableStreamDefaultController
クラスのWeb IDL定義は以下の通りです:
[Exposed=*]interface {
ReadableStreamDefaultController readonly attribute unrestricted double ?desiredSize ;undefined close ();undefined enqueue (optional any );
chunk undefined error (optional any ); };
e
4.6.2. 内部スロット
ReadableStreamDefaultController
のインスタンスは、以下の表で説明される内部スロットを持って生成されます。
内部スロット | 説明(参考情報) |
---|---|
[[cancelAlgorithm]] | Promiseを返すアルゴリズム(1つの引数:キャンセル理由を受け取る)で、underlying sourceにキャンセル要求を伝達する |
[[closeRequested]] | ストリームがunderlying sourceによって閉じられたかどうかを示すブール値。ただし内部キューに未読のチャンクがまだ残っている場合もある |
[[pullAgain]] | ストリームの仕組みがunderlying sourceのpullアルゴリズム呼び出しを要求したが、前回呼び出しがまだ処理中だった場合にtrueとなるフラグ |
[[pullAlgorithm]] | underlying sourceからデータをプルするPromiseを返すアルゴリズム |
[[pulling]] | underlying sourceのpullアルゴリズムが実行中で、戻り値のPromiseがまだfulfillされていない間trueとなるフラグ。再入呼び出し防止用 |
[[queue]] | リスト。ストリームの内部キューで、チャンクを保持する |
[[queueTotalSize]] | [[queue]]に格納されている全チャンクの合計サイズ(§ 8.1 Queue-with-sizes参照) |
[[started]] | underlying sourceの初期化が完了したかどうかを示すブール値 |
[[strategyHWM]] | ストリームのキュー戦略の一部としてコンストラクタに渡される数値。ストリームがバックプレッシャーをunderlying sourceに適用する閾値 |
[[strategySizeAlgorithm]] | ストリームのキュー戦略の一部として、追加されたチャンクのサイズを計算するアルゴリズム |
[[stream]] | 制御対象のReadableStream
インスタンス
|
4.6.3. メソッドおよびプロパティ
desiredSize = controller.
desiredSize
-
制御対象ストリームの内部キューを満たすためのdesired sizeを返します。キューが過剰に満たされている場合は負の値になることもあります。underlying sourceはこの値を使ってバックプレッシャーのタイミングや方法を決定できます。
controller.
close
()-
制御対象のReadableStreamを閉じます。コンシューマは、キューにすでに追加されているチャンクは引き続き読み取れますが、それらを全て読み取るとストリームは閉じられます。
controller.
enqueue
(chunk)-
指定したチャンク chunkを制御対象のReadableStreamに追加します。
controller.
error
(e)-
制御対象のReadableStreamをエラー状態にし、今後の操作は全て指定したエラーeで失敗するようにします。
desiredSize
ゲッター手順:
close()
メソッド手順:
-
! ReadableStreamDefaultControllerCanCloseOrEnqueue(this)がfalseなら、
TypeError
例外をスローする。 -
! ReadableStreamDefaultControllerClose(this)を実行する。
enqueue(chunk)
メソッド手順:
-
! ReadableStreamDefaultControllerCanCloseOrEnqueue(this)がfalseなら、
TypeError
例外をスローする。 -
? ReadableStreamDefaultControllerEnqueue(this, chunk)を実行する。
error(e)
メソッド手順:
-
! ReadableStreamDefaultControllerError(this, e)を実行する。
4.6.4. 内部メソッド
以下は各ReadableStreamDefaultController
インスタンスが実装する内部メソッドです。
ReadableStreamの実装は、現行標準の§ 4.9.2
コントローラーとのインターフェースで説明されている通り、これらもしくはBYOBコントローラー用の対応メソッドのいずれかをポリモーフィックに呼び出します。
-
! ResetQueue(this)を実行する。
-
resultを、this.[[cancelAlgorithm]]にreasonを渡して実行した結果とする。
-
! ReadableStreamDefaultControllerClearAlgorithms(this)を実行する。
-
resultを返す。
-
streamをthis.[[stream]]とする。
-
-
chunkを! DequeueValue(this)とする。
-
this.[[closeRequested]] がtrueかつ this.[[queue]]が空の場合、
-
! ReadableStreamDefaultControllerClearAlgorithms(this)を実行する。
-
! ReadableStreamClose(stream)を実行する。
-
-
それ以外の場合、! ReadableStreamDefaultControllerCallPullIfNeeded(this)を実行する。
-
readRequestのchunk stepsをchunkを渡して実行する。
-
-
それ以外の場合、
-
! ReadableStreamAddReadRequest(stream, readRequest)を実行する。
-
! ReadableStreamDefaultControllerCallPullIfNeeded(this)を実行する。
-
-
return。
4.7. ReadableByteStreamController
クラス
ReadableByteStreamController
クラスは、ReadableStream
の状態や内部キューを制御するメソッドを持っています。
ReadableStream
をReadableバイトストリームとして構築する場合、
underlying
sourceには対応するReadableByteStreamController
インスタンスが与えられ、操作が可能になります。
4.7.1. インターフェイス定義
ReadableByteStreamController
クラスのWeb IDL定義は以下の通りです:
[Exposed=*]interface {
ReadableByteStreamController readonly attribute ReadableStreamBYOBRequest ?byobRequest ;readonly attribute unrestricted double ?desiredSize ;undefined close ();undefined enqueue (ArrayBufferView );
chunk undefined error (optional any ); };
e
4.7.2. 内部スロット
ReadableByteStreamController
のインスタンスは、以下の表で説明される内部スロットを持って生成されます。
内部スロット | 説明(参考情報) |
---|---|
[[autoAllocateChunkSize]] | 自動バッファ割り当て機能が有効な場合の正の整数値。この値がバッファ割り当てサイズを指定する。それ以外の場合はundefined。 |
[[byobRequest]] | 現在のBYOBプルリクエストを表すReadableStreamBYOBRequest
インスタンス。保留中リクエストがない場合はnull。
|
[[cancelAlgorithm]] | Promiseを返すアルゴリズム(1つの引数:キャンセル理由を受け取る)。underlying byte sourceにキャンセル要求を伝達する。 |
[[closeRequested]] | ストリームがunderlying byte sourceによって閉じられたかどうかを示すブール値。ただし内部キューに未読のチャンクがまだ残っている場合もある。 |
[[pullAgain]] | ストリームの仕組みがunderlying byte sourceのpullアルゴリズム呼び出しを要求したが、前回呼び出しがまだ処理中だった場合にtrueとなるフラグ。 |
[[pullAlgorithm]] | underlying byte sourceからデータをプルするPromiseを返すアルゴリズム。 |
[[pulling]] | underlying byte sourceのpullアルゴリズムが実行中で、戻り値のPromiseがまだfulfillされていない間trueとなるフラグ。再入呼び出し防止用。 |
[[pendingPullIntos]] | リスト。保留中のBYOBプルリクエストを表すpull-into descriptorのリスト。 |
[[queue]] | リスト。ストリームの内部キューで、readable byte stream queue entryを保持し、チャンクを表す。 |
[[queueTotalSize]] | [[queue]]に格納されている全チャンクの合計サイズ(バイト単位、§ 8.1 Queue-with-sizes参照) |
[[started]] | underlying byte sourceの初期化が完了したかどうかを示すブール値。 |
[[strategyHWM]] | ストリームのキュー戦略の一部としてコンストラクタに渡される数値。ストリームがバックプレッシャーをunderlying byte sourceに適用する閾値。 |
[[stream]] | 制御対象のReadableStream
インスタンス。
|
ReadableByteStreamController
インスタンスには
[[queue]]と[[queueTotalSize]]
スロットがありますが、他の仕様のキュー操作とは異なる方法でキュー操作を行うため、§ 8.1
Queue-with-sizesのほとんどの抽象操作は使われません。代わりに、2つのスロットは手動で一緒に更新されます。
この点は将来の仕様リファクタリングで整理される可能性があります。
readable byte stream queue entryは、構造体であり、チャンクに関する重要な要素をReadableバイトストリーム向けにカプセル化します。以下の項目を持ちます:
- buffer
-
ArrayBuffer
。もともと転送された underlying byte sourceから渡されたもの。 - byte offset
-
もともとunderlying byte sourceから渡されたviewに由来するバイトオフセット(非負整数)。
- byte length
-
もともとunderlying byte sourceから渡されたviewに由来するバイト長(非負整数)。
pull-into descriptorは、構造体であり、保留中のBYOBプルリクエストを表すために使われます。以下の項目を持ちます:
- buffer
- buffer byte length
-
bufferの初期バイト長(正の整数)。
- byte offset
-
bufferのバイトオフセット(非負整数)。underlying byte sourceが書き込みを開始する位置。
- byte length
-
bufferに書き込み可能なバイト数(正の整数)。
- bytes filled
-
これまでにbufferに書き込まれたバイト数(非負整数)。
- minimum fill
-
bufferに書き込みが完了するまでに最低限必要なバイト数(正の整数)。関連する
read()
リクエストが fulfillされる前に必要となる。デフォルトではelement sizeと同じ値。 - element size
-
bufferに一度に書き込めるバイト数(正の整数)。view constructorで記述される型のビューを使う。
- view constructor
-
TypedArrayコンストラクタまたは
%DataView%
。bufferに書き込みを行う際のビュー生成に使用される。 - reader type
-
"
default
"または"byob
"のいずれか。どのReadableStream readerがこのリクエストを開始したかを示す。開始したreaderがreleaseされた場合は"none
"となる。
4.7.3. メソッドおよびプロパティ
byobRequest = controller.
byobRequest
-
現在のBYOBプルリクエストを返します。存在しなければnullを返します。
desiredSize = controller.
desiredSize
-
制御対象ストリームの内部キューを満たすためのdesired sizeを返します。キューが過剰に満たされている場合は負の値になることもあります。underlying byte sourceはこの値を使ってバックプレッシャーのタイミングや方法を決定できます。
controller.
close
()-
制御対象のReadableStreamを閉じます。コンシューマは、キューにすでに追加されているチャンクは引き続き読み取れますが、それらを全て読み取るとストリームは閉じられます。
controller.
enqueue
(chunk)-
指定したチャンク chunkを制御対象のReadableStreamに追加します。チャンクは
ArrayBufferView
インスタンスでなければならず、そうでなければTypeError
がスローされます。 controller.
error
(e)-
制御対象のReadableStreamをエラー状態にし、今後の操作は全て指定したエラーeで失敗するようにします。
byobRequest
ゲッター手順:
desiredSize
ゲッター手順:
close()
メソッド手順:
-
this.[[closeRequested]]がtrueなら、
TypeError
例外をスローする。 -
this.[[stream]].[[state]]が"
readable
"でない場合、TypeError
例外をスローする。 -
? ReadableByteStreamControllerClose(this)を実行する。
enqueue(chunk)
メソッド手順:
-
chunk.[[ByteLength]]が0の場合、
TypeError
例外をスローする。 -
chunk.[[ViewedArrayBuffer]].[[ByteLength]]が0の場合、
TypeError
例外をスローする。 -
this.[[closeRequested]]がtrueなら、
TypeError
例外をスローする。 -
this.[[stream]].[[state]]が"
readable
"でない場合、TypeError
例外をスローする。 -
Return ? ReadableByteStreamControllerEnqueue(this, chunk)。
error(e)
メソッド手順:
-
! ReadableByteStreamControllerError(this, e)を実行する。
4.7.4. 内部メソッド
以下は各ReadableByteStreamController
インスタンスが実装する内部メソッドです。
ReadableStreamの実装は、現行標準の§ 4.9.2
コントローラーとのインターフェースで説明されている通り、これらもしくはデフォルトコントローラー用の対応メソッドのいずれかをポリモーフィックに呼び出します。
-
! ReadableByteStreamControllerClearPendingPullIntos(this)を実行する。
-
! ResetQueue(this)を実行する。
-
resultを、this.[[cancelAlgorithm]]にreasonを渡して実行した結果とする。
-
resultを返す。
-
streamをthis.[[stream]]とする。
-
Assert: ! ReadableStreamHasDefaultReader(stream) がtrueであることを確認する。
-
this.[[queueTotalSize]] > 0の場合、
-
Assert: ! ReadableStreamGetNumReadRequests(stream) が0であることを確認する。
-
! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest)を実行する。
-
return。
-
-
autoAllocateChunkSizeを this.[[autoAllocateChunkSize]]とする。
-
autoAllocateChunkSizeがundefinedでない場合、
-
bufferをConstruct(
%ArrayBuffer%
, « autoAllocateChunkSize »)で生成する。 -
bufferがabrupt completionの場合、
-
readRequestのerror stepsをbuffer.[[Value]]で実行する。
-
return。
-
-
pullIntoDescriptorを新しいpull-into descriptorとして以下の値で生成する:
- buffer
- buffer.[[Value]]
- buffer byte length
- autoAllocateChunkSize
- byte offset
- 0
- byte length
- autoAllocateChunkSize
- bytes filled
- 0
- minimum fill
- 1
- element size
- 1
- view constructor
%Uint8Array%
- reader type
- "
default
"
-
Append pullIntoDescriptorを this.[[pendingPullIntos]]に追加する。
-
-
! ReadableStreamAddReadRequest(stream, readRequest)を実行する。
-
this.[[pendingPullIntos]]が空でない場合、
-
firstPendingPullIntoをthis.[[pendingPullIntos]][0]とする。
-
firstPendingPullIntoのreader typeを"
none
"に設定する。 -
this.[[pendingPullIntos]]をリスト « firstPendingPullInto »に設定する。
-
4.8. ReadableStreamBYOBRequest
クラス
ReadableStreamBYOBRequest
クラスは、
ReadableByteStreamController
内のpull-intoリクエストを表します。
4.8.1. インターフェイス定義
ReadableStreamBYOBRequest
クラスのWeb IDL定義は以下の通りです:
[Exposed=*]interface {
ReadableStreamBYOBRequest readonly attribute ArrayBufferView ?view ;undefined respond ([EnforceRange ]unsigned long long );
bytesWritten undefined respondWithNewView (ArrayBufferView ); };
view
4.8.2. 内部スロット
ReadableStreamBYOBRequest
のインスタンスは、以下の表で説明される内部スロットを持って生成されます。
内部スロット | 説明(参考情報) |
---|---|
[[controller]] | 親のReadableByteStreamController
インスタンス
|
[[view]] | TypedArray。コントローラーが生成したデータを書き込む先領域を表し、BYOBリクエストが無効化された後はnullになる。 |
4.8.3. メソッドおよびプロパティ
view = byobRequest.
view
-
書き込み先のviewを返します。BYOBリクエストにすでに応答済みの場合はnullが返されます。
byobRequest.
respond
(bytesWritten)-
関連するReadableバイトストリームに、bytesWrittenバイトが
view
に書き込まれたことを通知し、 結果がコンシューマに渡されます。 byobRequest.
respondWithNewView
(view)-
関連するReadableバイトストリームに対し、
view
への書き込みの代わりに、 underlying byte sourceから新しいArrayBufferView
が提供されたことを通知します。 この新しいビューはコンシューマに渡されます。新しいviewは、
view
と同じメモリ領域(バッファ)へのビューでなければなりません。 つまり、そのbufferはview
のbufferと等しいか、 転送されたものでなければなりません。 また、byteOffsetはview
のbyteOffsetと一致し、 byteLength(書き込まれたバイト数)はview
のbyteLength以下でなければなりません。このメソッド呼び出し後、新しいviewは転送され、以後変更できなくなります。
respond(bytesWritten)
メソッド手順:
-
this.[[controller]]がundefinedの場合、
TypeError
例外をスローする。 -
! IsDetachedBuffer(this.[[view]].[[ArrayBuffer]])がtrueの場合、
TypeError
例外をスローする。 -
Assert: this.[[view]].[[ViewedArrayBuffer]].[[ByteLength]] > 0。
-
? ReadableByteStreamControllerRespond(this.[[controller]], bytesWritten)を実行する。
respondWithNewView(view)
メソッド手順:
-
this.[[controller]]がundefinedの場合、
TypeError
例外をスローする。 -
! IsDetachedBuffer(view.[[ViewedArrayBuffer]])がtrueの場合、
TypeError
例外をスローする。 -
? ReadableByteStreamControllerRespondWithNewView(this.[[controller]], view)を実行する。
4.9. 抽象操作
4.9.1. ReadableStreamの操作
以下の抽象操作はReadableStream
インスタンスに対して高レベルで動作します。
-
readerを新規の
ReadableStreamBYOBReader
とする。 -
? SetUpReadableStreamBYOBReader(reader, stream)を実行する。
-
readerを返す。
-
readerを新規の
ReadableStreamDefaultReader
とする。 -
? SetUpReadableStreamDefaultReader(reader, stream)を実行する。
-
readerを返す。
-
highWaterMarkが渡されていなければ、1を設定する。
-
sizeAlgorithmが渡されていなければ、1を返すアルゴリズムを設定する。
-
Assert: ! IsNonNegativeNumber(highWaterMark) はtrue。
-
streamを新規の
ReadableStream
とする。 -
! InitializeReadableStream(stream)を実行する。
-
controllerを新規の
ReadableStreamDefaultController
とする。 -
? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm)を実行する。
-
streamを返す。
この抽象操作は、与えられたstartAlgorithmが例外をスローした場合のみ例外をスローします。
-
streamを新規の
ReadableStream
とする。 -
! InitializeReadableStream(stream)を実行する。
-
controllerを新規の
ReadableByteStreamController
とする。 -
? SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined)を実行する。
-
streamを返す。
この抽象操作は、与えられたstartAlgorithmが例外をスローした場合のみ例外をスローします。
-
stream.[[state]] に"
readable
"を設定する。 -
stream.[[reader]]およびstream.[[storedError]]にundefinedを設定する。
-
stream.[[disturbed]]にfalseを設定する。
-
stream.[[reader]]がundefinedなら、falseを返す。
-
trueを返す。
-
streamをundefinedとする。
-
iteratorRecordを? GetIterator(asyncIterable, async)で取得する。
-
startAlgorithmをundefinedを返すアルゴリズムとする。
-
pullAlgorithmを以下の手順とする:
-
nextResultをIteratorNext(iteratorRecord)で取得する。
-
nextResultがabrupt completionなら、a promise rejected with nextResult.[[Value]]を返す。
-
nextPromiseをa promise resolved with nextResult.[[Value]]で取得する。
-
nextPromiseのfulfillment時の手順(iterResult)として、以下を実行する:
-
doneを? IteratorComplete(iterResult)で取得する。
-
doneがtrueなら:
-
! ReadableStreamDefaultControllerClose(stream.[[controller]])を実行する。
-
-
それ以外なら:
-
valueを? IteratorValue(iterResult)で取得する。
-
! ReadableStreamDefaultControllerEnqueue(stream.[[controller]], value)を実行する。
-
-
-
cancelAlgorithmを以下の手順(reasonを受け取る)とする:
-
iteratorをiteratorRecord.[[Iterator]]とする。
-
returnMethodをGetMethod(iterator, "
return
")で取得する。 -
returnMethodがabrupt completionなら、a promise rejected with returnMethod.[[Value]]を返す。
-
returnMethod.[[Value]]がundefinedなら、a promise resolved with undefinedを返す。
-
returnResultをCall(returnMethod.[[Value]], iterator, « reason »)で取得する。
-
returnResultがabrupt completionなら、a promise rejected with returnResult.[[Value]]を返す。
-
returnPromiseをa promise resolved with returnResult.[[Value]]で取得する。
-
returnPromiseのfulfillment時の手順(iterResult)として、以下を実行する:
-
-
streamを! CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, 0)で取得する。
-
streamを返す。
-
Assert: sourceが
ReadableStream
をimplementsしていること。 -
Assert: destが
WritableStream
をimplementsしていること。 -
Assert: preventClose, preventAbort, preventCancelはすべてbooleanであること。
-
signalが与えられていなければ、signalをundefinedとする。
-
Assert: signalがundefinedまたはsignalが
AbortSignal
をimplementsしていること。 -
Assert: ! IsReadableStreamLocked(source)はfalseであること。
-
Assert: ! IsWritableStreamLocked(dest)はfalseであること。
-
source.[[controller]]が
ReadableByteStreamController
をimplementsしている場合、readerを ! AcquireReadableStreamBYOBReader(source) または ! AcquireReadableStreamDefaultReader(source)とし、どちらを使うかはUAの裁量とする。 -
それ以外の場合は、readerを! AcquireReadableStreamDefaultReader(source)とする。
-
writerを! AcquireWritableStreamDefaultWriter(dest)で取得する。
-
source.[[disturbed]]にtrueを設定する。
-
shuttingDownをfalseとする。
-
promiseを新しいPromiseとする。
-
signalがundefinedでない場合、
-
abortAlgorithmを以下の手順とする:
-
errorをsignalのabort reasonとする。
-
actionsを空の順序付き集合とする。
-
preventAbortがfalseなら、以下のactionをactionsにappendする:
-
dest.[[state]]が"
writable
"なら、! WritableStreamAbort(dest, error)を返す。 -
それ以外の場合、a promise resolved with undefinedを返す。
-
-
preventCancelがfalseなら、以下のactionをactionsにappendする:
-
source.[[state]]が"
readable
"なら、! ReadableStreamCancel(source, error)を返す。 -
それ以外の場合、a promise resolved with undefinedを返す。
-
-
Shutdown with an actionとして、 actions内の全てのactionの全て完了を待つpromiseを使い、errorとともにshutdownする。
-
-
signalがabortedなら、abortAlgorithmを実行し、promiseを返す。
-
Add abortAlgorithmをsignalに追加する。
-
-
並行処理で ただし実際は違う。#905参照、readerとwriterを使い、sourceから全てのチャンクを読み取りdestに書き込む。readerとwriterのロックによって、この処理の厳密な方法は著者コードから観測できないため、実装方法に柔軟性がある。ただし、以下の制約はどの実装でも適用される:
-
Public APIは使ってはならない: 読み書きや下記操作の最中は、JSから修正可能なreader/writer/streamのAPI(prototype上のメソッド等)は使わず、ストリームを直接操作すること。
-
バックプレッシャーを厳守:
-
! WritableStreamDefaultWriterGetDesiredSize(writer) が0以下またはnullの間は、readerから読み取ってはならない。
-
readerがBYOBリーダーなら、WritableStreamDefaultWriterGetDesiredSize(writer) を基準にチャンクのサイズを決めること。
小さすぎたり大きすぎたりするチャンクを読むのは効率が悪いことが多い。他の情報も考慮して最適なサイズを決定してもよい。
-
バックプレッシャー以外の理由で読み書きを遅延させてはならない。
各writeの完了を待ってから次のread/writeを行う実装は、この推奨に違反する。この場合、destの内部キューが無意味になり、destには常に1つ以下のチャンクしか溜まらない。
-
-
シャットダウン時は活動を停止: shuttingDownがtrueになったら、readerから追加で読み取ってはならず、既に読んだチャンクの書き込みのみ行う。下記条件によって即座にシャットダウンする場合もあるので、各読み書き前にチェックすること。
-
エラー・クローズ状態の伝播: 下記の条件を順序通り適用する:
-
エラーの前方伝播: source.[[state]] が"
errored
"またはそうなった場合:-
preventAbortがfalseなら、! WritableStreamAbort(dest, source.[[storedError]])をactionとしてshutdown with an action、source.[[storedError]]とともにshutdownする。
-
それ以外の場合、shutdown with source.[[storedError]]。
-
-
エラーの後方伝播: dest.[[state]] が"
errored
"またはそうなった場合:-
preventCancelがfalseなら、! ReadableStreamCancel(source, dest.[[storedError]])をactionとしてshutdown with an action、dest.[[storedError]]とともにshutdownする。
-
それ以外の場合、shutdown with dest.[[storedError]]。
-
-
クローズの前方伝播: source.[[state]] が"
closed
"またはそうなった場合:-
preventCloseがfalseなら、! WritableStreamDefaultWriterCloseWithErrorPropagation(writer)をactionとしてshutdown with an action。
-
それ以外の場合、shutdown。
-
-
クローズの後方伝播: ! WritableStreamCloseQueuedOrInFlight(dest) がtrue、またはdest.[[state]] が"
closed
"なら:-
Assert: まだチャンクは読まれておらず書き込まれてもいない。
-
destClosedを新しい
TypeError
とする。 -
preventCancelがfalseなら、! ReadableStreamCancel(source, destClosed)をactionとしてshutdown with an action、destClosedとともにshutdownする。
-
それ以外の場合、shutdown with destClosed。
-
-
-
Shutdown with an action:上記要件でaction actionと、場合によってはoriginalErrorでshutdownする場合:
-
shuttingDownがtrueなら、これ以降の手順を中止。
-
shuttingDownをtrueに設定。
-
dest.[[state]]が"
writable
"かつ ! WritableStreamCloseQueuedOrInFlight(dest) がfalseの場合、 -
pをactionの実行結果とする。
-
pのfulfillment時に、finalizeをoriginalError付きで実行。
-
pのrejection時にはnewErrorでfinalizeを実行。
-
-
Shutdown:上記要件または手順で、error errorとともにshutdownする場合:
-
shuttingDownがtrueなら、これ以降の手順を中止。
-
shuttingDownをtrueに設定。
-
dest.[[state]]が"
writable
"かつ ! WritableStreamCloseQueuedOrInFlight(dest) がfalseの場合、 -
finalizeをerror付きで実行。
-
-
Finalize:両方のshutdownは最終的にfinalizeに至る。error error付きの場合は以下の手順:
-
! WritableStreamDefaultWriterRelease(writer)を実行。
-
readerが
ReadableStreamBYOBReader
をimplementsしている場合は ! ReadableStreamBYOBReaderRelease(reader)を実行。 -
それ以外の場合は! ReadableStreamDefaultReaderRelease(reader)を実行。
-
signalがundefinedでなければ、remove abortAlgorithmをsignalから外す。
-
errorが与えられていれば、reject promise with error。
-
それ以外の場合は、resolve promise with undefined。
-
-
-
promiseを返す。
ここで実行される各種抽象操作には(多くがpromise等の)オブジェクト生成が含まれるが、ロックのため著者コードからこれらのオブジェクトは観測できない。そのため、生成時のrealmは問わない。
第2引数cloneForBranch2は、元のストリームのデータを2番目の分岐に現れる前に(HTMLのシリアライズ可能オブジェクトフレームワークを使って)クローンするかどうかを制御します。これは両分岐がそれぞれの消費方法で互いに干渉する可能性がある場合(たとえばtransferされるチャンクなど)に便利ですが、2つの分岐間に非対称性を導入し、チャンクはシリアライズ可能なものに限定されます。[HTML]
streamがReadableバイトストリームである場合、cloneForBranch2は無視され、チャンクは必ずクローンされます。
現行標準では、ReadableStreamTeeは常にcloneForBranch2をfalseで呼び出します。他の仕様はteeラッパーアルゴリズム経由でtrueを渡します。
この抽象操作の手順:
-
Assert: streamが
ReadableStream
をimplementsしていること。 -
Assert: cloneForBranch2はbooleanであること。
-
stream.[[controller]]が
ReadableByteStreamController
をimplementsしている場合、? ReadableByteStreamTee(stream)を返す。 -
? ReadableStreamDefaultTee(stream, cloneForBranch2)を返す。
-
Assert: streamが
ReadableStream
をimplementsしていること。 -
Assert: cloneForBranch2はbooleanであること。
-
readerを? AcquireReadableStreamDefaultReader(stream)で取得する。
-
readingをfalseとする。
-
readAgainをfalseとする。
-
canceled1をfalseとする。
-
canceled2をfalseとする。
-
reason1をundefinedとする。
-
reason2をundefinedとする。
-
branch1をundefinedとする。
-
branch2をundefinedとする。
-
cancelPromiseを新しいPromiseとする。
-
pullAlgorithmを以下の手順とする:
-
readingがtrueなら、
-
readAgainをtrueに設定する。
-
a promise resolved with undefinedを返す。
-
-
readingをtrueに設定する。
-
readRequestをread requestとして次の項目を持つ:
- chunk steps(chunkを受け取る)
-
-
マイクロタスクをキューに追加し、以下を実行:
-
readAgainをfalseに設定する。
-
chunk1とchunk2をchunkとする。
-
canceled2がfalseかつcloneForBranch2がtrueの場合、
-
cloneResultをStructuredClone(chunk2)で取得する。
-
cloneResultがabrupt completionなら、
-
! ReadableStreamDefaultControllerError(branch1.[[controller]], cloneResult.[[Value]])を実行。
-
! ReadableStreamDefaultControllerError(branch2.[[controller]], cloneResult.[[Value]])を実行。
-
resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]])を実行。
-
return。
-
-
それ以外の場合、chunk2をcloneResult.[[Value]]にする。
-
-
canceled1がfalseなら、! ReadableStreamDefaultControllerEnqueue(branch1.[[controller]], chunk1)を実行。
-
canceled2がfalseなら、! ReadableStreamDefaultControllerEnqueue(branch2.[[controller]], chunk2)を実行。
-
readingをfalseに設定する。
-
readAgainがtrueなら、pullAlgorithmを実行する。
-
ここでのマイクロタスク遅延は、エラー検出に最低1マイクロタスク必要なのと、下記のreader.[[closedPromise]]で使うためです。 stream内のエラーが両分岐に即座に反映されるよう、同期的にreadできる場合でも非同期エラーが先に処理されるようにします。
-
- close steps
-
-
readingをfalseに設定する。
-
canceled1がfalseなら、! ReadableStreamDefaultControllerClose(branch1.[[controller]])を実行。
-
canceled2がfalseなら、! ReadableStreamDefaultControllerClose(branch2.[[controller]])を実行。
-
canceled1かcanceled2がfalseなら、resolve cancelPromise with undefined。
-
- error steps
-
-
readingをfalseに設定する。
-
-
! ReadableStreamDefaultReaderRead(reader, readRequest)を実行する。
-
a promise resolved with undefinedを返す。
-
-
cancel1Algorithmを(reasonを受け取る)手順として:
-
canceled1をtrueに設定する。
-
reason1をreasonに設定する。
-
canceled2がtrueなら、
-
compositeReasonを! CreateArrayFromList(« reason1, reason2 »)で取得する。
-
cancelResultを! ReadableStreamCancel(stream, compositeReason)で取得する。
-
resolve cancelPromise with cancelResult。
-
-
cancelPromiseを返す。
-
-
cancel2Algorithmを(reasonを受け取る)手順として:
-
canceled2をtrueに設定する。
-
reason2をreasonに設定する。
-
canceled1がtrueなら、
-
compositeReasonを! CreateArrayFromList(« reason1, reason2 »)で取得する。
-
cancelResultを! ReadableStreamCancel(stream, compositeReason)で取得する。
-
resolve cancelPromise with cancelResult。
-
-
cancelPromiseを返す。
-
-
startAlgorithmをundefinedを返すアルゴリズムとする。
-
branch1を! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm)で取得する。
-
branch2を! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm)で取得する。
-
upon rejection of reader.[[closedPromise]] with reasonr:
-
! ReadableStreamDefaultControllerError(branch1.[[controller]], r)を実行。
-
! ReadableStreamDefaultControllerError(branch2.[[controller]], r)を実行。
-
canceled1かcanceled2がfalseなら、resolve cancelPromise with undefined。
-
-
« branch1, branch2 »を返す。
-
Assert: streamが
ReadableStream
をimplementsしていること。 -
Assert: stream.[[controller]]が
ReadableByteStreamController
をimplementsしていること。 -
readerを? AcquireReadableStreamDefaultReader(stream)で取得する。
-
readingをfalseとする。
-
readAgainForBranch1をfalseとする。
-
readAgainForBranch2をfalseとする。
-
canceled1をfalseとする。
-
canceled2をfalseとする。
-
reason1をundefinedとする。
-
reason2をundefinedとする。
-
branch1をundefinedとする。
-
branch2をundefinedとする。
-
cancelPromiseを新しいPromiseとする。
-
forwardReaderErrorを次の手順(thisReaderを受け取る)とする:
-
upon rejection of thisReader.[[closedPromise]] with reason r:
-
thisReaderがreaderでない場合、return。
-
! ReadableByteStreamControllerError(branch1.[[controller]], r)を実行。
-
! ReadableByteStreamControllerError(branch2.[[controller]], r)を実行。
-
canceled1かcanceled2がfalseなら、resolve cancelPromise with undefined。
-
-
-
pullWithDefaultReaderを以下の手順とする:
-
readerが
ReadableStreamBYOBReader
をimplementsしている場合、-
Assert: reader.[[readIntoRequests]]が空であること。
-
! ReadableStreamBYOBReaderRelease(reader)を実行。
-
readerを! AcquireReadableStreamDefaultReader(stream)で更新する。
-
forwardReaderErrorをreaderで実行。
-
-
readRequestをread requestとして次の項目を持つ:
- chunk steps(chunkを受け取る)
-
-
マイクロタスクをキューに追加し、以下を実行:
-
readAgainForBranch1をfalseに設定する。
-
readAgainForBranch2をfalseに設定する。
-
chunk1とchunk2をchunkとする。
-
canceled1とcanceled2がfalseなら、
-
cloneResultをCloneAsUint8Array(chunk)で取得する。
-
cloneResultがabrupt completionなら、
-
! ReadableByteStreamControllerError(branch1.[[controller]], cloneResult.[[Value]])を実行。
-
! ReadableByteStreamControllerError(branch2.[[controller]], cloneResult.[[Value]])を実行。
-
resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]])を実行。
-
return。
-
-
それ以外の場合、chunk2をcloneResult.[[Value]]にする。
-
-
canceled1がfalseなら、! ReadableByteStreamControllerEnqueue(branch1.[[controller]], chunk1)を実行。
-
canceled2がfalseなら、! ReadableByteStreamControllerEnqueue(branch2.[[controller]], chunk2)を実行。
-
readingをfalseに設定する。
-
readAgainForBranch1がtrueなら、pull1Algorithmを実行。
-
それ以外でreadAgainForBranch2がtrueなら、pull2Algorithmを実行。
-
ここでのマイクロタスク遅延は、エラー検出に最低1マイクロタスク必要なのと、下記のreader.[[closedPromise]]で使うためです。 stream内のエラーが両分岐に即座に反映されるよう、同期的にreadできる場合でも非同期エラーが先に処理されるようにします。
-
- close steps
-
-
readingをfalseに設定する。
-
canceled1がfalseなら、! ReadableByteStreamControllerClose(branch1.[[controller]])を実行。
-
canceled2がfalseなら、! ReadableByteStreamControllerClose(branch2.[[controller]])を実行。
-
branch1.[[controller]].[[pendingPullIntos]]が空でない場合、! ReadableByteStreamControllerRespond(branch1.[[controller]], 0)を実行。
-
branch2.[[controller]].[[pendingPullIntos]]が空でない場合、! ReadableByteStreamControllerRespond(branch2.[[controller]], 0)を実行。
-
canceled1かcanceled2がfalseなら、resolve cancelPromise with undefined。
-
- error steps
-
-
readingをfalseに設定する。
-
-
! ReadableStreamDefaultReaderRead(reader, readRequest)を実行。
-
-
pullWithBYOBReaderを次の手順(viewとforBranch2を受け取る)とする:
-
readerが
ReadableStreamDefaultReader
をimplementsしている場合、-
Assert: reader.[[readRequests]]が空であること。
-
! ReadableStreamDefaultReaderRelease(reader)を実行。
-
readerを! AcquireReadableStreamBYOBReader(stream)で更新する。
-
forwardReaderErrorをreaderで実行。
-
-
byobBranchをforBranch2がtrueならbranch2、falseならbranch1とする。
-
otherBranchをforBranch2がfalseならbranch2、trueならbranch1とする。
-
readIntoRequestをread-into requestとして次の項目を持つ:
- chunk steps(chunkを受け取る)
-
-
マイクロタスクをキューに追加し、以下を実行:
-
readAgainForBranch1をfalseに設定する。
-
readAgainForBranch2をfalseに設定する。
-
byobCanceledをforBranch2がtrueならcanceled2、falseならcanceled1とする。
-
otherCanceledをforBranch2がfalseならcanceled2、trueならcanceled1とする。
-
otherCanceledがfalseなら、
-
cloneResultをCloneAsUint8Array(chunk)で取得する。
-
cloneResultがabrupt completionなら、
-
! ReadableByteStreamControllerError(byobBranch.[[controller]], cloneResult.[[Value]])を実行。
-
! ReadableByteStreamControllerError(otherBranch.[[controller]], cloneResult.[[Value]])を実行。
-
resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]])を実行。
-
return。
-
-
それ以外の場合、clonedChunkをcloneResult.[[Value]]とする。
-
byobCanceledがfalseなら、! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk)を実行。
-
! ReadableByteStreamControllerEnqueue(otherBranch.[[controller]], clonedChunk)を実行。
-
-
それ以外でbyobCanceledがfalseなら、! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk)を実行。
-
readingをfalseに設定する。
-
readAgainForBranch1がtrueなら、pull1Algorithmを実行。
-
それ以外でreadAgainForBranch2がtrueなら、pull2Algorithmを実行。
-
ここでのマイクロタスク遅延は、エラー検出に最低1マイクロタスク必要なのと、下記のreader.[[closedPromise]]で使うためです。 stream内のエラーが両分岐に即座に反映されるよう、同期的にreadできる場合でも非同期エラーが先に処理されるようにします。
-
- close steps(chunkを受け取る)
-
-
readingをfalseに設定する。
-
byobCanceledをforBranch2がtrueならcanceled2、falseならcanceled1とする。
-
otherCanceledをforBranch2がfalseならcanceled2、trueならcanceled1とする。
-
byobCanceledがfalseなら、! ReadableByteStreamControllerClose(byobBranch.[[controller]])を実行。
-
otherCanceledがfalseなら、! ReadableByteStreamControllerClose(otherBranch.[[controller]])を実行。
-
chunkがundefinedでない場合、
-
Assert: chunk.[[ByteLength]]は0。
-
byobCanceledがfalseなら、! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk)を実行。
-
otherCanceledがfalseかつotherBranch.[[controller]].[[pendingPullIntos]]が空でない場合、! ReadableByteStreamControllerRespond(otherBranch.[[controller]], 0)を実行。
-
-
byobCanceledかotherCanceledがfalseなら、resolve cancelPromise with undefined。
-
- error steps
-
-
readingをfalseに設定する。
-
-
! ReadableStreamBYOBReaderRead(reader, view, 1, readIntoRequest)を実行。
-
-
pull1Algorithmを以下の手順とする:
-
readingがtrueなら、
-
readAgainForBranch1をtrueに設定。
-
a promise resolved with undefinedを返す。
-
-
readingをtrueに設定。
-
byobRequestを! ReadableByteStreamControllerGetBYOBRequest(branch1.[[controller]])で取得。
-
byobRequestがnullなら、pullWithDefaultReaderを実行。
-
それ以外の場合、pullWithBYOBReaderをbyobRequest.[[view]]とfalseで実行。
-
a promise resolved with undefinedを返す。
-
-
pull2Algorithmを以下の手順とする:
-
readingがtrueなら、
-
readAgainForBranch2をtrueに設定。
-
a promise resolved with undefinedを返す。
-
-
readingをtrueに設定。
-
byobRequestを! ReadableByteStreamControllerGetBYOBRequest(branch2.[[controller]])で取得。
-
byobRequestがnullなら、pullWithDefaultReaderを実行。
-
それ以外の場合、pullWithBYOBReaderをbyobRequest.[[view]]とtrueで実行。
-
a promise resolved with undefinedを返す。
-
-
cancel1Algorithmを(reasonを受け取る)手順として:
-
canceled1をtrueに設定。
-
reason1をreasonに設定。
-
canceled2がtrueなら、
-
compositeReasonを! CreateArrayFromList(« reason1, reason2 »)で取得。
-
cancelResultを! ReadableStreamCancel(stream, compositeReason)で取得。
-
resolve cancelPromise with cancelResult。
-
-
cancelPromiseを返す。
-
-
cancel2Algorithmを(reasonを受け取る)手順として:
-
canceled2をtrueに設定。
-
reason2をreasonに設定。
-
canceled1がtrueなら、
-
compositeReasonを! CreateArrayFromList(« reason1, reason2 »)で取得。
-
cancelResultを! ReadableStreamCancel(stream, compositeReason)で取得。
-
resolve cancelPromise with cancelResult。
-
-
cancelPromiseを返す。
-
-
startAlgorithmをundefinedを返すアルゴリズムとする。
-
branch1を! CreateReadableByteStream(startAlgorithm, pull1Algorithm, cancel1Algorithm)で取得。
-
branch2を! CreateReadableByteStream(startAlgorithm, pull2Algorithm, cancel2Algorithm)で取得。
-
forwardReaderErrorをreaderで実行。
-
« branch1, branch2 »を返す。
4.9.2. コントローラーとのインターフェース
仕様の整理という観点では、ReadableStream
クラスが単純なReadableStreamとReadableバイトストリームの両方の挙動を単一クラスでカプセル化する方法は、
可変的なロジックのほとんどを2つのコントローラークラス
ReadableStreamDefaultController
と ReadableByteStreamController
に集中させることです。
これらのクラスがストリームの内部キューの管理方法や、underlying sourceやunderlying byte
sourceとのインターフェース方法のための、ほとんどの状態スロット・抽象操作を定義しています。
各コントローラークラスは、ReadableStream
のアルゴリズムから呼ばれる3つの内部メソッドを定義します:
- [[CancelSteps]](reason)
- ストリームがキャンセルされた際に実行されるコントローラーの手順。コントローラーに保存されている状態のクリーンアップや underlying sourceへの通知に利用されます。
- [[PullSteps]](readRequest)
- default readerからの読み取り時に実行されるコントローラーの手順。キューされたチャンクや underlying sourceからの新たなチャンク取得を行います。
- [[ReleaseSteps]]()
- readerがreleaseされた際に実行されるコントローラーの手順。コントローラーに保存されているreader固有のリソースをクリーンアップします。
(これらは抽象操作ではなく内部メソッドとして定義されているため、ReadableStream
のアルゴリズムから型分岐せずポリモーフィックに呼び出せます。)
この節の残りは逆方向の抽象操作に関するものです。これらはコントローラー実装が対応するReadableStream
オブジェクトに影響を及ぼすために使います。
コントローラーの内部状態変化を、ReadableStream
のpublic
APIから見える開発者向けの結果へ変換します。
-
Assert: stream.[[reader]]が
ReadableStreamBYOBReader
をimplementsしていること。 -
Assert: stream.[[state]]が"
readable
"または"closed
"であること。 -
Append readRequestを stream.[[reader]].[[readIntoRequests]]に追加する。
-
Assert: stream.[[reader]]が
ReadableStreamDefaultReader
をimplementsしていること。 -
Assert: stream.[[state]]が"
readable
"であること。 -
Append readRequestを stream.[[reader]].[[readRequests]]に追加する。
-
stream.[[disturbed]]にtrueを設定する。
-
stream.[[state]]が"
closed
"なら、a promise resolved with undefinedを返す。 -
stream.[[state]]が"
errored
"なら、a promise rejected with stream.[[storedError]]を返す。 -
! ReadableStreamClose(stream)を実行。
-
readerをstream.[[reader]]とする。
-
readerがundefinedでなく、かつreaderが
ReadableStreamBYOBReader
をimplementsしている場合、-
readIntoRequestsをreader.[[readIntoRequests]]とする。
-
reader.[[readIntoRequests]]に空のlistを設定する。
-
For each readIntoRequest of readIntoRequestsについて、
-
readIntoRequestのclose steps(引数undefined)を実行。
-
-
-
sourceCancelPromiseを! stream.[[controller]].[[CancelSteps]](reason)の結果とする。
-
reacting to sourceCancelPromiseのfulfillment時にundefinedを返す。
-
Assert: stream.[[state]]が"
readable
"であること。 -
stream.[[state]]に"
closed
"を設定する。 -
readerをstream.[[reader]]とする。
-
readerがundefinedならreturn。
-
resolve reader.[[closedPromise]] with undefined。
-
readerが
ReadableStreamDefaultReader
をimplementsしている場合、-
readRequestsをreader.[[readRequests]]とする。
-
reader.[[readRequests]]に空のlistを設定する。
-
For each readRequest of readRequestsについて、
-
readRequestのclose stepsを実行。
-
-
-
Assert: stream.[[state]]が"
readable
"であること。 -
stream.[[state]]に"
errored
"を設定する。 -
stream.[[storedError]]にeを設定する。
-
readerをstream.[[reader]]とする。
-
readerがundefinedならreturn。
-
reject reader.[[closedPromise]] with e。
-
reader.[[closedPromise]].[[PromiseIsHandled]]にtrueを設定する。
-
readerが
ReadableStreamDefaultReader
をimplementsしている場合、-
! ReadableStreamDefaultReaderErrorReadRequests(reader, e)を実行。
-
-
それ以外の場合、
-
Assert: readerが
ReadableStreamBYOBReader
をimplementsしていること。 -
! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e)を実行。
-
-
Assert: ! ReadableStreamHasBYOBReader(stream) がtrueであること。
-
readerをstream.[[reader]]とする。
-
Assert: reader.[[readIntoRequests]]が空でないこと。
-
readIntoRequestをreader.[[readIntoRequests]][0]とする。
-
Remove readIntoRequestを reader.[[readIntoRequests]]から除去する。
-
doneがtrueなら、readIntoRequestのclose steps(chunk渡し)を実行。
-
それ以外の場合、readIntoRequestのchunk steps(chunk渡し)を実行。
-
Assert: ! ReadableStreamHasDefaultReader(stream) がtrueであること。
-
readerをstream.[[reader]]とする。
-
Assert: reader.[[readRequests]]が空でないこと。
-
readRequestをreader.[[readRequests]][0]とする。
-
Remove readRequestをreader.[[readRequests]]から除去する。
-
doneがtrueなら、readRequestのclose stepsを実行。
-
それ以外の場合、readRequestのchunk steps(chunk渡し)を実行。
-
Assert: ! ReadableStreamHasBYOBReader(stream) がtrueであること。
-
返す:stream.[[reader]].[[readIntoRequests]]のsize。
-
Assert: ! ReadableStreamHasDefaultReader(stream) がtrueであること。
-
返す:stream.[[reader]].[[readRequests]]のsize。
-
readerをstream.[[reader]]とする。
-
readerがundefinedならfalseを返す。
-
readerが
ReadableStreamBYOBReader
をimplementsしている場合、trueを返す。 -
falseを返す。
-
readerをstream.[[reader]]とする。
-
readerがundefinedならfalseを返す。
-
readerが
ReadableStreamDefaultReader
をimplementsしている場合、trueを返す。 -
falseを返す。
4.9.3. リーダー
以下の抽象操作は
ReadableStreamDefaultReader
および ReadableStreamBYOBReader
インスタンスの実装・操作をサポートします。
-
streamをreader.[[stream]]とする。
-
Assert: streamはundefinedでない。
-
! ReadableStreamCancel(stream, reason)を返す。
-
reader.[[stream]]にstreamを設定。
-
stream.[[reader]]にreaderを設定。
-
stream.[[state]]が"
readable
"の場合、-
reader.[[closedPromise]]に新しいPromiseを設定。
-
-
それ以外で、stream.[[state]]が"
closed
"の場合、-
reader.[[closedPromise]]にa promise resolved with undefinedを設定。
-
-
それ以外の場合、
-
Assert: stream.[[state]]は"
errored
"。 -
reader.[[closedPromise]]にa promise rejected with stream.[[storedError]]を設定。
-
reader.[[closedPromise]].[[PromiseIsHandled]]にtrueを設定。
-
-
streamをreader.[[stream]]とする。
-
Assert: streamはundefinedでない。
-
Assert: stream.[[reader]]はreaderである。
-
stream.[[state]]が"
readable
"なら、reject reader.[[closedPromise]] withTypeError
例外。 -
それ以外の場合、reader.[[closedPromise]]にa promise rejected with
TypeError
例外を設定。 -
reader.[[closedPromise]].[[PromiseIsHandled]]にtrueを設定。
-
! stream.[[controller]].[[ReleaseSteps]]()を実行。
-
stream.[[reader]]をundefinedに設定。
-
reader.[[stream]]をundefinedに設定。
-
readIntoRequestsをreader.[[readIntoRequests]]とする。
-
reader.[[readIntoRequests]]に新しい空のlistを設定。
-
For each readIntoRequest of readIntoRequestsについて、
-
readIntoRequestのerror steps(e渡し)を実行。
-
-
streamをreader.[[stream]]とする。
-
Assert: streamはundefinedでない。
-
stream.[[disturbed]]にtrueを設定。
-
stream.[[state]]が"
errored
"なら、readIntoRequestのerror steps(stream.[[storedError]]渡し)を実行。 -
それ以外の場合、! ReadableByteStreamControllerPullInto(stream.[[controller]], view, min, readIntoRequest)を実行。
-
! ReadableStreamReaderGenericRelease(reader)を実行。
-
eを新しい
TypeError
例外とする。 -
! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e)を実行。
-
readRequestsをreader.[[readRequests]]とする。
-
reader.[[readRequests]]に新しい空のlistを設定。
-
For each readRequest of readRequestsについて、
-
readRequestのerror steps(e渡し)を実行。
-
-
streamをreader.[[stream]]とする。
-
Assert: streamはundefinedでない。
-
stream.[[disturbed]]にtrueを設定。
-
stream.[[state]]が"
closed
"なら、readRequestのclose stepsを実行。 -
それ以外でstream.[[state]]が"
errored
"なら、readRequestの error steps(stream.[[storedError]]渡し)を実行。 -
それ以外の場合、
-
Assert: stream.[[state]]は"
readable
"。 -
! stream.[[controller]].[[PullSteps]](readRequest)を実行。
-
-
! ReadableStreamReaderGenericRelease(reader)を実行。
-
eを新しい
TypeError
例外とする。 -
! ReadableStreamDefaultReaderErrorReadRequests(reader, e)を実行。
-
! IsReadableStreamLocked(stream)がtrueなら、
TypeError
例外をスロー。 -
stream.[[controller]]が
ReadableByteStreamController
をimplementしていなければ、TypeError
例外をスロー。 -
! ReadableStreamReaderGenericInitialize(reader, stream)を実行。
-
reader.[[readIntoRequests]]に新しい空のlistを設定。
-
! IsReadableStreamLocked(stream)がtrueなら、
TypeError
例外をスロー。 -
! ReadableStreamReaderGenericInitialize(reader, stream)を実行。
-
reader.[[readRequests]]に新しい空のlistを設定。
4.9.4. デフォルトコントローラー
以下の抽象操作は
ReadableStreamDefaultController
クラスの実装をサポートします。
-
shouldPullを! ReadableStreamDefaultControllerShouldCallPull(controller)で取得する。
-
shouldPullがfalseならreturn。
-
controller.[[pulling]]がtrueなら、
-
controller.[[pullAgain]]にtrueを設定する。
-
return。
-
-
Assert: controller.[[pullAgain]]はfalse。
-
controller.[[pulling]]にtrueを設定する。
-
pullPromiseをcontroller.[[pullAlgorithm]]の実行結果とする。
-
-
controller.[[pulling]]にfalseを設定する。
-
controller.[[pullAgain]]がtrueなら、
-
controller.[[pullAgain]]にfalseを設定する。
-
! ReadableStreamDefaultControllerCallPullIfNeeded(controller)を実行する。
-
-
-
-
! ReadableStreamDefaultControllerError(controller, e)を実行する。
-
-
streamをcontroller.[[stream]]とする。
-
! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller)がfalseならfalseを返す。
-
controller.[[started]]がfalseならfalseを返す。
-
! IsReadableStreamLocked(stream)がtrueかつ ! ReadableStreamGetNumReadRequests(stream) > 0 ならtrueを返す。
-
desiredSizeを! ReadableStreamDefaultControllerGetDesiredSize(controller)で取得する。
-
Assert: desiredSizeはnullではない。
-
desiredSize > 0ならtrueを返す。
-
falseを返す。
ReadableStream
自身が参照されていてもガベージコレクト可能になります。
これはWeakRefで観測できます。詳細はtc39/proposal-weakrefs#31参照。
手順:
-
controller.[[pullAlgorithm]]にundefinedを設定。
-
controller.[[cancelAlgorithm]]にundefinedを設定。
-
controller.[[strategySizeAlgorithm]]にundefinedを設定。
-
! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller)がfalseならreturn。
-
streamをcontroller.[[stream]]とする。
-
controller.[[closeRequested]]にtrueを設定。
-
-
! ReadableStreamDefaultControllerClearAlgorithms(controller)を実行。
-
! ReadableStreamClose(stream)を実行。
-
-
! ReadableStreamDefaultControllerCanCloseOrEnqueue(controller)がfalseならreturn。
-
streamをcontroller.[[stream]]とする。
-
! IsReadableStreamLocked(stream)がtrueかつ ! ReadableStreamGetNumReadRequests(stream) > 0 なら、! ReadableStreamFulfillReadRequest(stream, chunk, false)を実行。
-
それ以外の場合:
-
resultをcontroller.[[strategySizeAlgorithm]]にchunkを渡して実行し、completion recordとして解釈する。
-
resultがabrupt completionなら、
-
! ReadableStreamDefaultControllerError(controller, result.[[Value]])を実行。
-
resultを返す。
-
-
chunkSizeをresult.[[Value]]とする。
-
enqueueResultをEnqueueValueWithSize(controller, chunk, chunkSize)で取得する。
-
enqueueResultがabrupt completionなら、
-
! ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]])を実行。
-
enqueueResultを返す。
-
-
-
! ReadableStreamDefaultControllerCallPullIfNeeded(controller)を実行。
-
streamをcontroller.[[stream]]とする。
-
stream.[[state]]が"
readable
"でなければreturn。 -
! ResetQueue(controller)を実行。
-
! ReadableStreamDefaultControllerClearAlgorithms(controller)を実行。
-
! ReadableStreamError(stream, e)を実行。
-
stateを controller.[[stream]].[[state]]とする。
-
stateが"
errored
"ならnullを返す。 -
stateが"
closed
"なら0を返す。 -
controller.[[strategyHWM]] − controller.[[queueTotalSize]]を返す。
TransformStream
の実装に使われます。
次の手順を実行します:
-
! ReadableStreamDefaultControllerShouldCallPull(controller) がtrueならfalseを返す。
-
それ以外の場合trueを返す。
-
stateを controller.[[stream]].[[state]]とする。
-
controller.[[closeRequested]]がfalseかつstateが"
readable
"ならtrueを返す。 -
それ以外の場合falseを返す。
controller.[[closeRequested]]
がfalseでstateが"readable
"でない場合は、
controller.error()
によりストリームがerroredになった場合や、
controllerのcontroller.close()
が呼ばれないまま
stream.cancel()
などでストリームが閉じられた場合に発生します。
-
Assert: stream.[[controller]]はundefinedである。
-
controller.[[stream]]にstreamを設定する。
-
! ResetQueue(controller)を実行する。
-
controller.[[started]]、 controller.[[closeRequested]]、 controller.[[pullAgain]]、 controller.[[pulling]]をfalseに設定する。
-
controller.[[strategySizeAlgorithm]]にsizeAlgorithmを、 controller.[[strategyHWM]]にhighWaterMarkを設定する。
-
controller.[[pullAlgorithm]]にpullAlgorithmを設定する。
-
controller.[[cancelAlgorithm]]にcancelAlgorithmを設定する。
-
stream.[[controller]]にcontrollerを設定する。
-
startResultをstartAlgorithmの実行結果とする(例外をスローする場合もある)。
-
startPromiseをa promise resolved with startResultとする。
-
-
controller.[[started]]をtrueに設定する。
-
Assert: controller.[[pulling]]はfalse。
-
Assert: controller.[[pullAgain]]はfalse。
-
! ReadableStreamDefaultControllerCallPullIfNeeded(controller)を実行する。
-
-
-
! ReadableStreamDefaultControllerError(controller, r)を実行する。
-
-
controllerを新規
ReadableStreamDefaultController
とする。 -
startAlgorithmをundefinedを返すアルゴリズムとする。
-
pullAlgorithmをa promise resolved with undefinedを返すアルゴリズムとする。
-
cancelAlgorithmをa promise resolved with undefinedを返すアルゴリズムとする。
-
underlyingSourceDict["
start
"] existsなら、startAlgorithmを invoking underlyingSourceDict["start
"] に引数« controller »とcallback this value underlyingSourceを使って呼び出すアルゴリズムに設定する。 -
underlyingSourceDict["
pull
"] existsなら、pullAlgorithmを invoking underlyingSourceDict["pull
"] に引数« controller »とcallback this value underlyingSourceを使って呼び出すアルゴリズムに設定する。 -
underlyingSourceDict["
cancel
"] existsなら、cancelAlgorithmを「引数reasonを受け取り、 invoking underlyingSourceDict["cancel
"] に引数« reason »とcallback this value underlyingSourceを使って呼び出すアルゴリズム」に設定する。 -
? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm)を実行する。
4.9.5. バイトストリームコントローラー
-
shouldPull に ! ReadableByteStreamControllerShouldCallPull(controller) の結果を格納する。
-
shouldPull が false なら、return する。
-
controller.[[pulling]] が true の場合、
-
controller.[[pullAgain]] を true に設定する。
-
return する。
-
-
アサーション: controller.[[pullAgain]] は false である。
-
controller.[[pulling]] を true に設定する。
-
pullPromise に controller.[[pullAlgorithm]] の実行結果を格納する。
-
pullPromise が fulfill されたとき、
-
controller.[[pulling]] を false に設定する。
-
controller.[[pullAgain]] が true の場合、
-
controller.[[pullAgain]] を false に設定する。
-
! ReadableByteStreamControllerCallPullIfNeeded(controller) を実行する。
-
-
-
pullPromise が e という理由で reject されたとき、
-
! ReadableByteStreamControllerError(controller, e) を実行する。
-
ReadableStream
自体が参照されていてもガーベジコレクトされることが可能となる。
これは 弱参照 を使って観測できる。詳細は tc39/proposal-weakrefs#31 を参照。
次の手順を実行する:
-
controller.[[pullAlgorithm]] を undefined に設定する。
-
controller.[[cancelAlgorithm]] を undefined に設定する。
-
! ReadableByteStreamControllerInvalidateBYOBRequest(controller) を実行する。
-
controller.[[pendingPullIntos]] を新しい空の リスト に設定する。
-
stream に controller.[[stream]] を格納する。
-
controller.[[closeRequested]] が true または stream.[[state]] が "
readable
" でない場合、return する。 -
controller.[[queueTotalSize]] > 0 の場合、
-
controller.[[closeRequested]] を true に設定する。
-
return する。
-
-
controller.[[pendingPullIntos]] が空でない場合、
-
firstPendingPullInto に controller.[[pendingPullIntos]][0] を格納する。
-
firstPendingPullInto の bytes filled を firstPendingPullInto の element size で割った余りが 0 でない場合、
-
e に新しい
TypeError
例外を格納する。 -
! ReadableByteStreamControllerError(controller, e) を実行する。
-
e を throw する。
-
-
-
! ReadableByteStreamControllerClearAlgorithms(controller) を実行する。
-
! ReadableStreamClose(stream) を実行する。
-
アサーション: stream.[[state]] は "
errored
" でない。 -
アサーション: pullIntoDescriptor.reader type は "
none
" でない。 -
done に false を格納する。
-
stream.[[state]] が "
closed
" の場合、-
アサーション: pullIntoDescriptor の bytes filled を pullIntoDescriptor の element size で割った余りは 0。
-
done を true に設定する。
-
-
filledView に ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor) を格納する。
-
pullIntoDescriptor の reader type が "
default
" の場合、-
! ReadableStreamFulfillReadRequest(stream, filledView, done) を実行する。
-
-
それ以外の場合、
-
アサーション: pullIntoDescriptor の reader type は "
byob
"。 -
! ReadableStreamFulfillReadIntoRequest(stream, filledView, done) を実行する。
-
-
bytesFilled に pullIntoDescriptor の bytes filled を格納する。
-
elementSize に pullIntoDescriptor の element size を格納する。
-
アサーション: bytesFilled ≤ pullIntoDescriptor の byte length。
-
アサーション: bytesFilled を elementSize で割った余りは 0。
-
buffer に ! TransferArrayBuffer(pullIntoDescriptor の buffer) を格納する。
-
! Construct(pullIntoDescriptor の view constructor, « buffer, pullIntoDescriptor の byte offset, bytesFilled ÷ elementSize ») を返す。
-
stream に controller.[[stream]] を格納する。
-
controller.[[closeRequested]] が true または stream.[[state]] が "
readable
" でない場合、return する。 -
buffer に chunk.[[ViewedArrayBuffer]] を格納する。
-
byteOffset に chunk.[[ByteOffset]] を格納する。
-
byteLength に chunk.[[ByteLength]] を格納する。
-
! IsDetachedBuffer(buffer) が true の場合、
TypeError
例外をthrowする。 -
transferredBuffer に ? TransferArrayBuffer(buffer) を格納する。
-
controller.[[pendingPullIntos]] が 空でない場合、
-
firstPendingPullInto に controller.[[pendingPullIntos]][0] を格納する。
-
! IsDetachedBuffer(firstPendingPullInto の buffer) が true の場合、
TypeError
例外をthrowする。 -
! ReadableByteStreamControllerInvalidateBYOBRequest(controller) を実行する。
-
firstPendingPullInto の buffer を ! TransferArrayBuffer(firstPendingPullInto の buffer) に設定する。
-
firstPendingPullInto の reader type が "
none
" の場合、 ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, firstPendingPullInto) を実行する。
-
-
! ReadableStreamHasDefaultReader(stream) が true の場合、
-
! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller) を実行する。
-
! ReadableStreamGetNumReadRequests(stream) が 0 の場合、
-
アサーション: controller.[[pendingPullIntos]] は 空である。
-
! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength) を実行する。
-
-
それ以外の場合、
-
controller.[[pendingPullIntos]] が 空でない場合、
-
アサーション: controller.[[pendingPullIntos]][0] の reader type は "
default
"。 -
! ReadableByteStreamControllerShiftPendingPullInto(controller) を実行する。
-
-
transferredView に ! Construct(
%Uint8Array%
, « transferredBuffer, byteOffset, byteLength ») を格納する。 -
! ReadableStreamFulfillReadRequest(stream, transferredView, false) を実行する。
-
-
それ以外の場合、! ReadableStreamHasBYOBReader(stream) が true の場合、
-
! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength) を実行する。
-
filledPullIntos に ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller) の結果を格納する。
-
filledPullIntos の各 filledPullInto について、
-
! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto) を実行する。
-
-
-
それ以外の場合、
-
アサーション: ! IsReadableStreamLocked(stream) は false。
-
! ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength) を実行する。
-
-
! ReadableByteStreamControllerCallPullIfNeeded(controller) を実行する。
-
新しい readable byte stream queue entry(buffer buffer、byte offset byteOffset、 byte length byteLength)を controller.[[queue]]に追加する。
-
controller.[[queueTotalSize]] を controller.[[queueTotalSize]] + byteLength に設定する。
-
cloneResult に CloneArrayBuffer(buffer, byteOffset, byteLength,
%ArrayBuffer%
) の結果を格納する。 -
cloneResult がabrupt completionの場合、
-
! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]) を実行する。
-
cloneResult を返す。
-
-
! ReadableByteStreamControllerEnqueueChunkToQueue(controller, cloneResult.[[Value]], 0, byteLength) を実行する。
-
アサーション: pullIntoDescriptor の reader type は "
none
"。 -
pullIntoDescriptor の bytes filled > 0 の場合、? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller, pullIntoDescriptor の buffer, pullIntoDescriptor の byte offset, pullIntoDescriptor の bytes filled) を実行する。
-
! ReadableByteStreamControllerShiftPendingPullInto(controller) を実行する。
-
stream に controller.[[stream]] を格納する。
-
stream.[[state]] が "
readable
" でない場合、return する。 -
! ReadableByteStreamControllerClearPendingPullIntos(controller) を実行する。
-
! ResetQueue(controller) を実行する。
-
! ReadableByteStreamControllerClearAlgorithms(controller) を実行する。
-
! ReadableStreamError(stream, e) を実行する。
-
アサーション: controller.[[pendingPullIntos]] が空であるか、または controller.[[pendingPullIntos]][0] が pullIntoDescriptor である。
-
アサーション: controller.[[byobRequest]] は null である。
-
pullIntoDescriptor の bytes filled を bytes filled + size に設定する。
-
maxBytesToCopy に min(controller.[[queueTotalSize]], pullIntoDescriptor の byte length − pullIntoDescriptor の bytes filled) を格納する。
-
maxBytesFilled に pullIntoDescriptor の bytes filled + maxBytesToCopy を格納する。
-
totalBytesToCopyRemaining に maxBytesToCopy を格納する。
-
ready に false を格納する。
-
アサーション: ! IsDetachedBuffer(pullIntoDescriptor の buffer) は false。
-
アサーション: pullIntoDescriptor の bytes filled < pullIntoDescriptor の minimum fill。
-
remainderBytes に maxBytesFilled を pullIntoDescriptor の element size で割った余りを格納する。
-
maxAlignedBytes に maxBytesFilled − remainderBytes を格納する。
-
maxAlignedBytes ≥ pullIntoDescriptor の minimum fill の場合、
-
totalBytesToCopyRemaining を maxAlignedBytes − pullIntoDescriptor の bytes filled に設定する。
-
ready を true に設定する。
read()
リクエストのためのディスクリプタがまだ最小長まで埋まっていない場合は、キューの先頭に残り、下層ソースが引き続き埋めることができる。
-
-
queue に controller.[[queue]] を格納する。
-
While totalBytesToCopyRemaining > 0 の間、
-
headOfQueue に queue[0] を格納する。
-
bytesToCopy に min(totalBytesToCopyRemaining, headOfQueue の byte length) を格納する。
-
destStart に pullIntoDescriptor の byte offset + pullIntoDescriptor の bytes filled を格納する。
-
descriptorBuffer に pullIntoDescriptor の buffer を格納する。
-
queueBuffer に headOfQueue の buffer を格納する。
-
queueByteOffset に headOfQueue の byte offset を格納する。
-
アサーション: ! CanCopyDataBlockBytes(descriptorBuffer, destStart, queueBuffer, queueByteOffset, bytesToCopy) は true。
このアサーションが失敗した場合(仕様または実装のバグによる)、次のステップで不正なメモリ領域の読み書きが行われる可能性がある。 ユーザーエージェントは常にこのアサーションをチェックし、失敗した場合は実装定義の方法で停止すべきである(例:プロセスをクラッシュさせる、または ストリームをエラーにするなど)。
-
! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart, queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy) を実行する。
-
headOfQueue の byte length が bytesToCopy の場合、
-
queue[0] を削除する。
-
-
それ以外の場合、
-
headOfQueue の byte offset を headOfQueue の byte offset + bytesToCopy に設定する。
-
headOfQueue の byte length を headOfQueue の byte length − bytesToCopy に設定する。
-
-
controller.[[queueTotalSize]] を controller.[[queueTotalSize]] − bytesToCopy に設定する。
-
! ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesToCopy, pullIntoDescriptor) を実行する。
-
totalBytesToCopyRemaining を totalBytesToCopyRemaining − bytesToCopy に設定する。
-
-
ready が false の場合、
-
アサーション: controller.[[queueTotalSize]] は 0。
-
アサーション: pullIntoDescriptor の bytes filled > 0。
-
アサーション: pullIntoDescriptor の bytes filled < pullIntoDescriptor の minimum fill。
-
-
ready を返す。
-
アサーション: controller.[[queueTotalSize]] > 0。
-
entry に controller.[[queue]][0] を格納する。
-
controller.[[queueTotalSize]] を controller.[[queueTotalSize]] − entry の byte length に設定する。
-
! ReadableByteStreamControllerHandleQueueDrain(controller) を実行する。
-
view に ! Construct(
%Uint8Array%
, « entry の buffer, entry の byte offset, entry の byte length ») を格納する。 -
readRequest の chunk steps を view を与えて実行する。
-
controller.[[byobRequest]] が null かつ controller.[[pendingPullIntos]] が 空でない場合、
-
firstDescriptor に controller.[[pendingPullIntos]][0] を格納する。
-
view に ! Construct(
%Uint8Array%
, « firstDescriptor の buffer, firstDescriptor の byte offset + firstDescriptor の bytes filled, firstDescriptor の byte length − firstDescriptor の bytes filled ») を格納する。 -
byobRequest に 新しい
ReadableStreamBYOBRequest
を格納する。 -
byobRequest.[[controller]] を controller に設定する。
-
byobRequest.[[view]] を view に設定する。
-
controller.[[byobRequest]] を byobRequest に設定する。
-
-
controller.[[byobRequest]] を返す。
-
state に controller.[[stream]].[[state]] を格納する。
-
state が "
errored
" の場合、null を返す。 -
state が "
closed
" の場合、0 を返す。 -
controller.[[strategyHWM]] − controller.[[queueTotalSize]] を返す。
-
アサーション: controller.[[stream]].[[state]] は "
readable
"。 -
controller.[[queueTotalSize]] が 0 かつ controller.[[closeRequested]] が true の場合、
-
! ReadableByteStreamControllerClearAlgorithms(controller) を実行する。
-
! ReadableStreamClose(controller.[[stream]]) を実行する。
-
-
それ以外の場合、
-
! ReadableByteStreamControllerCallPullIfNeeded(controller) を実行する。
-
-
controller.[[byobRequest]] が null の場合、return する。
-
controller.[[byobRequest]].[[controller]] を undefined に設定する。
-
controller.[[byobRequest]].[[view]] を null に設定する。
-
controller.[[byobRequest]] を null に設定する。
-
アサーション: controller.[[closeRequested]] は false。
-
filledPullIntos に新しい空の リスト を格納する。
-
While controller.[[pendingPullIntos]] が 空でない場合、
-
controller.[[queueTotalSize]] が 0 の場合、break。
-
pullIntoDescriptor に controller.[[pendingPullIntos]][0] を格納する。
-
! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) が true の場合、
-
! ReadableByteStreamControllerShiftPendingPullInto(controller) を実行する。
-
pullIntoDescriptor を filledPullIntos に追加する。
-
-
-
filledPullIntos を返す。
-
reader に controller.[[stream]].[[reader]] を格納する。
-
アサーション: reader implements
ReadableStreamDefaultReader
である。 -
reader.[[readRequests]] が 空でない間、
-
controller.[[queueTotalSize]] が 0 の場合、return する。
-
readRequest に reader.[[readRequests]][0] を格納する。
-
readRequest を reader.[[readRequests]] から削除する。
-
! ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest) を実行する。
-
-
stream に controller.[[stream]] を格納する。
-
elementSize に 1 を格納する。
-
ctor に
%DataView%
を格納する。 -
view が [[TypedArrayName]] 内部スロットを持つ場合(すなわち、
DataView
でない場合)、-
elementSize を typed array constructors table で view.[[TypedArrayName]] に指定された要素サイズに設定する。
-
ctor を typed array constructors table で view.[[TypedArrayName]] に指定されたコンストラクタに設定する。
-
-
minimumFill に min × elementSize を格納する。
-
アサーション: minimumFill ≥ 0 かつ minimumFill ≤ view.[[ByteLength]] である。
-
アサーション: minimumFill を elementSize で割った余りは 0。
-
byteOffset に view.[[ByteOffset]] を格納する。
-
byteLength に view.[[ByteLength]] を格納する。
-
bufferResult に TransferArrayBuffer(view.[[ViewedArrayBuffer]]) の結果を格納する。
-
bufferResult が abrupt completion の場合、
-
readIntoRequest の error steps を bufferResult.[[Value]] を与えて実行する。
-
return する。
-
-
buffer に bufferResult.[[Value]] を格納する。
-
pullIntoDescriptor に新しい pull-into descriptor(下記の各値を持つ)を格納する:
- buffer
- buffer
- buffer byte length
- buffer.[[ArrayBufferByteLength]]
- byte offset
- byteOffset
- byte length
- byteLength
- bytes filled
- 0
- minimum fill
- minimumFill
- element size
- elementSize
- view constructor
- ctor
- reader type
- "
byob
"
-
controller.[[pendingPullIntos]] が空でない場合、
-
pullIntoDescriptor を controller.[[pendingPullIntos]] に追加する。
-
! ReadableStreamAddReadIntoRequest(stream, readIntoRequest) を実行する。
-
return する。
-
-
stream.[[state]] が "
closed
" の場合、-
emptyView に ! Construct(ctor, « pullIntoDescriptor の buffer, pullIntoDescriptor の byte offset, 0 ») を格納する。
-
readIntoRequest の close steps を emptyView を与えて実行する。
-
return する。
-
-
controller.[[queueTotalSize]] > 0 の場合、
-
! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) が true の場合、
-
filledView に ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor) を格納する。
-
! ReadableByteStreamControllerHandleQueueDrain(controller) を実行する。
-
readIntoRequest の chunk steps を filledView を与えて実行する。
-
return する。
-
-
controller.[[closeRequested]] が true の場合、
-
e に新しい
TypeError
例外を格納する。 -
! ReadableByteStreamControllerError(controller, e) を実行する。
-
readIntoRequest の error steps を e を与えて実行する。
-
return する。
-
-
-
pullIntoDescriptor を controller.[[pendingPullIntos]] に追加する。
-
! ReadableStreamAddReadIntoRequest(stream, readIntoRequest) を実行する。
-
! ReadableByteStreamControllerCallPullIfNeeded(controller) を実行する。
-
アサーション: controller.[[pendingPullIntos]] は空ではない。
-
firstDescriptor に controller.[[pendingPullIntos]][0] を格納する。
-
state に controller.[[stream]].[[state]] を格納する。
-
state が "
closed
" の場合、-
bytesWritten が 0 でない場合、
TypeError
例外をthrowする。
-
-
それ以外の場合、
-
アサーション: state は "
readable
"。 -
bytesWritten が 0 の場合、
TypeError
例外をthrowする。 -
firstDescriptor の bytes filled + bytesWritten > firstDescriptor の byte length の場合、
RangeError
例外をthrowする。
-
-
firstDescriptor の buffer を TransferArrayBuffer(firstDescriptor の buffer) の結果に設定する。
-
? ReadableByteStreamControllerRespondInternal(controller, bytesWritten) を実行する。
-
アサーション: firstDescriptor の bytes filled を firstDescriptor の element size で割った余りは 0。
-
firstDescriptor の reader type が "
none
" の場合、 ! ReadableByteStreamControllerShiftPendingPullInto(controller) を実行する。 -
stream に controller.[[stream]] を格納する。
-
! ReadableStreamHasBYOBReader(stream) が true の場合、
-
filledPullIntos に新しい空の リスト を格納する。
-
While filledPullIntos の size が ! ReadableStreamGetNumReadIntoRequests(stream) より小さい間、
-
pullIntoDescriptor に ! ReadableByteStreamControllerShiftPendingPullInto(controller) を格納する。
-
pullIntoDescriptor を filledPullIntos に追加する。
-
-
filledPullIntos の各 filledPullInto について、
-
! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto) を実行する。
-
-
-
アサーション: pullIntoDescriptor の bytes filled + bytesWritten ≤ pullIntoDescriptor の byte length。
-
! ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesWritten, pullIntoDescriptor) を実行する。
-
pullIntoDescriptor の reader type が "
none
" の場合、-
? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor) を実行する。
-
filledPullIntos に ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller) の結果を格納する。
-
filledPullIntos の各 filledPullInto について、
-
! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto) を実行する。
-
-
return する。
-
-
pullIntoDescriptor の bytes filled が pullIntoDescriptor の minimum fill より小さい場合、return する。
read()
リクエスト用ディスクリプタがまだ最小長まで埋まっていない場合は、キューの先頭に残り、下層ソースが引き続き埋めることができる。 -
! ReadableByteStreamControllerShiftPendingPullInto(controller) を実行する。
-
remainderSize に pullIntoDescriptor の bytes filled を pullIntoDescriptor の element size で割った余りを格納する。
-
remainderSize > 0 の場合、
-
end に pullIntoDescriptor の byte offset + pullIntoDescriptor の bytes filled を格納する。
-
? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller, pullIntoDescriptor の buffer, end − remainderSize, remainderSize) を実行する。
-
-
pullIntoDescriptor の bytes filled を pullIntoDescriptor の bytes filled − remainderSize に設定する。
-
filledPullIntos に ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller) の結果を格納する。
-
! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor) を実行する。
-
filledPullIntos の各 filledPullInto について、
-
! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto) を実行する。
-
-
firstDescriptor に controller.[[pendingPullIntos]][0] を格納する。
-
アサーション: ! CanTransferArrayBuffer(firstDescriptor の buffer) は true。
-
! ReadableByteStreamControllerInvalidateBYOBRequest(controller) を実行する。
-
state に controller.[[stream]].[[state]] を格納する。
-
state が "
closed
" の場合、-
アサーション: bytesWritten は 0。
-
! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor) を実行する。
-
-
それ以外の場合、
-
アサーション: state は "
readable
"。 -
アサーション: bytesWritten > 0。
-
? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor) を実行する。
-
-
! ReadableByteStreamControllerCallPullIfNeeded(controller) を実行する。
-
アサーション: controller.[[pendingPullIntos]] が 空でない。
-
アサーション: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) は false。
-
firstDescriptor に controller.[[pendingPullIntos]][0] を格納する。
-
state に controller.[[stream]].[[state]] を格納する。
-
state が "
closed
" の場合、-
view.[[ByteLength]] が 0 でない場合、
TypeError
例外をthrowする。
-
-
それ以外の場合、
-
アサーション: state は "
readable
"。 -
view.[[ByteLength]] が 0 の場合、
TypeError
例外をthrowする。
-
-
firstDescriptor の byte offset + firstDescriptor の bytes filled が view.[[ByteOffset]] でない場合、
RangeError
例外をthrowする。 -
firstDescriptor の buffer byte length が view.[[ViewedArrayBuffer]].[[ByteLength]] でない場合、
RangeError
例外をthrowする。 -
firstDescriptor の bytes filled + view.[[ByteLength]] > firstDescriptor の byte length の場合、
RangeError
例外をthrowする。 -
viewByteLength に view.[[ByteLength]] を格納する。
-
firstDescriptor の buffer を TransferArrayBuffer(view.[[ViewedArrayBuffer]]) の結果に設定する。
-
? ReadableByteStreamControllerRespondInternal(controller, viewByteLength) を実行する。
-
アサーション: controller.[[byobRequest]] は null。
-
descriptor に controller.[[pendingPullIntos]][0] を格納する。
-
descriptor を controller.[[pendingPullIntos]] から削除する。
-
descriptor を返す。
-
stream に controller.[[stream]] を格納する。
-
stream.[[state]] が "
readable
" でない場合、false を返す。 -
controller.[[closeRequested]] が true の場合、 false を返す。
-
controller.[[started]] が false の場合、false を返す。
-
! ReadableStreamHasDefaultReader(stream) が true かつ ReadableStreamGetNumReadRequests(stream) > 0 の場合、true を返す。
-
! ReadableStreamHasBYOBReader(stream) が true かつ ReadableStreamGetNumReadIntoRequests(stream) > 0 の場合、true を返す。
-
desiredSize に ! ReadableByteStreamControllerGetDesiredSize(controller) の結果を格納する。
-
アサーション: desiredSize は null でない。
-
desiredSize > 0 の場合、true を返す。
-
false を返す。
-
アサーション: stream.[[controller]] は未定義である。
-
autoAllocateChunkSize が未定義でない場合、
-
アサーション: ! IsInteger(autoAllocateChunkSize) は true である。
-
アサーション: autoAllocateChunkSize は正である。
-
-
controller.[[stream]] を stream に設定する。
-
controller.[[pullAgain]] および controller.[[pulling]] を false に設定する。
-
controller.[[byobRequest]] を null に設定する。
-
! ResetQueue(controller) を実行する。
-
controller.[[closeRequested]] および controller.[[started]] を false に設定する。
-
controller.[[strategyHWM]] を highWaterMark に設定する。
-
controller.[[pullAlgorithm]] を pullAlgorithm に設定する。
-
controller.[[cancelAlgorithm]] を cancelAlgorithm に設定する。
-
controller.[[autoAllocateChunkSize]] を autoAllocateChunkSize に設定する。
-
controller.[[pendingPullIntos]] を新しい空の リスト に設定する。
-
stream.[[controller]] を controller に設定する。
-
startResult に startAlgorithm を実行した結果を格納する。
-
startPromise に a promise resolved with startResult を格納する。
-
startPromise が fulfill されたとき、
-
controller.[[started]] を true に設定する。
-
アサーション: controller.[[pulling]] は false。
-
アサーション: controller.[[pullAgain]] は false。
-
! ReadableByteStreamControllerCallPullIfNeeded(controller) を実行する。
-
-
startPromise が reason r で reject されたとき、
-
! ReadableByteStreamControllerError(controller, r) を実行する。
-
-
controller に 新しい
ReadableByteStreamController
を格納する。 -
startAlgorithm に undefined を返すアルゴリズムを格納する。
-
pullAlgorithm に a promise resolved with undefined を返すアルゴリズムを格納する。
-
cancelAlgorithm に a promise resolved with undefined を返すアルゴリズムを格納する。
-
underlyingSourceDict["
start
"] が存在する場合、 startAlgorithm を、invoking underlyingSourceDict["start
"] に引数リスト « controller » と callback this value underlyingSource を渡して実行した結果を返すアルゴリズムに設定する。 -
underlyingSourceDict["
pull
"] が存在する場合、 pullAlgorithm を invoking underlyingSourceDict["pull
"] に引数リスト « controller » と callback this value underlyingSource を渡して実行した結果を返すアルゴリズムに設定する。 -
underlyingSourceDict["
cancel
"] が存在する場合、 cancelAlgorithm を、引数 reason を受け取り invoking underlyingSourceDict["cancel
"] に引数リスト « reason » と callback this value underlyingSource を渡して実行した結果を返すアルゴリズムに設定する。 -
autoAllocateChunkSize に underlyingSourceDict["
autoAllocateChunkSize
"] が 存在する場合その値、存在しない場合は undefined を格納する。 -
autoAllocateChunkSize が 0 の場合、
TypeError
例外をthrowする。 -
? SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize) を実行する。
5. Writableストリーム
5.1. Writableストリームの利用方法
readableStream. pipeTo( writableStream) . then(() => console. log( "すべてのデータが正常に書き込まれました!" )) . catch ( e=> console. error( "何か問題が発生しました!" , e));
write()
やclose()
メソッドを使うことで、Writableストリームに直接書き込むこともできます。Writableストリームは受けた書き込みをキューに入れ、内部で順にunderlying
sinkへ転送してくれるため、特別な手順無しで書き込みできます。
function writeArrayToStream( array, writableStream) { const writer= writableStream. getWriter(); array. forEach( chunk=> writer. write( chunk). catch (() => {})); return writer. close(); } writeArrayToStream([ 1 , 2 , 3 , 4 , 5 ], writableStream) . then(() => console. log( "完了しました!" )) . catch ( e=> console. error( "ストリームでエラー: " + e));
.catch(() => {})
でwrite()
の拒否を握りつぶしている点に注目してください。致命的なエラーはclose()
の拒否で通知されるため、未処理のままにするとunhandledrejection
イベントや警告が出てしまいます。
close()
メソッドのPromiseで確認していました。このPromiseは、ストリームの初期化・書き込み・クローズのいずれかで問題が発生した場合にrejectされ、正常に閉じられたときにfulfillされます。多くの場合これだけで十分です。
しかし、特定のチャンクの書き込み成功を知りたい場合は、writerのwrite()
メソッドが返すPromiseを使えます:
writer. write( "私はデータのチャンクです" ) . then(() => console. log( "チャンクの書き込みに成功!" )) . catch ( e=> console. error( e));
「成功」の定義はストリーム(正確にはunderlying sink)ごとに異なります。例えばファイルストリームならOSが書き込みを受け付けた時点で成功とみなされ、必ずしもディスクに書き込まれたとは限りません。即座にPromiseが解決されるストリームもあります。
desiredSize
やready
プロパティを使うことで、Writableストリームwriterはプロデューサがストリームからのフロー制御信号により細かく対応でき、メモリ使用量を高水位マーク以下に保てます。以下の例は、desiredSize
で生成バイト数を決め、ready
でバックプレッシャーが解消するのを待ちながら、無限にランダムバイトを書き込みます。
async function writeRandomBytesForever( writableStream) { const writer= writableStream. getWriter(); while ( true ) { await writer. ready; const bytes= new Uint8Array( writer. desiredSize); crypto. getRandomValues( bytes); // 敢えてawaitしない; writer.readyのawaitだけで十分です。 writer. write( bytes). catch (() => {}); } } writeRandomBytesForever( myWritableStream). catch ( e=> console. error( "何かが壊れました" , e));
write()
のPromiseはawaitせず、ready
のawaitだけで十分です。またひとつ前の例と同様、.catch(() => {})
パターンでwrite()
のPromiseを処理しています。この場合、失敗はready
のawaitで通知されます。
write()
が返すpromiseをawait
するのがなぜ悪いアイデアなのか、さらに強調するために、上記の例を修正してみましょう。この場合でもWritableStreamDefaultWriter
インターフェースを直接使い続けますが、その時点で書き込むバイト数を自分で制御しない場合です。その場合、バックプレッシャーを尊重するコードは同じようになります:
async function writeSuppliedBytesForever( writableStream, getBytes) { const writer= writableStream. getWriter(); while ( true ) { await writer. ready; const bytes= getBytes(); writer. write( bytes). catch (() => {}); } }
前の例では毎回writer.desiredSize
バイトを書き込んでいたため、write()
とready
のPromiseが同期的に動いていましたが、この例ではready
のPromiseがwrite()
より先に解決される場合もあります。ready
のPromiseはdesired
sizeが正になった時に解決されますが、これは書き込みの成功よりも前の場合があります(特に高水位マークが大きい場合)。
つまり、write()
の戻り値をawaitすると、ストリームの内部キューに書き込みをためることができず、前回の書き込み成功後に次の書き込みを実行することになるため、スループットが低下します。
5.2. WritableStream
クラス
WritableStream
はWritableストリームを表します。
5.2.1. インターフェース定義
WritableStream
クラスのWeb IDL定義は以下の通りです:
[Exposed=*,Transferable ]interface {
WritableStream constructor (optional object ,
underlyingSink optional QueuingStrategy = {});
strategy readonly attribute boolean locked ;Promise <undefined >abort (optional any );
reason Promise <undefined >close ();WritableStreamDefaultWriter getWriter (); };
5.2.2. 内部スロット
WritableStream
のインスタンスは、次の表に記載された内部スロットとともに作成されます:
内部スロット | 説明(非規範的) |
---|---|
[[backpressure]] | コントローラーによって設定されたバックプレッシャー信号を示すブール値 |
[[closeRequest]] | writerのclose()
メソッドから返されるPromise
|
[[controller]] | このストリームの状態とキューを制御するために作成されたWritableStreamDefaultController
|
[[Detached]] | ストリームが転送されたときにtrueとなるブールフラグ |
[[inFlightWriteRequest]] | 現在実行中の書き込み操作のPromiseが設定されるスロット。underlying sinkの書き込みアルゴリズムが実行中で未完了の場合に使用され、リエントラント呼び出しを防ぐ |
[[inFlightCloseRequest]] | 現在実行中のクローズ操作のPromiseが設定されるスロット。underlying
sinkのクローズアルゴリズムが実行中で未完了の場合に使用され、abort()
メソッドがクローズを中断するのを防ぐ
|
[[pendingAbortRequest]] | 保留中の中止リクエスト |
[[state]] |
ストリームの現在の状態を示す文字列(内部用途);"writable "、"closed "、"erroring "、"errored "のいずれか
|
[[storedError]] | ストリームが失敗した際の値。"errored "状態でストリームを操作しようとしたときの失敗理由や例外として渡される
|
[[writer]] | ストリームがwriterにロックされている場合はWritableStreamDefaultWriter
インスタンス。ロックされていない場合はundefined
|
[[writeRequests]] | リスト。underlying sinkでまだ処理されていないストリームの内部書き込みリクエストキューを表すPromiseの一覧 |
[[inFlightCloseRequest]]スロットと [[closeRequest]]スロットは排他的です。同様に、[[inFlightWriteRequest]]がundefinedでない間は、[[writeRequests]]から要素は削除されません。実装はこれらの不変条件に基づきストレージを最適化できます。
保留中の中止リクエストは、ストリームの中止リクエストが最終的に処理されるまで追跡するための構造体であり、次の項目を持ちます:
- promise
-
WritableStreamAbortから返されるPromise
- reason
-
WritableStreamAbortに中止理由として渡されたJavaScript値
- was already erroring
-
ストリームが
erroring
状態でWritableStreamAbortが呼び出されたかどうかを示すブール値であり、中止リクエストの結果に影響します
5.2.3. underlying sink API
WritableStream()
のコンストラクターは、最初の引数としてunderlying
sinkを表すJavaScriptオブジェクトを受け取ります。これらのオブジェクトには次のいずれかのプロパティを含めることができます:
dictionary {
UnderlyingSink UnderlyingSinkStartCallback start ;UnderlyingSinkWriteCallback write ;UnderlyingSinkCloseCallback close ;UnderlyingSinkAbortCallback abort ;any type ; };callback =
UnderlyingSinkStartCallback any (WritableStreamDefaultController );
controller callback =
UnderlyingSinkWriteCallback Promise <undefined > (any ,
chunk WritableStreamDefaultController );
controller callback =
UnderlyingSinkCloseCallback Promise <undefined > ();callback =
UnderlyingSinkAbortCallback Promise <undefined > (optional any );
reason
start(controller)
, 型 UnderlyingSinkStartCallback-
この関数は
WritableStream
の作成時に直ちに呼び出されます。通常、これは表現されているunderlying sinkリソースへのアクセスを取得するために利用されます。
セットアップ処理が非同期の場合、promiseを返すことで成功・失敗を通知できます。promiseがrejectされるとストリームはエラーとなります。例外が投げられた場合は
WritableStream()
コンストラクターによって再スローされます。 write(chunk, controller)
, 型 UnderlyingSinkWriteCallback-
この関数は新しいチャンクのデータをunderlying sinkへ書き込む準備ができた時に呼び出されます。ストリームの実装はこの関数が前回の書き込みが成功した後のみ呼ばれること、および
start()
が成功する前や、close()
・abort()
が呼ばれた後には呼ばれないことを保証します。この関数はunderlying sinkが提示するリソースへデータを送るために使われます。たとえば下位APIを呼び出すなどです。
データの書き込みが非同期で、成功・失敗の通知ができる場合は、promiseを返すことで通知できます。このpromiseの返り値は
writer.write()
の呼び出し元に伝えられ、個々の書き込みを監視できます。例外を投げることはrejectされたpromiseを返すことと同じ扱いです。こうした通知が常に利用できるとは限りません。例:§ 10.6 バックプレッシャーや成功通知のないWritableストリームと§ 10.7 バックプレッシャーと成功通知のあるWritableストリームを比較してください。その場合、何も返さないのが最善です。
この関数が返す可能性があるpromiseは、指定したチャンクがストリーム内部キューを満たすための希望サイズの計算上、書き込まれたとみなされるかどうかにも影響します。つまり、promiseが解決するまでの間、
writer.desiredSize
の値は変化せず、書き込みが成功した後にのみチャンク追加の要求が出されます。さらに、この関数が返すpromiseは良い振る舞いをするプロデューサが、チャンクが完全に処理されるまでミューテートしないよう保証するためにも使われます(これは仕様上保証されるものではなく、プロデューサとunderlying sink間の非公式な契約です)。
close()
, 型 UnderlyingSinkCloseCallback-
この関数はプロデューサが
writer.close()
を通じて書き込み完了を通知し、キュー内の書き込みが全て成功した後に呼び出されます。この関数は、underlying sinkへの書き込みの完了・フラッシュのための処理や、保持していたリソースの解放などを行うことができます。
シャットダウン処理が非同期の場合、promiseを返すことで成功・失敗を通知できます。結果は
writer.close()
の返り値となり、promiseがrejectされた場合はストリームがエラーとなり、正常に閉じられません。例外を投げることはrejectされたpromiseを返すことと同じ扱いです。 abort(reason)
, 型 UnderlyingSinkAbortCallback-
この関数はプロデューサが
stream.abort()
またはwriter.abort()
を通じてストリームの中止を希望したときに呼び出されます。引数にはこれらのメソッドに渡された値が使われます。Writableストリームは、パイピング中の特定条件下でも中止されることがあります。詳細は
pipeTo()
メソッドの定義を参照してください。この関数は、保持していたリソースのクリーンアップ処理を
close()
と同様に行えますが、独自の処理も可能です。シャットダウン処理が非同期の場合、promiseを返すことで成功・失敗を通知できます。結果は
writer.abort()
の返り値となります。例外を投げることはrejectされたpromiseを返すことと同じ扱いです。いずれにせよストリームは新しいTypeError
によって中止されたことが示され、エラーとなります。 type
, 型 any-
このプロパティは将来の利用を目的として予約されており、値を指定しようとすると例外が投げられます。
start()
やwrite()
に渡されるcontroller
引数はWritableStreamDefaultController
のインスタンスであり、ストリームをエラーにする能力を持ちます。これは主にpromiseベースでないAPIとの橋渡しに使われます。例:§ 10.6 バックプレッシャーや成功通知のないWritableストリームなど。
5.2.4. コンストラクター・メソッド・プロパティ
stream = new
WritableStream
(underlyingSink[, strategy)-
指定したunderlying sinkをラップする新しい
WritableStream
を作成します。underlyingSink引数の詳細は§ 5.2.3 underlying sink APIを参照してください。strategy引数はストリームのキュー戦略を表します。詳細は§ 7.1 キュー戦略APIを参照。指定しない場合は、
CountQueuingStrategy
(高水位マークが1)の動作になります。 isLocked = stream.
locked
-
Writableストリームがwriterにロックされているかどうかを返します。
await stream.
abort
([ reason ])-
ストリームを中止します。これによりプロデューサはこれ以上正常に書き込みできなくなり、ストリームは即座にエラー状態となり、キューに入っていた書き込みも破棄されます。underlying sinkの中止処理も実行されます。
返されるpromiseは、ストリームのシャットダウンが正常に完了すればfulfillし、 underlying sinkからエラーが通知された場合はrejectされます。また、ストリームが現在ロックされている場合は
TypeError
でrejectされます(ストリームのキャンセルは試みません)。 await stream.
close
()-
ストリームを閉じます。underlying sinkは、これまでに書き込まれたチャンクの処理が完了した後、クローズ処理を呼び出します。この間、追加の書き込みは失敗します(ストリームがエラー状態になることはありません)。
このメソッドは、残りのチャンクが全て正常に書き込まれ、ストリームが正常に閉じられればfulfillするpromiseを返し、処理中にエラーが発生した場合はrejectします。また、ストリームが現在ロックされている場合は
TypeError
でrejectされます(ストリームのキャンセルは試みません)。 writer = stream.
getWriter
()-
writer(
WritableStreamDefaultWriter
のインスタンス)を作成し、ストリームを新しいwriterにロックします。ストリームがロックされている間は他のwriterを取得できません。このwriterが解放されるまで独占的に利用されます。この機能は、割り込みやインターリーブなしにストリームに書き込みしたい抽象化を作る際に特に有用です。writerを取得すれば、他の誰かが同時にストリームへ書き込むことを防ぎ、書き込まれるデータの不確定性や破損を防げます。
new WritableStream(underlyingSink, strategy)
コンストラクターの手順は以下の通りです:
-
underlyingSinkが省略された場合、nullに設定する。
-
underlyingSinkDictをunderlyingSinkのIDL値への変換結果(型
UnderlyingSink
)とする。引数underlyingSinkを型
UnderlyingSink
として宣言できないのは、元のオブジェクトへの参照を保持する必要があるためです。これにより、各メソッドの呼び出しで元のオブジェクトを使えます。 -
underlyingSinkDict["
type
"] が存在する場合、RangeError
例外を投げる。これは将来的な型追加を後方互換性の懸念なしで行えるようにするためです。
-
! InitializeWritableStream(this)を実行する。
-
sizeAlgorithmを! ExtractSizeAlgorithm(strategy)の結果とする。
-
highWaterMarkを? ExtractHighWaterMark(strategy, 1)の結果とする。
-
? SetUpWritableStreamDefaultControllerFromUnderlyingSink(this, underlyingSink, underlyingSinkDict, highWaterMark, sizeAlgorithm)を実行する。
locked
ゲッターの手順は以下の通りです:
-
! IsWritableStreamLocked(this)を返す。
abort(reason)
メソッドの手順は以下の通りです:
-
! IsWritableStreamLocked(this)がtrueなら、TypeErrorでrejectされたpromiseを返す。
-
! WritableStreamAbort(this, reason)を返す。
close()
メソッドの手順は以下の通りです:
-
! IsWritableStreamLocked(this)がtrueなら、TypeErrorでrejectされたpromiseを返す。
-
! WritableStreamCloseQueuedOrInFlight(this)がtrueなら、TypeErrorでrejectされたpromiseを返す。
-
! WritableStreamClose(this)を返す。
getWriter()
メソッドの手順は以下の通りです:
5.2.5. postMessage()
による転送
destination.postMessage(ws, { transfer: [ws] });
-
WritableStream
を他のフレーム・ウィンドウ・ワーカーへ送信します。転送されたストリームは元のストリームと同様に利用できます。元のストリームはロックされ、直接操作できなくなります。
WritableStream
オブジェクトは転送可能オブジェクトです。転送手順(valueおよびdataHolder)は以下の通りです:
-
! IsWritableStreamLocked(value)がtrueなら、"
DataCloneError
"DOMException
を投げる。 -
port1を新規
MessagePort
(現在のRealm内)とする。 -
port2を新規
MessagePort
(現在のRealm内)とする。 -
port1とport2をエンタングルする。
-
readableを新規
ReadableStream
(現在のRealm内)とする。 -
! SetUpCrossRealmTransformReadable(readable, port1)を実行する。
-
promiseを! ReadableStreamPipeTo(readable, value, false, false, false)とする。
-
promise.[[PromiseIsHandled]]をtrueに設定する。
-
dataHolder.[[port]]を! StructuredSerializeWithTransfer(port2, « port2 »)に設定する。
-
deserializedRecordを! StructuredDeserializeWithTransfer(dataHolder.[[port]], 現在のRealm)とする。
-
portをdeserializedRecord.[[Deserialized]]とする。
-
! SetUpCrossRealmTransformWritable(value, port)を実行する。
5.3. WritableStreamDefaultWriter
クラス
WritableStreamDefaultWriter
クラスはWritableストリームwriterを表し、WritableStream
インスタンスから提供されるよう設計されています。
5.3.1. インターフェース定義
WritableStreamDefaultWriter
クラスのWeb IDL定義は以下の通りです:
[Exposed=*]interface {
WritableStreamDefaultWriter constructor (WritableStream );
stream readonly attribute Promise <undefined >closed ;readonly attribute unrestricted double ?desiredSize ;readonly attribute Promise <undefined >ready ;Promise <undefined >abort (optional any );
reason Promise <undefined >close ();undefined releaseLock ();Promise <undefined >write (optional any ); };
chunk
5.3.2. 内部スロット
WritableStreamDefaultWriter
のインスタンスは、以下の表に記載された内部スロットとともに生成されます:
内部スロット | 説明(非規範的) |
---|---|
[[closedPromise]] | writerのclosed
ゲッターから返されるPromise
|
[[readyPromise]] | writerのready
ゲッターから返されるPromise
|
[[stream]] | このreaderを所有するWritableStream
インスタンス
|
5.3.3. コンストラクター・メソッド・プロパティ
writer = new
WritableStreamDefaultWriter
(stream)-
これは
stream.
を呼ぶのと同等です。getWriter()
await writer.
closed
-
ストリームが閉じられるとfulfilledとなり、ストリームがエラーになるかwriterのロックが解放される前にストリームのクローズが完了しなかった場合はrejectedとなるpromiseを返します。
desiredSize = writer.
desiredSize
-
ストリームの内部キューを埋めるためのdesired sizeを返します。キューが満杯の場合は負の値になることがあります。プロデューサはこの値を使って最適な書き込み量を決定できます。
ストリームがエラー状態、または中止キュー済みの場合はnullとなります。ストリームが閉じている場合は0を返します。また、writerのロックが解放されている場合は例外を投げます。
await writer.
ready
-
ストリーム内部キューのdesired sizeが非正から正に遷移し、バックプレッシャーが解除されたことを示すときfulfilledとなるpromiseを返します。desired sizeが再び0以下になると、新しいpendingのpromiseを返します。
ストリームがエラーまたは中止された場合、writerのロックが解放された場合はpromiseがrejectedになります。
await writer.
abort
([ reason ])await writer.
close
()writer.
releaseLock
()-
writerのストリームへのロックを解放します。ロック解放後、writerはアクティブでなくなります。関連ストリームがエラー状態で解放された場合、writerは今後も同様にエラー状態を示し、それ以外の場合は閉じられた状態を示します。
ロックは、以前の
write()
呼び出しのpromiseが未解決でも解放できる点に注意してください。書き込みの間ずっとロックを維持する必要はなく、ロックは他のプロデューサによるインターリーブ書き込みを防ぐ役割です。 await writer.
write
(chunk)-
与えられたチャンクをWritableストリームに書き込みます。前回の書き込みが完了するまで待ってから、underlying sinkの
write()
メソッドへ渡します。成功時はundefinedでfulfilledとなるpromiseを返し、失敗や書き込み前にストリームがエラー状態になった場合はrejectedとなります。「成功」の定義はunderlying sinkによって異なり、チャンクが受理された時点で成功とされる場合もありますが、必ずしも最終保存先に安全に保存されたとは限りません。
chunkがミュータブルな場合、プロデューサは
write()
に渡した後、promiseが解決されるまでミューテートしないよう推奨されます。これにより、underlying sinkが渡された値そのものを受け取り処理できることが保証されます。
new WritableStreamDefaultWriter(stream)
コンストラクターの手順は以下の通りです:
-
? SetUpWritableStreamDefaultWriter(this, stream)を実行する。
closed
ゲッターの手順は以下の通りです:
desiredSize
ゲッターの手順は以下の通りです:
-
this.[[stream]]がundefinedなら、
TypeError
例外を投げる。
ready
ゲッターの手順は以下の通りです:
-
this.[[readyPromise]]を返す。
abort(reason)
メソッドの手順は以下の通りです:
-
this.[[stream]]がundefinedなら、TypeErrorでrejectされたpromiseを返す。
-
! WritableStreamDefaultWriterAbort(this, reason)を返す。
close()
メソッドの手順は以下の通りです:
-
streamをthis.[[stream]]とする。
-
streamがundefinedなら、TypeErrorでrejectされたpromiseを返す。
-
! WritableStreamCloseQueuedOrInFlight(stream) がtrueなら、TypeErrorでrejectされたpromiseを返す。
releaseLock()
メソッドの手順は以下の通りです:
-
streamをthis.[[stream]]とする。
-
streamがundefinedなら、return。
-
断言: stream.[[writer]]はundefinedではない。
-
! WritableStreamDefaultWriterRelease(this)を実行する。
write(chunk)
メソッドの手順は以下の通りです:
-
this.[[stream]]がundefinedなら、TypeErrorでrejectされたpromiseを返す。
-
! WritableStreamDefaultWriterWrite(this, chunk)を返す。
5.4.
WritableStreamDefaultController
クラス
WritableStreamDefaultController
クラスにはWritableStream
の状態を制御するためのメソッドがあります。WritableStream
の構築時、underlying
sinkに対応するWritableStreamDefaultController
インスタンスが渡され、操作できるようになります。
5.4.1. インターフェース定義
WritableStreamDefaultController
クラスのWeb IDL定義は以下の通りです:
[Exposed=*]interface {
WritableStreamDefaultController readonly attribute AbortSignal signal ;undefined error (optional any ); };
e
5.4.2. 内部スロット
WritableStreamDefaultController
のインスタンスは、以下の表に記載された内部スロットとともに生成されます:
内部スロット | 説明(非規範的) |
---|---|
[[abortAlgorithm]] | 1つの引数(中止理由)を受け取り、underlying sinkに中止要求を伝えるPromiseを返すアルゴリズム |
[[abortController]] | AbortController
。ストリームが中止された時、保留中の書き込みやクローズ操作を中止するために使える
|
[[closeAlgorithm]] | underlying sinkにクローズ要求を伝えるPromiseを返すアルゴリズム |
[[queue]] | リスト。ストリーム内部キューに格納されたチャンクを表す |
[[queueTotalSize]] | [[queue]]に格納されたすべてのチャンクの合計サイズ(§ 8.1 サイズ付きキュー参照) |
[[started]] | underlying sinkが開始処理を完了したかを示すブール値 |
[[strategyHWM]] | ストリーム作成者がキュー戦略の一部として指定した数値。ストリームがバックプレッシャーをunderlying sinkに適用する閾値 |
[[strategySizeAlgorithm]] | キューに入れるチャンクのサイズを計算するアルゴリズム。ストリームのキュー戦略の一部 |
[[stream]] | 制御対象のWritableStream インスタンス
|
[[writeAlgorithm]] | 1つの引数(書き込むチャンク)を受け取り、underlying sinkへデータを書き込むPromiseを返すアルゴリズム |
クローズ・センチネルは [[queue]]にチャンクの代わりとしてエンキューされる一意な値で、ストリームが閉じたことを示します。これは内部用途でのみ使われ、Web開発者に公開されることはありません。
5.4.3. メソッド・プロパティ
controller.
signal
-
ストリームが中止された時、保留中の書き込みやクローズ操作を中止するために使えるAbortSignal。
controller.
error
(e)-
制御対象のWritableストリームを閉じ、今後の操作がすべて指定したエラーeで失敗するようにします。
このメソッドはほとんど使われません。通常はunderlying sinkのいずれかのメソッドからrejected promiseを返すだけで十分です。ただし、underlying sinkの通常のライフサイクル外イベントで、ストリームを突然シャットダウンする必要がある場合などに便利です。
signal
ゲッターの手順は以下の通りです:
error(e)
メソッドの手順は以下の通りです:
-
stateをthis.[[stream]].[[state]]とする。
-
stateが"
writable
"でない場合、return。 -
! WritableStreamDefaultControllerError(this, e)を実行する。
5.4.4. 内部メソッド
以下は各WritableStreamDefaultController
インスタンスで実装される内部メソッドです。
Writableストリームの実装はこれらを呼び出します。
これらがメソッド形式になっている理由は、抽象演算としてではなく、Writableストリームの実装がコントローラー実装から分離されていることを示すためです。将来的には、これらの内部メソッドを実装する他のコントローラーで拡張可能となります。Readableストリームでも同様のシナリオが存在します(§ 4.9.2 コントローラーとのインターフェース参照)。実際複数のコントローラー型があり、対応する内部メソッドはポリモーフィックに使用されています。
-
resultを this.[[abortAlgorithm]]をreasonで実行した結果とする。
-
! WritableStreamDefaultControllerClearAlgorithms(this)を実行する。
-
resultを返す。
-
! ResetQueue(this)を実行する。
5.5. 抽象演算
5.5.1. Writableストリームの操作
以下の抽象演算は、より高いレベルでWritableStream
インスタンスを操作します。
-
writerを新規
WritableStreamDefaultWriter
とする。 -
? SetUpWritableStreamDefaultWriter(writer, stream)を実行する。
-
writerを返す。
-
断言: ! IsNonNegativeNumber(highWaterMark)がtrueである。
-
streamを新規
WritableStream
とする。 -
! InitializeWritableStream(stream)を実行する。
-
controllerを新規
WritableStreamDefaultController
とする。 -
? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm)を実行する。
-
streamを返す。
この抽象演算は、startAlgorithmがthrowした場合のみ例外を投げます。
-
stream.[[state]]を"
writable
"に設定する。 -
stream.[[storedError]]、stream.[[writer]]、 stream.[[controller]]、 stream.[[inFlightWriteRequest]]、 stream.[[closeRequest]]、 stream.[[inFlightCloseRequest]]、 stream.[[pendingAbortRequest]]をundefinedに設定する。
-
stream.[[writeRequests]]を新しい空のリストに設定する。
-
stream.[[backpressure]]をfalseに設定する。
-
stream.[[writer]]がundefinedなら、falseを返す。
-
trueを返す。
-
! IsWritableStreamLocked(stream)がtrueなら、
TypeError
例外を投げる。 -
writer.[[stream]]をstreamに設定する。
-
stream.[[writer]]をwriterに設定する。
-
stateをstream.[[state]]とする。
-
stateが"
writable
"の場合、-
! WritableStreamCloseQueuedOrInFlight(stream) がfalseかつ stream.[[backpressure]]がtrueなら、writer.[[readyPromise]]を新しいpromiseに設定する。
-
それ以外の場合、writer.[[readyPromise]]をundefinedで解決されたpromiseに設定する。
-
writer.[[closedPromise]]を新しいpromiseに設定する。
-
-
それ以外でstateが"
erroring
"の場合、-
writer.[[readyPromise]]をpromise rejected with stream.[[storedError]]に設定する。
-
writer.[[readyPromise]].[[PromiseIsHandled]] をtrueに設定する。
-
writer.[[closedPromise]]を新しいpromiseに設定する。
-
-
それ以外でstateが"
closed
"の場合、-
writer.[[readyPromise]]をundefinedで解決されたpromiseに設定する。
-
writer.[[closedPromise]]をundefinedで解決されたpromiseに設定する。
-
-
それ以外の場合、
-
断言: stateは"
errored
"である。 -
storedErrorをstream.[[storedError]]とする。
-
writer.[[readyPromise]]をpromise rejected with storedErrorに設定する。
-
writer.[[readyPromise]].[[PromiseIsHandled]] をtrueに設定する。
-
writer.[[closedPromise]]をpromise rejected with storedErrorに設定する。
-
writer.[[closedPromise]].[[PromiseIsHandled]] をtrueに設定する。
-
-
stream.[[state]]が"
closed
"または"errored
"の場合、 undefinedで解決されたpromiseを返す。 -
abortシグナルを stream.[[controller]].[[abortController]] に対してreasonで発行する。
-
stateをstream.[[state]]とする。
-
stateが"
closed
"または"errored
"の場合、undefinedで解決されたpromiseを返す。状態を再確認する理由は、abortシグナルが著者コードを実行し、状態が変化する可能性があるためです。
-
stream.[[pendingAbortRequest]]がundefinedでない場合、stream.[[pendingAbortRequest]]のpromiseを返す。
-
断言: stateは"
writable
"または"erroring
"である。 -
wasAlreadyErroringをfalseに設定する。
-
stateが"
erroring
"の場合、-
wasAlreadyErroringをtrueに設定する。
-
reasonをundefinedに設定する。
-
-
promiseを新しいpromiseとする。
-
stream.[[pendingAbortRequest]]を新しいpending abort request(promiseがpromise、reasonがreason、was already erroringがwasAlreadyErroring)に設定する。
-
wasAlreadyErroringがfalseなら、! WritableStreamStartErroring(stream, reason)を実行する。
-
promiseを返す。
-
stateをstream.[[state]]とする。
-
stateが"
closed
"または"errored
"の場合、TypeErrorでrejectされたpromiseを返す。 -
断言: stateは"
writable
"または"erroring
"である。 -
断言: ! WritableStreamCloseQueuedOrInFlight(stream) がfalseである。
-
promiseを新しいpromiseとする。
-
stream.[[closeRequest]]をpromiseに設定する。
-
writerをstream.[[writer]]とする。
-
writerがundefinedでなく、かつstream.[[backpressure]]がtrueであり、stateが"
writable
"なら、resolve writer.[[readyPromise]] をundefinedで解決する。 -
! WritableStreamDefaultControllerClose(stream.[[controller]])を実行する。
-
promiseを返す。
5.5.2. コントローラーとのインターフェース
将来的に異なるWritableストリームの挙動(デフォルトのReadableストリームとReadableバイトストリームの違いのような)を追加できるよう、Writableストリームの内部状態の多くはWritableStreamDefaultController
クラスによってカプセル化されています。
各コントローラークラスは内部メソッドを2つ定義しており、WritableStream
のアルゴリズムから呼び出されます:
- [[AbortSteps]](reason)
- ストリームが中止された際に実行されるコントローラーの手順で、コントローラー内の状態をクリーンアップし、underlying sinkへ通知します。
- [[ErrorSteps]]()
- ストリームがエラー状態になった際に実行されるコントローラーの手順で、コントローラー内の状態をクリーンアップします。
(これらは抽象演算ではなく内部メソッドとして定義されています。これは、WritableStream
のアルゴリズムがコントローラーの型に依存せずポリモーフィックに呼び出せるようにするためです。現時点ではWritableStreamDefaultController
しか存在しませんが、理論的には他のコントローラー型でも同様です。)
このセクションの残りは逆方向の抽象演算についてです。コントローラーの実装が対応するWritableStream
オブジェクトに影響を与えるために使います。
これはコントローラーの内部状態変更を、WritableStream
の公開APIを通じて開発者に見える結果に変換します。
-
断言: ! IsWritableStreamLocked(stream)がtrueである。
-
断言: stream.[[state]]が"
writable
"である。 -
promiseを新しいpromiseとする。
-
Append promiseをstream.[[writeRequests]]に追加する。
-
promiseを返す。
-
stream.[[closeRequest]]がundefined、かつ stream.[[inFlightCloseRequest]]がundefinedなら、falseを返す。
-
trueを返す。
-
stateをstream.[[state]]とする。
-
stateが"
writable
"の場合、-
! WritableStreamStartErroring(stream, error)を実行する。
-
return。
-
-
断言: stateは"
erroring
"である。 -
! WritableStreamFinishErroring(stream)を実行する。
-
断言: stream.[[state]]は"
erroring
"である。 -
断言: ! WritableStreamHasOperationMarkedInFlight(stream) がfalseである。
-
stream.[[state]]を"
errored
"に設定する。 -
! stream.[[controller]].[[ErrorSteps]]()を実行する。
-
storedErrorをstream.[[storedError]]とする。
-
For each writeRequest of stream.[[writeRequests]]について:
-
Reject writeRequestをstoredErrorでrejectする。
-
-
stream.[[writeRequests]]を空のリストに設定する。
-
stream.[[pendingAbortRequest]]がundefinedの場合、
-
! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream)を実行する。
-
return。
-
-
abortRequestをstream.[[pendingAbortRequest]]とする。
-
stream.[[pendingAbortRequest]]をundefinedに設定する。
-
abortRequestのwas already erroringがtrueの場合、
-
! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream)を実行する。
-
return。
-
promiseを! stream.[[controller]].[[AbortSteps]](abortRequestのreason)の結果とする。
-
Upon fulfillment of promiseについて、
-
! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream)を実行する。
-
Upon rejection of promise(理由reason)について、
-
! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream)を実行する。
-
断言: stream.[[inFlightCloseRequest]]はundefinedではない。
-
resolve stream.[[inFlightCloseRequest]]をundefinedで解決する。
-
stream.[[inFlightCloseRequest]]をundefinedに設定する。
-
stateをstream.[[state]]とする。
-
断言: stream.[[state]]は"
writable
"または"erroring
"である。 -
stateが"
erroring
"の場合、-
stream.[[storedError]]をundefinedに設定する。
-
stream.[[pendingAbortRequest]]がundefinedでない場合、
-
resolve stream.[[pendingAbortRequest]]のpromiseをundefinedで解決する。
-
stream.[[pendingAbortRequest]]をundefinedに設定する。
-
-
-
stream.[[state]]を"
closed
"に設定する。 -
writerをstream.[[writer]]とする。
-
writerがundefinedでない場合、resolve writer.[[closedPromise]]をundefinedで解決する。
-
断言: stream.[[pendingAbortRequest]]はundefinedである。
-
断言: stream.[[storedError]]はundefinedである。
-
断言: stream.[[inFlightCloseRequest]]はundefinedではない。
-
reject stream.[[inFlightCloseRequest]]をerrorでrejectする。
-
stream.[[inFlightCloseRequest]]をundefinedに設定する。
-
断言: stream.[[state]]は"
writable
"または"erroring
"である。 -
stream.[[pendingAbortRequest]]がundefinedでない場合、
-
reject stream.[[pendingAbortRequest]]のpromiseをerrorでrejectする。
-
stream.[[pendingAbortRequest]]をundefinedに設定する。
-
-
! WritableStreamDealWithRejection(stream, error)を実行する。
-
断言: stream.[[inFlightWriteRequest]]はundefinedではない。
-
resolve stream.[[inFlightWriteRequest]]をundefinedで解決する。
-
stream.[[inFlightWriteRequest]]をundefinedに設定する。
-
断言: stream.[[inFlightWriteRequest]]はundefinedではない。
-
reject stream.[[inFlightWriteRequest]]をerrorでrejectする。
-
stream.[[inFlightWriteRequest]]をundefinedに設定する。
-
断言: stream.[[state]]は"
writable
"または"erroring
"である。 -
! WritableStreamDealWithRejection(stream, error)を実行する。
-
stream.[[inFlightWriteRequest]]がundefined、かつ stream.[[inFlightCloseRequest]]がundefinedなら、falseを返す。
-
trueを返す。
-
断言: stream.[[inFlightCloseRequest]]はundefinedである。
-
断言: stream.[[closeRequest]]はundefinedではない。
-
stream.[[inFlightCloseRequest]]を stream.[[closeRequest]]に設定する。
-
stream.[[closeRequest]]をundefinedに設定する。
-
断言: stream.[[inFlightWriteRequest]]はundefinedである。
-
断言: stream.[[writeRequests]]は空ではない。
-
writeRequestをstream.[[writeRequests]][0]とする。
-
Remove writeRequestをstream.[[writeRequests]]から除去する。
-
stream.[[inFlightWriteRequest]]を writeRequestに設定する。
-
断言: stream.[[state]]は"
errored
"である。 -
stream.[[closeRequest]]がundefinedでない場合、
-
断言: stream.[[inFlightCloseRequest]]はundefinedである。
-
reject stream.[[closeRequest]]をstream.[[storedError]]でrejectする。
-
stream.[[closeRequest]]をundefinedに設定する。
-
-
writerをstream.[[writer]]とする。
-
writerがundefinedでない場合、
-
reject writer.[[closedPromise]]をstream.[[storedError]]でrejectする。
-
writer.[[closedPromise]].[[PromiseIsHandled]] をtrueに設定する。
-
-
断言: stream.[[storedError]]はundefinedである。
-
断言: stream.[[state]]は"
writable
"である。 -
controllerをstream.[[controller]]とする。
-
断言: controllerはundefinedではない。
-
stream.[[state]]を"
erroring
"に設定する。 -
stream.[[storedError]]をreasonに設定する。
-
writerをstream.[[writer]]とする。
-
writerがundefinedでない場合、! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason)を実行する。
-
! WritableStreamHasOperationMarkedInFlight(stream) がfalseかつ controller.[[started]]がtrueの場合、! WritableStreamFinishErroring(stream)を実行する。
-
断言: stream.[[state]]は"
writable
"である。 -
断言: ! WritableStreamCloseQueuedOrInFlight(stream) がfalseである。
-
writerをstream.[[writer]]とする。
-
writerがundefinedでなく、かつbackpressureが stream.[[backpressure]]と異なる場合、
-
backpressureがtrueなら、writer.[[readyPromise]]を新しいpromiseに設定する。
-
それ以外の場合、
-
断言: backpressureはfalseである。
-
resolve writer.[[readyPromise]]をundefinedで解決する。
-
-
-
stream.[[backpressure]]をbackpressureに設定する。
5.5.3. Writer
以下の抽象演算は
WritableStreamDefaultWriter
インスタンスの実装および操作に利用されます。
-
streamをwriter.[[stream]]とする。
-
断言: streamはundefinedではない。
-
! WritableStreamAbort(stream, reason)を返す。
-
streamをwriter.[[stream]]とする。
-
断言: streamはundefinedではない。
-
! WritableStreamClose(stream)を返す。
-
streamをwriter.[[stream]]とする。
-
断言: streamはundefinedではない。
-
stateをstream.[[state]]とする。
-
! WritableStreamCloseQueuedOrInFlight(stream) がtrue、またはstateが"
closed
"の場合、 undefinedで解決されたpromiseを返す。 -
stateが"
errored
"の場合、promise rejected with stream.[[storedError]]を返す。 -
断言: stateは"
writable
"または"erroring
"である。 -
! WritableStreamDefaultWriterClose(writer)を返す。
この抽象演算は
ReadableStream
の
pipeTo()
のエラー伝播のセマンティクス実装を支援します。
-
writer.[[closedPromise]].[[PromiseState]] が"
pending
"の場合、 reject writer.[[closedPromise]]をerrorでrejectする。 -
それ以外の場合、writer.[[closedPromise]]をpromise rejected with errorに設定する。
-
writer.[[closedPromise]].[[PromiseIsHandled]] をtrueに設定する。
-
writer.[[readyPromise]].[[PromiseState]] が"
pending
"の場合、 reject writer.[[readyPromise]]をerrorでrejectする。 -
それ以外の場合、writer.[[readyPromise]]をpromise rejected with errorに設定する。
-
writer.[[readyPromise]].[[PromiseIsHandled]] をtrueに設定する。
-
streamをwriter.[[stream]]とする。
-
stateをstream.[[state]]とする。
-
stateが"
errored
"または"erroring
"の場合、nullを返す。 -
stateが"
closed
"の場合、0を返す。 -
! WritableStreamDefaultControllerGetDesiredSize(stream.[[controller]])を返す。
-
streamをwriter.[[stream]]とする。
-
断言: streamはundefinedではない。
-
断言: stream.[[writer]]はwriterである。
-
releasedErrorを新しい
TypeError
とする。 -
! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError)を実行する。
-
! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError)を実行する。
-
stream.[[writer]]をundefinedに設定する。
-
writer.[[stream]]をundefinedに設定する。
-
streamをwriter.[[stream]]とする。
-
断言: streamはundefinedではない。
-
controllerをstream.[[controller]]とする。
-
chunkSizeを! WritableStreamDefaultControllerGetChunkSize(controller, chunk)の結果とする。
-
streamがwriter.[[stream]]と等しくない場合、TypeErrorでrejectされたpromiseを返す。
-
stateをstream.[[state]]とする。
-
stateが"
errored
"の場合、promise rejected with stream.[[storedError]]を返す。 -
! WritableStreamCloseQueuedOrInFlight(stream) がtrue、またはstateが"
closed
"の場合、TypeErrorでrejectされたpromise(ストリームがクローズ/クローズ中であることを示す)を返す。 -
stateが"
erroring
"の場合、promise rejected with stream.[[storedError]]を返す。 -
断言: stateは"
writable
"である。 -
promiseを! WritableStreamAddWriteRequest(stream)の結果とする。
-
! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize)を実行する。
-
promiseを返す。
5.5.4. デフォルトコントローラー
以下の抽象演算は
WritableStreamDefaultController
クラスの実装を支援します。
-
断言: stream implements
WritableStream
である。 -
断言: stream.[[controller]] はundefinedである。
-
controller.[[stream]]をstreamに設定する。
-
stream.[[controller]]をcontrollerに設定する。
-
! ResetQueue(controller)を実行する。
-
controller.[[abortController]]を新しい
AbortController
に設定する。 -
controller.[[started]]をfalseに設定する。
-
controller.[[strategySizeAlgorithm]] を sizeAlgorithmに設定する。
-
controller.[[strategyHWM]]を highWaterMarkに設定する。
-
controller.[[writeAlgorithm]]を writeAlgorithmに設定する。
-
controller.[[closeAlgorithm]]を closeAlgorithmに設定する。
-
controller.[[abortAlgorithm]]を abortAlgorithmに設定する。
-
backpressureを! WritableStreamDefaultControllerGetBackpressure(controller)の結果とする。
-
! WritableStreamUpdateBackpressure(stream, backpressure)を実行する。
-
startResultをstartAlgorithmの実行結果とする。(例外が投げられる場合がある)
-
startPromiseをstartResultで解決されたpromiseとする。
-
-
断言: stream.[[state]]は "
writable
"または"erroring
"である。 -
controller.[[started]]をtrueに設定する。
-
! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller)を実行する。
-
-
startPromiseがrejectedになったとき(理由r)、
-
断言: stream.[[state]]は "
writable
"または"erroring
"である。 -
controller.[[started]]をtrueに設定する。
-
! WritableStreamDealWithRejection(stream, r)を実行する。
-
-
controllerを新規
WritableStreamDefaultController
とする。 -
startAlgorithmをundefinedを返すアルゴリズムとする。
-
writeAlgorithmをundefinedで解決されたpromiseを返すアルゴリズムとする。
-
closeAlgorithmをundefinedで解決されたpromiseを返すアルゴリズムとする。
-
abortAlgorithmをundefinedで解決されたpromiseを返すアルゴリズムとする。
-
underlyingSinkDict["
start
"] が存在する場合、startAlgorithmを invoke underlyingSinkDict["start
"] に引数リスト« controller »で渡し、 例外処理は"rethrow
"、callback this valueはunderlyingSinkとするアルゴリズムに設定する。 -
underlyingSinkDict["
write
"] が存在する場合、writeAlgorithmを 引数chunkを受け取り、invoke underlyingSinkDict["write
"] に引数リスト« chunk, controller »で渡し、callback this valueはunderlyingSinkとするアルゴリズムに設定する。 -
underlyingSinkDict["
close
"] が存在する場合、closeAlgorithmを invoke underlyingSinkDict["close
"] に引数リスト«»で渡し、callback this valueはunderlyingSinkとするアルゴリズムに設定する。 -
underlyingSinkDict["
abort
"] が存在する場合、abortAlgorithmを 引数reasonを受け取り、invoke underlyingSinkDict["abort
"] に引数リスト« reason »で渡し、 callback this valueはunderlyingSinkとするアルゴリズムに設定する。 -
? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm)を実行する。
-
streamをcontroller.[[stream]]とする。
-
controller.[[started]]がfalseなら、return。
-
stream.[[inFlightWriteRequest]]がundefinedでなければ、return。
-
stateをstream.[[state]]とする。
-
断言: stateは"
closed
"でも"errored
"でもない。 -
stateが"
erroring
"の場合、-
! WritableStreamFinishErroring(stream)を実行する。
-
return。
-
-
controller.[[queue]]が空なら、return。
-
valueを! PeekQueueValue(controller)の結果とする。
-
valueがクローズ・センチネルなら、! WritableStreamDefaultControllerProcessClose(controller)を実行する。
-
それ以外の場合、! WritableStreamDefaultControllerProcessWrite(controller, value)を実行する。
WritableStream
自体が参照されていても
ガベージコレクトできるようになります。
これは弱参照を使って観測できます。詳細はtc39/proposal-weakrefs#31を参照してください。
以下の手順を実行します:
-
controller.[[writeAlgorithm]]をundefinedに設定する。
-
controller.[[closeAlgorithm]]をundefinedに設定する。
-
controller.[[abortAlgorithm]]をundefinedに設定する。
-
controller.[[strategySizeAlgorithm]]をundefinedに設定する。
このアルゴリズムは一部の例外ケースで複数回実行されます。最初の1回以降は何もしません。
-
! EnqueueValueWithSize(controller, クローズ・センチネル, 0)を実行する。
-
! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller)を実行する。
-
streamをcontroller.[[stream]]とする。
-
断言: stream.[[state]]は"
writable
"である。 -
! WritableStreamDefaultControllerClearAlgorithms(controller)を実行する。
-
! WritableStreamStartErroring(stream, error)を実行する。
-
controller.[[stream]].[[state]]が"
writable
"なら、! WritableStreamDefaultControllerError(controller, error)を実行する。
-
desiredSizeを! WritableStreamDefaultControllerGetDesiredSize(controller)の結果とする。
-
desiredSize ≤ 0 なら true、そうでなければ false を返す。
-
controller.[[strategySizeAlgorithm]]がundefinedなら:
-
断言: controller.[[stream]].[[state]]は"
writable
"ではない。 -
1を返す。
-
-
returnValueを controller.[[strategySizeAlgorithm]]をchunkで実行した結果(completion recordとして解釈)とする。
-
returnValueがabrupt completionなら、
-
! WritableStreamDefaultControllerErrorIfNeeded(controller, returnValue.[[Value]])を実行する。
-
1を返す。
-
-
returnValue.[[Value]]を返す。
-
controller.[[strategyHWM]] − controller.[[queueTotalSize]]を返す。
-
streamをcontroller.[[stream]]とする。
-
! WritableStreamMarkCloseRequestInFlight(stream)を実行する。
-
! DequeueValue(controller)を実行する。
-
断言: controller.[[queue]]は空である。
-
sinkClosePromiseを controller.[[closeAlgorithm]]の実行結果とする。
-
! WritableStreamDefaultControllerClearAlgorithms(controller)を実行する。
-
sinkClosePromiseがfulfilledになったとき、
-
! WritableStreamFinishInFlightClose(stream)を実行する。
-
-
sinkClosePromiseがrejectedになったとき(理由reason)、
-
! WritableStreamFinishInFlightCloseWithError(stream, reason)を実行する。
-
-
streamをcontroller.[[stream]]とする。
-
! WritableStreamMarkFirstWriteRequestInFlight(stream)を実行する。
-
sinkWritePromiseを controller.[[writeAlgorithm]]をchunkで実行した結果とする。
-
sinkWritePromiseがfulfilledになったとき、
-
! WritableStreamFinishInFlightWrite(stream)を実行する。
-
stateをstream.[[state]]とする。
-
断言: stateは"
writable
"または"erroring
"である。 -
! DequeueValue(controller)を実行する。
-
! WritableStreamCloseQueuedOrInFlight(stream) がfalseかつstateが"
writable
"の場合、-
backpressureを! WritableStreamDefaultControllerGetBackpressure(controller)の結果とする。
-
! WritableStreamUpdateBackpressure(stream, backpressure)を実行する。
-
-
! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller)を実行する。
-
-
sinkWritePromiseがrejectedになったとき(reason)、
-
stream.[[state]]が"
writable
"なら、! WritableStreamDefaultControllerClearAlgorithms(controller)を実行する。 -
! WritableStreamFinishInFlightWriteWithError(stream, reason)を実行する。
-
-
enqueueResultをEnqueueValueWithSize(controller, chunk, chunkSize)の結果とする。
-
enqueueResultがabrupt completionなら、
-
! WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueResult.[[Value]])を実行する。
-
return。
-
-
streamをcontroller.[[stream]]とする。
-
! WritableStreamCloseQueuedOrInFlight(stream) がfalseかつ stream.[[state]]が"
writable
"の場合、-
backpressureを! WritableStreamDefaultControllerGetBackpressure(controller)の結果とする。
-
! WritableStreamUpdateBackpressure(stream, backpressure)を実行する。
-
-
! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller)を実行する。
6. 変換ストリーム
6.1. 変換ストリームの利用方法
readableStream. pipeThrough( transformStream) . pipeTo( writableStream) . then(() => console. log( "全てのデータが正常に変換されました!" )) . catch ( e=> console. error( "何か問題が発生しました!" , e));
readable
や
writable
プロパティを使って、変換ストリームのReadableStreamやWritableStreamのインターフェースを直接利用することもできます。この例では、ストリームの書き込み側にデータをwriter
インターフェースを使って供給しています。読み取り側は別のanotherWritableStream
にパイプされます。
const writer= transformStream. writable. getWriter(); writer. write( "入力チャンク" ); transformStream. readable. pipeTo( anotherWritableStream);
fetch()
APIはReadableStreamの
リクエストボディを受け付けますが、アップロードするデータをWritableStreamインターフェースで書き込む方が便利な場合もあります。このような場合、同一変換ストリームが役立ちます。
const { writable, readable} = new TransformStream(); fetch( "..." , { body: readable}). then( response=> /* ... */ ); const writer= writable. getWriter(); writer. write( new Uint8Array([ 0x73 , 0x74 , 0x72 , 0x65 , 0x61 , 0x6D , 0x73 , 0x21 ])); writer. close();
同一変換ストリームのもう一つの用途は、パイプに追加バッファリングを加えることです。次の例では、readableStream
とwritableStream
の間に余分なバッファリングを加えています。
const writableStrategy= new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 }); readableStream. pipeThrough( new TransformStream( undefined , writableStrategy)) . pipeTo( writableStream);
6.2. TransformStream
クラス
TransformStream
クラスは、一般的な変換ストリーム概念の具体的なインスタンスです。
6.2.1. インターフェース定義
TransformStream
クラスのWeb IDL定義は以下の通りです:
[Exposed=*,Transferable ]interface {
TransformStream constructor (optional object ,
transformer optional QueuingStrategy = {},
writableStrategy optional QueuingStrategy = {});
readableStrategy readonly attribute ReadableStream readable ;readonly attribute WritableStream writable ; };
6.2.2. 内部スロット
TransformStream
のインスタンスは、以下の表に記載された内部スロットを持って生成されます:
内部スロット | 説明(非規範的) |
---|---|
[[backpressure]] | [[readable]]に対して最後に観測されたバックプレッシャーの有無 |
[[backpressureChangePromise]] | [[backpressure]]の値が変化するたびにfulfilledになり、置き換えられるpromise |
[[controller]] | TransformStreamDefaultController
オブジェクト。[[readable]]と[[writable]]を制御する能力を持つ
|
[[Detached]] | ストリームが転送されたときtrueになるブール値 |
[[readable]] | このオブジェクトが制御するReadableStream インスタンス
|
[[writable]] | このオブジェクトが制御するWritableStream インスタンス
|
6.2.3. トランスフォーマーAPI
TransformStream()
のコンストラクターは、最初の引数としてトランスフォーマーを表すJavaScriptオブジェクトを受け付けます。このオブジェクトは以下のいずれかのメソッドを含むことができます:
dictionary {
Transformer TransformerStartCallback start ;TransformerTransformCallback transform ;TransformerFlushCallback flush ;TransformerCancelCallback cancel ;any readableType ;any writableType ; };callback =
TransformerStartCallback any (TransformStreamDefaultController );
controller callback =
TransformerFlushCallback Promise <undefined > (TransformStreamDefaultController );
controller callback =
TransformerTransformCallback Promise <undefined > (any ,
chunk TransformStreamDefaultController );
controller callback =
TransformerCancelCallback Promise <undefined > (any );
reason
start(controller)
, 型 TransformerStartCallback-
TransformStream
の生成直後に呼び出される関数です。通常これは、
controller.enqueue()
を用いてプレフィックスチャンクをエンキューするために使われます。これらのチャンクは読み取り側から読み取られますが、書き込み側への書き込みには依存しません。この初期処理が非同期の場合(例えば、プレフィックスチャンクの取得に手間がかかる場合)、成功・失敗を示すpromiseを返すことができます。rejectされたpromiseはストリームをエラーにします。例外が投げられた場合は
TransformStream()
コンストラクターによって再スローされます。 transform(chunk, controller)
, 型 TransformerTransformCallback-
新しいチャンクが書き込み側に書き込まれ、変換可能になったときに呼び出される関数です。ストリームの実装は、この関数が前回の変換が成功した後にのみ呼び出されること、
start()
の完了前やflush()
の呼び出し後には決して呼び出されないことを保証します。この関数は変換ストリームの実際の変換処理を行います。
controller.enqueue()
を使って結果をエンキューできます。これにより、書き込み側に書き込まれた1つのチャンクが、読み取り側で0個以上のチャンクとなることも可能です。§ 10.9 テンプレートタグを置換する変換ストリームでは0チャンクの場合の例を示しています。変換処理が非同期の場合、この関数はpromiseを返して変換の成否を示すことができます。rejectされたpromiseは、変換ストリームの読み取り側・書き込み側両方をエラーにします。
この関数が返す可能性があるpromiseは、良い振る舞いをするプロデューサが、チャンクの完全な変換が終わる前にミューテートしないようにする契約に利用されます(これは仕様の機構による保証ではなく、プロデューサとトランスフォーマーとの非公式な契約です)。
transform()
メソッドが指定されていない場合は、同一変換が行われ、書き込み側から読み取り側へチャンクが変更されずにエンキューされます。 flush(controller)
, 型 TransformerFlushCallback-
全てのチャンクが書き込み側で変換され、
transform()
を経由して処理された後、書き込み側が閉じられる直前に呼び出される関数です。通常これは、読み取り側にサフィックスチャンクをエンキューするために使われます。その後読み取り側も閉じられます。§ 10.9 テンプレートタグを置換する変換ストリームで例を確認できます。
フラッシュ処理が非同期の場合は、成功・失敗を示すpromiseを返すことができます。その結果は
stream.writable.write()
の呼び出し元に通知されます。rejectされたpromiseはストリームの読み取り側・書き込み側両方をエラーにします。例外を投げた場合もrejectされたpromiseと同様に扱われます。(なお、
controller.terminate()
をflush()
内で呼び出す必要はありません。ストリームは既に正常にクローズ処理中であり、terminateの呼び出しは逆効果です。) cancel(reason)
, 型 TransformerCancelCallback-
読み取り側がキャンセルされたとき、または書き込み側が中止されたときに呼び出される関数です。
通常これは、ストリームが中止・キャンセルされた場合にトランスフォーマーのリソースをクリーンアップするために使われます。
キャンセル処理が非同期の場合、成功・失敗を示すpromiseを返すことができます。その結果は
stream.writable.abort()
やstream.readable.cancel()
の呼び出し元に通知されます。例外が投げられた場合もrejectされたpromiseと同様に扱われます。(なお、
controller.terminate()
をcancel()
内で呼び出す必要はありません。ストリームは既にキャンセル/中止処理中であり、terminateの呼び出しは逆効果です。) readableType
, 型 any-
このプロパティは将来のために予約されており、値を指定しようとすると例外が投げられます。
writableType
, 型 any-
このプロパティは将来のために予約されており、値を指定しようとすると例外が投げられます。
controller
オブジェクトは start()
,
transform()
,
flush()
の各関数に渡されます。
このオブジェクトは
TransformStreamDefaultController
のインスタンスであり、読み取り側へのチャンクのエンキューやストリームの終了・エラー化が可能です。
6.2.4. コンストラクターとプロパティ
stream = new
TransformStream
([transformer[, writableStrategy[, readableStrategy]]])-
指定されたtransformerをラップした新しい
TransformStream
を作成します。transformer引数の詳細は § 6.2.3 トランスフォーマーAPIを参照してください。transformer引数が指定されていない場合、結果は同一変換ストリームになります。便利なケースについてはこの例を参照してください。
writableStrategyとreadableStrategy引数は、それぞれ書き込み側・読み取り側のキュー戦略オブジェクトです。これらは
WritableStream
やReadableStream
の構築時に使用され、変換ストリームにバッファを追加して変換速度のばらつきを緩和したり、パイプのバッファ量を増やすことができます。指定しない場合、デフォルトの挙動はそれぞれCountQueuingStrategy
(high water markが1および0)と同じになります。 readable = stream.
readable
-
この変換ストリームの読み取り側を表す
ReadableStream
を返します。 writable = stream.
writable
-
この変換ストリームの書き込み側を表す
WritableStream
を返します。
new TransformStream(transformer, writableStrategy,
readableStrategy)
コンストラクターの手順は以下の通りです:
-
transformerが指定されていなければ、nullに設定する。
-
transformerDictをtransformerを
Transformer
型のIDL値に変換したものとする。引数transformerを
Transformer
型で宣言できない理由は、元のオブジェクト参照を失うためです。オブジェクトを保持して、各種メソッドをinvokeできるようにする必要があります。 -
transformerDict["
readableType
"]が存在する場合、RangeError
例外を投げる。 -
transformerDict["
writableType
"]が存在する場合、RangeError
例外を投げる。 -
readableHighWaterMarkを? ExtractHighWaterMark(readableStrategy, 0)の結果とする。
-
readableSizeAlgorithmを! ExtractSizeAlgorithm(readableStrategy)の結果とする。
-
writableHighWaterMarkを? ExtractHighWaterMark(writableStrategy, 1)の結果とする。
-
writableSizeAlgorithmを! ExtractSizeAlgorithm(writableStrategy)の結果とする。
-
startPromiseを新しいpromiseとする。
-
! InitializeTransformStream(this, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm)を実行する。
-
? SetUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict)を実行する。
-
transformerDict["
start
"]が存在する場合、resolve startPromise をinvoke transformerDict["start
"]に引数 « this.[[controller]] » とcallback this value transformerで呼び出した結果で解決する。 -
それ以外の場合、resolve startPromiseをundefinedで解決する。
readable
ゲッターの手順は以下の通りです:
-
this.[[readable]]を返す。
writable
ゲッターの手順は以下の通りです:
-
this.[[writable]]を返す。
6.2.5. postMessage()
による転送
destination.postMessage(ts, { transfer: [ts] });
-
TransformStream
を他のフレーム、ウィンドウ、またはワーカーへ送信します。転送されたストリームは元のものと同様に利用できます。ただし、読み取り側と書き込み側はロックされ、直接利用できなくなります。
TransformStream
オブジェクトは転送可能オブジェクトです。転送手順(valueおよびdataHolder)は以下の通りです:
-
readableをvalue.[[readable]]とする。
-
writableをvalue.[[writable]]とする。
-
! IsReadableStreamLocked(readable)がtrueなら、"
DataCloneError
"DOMException
を投げる。 -
! IsWritableStreamLocked(writable)がtrueなら、"
DataCloneError
"DOMException
を投げる。 -
dataHolder.[[readable]]を! StructuredSerializeWithTransfer(readable, « readable »)の結果に設定する。
-
dataHolder.[[writable]]を! StructuredSerializeWithTransfer(writable, « writable »)の結果に設定する。
-
readableRecordを! StructuredDeserializeWithTransfer(dataHolder.[[readable]], 現在のRealm)の結果とする。
-
writableRecordを! StructuredDeserializeWithTransfer(dataHolder.[[writable]], 現在のRealm)の結果とする。
-
value.[[readable]]を readableRecord.[[Deserialized]]に設定する。
-
value.[[writable]]を writableRecord.[[Deserialized]]に設定する。
-
value.[[backpressure]], value.[[backpressureChangePromise]], value.[[controller]]をundefinedに設定する。
[[backpressure]]、
[[backpressureChangePromise]]、
[[controller]]スロットは
転送済みTransformStream
では使用されません。
6.3.
TransformStreamDefaultController
クラス
TransformStreamDefaultController
クラスは、関連するReadableStream
やWritableStream
の操作を可能にするメソッドを持っています。
TransformStream
の構築時、
transformerオブジェクトには対応するTransformStreamDefaultController
インスタンスが渡され、操作することができます。
6.3.1. インターフェース定義
TransformStreamDefaultController
クラスのWeb IDL定義は以下の通りです:
[Exposed=*]interface {
TransformStreamDefaultController readonly attribute unrestricted double ?desiredSize ;undefined enqueue (optional any );
chunk undefined error (optional any );
reason undefined terminate (); };
6.3.2. 内部スロット
TransformStreamDefaultController
のインスタンスは、以下の表に記載された内部スロットとともに生成されます:
内部スロット | 説明(非規範的) |
---|---|
[[cancelAlgorithm]] | 1つの引数(キャンセル理由)を受け取り、transformerにキャンセル要求を伝えるPromiseを返すアルゴリズム |
[[finishPromise]] | [[cancelAlgorithm]]または[[flushAlgorithm]]どちらかの完了時にresolveされるpromise。このフィールドが未設定(undefined)の場合は、どちらのアルゴリズムもまだinvokeされていないことを示す |
[[flushAlgorithm]] | クローズ要求をtransformerに伝えるPromiseを返すアルゴリズム |
[[stream]] | 制御対象のTransformStream インスタンス
|
[[transformAlgorithm]] | 1つの引数(変換するチャンク)を受け取り、transformerに変換要求を伝えるPromiseを返すアルゴリズム |
6.3.3. メソッド・プロパティ
desiredSize = controller.
desiredSize
-
読み取り側の内部キューを満たすための希望サイズを返します。キューが過剰に満たされている場合は負の値にもなります。
controller.
enqueue
(chunk)controller.
error
(e)-
制御対象の変換ストリームの読み取り側・書き込み側の両方をエラー状態にし、以降の操作はすべて指定されたエラーeで失敗します。変換待ちのチャンクはすべて破棄されます。
controller.
terminate
()-
制御対象の変換ストリームの読み取り側を閉じ、書き込み側をエラー状態にします。transformerがチャンクの一部のみ消費すればよい場合などに有用です。
desiredSize
ゲッターの手順は以下の通りです:
-
readableControllerをthis.[[stream]].[[readable]].[[controller]]とする。
-
! ReadableStreamDefaultControllerGetDesiredSize(readableController)を返す。
enqueue(chunk)
メソッドの手順は以下の通りです:
-
? TransformStreamDefaultControllerEnqueue(this, chunk)を実行する。
error(e)
メソッドの手順は以下の通りです:
-
? TransformStreamDefaultControllerError(this, e)を実行する。
terminate()
メソッドの手順は以下の通りです:
6.4. 抽象演算
6.4.1. 変換ストリームの操作
以下の抽象演算は、より高いレベルでTransformStream
インスタンスを操作します。
-
startAlgorithmをstartPromiseを返すアルゴリズムとする。
-
writeAlgorithmを以下の手順(chunk引数を取る)とする:
-
! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk)を返す。
-
-
abortAlgorithmを以下の手順(reason引数を取る)とする:
-
! TransformStreamDefaultSinkAbortAlgorithm(stream, reason)を返す。
-
-
closeAlgorithmを以下の手順とする:
-
! TransformStreamDefaultSinkCloseAlgorithm(stream)を返す。
-
-
stream.[[writable]]を! CreateWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm)に設定する。
-
pullAlgorithmを以下の手順とする:
-
! TransformStreamDefaultSourcePullAlgorithm(stream)を返す。
-
-
cancelAlgorithmを以下の手順(reason引数を取る)とする:
-
! TransformStreamDefaultSourceCancelAlgorithm(stream, reason)を返す。
-
-
stream.[[readable]]を! CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm)に設定する。
-
stream.[[backpressure]]および stream.[[backpressureChangePromise]]を undefinedに設定する。
[[backpressure]]スロットは、TransformStreamSetBackpressureによって初期化できるようundefinedに設定します。実装によってはboolean型で初期化してもかまいませんが、
start()
メソッド呼び出し前に適切に初期化されていればユーザーコードからは見えません。 -
! TransformStreamSetBackpressure(stream, true)を実行する。
-
stream.[[controller]]をundefinedに設定する。
-
! ReadableStreamDefaultControllerError(stream.[[readable]].[[controller]], e)を実行する。
-
! TransformStreamErrorWritableAndUnblockWrite(stream, e)を実行する。
この演算は、片方または両方がすでにエラー状態でも正しく動作します。そのため、呼び出しアルゴリズムはエラー時にストリーム状態を事前にチェックする必要はありません。
-
! TransformStreamDefaultControllerClearAlgorithms(stream.[[controller]])を実行する。
-
! WritableStreamDefaultControllerErrorIfNeeded(stream.[[writable]].[[controller]], e)を実行する。
-
! TransformStreamUnblockWrite(stream)を実行する。
-
断言: stream.[[backpressure]]は backpressureではない。
-
stream.[[backpressureChangePromise]]が undefinedでなければ、resolve stream.[[backpressureChangePromise]] をundefinedで解決する。
-
stream.[[backpressureChangePromise]]を 新しいpromiseに設定する。
-
stream.[[backpressure]]をbackpressureに設定する。
-
stream.[[backpressure]]がtrueなら、! TransformStreamSetBackpressure(stream, false)を実行する。
TransformStreamDefaultSinkWriteAlgorithm 抽象演算は[[backpressureChangePromise]]に格納されたpromiseがresolveされるのを待っている可能性があります。TransformStreamSetBackpressureの呼び出しによって、promiseが必ずresolveされることが保証されます。
6.4.2. デフォルトコントローラー
以下の抽象演算は
TransformStreamDefaultController
クラスの実装を支援します。
-
断言: stream implements
TransformStream
である。 -
断言: stream.[[controller]]はundefinedである。
-
controller.[[stream]]をstreamに設定する。
-
stream.[[controller]]をcontrollerに設定する。
-
controller.[[transformAlgorithm]] を transformAlgorithmに設定する。
-
controller.[[flushAlgorithm]]を flushAlgorithmに設定する。
-
controller.[[cancelAlgorithm]]を cancelAlgorithmに設定する。
-
controllerを新規
TransformStreamDefaultController
とする。 -
transformAlgorithmを以下の手順(chunk引数を取る)とする:
-
resultをTransformStreamDefaultControllerEnqueue(controller, chunk)の結果とする。
-
resultがabrupt completionなら、promise rejected with result.[[Value]]を返す。
-
それ以外の場合、undefinedで解決されたpromiseを返す。
-
-
flushAlgorithmをundefinedで解決されたpromiseを返すアルゴリズムとする。
-
cancelAlgorithmをundefinedで解決されたpromiseを返すアルゴリズムとする。
-
transformerDict["
transform
"]が存在する場合、transformAlgorithmを引数chunkを受け取り、invoke transformerDict["transform
"] に引数リスト« chunk, controller »で渡し、callback this valueはtransformerとするアルゴリズムに設定する。 -
transformerDict["
flush
"]が存在する場合、flushAlgorithmをinvoke transformerDict["flush
"] に引数リスト« controller »で渡し、callback this valueはtransformerとするアルゴリズムに設定する。 -
transformerDict["
cancel
"]が存在する場合、cancelAlgorithmを引数reasonを受け取り、invoke transformerDict["cancel
"] に引数リスト« reason »で渡し、 callback this valueはtransformerとするアルゴリズムに設定する。 -
! SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm, cancelAlgorithm)を実行する。
TransformStream
自体が参照されていても
ガベージコレクトできるようになります。
これは弱参照を使って観測できます。詳細はtc39/proposal-weakrefs#31を参照してください。
以下の手順を実行します:
-
controller.[[transformAlgorithm]]をundefinedに設定する。
-
controller.[[flushAlgorithm]]をundefinedに設定する。
-
controller.[[cancelAlgorithm]]をundefinedに設定する。
-
streamをcontroller.[[stream]]とする。
-
readableControllerを stream.[[readable]].[[controller]]とする。
-
! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController) がfalseなら、
TypeError
例外を投げる。 -
enqueueResultをReadableStreamDefaultControllerEnqueue(readableController, chunk)の結果とする。
-
enqueueResultがabrupt completionなら、
-
! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]])を実行する。
-
stream.[[readable]].[[storedError]]を投げる。
-
-
backpressureを! ReadableStreamDefaultControllerHasBackpressure(readableController)の結果とする。
-
backpressureがstream.[[backpressure]]と異なる場合、
-
断言: backpressureはtrueである。
-
! TransformStreamSetBackpressure(stream, true)を実行する。
-
-
! TransformStreamError(controller.[[stream]], e)を実行する。
-
transformPromiseを controller.[[transformAlgorithm]]をchunkで実行した結果とする。
-
reactingでtransformPromiseに対し、reject時は以下の手順(引数r)で返す:
-
! TransformStreamError(controller.[[stream]], r)を実行する。
-
rを投げる。
-
-
streamをcontroller.[[stream]]とする。
-
readableControllerを stream.[[readable]].[[controller]]とする。
-
! ReadableStreamDefaultControllerClose(readableController)を実行する。
-
errorを
TypeError
(ストリームがterminateされたことを示す例外)とする。 -
! TransformStreamErrorWritableAndUnblockWrite(stream, error)を実行する。
6.4.3. デフォルトシンク
以下の抽象演算は、変換ストリームの書き込み側のunderlying sinkの実装に使われます。
-
断言: stream.[[writable]].[[state]]は"
writable
"である。 -
controllerをstream.[[controller]]とする。
-
stream.[[backpressure]]がtrueの場合、
-
backpressureChangePromiseをstream.[[backpressureChangePromise]]とする。
-
断言: backpressureChangePromiseはundefinedではない。
-
ReactでbackpressureChangePromiseがfulfilledになったときの手順として以下を返す:
-
writableをstream.[[writable]]とする。
-
stateをwritable.[[state]]とする。
-
stateが"
erroring
"の場合、writable.[[storedError]]を投げる。 -
断言: stateは"
writable
"である。 -
! TransformStreamDefaultControllerPerformTransform(controller, chunk)を返す。
-
-
-
! TransformStreamDefaultControllerPerformTransform(controller, chunk)を返す。
-
controllerをstream.[[controller]]とする。
-
controller.[[finishPromise]]がundefinedでなければ、 controller.[[finishPromise]]を返す。
-
readableをstream.[[readable]]とする。
-
controller.[[finishPromise]]を新しいpromiseに設定する。
-
cancelPromiseを controller.[[cancelAlgorithm]]をreasonで実行した結果とする。
-
! TransformStreamDefaultControllerClearAlgorithms(controller)を実行する。
-
ReactでcancelPromiseに対し:
-
cancelPromiseがfulfilledなら:
-
readable.[[state]]が"
errored
"なら、 reject controller.[[finishPromise]] をreadable.[[storedError]]でrejectする。 -
それ以外の場合:
-
! ReadableStreamDefaultControllerError(readable.[[controller]], reason)を実行する。
-
resolve controller.[[finishPromise]] をundefinedで解決する。
-
-
-
cancelPromiseがrejected(理由r)なら:
-
! ReadableStreamDefaultControllerError(readable.[[controller]], r)を実行する。
-
reject controller.[[finishPromise]] をrでrejectする。
-
-
-
controller.[[finishPromise]]を返す。
-
controllerをstream.[[controller]]とする。
-
controller.[[finishPromise]]がundefinedでなければ、 controller.[[finishPromise]]を返す。
-
readableをstream.[[readable]]とする。
-
controller.[[finishPromise]]を新しいpromiseに設定する。
-
flushPromiseを controller.[[flushAlgorithm]]の実行結果とする。
-
! TransformStreamDefaultControllerClearAlgorithms(controller)を実行する。
-
ReactでflushPromiseに対し:
-
flushPromiseがfulfilledなら:
-
readable.[[state]]が"
errored
"なら、 reject controller.[[finishPromise]] をreadable.[[storedError]]でrejectする。 -
それ以外の場合:
-
! ReadableStreamDefaultControllerClose(readable.[[controller]])を実行する。
-
resolve controller.[[finishPromise]] をundefinedで解決する。
-
-
-
flushPromiseがrejected(理由r)なら:
-
! ReadableStreamDefaultControllerError(readable.[[controller]], r)を実行する。
-
reject controller.[[finishPromise]] をrでrejectする。
-
-
-
controller.[[finishPromise]]を返す。
6.4.4. デフォルトソース
以下の抽象演算は、変換ストリームの読み取り側のunderlying sourceの実装に使われます。
-
controllerをstream.[[controller]]とする。
-
controller.[[finishPromise]]がundefinedでなければ、 controller.[[finishPromise]]を返す。
-
writableをstream.[[writable]]とする。
-
controller.[[finishPromise]]を新しいpromiseに設定する。
-
cancelPromiseを controller.[[cancelAlgorithm]]をreasonで実行した結果とする。
-
! TransformStreamDefaultControllerClearAlgorithms(controller)を実行する。
-
ReactでcancelPromiseに対し:
-
cancelPromiseがfulfilledなら:
-
writable.[[state]]が"
errored
"なら、 reject controller.[[finishPromise]] をwritable.[[storedError]]でrejectする。 -
それ以外の場合:
-
! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], reason)を実行する。
-
! TransformStreamUnblockWrite(stream)を実行する。
-
resolve controller.[[finishPromise]] をundefinedで解決する。
-
-
-
cancelPromiseがrejected(理由r)なら:
-
! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], r)を実行する。
-
! TransformStreamUnblockWrite(stream)を実行する。
-
reject controller.[[finishPromise]] をrでrejectする。
-
-
-
controller.[[finishPromise]]を返す。
-
断言: stream.[[backpressure]]はtrueである。
-
断言: stream.[[backpressureChangePromise]]はundefinedではない。
-
! TransformStreamSetBackpressure(stream, false)を実行する。
-
stream.[[backpressureChangePromise]]を返す。
7. キュー戦略
7.1. キュー戦略API
ReadableStream()
、
WritableStream()
、
TransformStream()
の各コンストラクターは、作成されるストリームに適したキュー戦略を表す引数を少なくとも1つ受け付けます。これらのオブジェクトは以下のプロパティを持ちます:
dictionary {
QueuingStrategy unrestricted double highWaterMark ;QueuingStrategySize size ; };callback =
QueuingStrategySize unrestricted double (any );
chunk
highWaterMark
, 型 unrestricted double-
このキュー戦略を使うストリームのハイウォーターマークを示す、0以上の数値です。
size(chunk)
(非バイトストリーム専用)、 型 QueuingStrategySize-
指定したチャンク値の有限かつ非負なサイズを計算して返す関数です。
結果はバックプレッシャーの判定に使われ、該当する
desiredSize
プロパティ(defaultController.desiredSize
、byteController.desiredSize
、writer.desiredSize
)に現れます。読み取りストリームの場合はunderlying sourceのpull()
メソッド呼び出しタイミングも制御します。この関数は冪等かつ副作用を持たない必要があります。そうでない場合は非常に奇妙な結果になることがあります。
Readable byte streamでは使用されません。チャンクは常にバイト単位で計測されます。
これらのプロパティを持つオブジェクトは、キュー戦略オブジェクトとして利用できますが、特定用途のために2つの組み込みキュー戦略クラス(ByteLengthQueuingStrategy
とCountQueuingStrategy
)も提供されています。両者とも以下のWeb
IDLフラグメントをコンストラクターで利用します:
[Exposed=*]dictionary {
QueuingStrategyInit required unrestricted double ; };
highWaterMark
7.2. ByteLengthQueuingStrategy
クラス
バイトを扱う際によく使われるキュー戦略は、入ってくるチャンクのbyteLength
プロパティの合計が指定したハイウォーターマークに達するまで待つことです。そのため、ストリーム構築時に利用できる組み込みのキュー戦略として提供されています。
const stream= new ReadableStream( { ... }, new ByteLengthQueuingStrategy({ highWaterMark: 16 * 1024 }) );
この場合、16KiB分のチャンクをunderlying sourceがReadableStreamにエンキューできます。これを超えるとReadableStreamの実装はバックプレッシャーシグナルをunderlying sourceに送信し始めます。
const stream= new WritableStream( { ... }, new ByteLengthQueuingStrategy({ highWaterMark: 32 * 1024 }) );
この場合、32KiB分のチャンクがWritableStreamの内部キューに蓄積され、以前の書き込みがunderlying sinkで完了するのを待ちます。それを超えると、WritableStreamはバックプレッシャーシグナルをプロデューサーに送信し始めます。
ByteLengthQueuingStrategy
はReadable byte
streamには不要です。チャンクは常にバイト単位で計測されるためです。バイトストリームに対してByteLengthQueuingStrategy
を使って構築しようとすると失敗します。
7.2.1. インターフェース定義
ByteLengthQueuingStrategy
クラスのWeb IDL定義は以下の通りです:
[Exposed=*]interface {
ByteLengthQueuingStrategy constructor (QueuingStrategyInit );
init readonly attribute unrestricted double highWaterMark ;readonly attribute Function size ; };
7.2.2. 内部スロット
ByteLengthQueuingStrategy
のインスタンスは、コンストラクターで指定した値を格納する[[highWaterMark]]内部スロットを持ちます。
Function
型で、初期化手順は次の通りです:
-
stepsを以下の手順(chunkを受け取る)とする:
-
? GetV(chunk, "
byteLength
")を返す。
-
-
Fを! CreateBuiltinFunction(steps, 1, "
size
", « », globalObjectの関連Realm)の結果とする。 -
globalObjectのバイト長キュー戦略サイズ関数を
Function
型でF参照を持つように設定し、コールバックコンテキストはglobalObjectの関連設定オブジェクトとする。
この設計は歴史的な事情によるものです。size
がメソッドではなく関数(this値を参照しない)であることを保証するためです。詳細はwhatwg/streams#1005とheycam/webidl#819を参照してください。
7.2.3. コンストラクターとプロパティ
strategy = new
ByteLengthQueuingStrategy
({highWaterMark
})-
指定したハイウォーターマークで新しい
ByteLengthQueuingStrategy
を作成します。指定したハイウォーターマークは事前検証されません。負値やNaN、数値以外の場合、生成された
ByteLengthQueuingStrategy
によってストリームのコンストラクターが例外を投げます。 highWaterMark = strategy.
highWaterMark
-
コンストラクターに指定したハイウォーターマーク値を返します。
strategy.
size
(chunk)-
chunkの
byteLength
プロパティの値を返してサイズを計測します。
new ByteLengthQueuingStrategy(init)
コンストラクターの手順は以下の通りです:
-
this.[[highWaterMark]]を init["
highWaterMark
"]に設定する。
highWaterMark
ゲッターの手順は以下の通りです:
size
ゲッターの手順は以下の通りです:
7.3. CountQueuingStrategy
クラス
一般的なオブジェクトのストリームを扱う場合のキュー戦略として、これまでに蓄積されたチャンク数を単純にカウントし、その数が指定したハイウォーターマークに達するまで待つ方法がよく使われます。この戦略も標準で提供されています。
const stream= new ReadableStream( { ... }, new CountQueuingStrategy({ highWaterMark: 10 }) );
この場合、任意の種類のチャンク10個分をunderlying sourceがReadableStreamにエンキューできます。これを超えるとReadableStreamの実装はバックプレッシャーシグナルをunderlying sourceに送信し始めます。
const stream= new WritableStream( { ... }, new CountQueuingStrategy({ highWaterMark: 5 }) );
この場合、任意の種類のチャンク5個分がWritableStreamの内部キューに蓄積され、以前の書き込みがunderlying sinkで完了するのを待ちます。それを超えると、WritableStreamはバックプレッシャーシグナルをプロデューサーに送信し始めます。
7.3.1. インターフェース定義
CountQueuingStrategy
クラスのWeb IDL定義は以下の通りです:
[Exposed=*]interface {
CountQueuingStrategy constructor (QueuingStrategyInit );
init readonly attribute unrestricted double highWaterMark ;readonly attribute Function size ; };
7.3.2. 内部スロット
CountQueuingStrategy
のインスタンスは、コンストラクターで指定した値を格納する[[highWaterMark]]内部スロットを持ちます。
Function
型で、初期化手順は次の通りです:
-
stepsを以下の手順とする:
-
1を返す。
-
-
Fを! CreateBuiltinFunction(steps, 0, "
size
", « », globalObjectの関連Realm)の結果とする。 -
globalObjectのカウントキュー戦略サイズ関数を
Function
型でF参照を持つように設定し、コールバックコンテキストはglobalObjectの関連設定オブジェクトとする。
この設計は歴史的な事情によるものです。size
がメソッドではなく関数(this値を参照しない)であることを保証するためです。詳細はwhatwg/streams#1005とheycam/webidl#819を参照してください。
7.3.3. コンストラクターとプロパティ
strategy = new
CountQueuingStrategy
({highWaterMark
})-
指定したハイウォーターマークで新しい
CountQueuingStrategy
を作成します。指定したハイウォーターマークは事前検証されません。負値やNaN、数値以外の場合、生成された
CountQueuingStrategy
によってストリームのコンストラクターが例外を投げます。 highWaterMark = strategy.
highWaterMark
-
コンストラクターに指定したハイウォーターマーク値を返します。
strategy.
size
(chunk)-
chunkのサイズを常に1として計測します。これによりキュー全体のサイズはキュー内チャンク数となります。
new CountQueuingStrategy(init)
コンストラクターの手順は以下の通りです:
-
this.[[highWaterMark]]を init["
highWaterMark
"]に設定する。
highWaterMark
ゲッターの手順は以下の通りです:
size
ゲッターの手順は以下の通りです:
7.4. 抽象演算
以下のアルゴリズムは、ストリームのコンストラクターがQueuingStrategy
辞書から関連する部分を抽出するために使用します。
-
strategy["
highWaterMark
"]が存在しなければ、defaultHWMを返す。 -
highWaterMarkをstrategy["
highWaterMark
"]とする。 -
highWaterMarkがNaNまたはhighWaterMark < 0なら、
RangeError
例外を投げる。 -
highWaterMarkを返す。
+∞は有効なハイウォーターマークとして明示的に許可されています。これによりバックプレッシャーは決して適用されません。
8. 補助的な抽象演算
以下の抽象演算は複数種類のストリームの実装を支援するためのもので、上記の主要セクションには含めていません。
8.1. サイズ付きキュー
この仕様のストリームは「サイズ付きキュー」データ構造を使い、値とそのサイズを格納します。各仕様オブジェクトは、必ず[[queue]]と[[queueTotalSize]]というペアの内部スロットを持ちます。[[queue]]はリスト型のサイズ付き値のリストで、[[queueTotalSize]]はJavaScriptのNumber
(倍精度浮動小数点数)です。
以下の抽象演算は、サイズ付きキューを含むオブジェクト操作時に、2つの内部スロットの同期を維持するために使われます。
浮動小数点演算の精度制限のため、[[queueTotalSize]]スロットに合計値を記録するこの方式は、[[queue]]内のチャンクサイズをすべて加算する方式とは等価ではありません(ただしチャンク間のサイズの差が極端に大きい場合(~1015)や、兆単位でエンキューした場合のみ差が出ます)。
以下で、サイズ付き値は、2つの構造体要素(valueとsize)を持つものです。
-
断言: containerは[[queue]]と[[queueTotalSize]]の内部スロットを持つ。
-
断言: container.[[queue]]は空ではない。
-
valueWithSizeをcontainer.[[queue]][0]とする。
-
Remove valueWithSizeをcontainer.[[queue]]から取り除く。
-
container.[[queueTotalSize]]をcontainer.[[queueTotalSize]] − valueWithSizeのsizeに設定する。
-
container.[[queueTotalSize]] < 0なら、container.[[queueTotalSize]]を0に設定する(丸め誤差による)。
-
valueWithSizeのvalueを返す。
-
断言: containerは[[queue]]と[[queueTotalSize]]の内部スロットを持つ。
-
! IsNonNegativeNumber(size)がfalseなら、
RangeError
例外を投げる。 -
sizeが+∞なら、
RangeError
例外を投げる。 -
Append新しいサイズ付き値(valuevalue、sizesize)をcontainer.[[queue]]に追加する。
-
container.[[queueTotalSize]]をcontainer.[[queueTotalSize]] + sizeに設定する。
-
断言: containerは[[queue]]と[[queueTotalSize]]の内部スロットを持つ。
-
container.[[queue]]を新しい空のリストに設定する。
-
container.[[queueTotalSize]]を0に設定する。
8.2. 転送可能ストリーム
転送可能ストリームは、特殊なアイデンティティ変換を用いて実装されます。これは書き込み側があるrealmに、読み取り側が別のrealmにある場合です。以下の抽象演算は、この「クロスrealm変換」の実装に使われます。
-
PackAndPostMessage(port, "
error
", error)を実行し、結果は破棄する。
この抽象演算が実行される時点で既にエラー状態なので、さらなるエラーは処理できず破棄されます。
-
messageをOrdinaryObjectCreate(null)で作成する。
-
! CreateDataProperty(message, "
type
", type)を実行する。 -
! CreateDataProperty(message, "
value
", value)を実行する。 -
targetPortをportがエンタングルされているポート(なければnull)とする。
-
optionsを«[ "
transfer
" → « » ]»とする。 -
message port post message stepsをtargetPort、message、optionsを指定して実行する。
JavaScriptオブジェクトを転送に使うことで、message port post message
stepsを重複せずに済みます。オブジェクトのprototypeはnullにして%Object.prototype%
による干渉を防ぎます。
-
resultをPackAndPostMessage(port, type, value)の結果とする。
-
resultがabrupt completionなら、
-
! CrossRealmTransformSendError(port, result.[[Value]])を実行する。
-
-
resultをcompletion recordとして返す。
-
! InitializeReadableStream(stream)を実行する。
-
controllerを新規
ReadableStreamDefaultController
とする。 -
portの
message
イベントに以下のハンドラを追加する:-
dataをメッセージのデータとする。
-
断言: dataはオブジェクトである。
-
typeを! Get(data, "
type
")とする。 -
valueを! Get(data, "
value
")とする。 -
断言: typeは文字列である。
-
typeが"
chunk
"なら、-
! ReadableStreamDefaultControllerEnqueue(controller, value)を実行する。
-
-
それ以外でtypeが"
close
"なら、-
! ReadableStreamDefaultControllerClose(controller)を実行する。
-
portをディセンタングルする。
-
-
それ以外でtypeが"
error
"なら、-
! ReadableStreamDefaultControllerError(controller, value)を実行する。
-
portをディセンタングルする。
-
-
-
portの
messageerror
イベントに以下のハンドラを追加する:-
errorを新しい"
DataCloneError
"DOMException
とする。 -
! CrossRealmTransformSendError(port, error)を実行する。
-
! ReadableStreamDefaultControllerError(controller, error)を実行する。
-
portをディセンタングルする。
-
-
portのport message queueを有効化する。
-
startAlgorithmをundefinedを返すアルゴリズムとする。
-
pullAlgorithmを以下の手順とする:
-
! PackAndPostMessage(port, "
pull
", undefined)を実行する。
-
-
cancelAlgorithmを以下の手順(reason引数を取る)とする:
-
resultをPackAndPostMessageHandlingError(port, "
error
", reason)の結果とする。 -
portをディセンタングルする。
-
resultがabrupt completionなら、promise rejected with result.[[Value]]を返す。
-
それ以外なら、undefinedで解決されたpromiseを返す。
-
-
sizeAlgorithmを常に1を返すアルゴリズムとする。
-
! SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, sizeAlgorithm)を実行する。
このアルゴリズム内のassert失敗は、入力が信頼できないコンテキストから来る可能性があるため、明示的に処理することが推奨されます。未処理だとセキュリティ上の問題に繋がる可能性があります。
-
! InitializeWritableStream(stream)を実行する。
-
controllerを新規
WritableStreamDefaultController
とする。 -
backpressurePromiseを新しいpromiseとする。
-
portの
message
イベントに以下のハンドラを追加する:-
dataをメッセージのデータとする。
-
断言: dataはオブジェクトである。
-
typeを! Get(data, "
type
")とする。 -
valueを! Get(data, "
value
")とする。 -
断言: typeは文字列である。
-
typeが"
pull
"なら、-
backpressurePromiseがundefinedでなければ、
-
resolve backpressurePromise をundefinedで解決する。
-
backpressurePromiseをundefinedに設定する。
-
-
-
それ以外でtypeが"
error
"なら、-
! WritableStreamDefaultControllerErrorIfNeeded(controller, value)を実行する。
-
backpressurePromiseがundefinedでなければ、
-
resolve backpressurePromise をundefinedで解決する。
-
backpressurePromiseをundefinedに設定する。
-
-
-
-
portの
messageerror
イベントに以下のハンドラを追加する:-
errorを新しい"
DataCloneError
"DOMException
とする。 -
! CrossRealmTransformSendError(port, error)を実行する。
-
! WritableStreamDefaultControllerErrorIfNeeded(controller, error)を実行する。
-
portをディセンタングルする。
-
-
portのport message queueを有効化する。
-
startAlgorithmをundefinedを返すアルゴリズムとする。
-
writeAlgorithmを以下の手順(chunk引数を受け取る)とする:
-
backpressurePromiseがundefinedなら、backpressurePromiseをundefinedで解決されたpromiseに設定する。
-
ReactでbackpressurePromiseがfulfilledになったときの手順として以下を返す:
-
backpressurePromiseを新しいpromiseに設定する。
-
resultをPackAndPostMessageHandlingError(port, "
chunk
", chunk)の結果とする。 -
resultがabrupt completionなら、
-
portをディセンタングルする。
-
promise rejected with result.[[Value]]を返す。
-
-
それ以外なら、undefinedで解決されたpromiseを返す。
-
-
-
closeAlgorithmを以下の手順とする:
-
! PackAndPostMessage(port, "
close
", undefined)を実行する。 -
portをディセンタングルする。
-
-
abortAlgorithmを以下の手順(reason引数を取る)とする:
-
resultをPackAndPostMessageHandlingError(port, "
error
", reason)の結果とする。 -
portをディセンタングルする。
-
resultがabrupt completionなら、promise rejected with result.[[Value]]を返す。
-
それ以外なら、undefinedで解決されたpromiseを返す。
-
-
sizeAlgorithmを常に1を返すアルゴリズムとする。
-
! SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, 1, sizeAlgorithm)を実行する。
このアルゴリズム内のassert失敗は、入力が信頼できないコンテキストから来る可能性があるため、明示的に処理することが推奨されます。未処理だとセキュリティ上の問題に繋がる可能性があります。
8.3. その他
以下の抽象演算はユーティリティの寄せ集めです。
-
断言: Oはオブジェクトである。
-
断言: Oは[[ArrayBufferData]]内部スロットを持つ。
-
! IsDetachedBuffer(O)がtrueなら、falseを返す。
-
SameValue(O.[[ArrayBufferDetachKey]], undefined) がfalseなら、falseを返す。
-
trueを返す。
-
vが数値でなければ、falseを返す。
-
vがNaNなら、falseを返す。
-
v < 0なら、falseを返す。
-
trueを返す。
-
断言: ! IsDetachedBuffer(O)はfalse。
-
arrayBufferDataをO.[[ArrayBufferData]]とする。
-
arrayBufferByteLengthをO.[[ArrayBufferByteLength]]とする。
-
? DetachArrayBuffer(O)を実行する。
これはOの[[ArrayBufferDetachKey]]がundefinedでない場合(例:
WebAssembly.Memory
のbuffer
など)、例外を投げます。 [WASM-JS-API-1] -
新しい
ArrayBuffer
オブジェクト(現在のRealmで作成)で、[[ArrayBufferData]]内部スロット値がarrayBufferData、[[ArrayBufferByteLength]]内部スロット値がarrayBufferByteLengthのものを返す。
-
断言: Oはオブジェクトである。
-
断言: Oは[[ViewedArrayBuffer]]内部スロットを持つ。
-
断言: ! IsDetachedBuffer(O.[[ViewedArrayBuffer]])がfalseである。
-
bufferを? CloneArrayBuffer(O.[[ViewedArrayBuffer]], O.[[ByteOffset]], O.[[ByteLength]],
%ArrayBuffer%
)とする。 -
arrayを! Construct(
%Uint8Array%
, « buffer »)とする。 -
arrayを返す。
-
serializedを? StructuredSerialize(v)とする。
-
? StructuredDeserialize(serialized, 現在のRealm)を返す。
-
断言: toBufferはオブジェクトである。
-
断言: toBufferは[[ArrayBufferData]]内部スロットを持つ。
-
断言: fromBufferはオブジェクトである。
-
断言: fromBufferは[[ArrayBufferData]]内部スロットを持つ。
-
toBufferがfromBufferと同じなら、falseを返す。
-
! IsDetachedBuffer(toBuffer)がtrueなら、falseを返す。
-
! IsDetachedBuffer(fromBuffer)がtrueなら、falseを返す。
-
toIndex + count > toBuffer.[[ArrayBufferByteLength]]なら、falseを返す。
-
fromIndex + count > fromBuffer.[[ArrayBufferByteLength]]なら、falseを返す。
-
trueを返す。
9. 他の仕様でのストリーム利用
この標準の多くはストリームの内部動作に関するものです。他の仕様は通常これらの詳細を気にする必要はありません。代わりに、各種IDL型や以下の定義を通じて現行標準とインターフェースすべきです。
仕様はこの標準で定義された内部スロットを直接参照・操作すべきではありません。同様に、ここで定義されている抽象演算も使用すべきではありません。こうした直接利用は、この標準が保証する不変条件を破壊する可能性があります。
もしあなたの仕様がここでサポートされていない形でストリームとインターフェースしたい場合は、issueを提出してください。このセクションは必要に応じて有機的に拡張される予定です。
9.1. ReadableStream
9.1.1. 作成と操作
ReadableStream
オブジェクトstreamに、オプションのアルゴリズムpullAlgorithm、オプションのアルゴリズムcancelAlgorithm、オプションの数値highWaterMark(デフォルト1)、オプションのアルゴリズムsizeAlgorithmを与え、以下の手順を実行する。pullAlgorithmとcancelAlgorithmはpromiseを返してもよい。sizeAlgorithmはチャンクオブジェクトを受けて数値を返すアルゴリズムでなければならない。highWaterMarkは非負かつNaNでない数値でなければならない。
-
startAlgorithmをundefinedを返すアルゴリズムとする。
-
pullAlgorithmWrapperを次の手順を実行するアルゴリズムとする:
-
resultをpullAlgorithmを実行した結果(未指定ならnull)とする。例外eが投げられたら、eでrejectされたpromiseを返す。
-
resultが
Promise
なら、resultを返す。
-
-
cancelAlgorithmWrapperを次の手順(reason引数を取る)を実行するアルゴリズムとする:
-
resultをcancelAlgorithmにreasonを渡して実行した結果(未指定ならnull)とする。例外eが投げられたら、eでrejectされたpromiseを返す。
-
resultが
Promise
なら、resultを返す。
-
-
sizeAlgorithmが未指定なら、1を返すアルゴリズムに設定する。
-
! InitializeReadableStream(stream)を実行する。
-
controllerを新規
ReadableStreamDefaultController
とする。 -
! SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithmWrapper, cancelAlgorithmWrapper, highWaterMark, sizeAlgorithm)を実行する。
ReadableStream
オブジェクトstreamに、オプションのアルゴリズムpullAlgorithm、
オプションのアルゴリズムcancelAlgorithm、
オプションの数値highWaterMark
(デフォルト0)を与え、以下の手順を実行する。pullAlgorithmとcancelAlgorithmはpromiseを返してもよい。highWaterMarkは非負かつNaNでない数値でなければならない。
-
startAlgorithmをundefinedを返すアルゴリズムとする。
-
pullAlgorithmWrapperを次の手順を実行するアルゴリズムとする:
-
resultをpullAlgorithmを実行した結果(未指定ならnull)とする。例外eが投げられたら、eでrejectされたpromiseを返す。
-
resultが
Promise
なら、resultを返す。
-
-
cancelAlgorithmWrapperを次の手順を実行するアルゴリズムとする:
-
resultをcancelAlgorithmを実行した結果(未指定ならnull)とする。例外eが投げられたら、eでrejectされたpromiseを返す。
-
resultが
Promise
なら、resultを返す。
-
-
! InitializeReadableStream(stream)を実行する。
-
controllerを新規
ReadableByteStreamController
とする。 -
! SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithmWrapper, cancelAlgorithmWrapper, highWaterMark, undefined)を実行する。
ReadableStream
を作成する手順は2段階です:
-
readableStreamを新規
ReadableStream
とする。 -
セットアップにreadableStreamと各種引数を与えて実行する。
ReadableStream
のサブクラスは、コンストラクターの手順内でセットアップや
バイト読取サポート付きセットアップ
をthis値に対して直接呼び出します。
以下のアルゴリズムは、上述のセットアップまたはバイト読取サポート付きセットアップアルゴリズムで初期化されたReadableStream
インスタンスにのみ使用してください(例えば、Web開発者が作成したインスタンスには使用不可)。
ReadableStream
streamのハイウォーターマークまで埋めるための希望サイズは、次の手順で決まります:
-
streamが読み取り可能でなければ、0を返す。
-
stream.[[controller]]が
ReadableByteStreamController
を実装していれば、 !ReadableByteStreamControllerGetDesiredSize(stream.[[controller]])を返す。 -
それ以外の場合、!ReadableStreamDefaultControllerGetDesiredSize(stream.[[controller]])を返す。
ReadableStream
は、ハイウォーターマークまで埋めるための希望サイズが0より大きい場合、追加データが必要と判定されます。
ReadableStream
streamをクローズするには:
-
stream.[[controller]]が
ReadableByteStreamController
を実装していれば、-
!ReadableByteStreamControllerClose(stream.[[controller]])を実行する。
-
stream.[[controller]].[[pendingPullIntos]] が空でなければ、!ReadableByteStreamControllerRespond(stream.[[controller]], 0)を実行する。
-
-
それ以外の場合、!ReadableStreamDefaultControllerClose(stream.[[controller]])を実行する。
ReadableStream
streamにJavaScript値eでエラーを発生させるには:
-
stream.[[controller]]が
ReadableByteStreamController
を実装していれば、!ReadableByteStreamControllerError(stream.[[controller]], e)を実行する。 -
それ以外の場合、!ReadableStreamDefaultControllerError(stream.[[controller]], e)を実行する。
ReadableStream
streamにエンキューするには:
-
stream.[[controller]]が
ReadableStreamDefaultController
を実装していれば、-
!ReadableStreamDefaultControllerEnqueue(stream.[[controller]], chunk)を実行する。
-
-
それ以外の場合:
-
断言: stream.[[controller]]は
ReadableByteStreamController
を実装している。 -
断言: chunkは
ArrayBufferView
である。 -
byobViewをstreamの現在のBYOBリクエストビューとする。
-
byobViewがnullでなく、chunk.[[ViewedArrayBuffer]]がbyobView.[[ViewedArrayBuffer]]なら:
-
断言: chunk.[[ByteOffset]]はbyobView.[[ByteOffset]]である。
-
断言: chunk.[[ByteLength]] ≤ byobView.[[ByteLength]]である。
これらの断言は呼び出し元が現在のBYOBリクエストビューの要求範囲外に書き込まないことを保証します。
-
?ReadableByteStreamControllerRespond(stream.[[controller]], chunk.[[ByteLength]])を実行する。
-
-
それ以外の場合、?ReadableByteStreamControllerEnqueue(stream.[[controller]], chunk)を実行する。
-
以下のアルゴリズムは、上述のバイト読取サポート付きセットアップアルゴリズムで初期化されたReadableStream
インスタンスにのみ使用してください:
ReadableStream
streamの現在のBYOBリクエストビューは、ArrayBufferView
またはnullであり、次の手順で決定します:
-
断言: stream.[[controller]]は
ReadableByteStreamController
を実装している。 -
byobRequestを!ReadableByteStreamControllerGetBYOBRequest(stream.[[controller]])とする。
-
byobRequestがnullなら、nullを返す。
-
byobRequest.[[view]]を返す。
仕様は転送や切り離しを、現在のBYOBリクエストビューの基礎バッファに対して行ってはいけません。
実装は、例えば他スレッドからメモリを書き込む目的で転送と同等のことをする場合もあり得ますが、エンキューやクローズのアルゴリズムを調整し、同じ可観測な結果を保つ必要があります。仕様上は、転送および切り離しは禁止されています。
仕様は可能な限り、現在の BYOB リクエストの view が null でない場合、その view に 書き込むべきであり、その後その view を使って enqueue
を呼び出すべきです。
新たな ArrayBufferView
を
enqueue
に渡すために 作成するのは、
現在の BYOB リクエストの view が null の場合や、
手元のバイト数が 現在の BYOB リクエストの view の
byte length より多い場合のみにすべきです。これにより不要なコピーを避け、ストリームの利用者の意図をよりよく尊重できます。
以下のバイトからプルアルゴリズムは、バイト列を仕様レベルの基礎バイトソースの表現として使う一般的なケースを満たします。これは保守的であり、バイト列を積極的にエンキューせずに残しますので、呼び出し元は残りバイト数をバックプレッシャーのシグナルとして使うことができます。
ReadableStream
streamへバイトからプルするには:
-
断言: stream.[[controller]]は
ReadableByteStreamController
を実装している。 -
availableをbytesの長さとする。
-
desiredSizeをavailableとする。
-
streamの現在のBYOBリクエストビューがnullでなければ、desiredSize をstreamの現在のBYOBリクエストビューのバイト長に設定する。
-
pullSizeをavailableとdesiredSizeの小さい方とする。
-
pulledをbytesの先頭pullSizeバイト分とする。
-
bytesから先頭pullSizeバイトを削除する。
-
streamの現在のBYOBリクエストビューがnullでなければ:
-
pulledを書き込みstreamの現在のBYOBリクエストビューに格納する。
-
?ReadableByteStreamControllerRespond(stream.[[controller]], pullSize)を実行する。
-
-
それ以外の場合:
-
viewをpulledからstreamの関連Realmで
Uint8Array
として作成したものとする。 -
?ReadableByteStreamControllerEnqueue(stream.[[controller]], view)を実行する。
-
仕様は、対応するReadableStream
をクローズした後は、現在のBYOBリクエストビューやバイトからプルへの書き込み操作を行ってはいけません。
9.1.2. 読取
以下のアルゴリズムは、Web開発者が作成したものを含む任意のReadableStream
インスタンスで使用できます。いずれも操作ごとに特有の失敗があり、呼び出し元の仕様で適切に処理してください。
ReadableStream
streamに対してリーダー取得するには、? AcquireReadableStreamDefaultReader(stream)を返します。
結果はReadableStreamDefaultReader
となります。
これは、streamが既にロック済みの場合、例外を投げます。
ReadableStreamDefaultReader
readerからチャンク読取を行うには、read request
readRequestを与えて! ReadableStreamDefaultReaderRead(reader,
readRequest)を実行します。
ReadableStreamDefaultReader
readerから全バイト読取するには、
successSteps(バイト列を受け取るアルゴリズム)と
failureSteps(JavaScript値を受け取るアルゴリズム)を与えて、read-loopをreader、新規バイト列、successSteps、failureStepsで呼び出します。
-
readRequestを新しいread request(以下の要素を持つ)とする:
- chunk steps(chunkを受け取る)
-
-
chunkが
Uint8Array
オブジェクトでなければ、failureStepsをTypeError
で呼び出し、これ以降の手順を中止する。 -
chunkが表すバイトをbytesに追加する。
-
read-loopをreader、bytes、successSteps、failureStepsで再帰呼び出しする。
この再帰は直接実装するとスタックオーバーフローの原因になる可能性があります。実装では非再帰的な手法やマイクロタスクのキューイング、あるいは下記のより直接的バイト読取手法の利用等で緩和が必要です。
-
- close steps
-
-
successStepsをbytesで呼び出す。
-
- error steps(eを受け取る)
-
-
failureStepsをeで呼び出す。
-
-
! ReadableStreamDefaultReaderRead(reader, readRequest)を実行する。
readerは対応するReadableStream
への排他的アクセス権を持つため、実際の読取メカニズムは観測できません。実装によっては、より直接的なメカニズム(例えばReadableStreamBYOBReader
の取得・利用や、チャンクへの直接アクセスなど)を使用できます。
ReadableStreamDefaultReader
readerのリリースは
!ReadableStreamDefaultReaderRelease(reader)を実行します。
ReadableStreamDefaultReader
readerのキャンセルは
reasonを与えて! ReadableStreamReaderGenericCancel(reader,
reason)を実行します。返り値はpromiseで、undefinedでfulfillまたは失敗理由でrejectされます。
ReadableStream
streamのキャンセルは
reasonを与えて! ReadableStreamCancel(stream,
reason)を返します。
返り値はpromiseで、undefinedでfulfillまたは失敗理由でrejectされます。
ReadableStream
streamの二又化(tee)は、? ReadableStreamTee(stream, true)を返します。
第二引数にtrueを渡すことで、ReadableStreamTeeの戻り値の2番目のブランチのチャンクは、1番目のブランチからHTMLのシリアライズ可能オブジェクトフレームワークでクローンされます。これにより、一方のブランチの消費が他方に干渉しません。
9.1.3. 内省
以下の述語は任意のReadableStream
オブジェクトに対して使用できます。ただし、ストリームがロックされているかどうかの判定以外は、公開JavaScript API経由では直接内省できませんので、仕様は§ 9.1.2 読取のアルゴリズムを使うべきです。(例えば、ストリームが読み取り可能かテストする代わりに、リーダー取得を試みて例外を処理します。)
ReadableStream
streamが読み取り可能なのは
stream.[[state]]が"readable
"の場合です。
ReadableStream
streamがクローズなのは
stream.[[state]]が"closed
"の場合です。
ReadableStream
streamがエラー状態なのは
stream.[[state]]が"errored
"の場合です。
ReadableStream
streamがロックされているのは、! IsReadableStreamLocked(stream)がtrueの場合です。
ReadableStream
streamが攪乱(disturbed)されているのは、stream.[[disturbed]]がtrueの場合です。
これはそのストリームが一度でも読取またはキャンセルされたかを示します。この節の他の述語以上に、Web開発者が間接的にもアクセスできない情報なので、分岐によるプラットフォーム挙動の制御は好ましくありません。
9.2. WritableStream
9.2.1. 作成と操作
WritableStream
オブジェクトstreamに、アルゴリズムwriteAlgorithm、オプションのアルゴリズムcloseAlgorithm、オプションのアルゴリズムabortAlgorithm、オプションの数値highWaterMark(デフォルト1)、オプションのアルゴリズムsizeAlgorithmを与え、以下の手順を実行します。
writeAlgorithmはチャンクオブジェクトを受け取りpromiseを返すアルゴリズムでなければなりません。closeAlgorithmとabortAlgorithmはpromiseを返してもよい。sizeAlgorithmはチャンクを受けて数値を返すアルゴリズムでなければならない。highWaterMarkは非負かつNaNでない数値でなければなりません。
-
startAlgorithmをundefinedを返すアルゴリズムとする。
-
closeAlgorithmWrapperを次の手順を実行するアルゴリズムとする:
-
resultをcloseAlgorithmを実行した結果(未指定ならnull)とする。例外eが投げられたら、eでrejectされたpromiseを返す。
-
resultが
Promise
なら、resultを返す。
-
-
abortAlgorithmWrapperを次の手順(reason引数を取る)を実行するアルゴリズムとする:
-
resultをabortAlgorithmにreasonを渡して実行した結果(未指定ならnull)とする。例外eが投げられたら、eでrejectされたpromiseを返す。
-
resultが
Promise
なら、resultを返す。
-
-
sizeAlgorithmが未指定なら、1を返すアルゴリズムに設定する。
-
! InitializeWritableStream(stream)を実行する。
-
controllerを新規
WritableStreamDefaultController
とする。 -
! SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithmWrapper, abortAlgorithmWrapper, highWaterMark, sizeAlgorithm)を実行する。
他の仕様は、writeAlgorithmの構築時に、与えられたチャンクの並列読み取りを避けるよう注意すべきです。こうした並列読取はJavaScriptのrun-to-completion意味論を破壊する可能性があります。これを避けるには、与えられた値を同期的にコピーまたは転送する(StructuredSerializeWithTransfer、buffer sourceのバイトコピー取得、ArrayBufferの転送等)を使います。例外は、チャンクがSharedArrayBuffer
の場合で、この場合は並列変更が許容されます。
WritableStream
を作成する手順は2段階です:
-
writableStreamを新規
WritableStream
とする。 -
セットアップにwritableStreamと各種引数を与えて実行する。
WritableStream
のサブクラスは、コンストラクター手順内でセットアップをthis値に対して直接呼び出します。
以下の定義は、上述のセットアップアルゴリズムで初期化されたWritableStream
インスタンスにのみ使用してください:
WritableStream
streamにJavaScript値eでエラーを発生させるには
!WritableStreamDefaultControllerErrorIfNeeded(stream.[[controller]], e)を実行します。
WritableStream
streamのsignalは
stream.[[controller]].[[abortController]]の
signalです。仕様はこのAbortSignal
に
追加や削除アルゴリズムを行ったり、
中断判定や
中断理由を参照できます。
通常の使い方は、WritableStreamのsignalにアルゴリズムを追加し、基礎sinkへの書き込み操作を中断できるようにします。次にwriteAlgorithm内で、基礎sinkの応答後、signalが中断済みかを確認し、もしそうならsignalの中断理由でpromiseをrejectします。
9.2.2. 書き込み
以下のアルゴリズムは、Web開発者が作成したものを含む任意のWritableStream
インスタンスで使用できます。いずれも操作ごとに特有の失敗があり、呼び出し元の仕様で適切に処理してください。
WritableStream
streamに対してライター取得するには、? AcquireWritableStreamDefaultWriter(stream)を返します。
結果はWritableStreamDefaultWriter
となります。
これは、streamが既にロック済みの場合、例外を投げます。
WritableStreamDefaultWriter
writerに値chunkをチャンク書き込みするには、! WritableStreamDefaultWriterWrite(writer,
chunk)を返します。
WritableStreamDefaultWriter
writerのリリースは、!WritableStreamDefaultWriterRelease(writer)を実行します。
WritableStream
streamのクローズは、! WritableStreamClose(stream)を返します。返り値はpromiseで、undefinedでfulfillまたは失敗理由でrejectされます。
WritableStream
streamの中断(abort)は、reasonを与えて! WritableStreamAbort(stream,
reason)を返します。返り値はpromiseで、undefinedでfulfillまたは失敗理由でrejectされます。
9.3. TransformStream
9.3.1. 作成と操作
TransformStream
オブジェクトstreamに、アルゴリズムtransformAlgorithm、オプションのアルゴリズムflushAlgorithm、オプションのアルゴリズム
cancelAlgorithmを与え、以下の手順を実行します。
transformAlgorithmや、指定されていればflushAlgorithmやcancelAlgorithmはpromiseを返してもよい。
-
writableHighWaterMarkを1とする。
-
writableSizeAlgorithmを1を返すアルゴリズムとする。
-
readableHighWaterMarkを0とする。
-
readableSizeAlgorithmを1を返すアルゴリズムとする。
-
transformAlgorithmWrapperを次の手順(値chunkを受け取る)とする:
-
resultをtransformAlgorithmにchunkを渡して実行した結果とする。例外eが投げられたら、eでrejectされたpromiseを返す。
-
resultが
Promise
なら、resultを返す。
-
-
flushAlgorithmWrapperを次の手順とする:
-
resultをflushAlgorithmを実行した結果(未指定ならnull)とする。例外eが投げられたら、eでrejectされたpromiseを返す。
-
resultが
Promise
なら、resultを返す。
-
-
cancelAlgorithmWrapperを次の手順(値reasonを受け取る)とする:
-
resultをcancelAlgorithmにreasonを渡して実行した結果(未指定ならnull)とする。例外eが投げられたら、eでrejectされたpromiseを返す。
-
resultが
Promise
なら、resultを返す。
-
-
startPromiseをundefinedで解決されたpromiseとする。
-
! InitializeTransformStream(stream, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm)を実行する。
-
controllerを新規
TransformStreamDefaultController
とする。 -
! SetUpTransformStreamDefaultController(stream, controller, transformAlgorithmWrapper, flushAlgorithmWrapper, cancelAlgorithmWrapper)を実行する。
他の仕様は、transformAlgorithmの構築時に、与えられたチャンクの並列読み取りを避けるよう注意すべきです。こうした並列読取はJavaScriptのrun-to-completion意味論を破壊する可能性があります。これを避けるには、与えられた値を同期的にコピーまたは転送する(StructuredSerializeWithTransfer、buffer sourceのバイトコピー取得、ArrayBufferの転送等)を使います。例外は、チャンクがSharedArrayBuffer
の場合で、この場合は並列変更が許容されます。
TransformStream
を作成する手順は2段階です:
-
transformStreamを新規
TransformStream
とする。 -
セットアップにtransformStreamと各種引数を与えて実行する。
TransformStream
のサブクラスは、コンストラクター手順内でセットアップをthis値に対して直接呼び出します。
TransformStream
の作成は:
-
transformStreamを新規
TransformStream
とする。 -
セットアップにtransformStreamを与え、transformAlgorithmは、chunkが与えられたとき transformStreamにエンキューするアルゴリズムとする。
-
transformStreamを返す。
以下のアルゴリズムは、上述のセットアップアルゴリズムで初期化されたTransformStream
インスタンスにのみ使用してください。通常はtransformAlgorithmや
flushAlgorithmの一部として呼び出されます。
TransformStream
streamにJavaScript値chunkをエンキューするには、
!TransformStreamDefaultControllerEnqueue(stream.[[controller]], chunk)を実行します。
TransformStream
streamの終了(terminate)は、
!TransformStreamDefaultControllerTerminate(stream.[[controller]])を実行します。
TransformStream
streamにJavaScript値eでエラーを発生させるには、
!TransformStreamDefaultControllerError(stream.[[controller]], e)を実行します。
9.3.2. 独自クラスへのラッピング
現行標準以外の仕様で、独自の変換ストリームを定義したい場合、TransformStream
インターフェースを直接サブクラス化しない選択もできます。独自クラスが必要な場合は、独立したWeb IDLインターフェースを定義し、以下のMixinを利用できます:
interface mixin {
GenericTransformStream readonly attribute ReadableStream readable ;readonly attribute WritableStream writable ; };
プラットフォームオブジェクトがGenericTransformStream
mixinを含む場合、そのオブジェクトには
transformが関連付けられます。これは実際のTransformStream
です。
readable
ゲッターの手順は、
thisの
transform.[[readable]]を返します。
writable
ゲッターの手順は、
thisの
transform.[[writable]]を返します。
GenericTransformStream
mixinを含めることで、IDLインターフェースは適切な
readable
およびwritable
プロパティを持ちます。挙動をカスタマイズしたい場合は、コンストラクター(または他の初期化コード)で各インスタンスのtransformを新規TransformStream
にセットし、
セットアップ時に
transformAlgorithmや
flushAlgorithm引数で適切にカスタマイズしてください。
注: このパターンの既存例として、CompressionStream
や
TextDecoderStream
などがあります。
[COMPRESSION] [ENCODING]
ベースのTransformStream
クラス以上のAPIが不要な場合はラッパークラスを作る必要はありません。ラッパーが必要になる典型的な理由はカスタムなコンストラクター手順ですが、もし概念的な変換ストリームが構築されることを意図していないなら、直接TransformStream
を使って問題ありません。
9.4. その他のストリームペア
変換ストリーム以外にも、仕様ではReadableとWritableストリームのペアを作ることが多いです。この節ではそういった場合の指針を示します。
その際はプロパティ名にreadable
とwritable
を使い、input
/output
やreadableStream
/writableStream
など他の名前は使わないこと、
メソッドやその他の手段でアクセスさせてはいけないことが推奨されます。
9.4.1. 複合(duplex)ストリーム
最も一般的なReadable/Writableペアは複合ストリームであり、readable/writable両方が一つのリソース(ソケットや接続、デバイス等)の両端を表します。
複合ストリームで最も注意すべきなのは、Readable側のキャンセルやWritable側のクローズ/中断 などの操作をどう扱うかです。一方のみへの操作で「半開き」にするか、両側に影響を波及させるか(たとえばReadable側の cancelAlgorithmでWritable側をクローズする等)を仕様で決める必要があります。
JavaScriptで作る典型的な複合ストリームの例は、§ 10.8 同じ基礎リソースをラップする{readable, writable}ストリームペアにあります。この例では両側への影響の伝播を示しています。
もう一つは非同期で複合ストリームを取得する場合(例えば接続確立後など)の扱いです。推奨パターンは、構築可能なクラスを作り、promiseを返すプロパティで複合ストリームオブジェクトを返すことです。この複合ストリームは非同期でしか得られない情報(接続データ等)も公開できます。コンテナクラス側で、両端をまとめて閉じるAPIなども持たせられます。
この複合ストリームのより複雑な例は、まだ仕様策定中ですがWebSocketStream
です。詳しくはexplainerや設計ノートを参照してください。
複合ストリームはreadable
/writable
プロパティ契約を守るため、pipeThrough()
で利用できます。すべてのケースで意味があるわけではありませんが、基礎リソースが実際に変換処理をしている場合は有用です。
任意のWebSocketではpipeThroughは意味がありませんが、WebSocketサーバーが受信したメッセージに対して何らかの変換後のデータを返す設計なら便利に使えます。
9.4.2. エンドポイントペア
もう一つのReadable/Writableペアはエンドポイントペアです。この場合、readable/writableストリームはより長いパイプラインの両端であり、Web開発者コードが間に変換ストリームを挿入することを目的としています。
createEndpointPair()
という関数があるとすると、Web開発者は以下のように書くでしょう:
const { readable, writable} = createEndpointPair(); await readable. pipeThrough( new TransformStream(...)). pipeTo( writable);
WebRTC Encoded Transform
はこの技法の例であり、RTCRtpScriptTransformer
インターフェースは
readable
とwritable
属性を持ちます。
このようなエンドポイントペアもreadable
/writable
契約を守りますが、pipeThrough()
に渡すのは常に意味があるわけではありません。
9.5. パイピング
ReadableStream
readableをWritableStream
writableへパイプする結果は、オプションのboolean
preventClose
(デフォルトfalse)、オプションのboolean preventAbort(デフォルト
false)、オプションのboolean preventCancel(デフォルト
false)、オプションのAbortSignal
signalを受け取り、以下の手順を実行します。
これらはパイプ処理が完了するとfulfilled、失敗すると例外でrejectedになるPromise
を返します。
-
断言: ! IsReadableStreamLocked(readable)はfalse。
-
断言: ! IsWritableStreamLocked(writable)はfalse。
-
signalArgをsignalが与えられていればその値、そうでなければundefinedとする。
-
! ReadableStreamPipeTo(readable, writable, preventClose, preventAbort, preventCancel, signalArg)を返す。
返されたpromiseが不要な場合、この概念を参照するのは少し不便です。最善の表現は「readableをwritableへパイプする」です。
ReadableStream
readableをTransformStream
transformへパイプスルーする結果は、
オプションのboolean preventClose
(デフォルトfalse)、オプションのboolean preventAbort
(デフォルトfalse)、オプションのboolean preventCancel(デフォルトfalse)、オプションのAbortSignal
signalを受け取り、以下の手順を実行します。結果はtransformの読み取り側となります。
-
断言: ! IsReadableStreamLocked(readable)はfalse。
-
断言: ! IsWritableStreamLocked(transform.[[writable]])はfalse。
-
signalArgをsignalが与えられていればその値、そうでなければundefinedとする。
-
promiseを! ReadableStreamPipeTo(readable, transform.[[writable]], preventClose, preventAbort, preventCancel, signalArg)とする。
-
promise.[[PromiseIsHandled]]をtrueにする。
-
transform.[[readable]]を返す。
ReadableStream
streamのプロキシ作成は以下の手順で行います。結果は、streamからデータを取得する新しいReadableStream
オブジェクトです。このときstream自身はすぐにロックされ攪乱状態となります。
-
identityTransformをアイデンティティTransformStreamの作成結果とする。
-
streamをidentityTransformへパイプスルーした結果を返す。
10. ストリームの作成例
このセクションおよびそのすべてのサブセクションは規範的ではありません。
これまでの現行標準全体の例では、ストリームの利用方法に焦点を当てていました。ここでは、ReadableStream
、
WritableStream
、
TransformStream
の各コンストラクターを使って、ストリームを作成する方法を示します。
10.1. 基盤となるプッシュソース(バックプレッシャー非対応)のリーダブルストリーム
次の関数は、リーダブルストリームを作成し、WebSocket
インスタンス [WEBSOCKETS]
をラップします。
これは、プッシュソースであり、
バックプレッシャー信号をサポートしません。プッシュソースの適応時、通常ほとんどの処理は start()
メソッド内で行われることを示しています。
function makeReadableWebSocketStream( url, protocols) { const ws= new WebSocket( url, protocols); ws. binaryType= "arraybuffer" ; return new ReadableStream({ start( controller) { ws. onmessage= event=> controller. enqueue( event. data); ws. onclose= () => controller. close(); ws. onerror= () => controller. error( new Error ( "The WebSocket errored!" )); }, cancel() { ws. close(); } }); }
この関数を使って、ウェブソケット用のリーダブルストリームを作成し、そのストリームを任意のライタブルストリームにパイプできます:
const webSocketStream= makeReadableWebSocketStream( "wss://example.com:443/" , "protocol" ); webSocketStream. pipeTo( writableStream) . then(() => console. log( "全てのデータが正常に書き込まれました!" )) . catch ( e=> console. error( "何か問題が発生しました!" , e));
しかし、多くの場合「ウェブソケットにストリーム対応を追加したい」という話は、個々のウェブソケットメッセージ自体をストリーミングで送信できる新しい機能を期待しています。つまり、例えばファイルを一つのメッセージで転送できるが、クライアント側で全内容をメモリに保持せずに済むようにすることです。この目的のためには、個々のウェブソケットメッセージ自体が
ReadableStream
インスタンスになる必要があります。上の例ではそれは示していません。
詳細は この議論 を参照してください。
10.2. 基盤となるプッシュソース(バックプレッシャー対応)のリーダブルストリーム
次の関数は、「バックプレッシャーソケット」と呼ばれる仮想オブジェクトをラップしたリーダブルストリームを返します。これらはウェブソケットと同じAPIを持ちますが、readStop
とreadStart
メソッドによりデータの流れを一時停止・再開できます。この例は、バックプレッシャーを基盤ソースに適用する方法を示しています。
function makeReadableBackpressureSocketStream( host, port) { const socket= createBackpressureSocket( host, port); return new ReadableStream({ start( controller) { socket. ondata= event=> { controller. enqueue( event. data); if ( controller. desiredSize<= 0 ) { // 内部キューが満杯なので、 // バックプレッシャー信号を基盤ソースに伝播します。 socket. readStop(); } }; socket. onend= () => controller. close(); socket. onerror= () => controller. error( new Error ( "The socket errored!" )); }, pull() { // 内部キューが空になったが、ストリームの利用者がまだデータを欲している場合に呼ばれます。 // この場合、データの流れが以前に停止されていれば再開します。 socket. readStart(); }, cancel() { socket. close(); } }); }
この関数を使って同様に「バックプレッシャーソケット」用のリーダブルストリームを作成できます。今度は、送信先がソケットの生成速度ほど早くデータを受け取れない場合や、しばらくストリームを放置した場合に、バックプレッシャー信号がソケットへ送られるようになります。
10.3. 基盤となるプッシュソース(バックプレッシャー非対応)のリーダブルバイトストリーム
次の関数は、仮想UDPソケットAPIをラップしたリーダブルバイトストリームを返します。
ここでは、POSIXのselect(2)システムコールを想起させるPromise返却型select2()
メソッドを含んでいます。
UDPプロトコルはバックプレッシャー機能を持たないため、desiredSize
によるバックプレッシャー信号は無視されます。ストリームは、ソケットからデータが利用可能でもまだ開発者に要求されていない場合、そのデータをストリームの内部キューにエンキューし、カーネル空間キューのオーバーフローとデータ損失を防ぎます。
これは、利用者がストリームとどのようにやり取りするかに興味深い影響を与えます。利用者がソケットの生成速度ほど早くデータを読み取らない場合、チャンクはストリームの内部キューに無期限に残ります。この場合、BYOBリーダーを使うと、内部キューから開発者提供のバッファへデータを移動するため追加コピーが発生します。しかし、利用者が十分速くデータを消費する場合、BYOBリーダーは開発者提供バッファにゼロコピーで直接読み込めます。
(より複雑な例として、desiredSize
で帯域外バックプレッシャー信号を通知し、例えばソケットにメッセージを送って送信速度を調整する方法も考えられます。これは読者への課題です。)
const DEFAULT_CHUNK_SIZE= 65536 ; function makeUDPSocketStream( host, port) { const socket= createUDPSocket( host, port); return new ReadableStream({ type: "bytes" , start( controller) { readRepeatedly(). catch ( e=> controller. error( e)); function readRepeatedly() { return socket. select2(). then(() => { // ソケットが読み取り可能になってもBYOBリクエストがない場合もあるため、両ケースに対応する必要があります。 let bytesRead; if ( controller. byobRequest) { const v= controller. byobRequest. view; bytesRead= socket. readInto( v. buffer, v. byteOffset, v. byteLength); if ( bytesRead=== 0 ) { controller. close(); } controller. byobRequest. respond( bytesRead); } else { const buffer= new ArrayBuffer( DEFAULT_CHUNK_SIZE); bytesRead= socket. readInto( buffer, 0 , DEFAULT_CHUNK_SIZE); if ( bytesRead=== 0 ) { controller. close(); } else { controller. enqueue( new Uint8Array( buffer, 0 , bytesRead)); } } if ( bytesRead=== 0 ) { return ; } return readRepeatedly(); }); } }, cancel() { socket. close(); } }); }
ReadableStream
インスタンスは、BYOBリーダーを提供でき、上記の利点と注意点が適用されます。
10.4. 基盤となるプルソースのリーダブルストリーム
次の関数は、リーダブルストリームを返し、Node.jsファイルシステムAPIの一部をラップします(これらはC言語のfopen
、fread
、fclose
にほぼ対応します)。ファイルは典型的なプルソースの例です。プッシュソース例とは対照的に、ここではほとんどの処理が pull()
関数内でオンデマンドで実行され、start()
関数ではありません。
const fs= require( "fs" ). promises; const CHUNK_SIZE= 1024 ; function makeReadableFileStream( filename) { let fileHandle; let position= 0 ; return new ReadableStream({ async start() { fileHandle= await fs. open( filename, "r" ); }, async pull( controller) { const buffer= new Uint8Array( CHUNK_SIZE); const { bytesRead} = await fileHandle. read( buffer, 0 , CHUNK_SIZE, position); if ( bytesRead=== 0 ) { await fileHandle. close(); controller. close(); } else { position+= bytesRead; controller. enqueue( buffer. subarray( 0 , bytesRead)); } }, cancel() { return fileHandle. close(); } }); }
この関数を使えば、ソケットと同様にファイル用のリーダブルストリームを作成・利用できます。
10.5. 基盤となるプルソースのリーダブルバイトストリーム
次の関数は、リーダブルバイトストリームを返し、効率的なゼロコピーでファイル読み込みを可能にします。ここでもNode.jsファイルシステムAPIを利用します。 固定サイズ(1024バイト)でなく、開発者提供バッファを満たす形でデータを読み込み、制御を完全に委ねます。
const fs= require( "fs" ). promises; const DEFAULT_CHUNK_SIZE= 1024 ; function makeReadableByteFileStream( filename) { let fileHandle; let position= 0 ; return new ReadableStream({ type: "bytes" , async start() { fileHandle= await fs. open( filename, "r" ); }, async pull( controller) { // 利用者がデフォルトリーダーを使う場合でも、auto-allocation機能がバッファを割り当ててbyobRequest経由で渡します。 const v= controller. byobRequest. view; const { bytesRead} = await fileHandle. read( v, 0 , v. byteLength, position); if ( bytesRead=== 0 ) { await fileHandle. close(); controller. close(); controller. byobRequest. respond( 0 ); } else { position+= bytesRead; controller. byobRequest. respond( bytesRead); } }, cancel() { return fileHandle. close(); }, autoAllocateChunkSize: DEFAULT_CHUNK_SIZE}); }
これで返されたReadableStream
に対しBYOBリーダーを使えます。
また、デフォルトリーダーも作成でき、従来通りシンプルで汎用的な方法で利用できます。
ここで示した基盤バイトソースの低レベルなバイト管理と、
デフォルトリーダーの高レベルなチャンク消費との適応は、ストリーム実装が自動的に処理します。
autoAllocateChunkSize
オプションの自動割り当て機能により、§ 10.3
基盤となるプッシュソース(バックプレッシャー非対応)のリーダブルバイトストリームでの分岐処理よりもコード量が少なくなります。
10.6. バックプレッシャーも成功信号もないライタブルストリーム
次の関数は、ライタブルストリームを返し、WebSocket
[WEBSOCKETS]
をラップします。
ウェブソケットは、特定のデータチャンクが正常に送信されたかどうか簡単には判定できません(bufferedAmount
のポーリングなど、煩雑な方法以外は。これは読者への課題です)。
そのため、このライタブルストリームはバックプレッシャー
信号や書き込み成功/失敗をプロデューサーに正しく通知できません。つまり、writerのwrite()
メソッドや
ready
ゲッターの返すPromiseは常に即座に解決されます。
function makeWritableWebSocketStream( url, protocols) { const ws= new WebSocket( url, protocols); return new WritableStream({ start( controller) { ws. onerror= () => { controller. error( new Error ( "The WebSocket errored!" )); ws. onclose= null ; }; ws. onclose= () => controller. error( new Error ( "The server closed the connection unexpectedly!" )); return new Promise( resolve=> ws. onopen= resolve); }, write( chunk) { ws. send( chunk); // 送信完了が判定できないため、即座に返します。 }, close() { return closeWS( 1000 ); }, abort( reason) { return closeWS( 4000 , reason&& reason. message); }, }); function closeWS( code, reasonString) { return new Promise(( resolve, reject) => { ws. onclose= e=> { if ( e. wasClean) { resolve(); } else { reject( new Error ( "The connection was not closed cleanly" )); } }; ws. close( code, reasonString); }); } }
この関数を使って、ウェブソケット用のライタブルストリームを作成し、任意のリーダブルストリームをパイプできます:
const webSocketStream= makeWritableWebSocketStream( "wss://example.com:443/" , "protocol" ); readableStream. pipeTo( webSocketStream) . then(() => console. log( "全てのデータが正常に書き込まれました!" )) . catch ( e=> console. error( "何か問題が発生しました!" , e));
このウェブソケットラップ方式については前述の注意も参照してください。
10.7. バックプレッシャーと成功信号のあるライタブルストリーム
次の関数は、ライタブルストリームを返し、Node.jsファイルシステムAPIの一部をラップします(これらはC言語のfopen
、fwrite
、fclose
にほぼ対応します)。ラップ対象APIが書き込み成功を判定できるため、このストリームはバックプレッシャー信号や個々の書き込み成功/失敗も通知できます。
const fs= require( "fs" ). promises; function makeWritableFileStream( filename) { let fileHandle; return new WritableStream({ async start() { fileHandle= await fs. open( filename, "w" ); }, write( chunk) { return fileHandle. write( chunk, 0 , chunk. length); }, close() { return fileHandle. close(); }, abort() { return fileHandle. close(); } }); }
この関数を使うことで、ファイル用のライタブルストリームを作成し、個々のチャンクを順次書き込めます:
const fileStream= makeWritableFileStream( "/example/path/on/fs.txt" ); const writer= fileStream. getWriter(); writer. write( "To stream, or not to stream\n" ); writer. write( "That is the question\n" ); writer. close() . then(() => console. log( "チャンクを書き込み、ストリームを正常に閉じました!" )) . catch ( e=> console. error( e));
特定のfileHandle.write
呼び出しの完了が遅い場合、返却Promiseも遅れて解決されます。その間、追加の書き込みはキューに溜まり、ストリームの内部キューに保存されます。キュー内チャンクの蓄積によって、ready
ゲッターがpending状態のPromiseを返すことがあり、これはプロデューサーに書き込みを控えるべき合図となります。
ライタブルストリームのキュー処理は、このケースでは特に重要です。公式ドキュメント
にもある通り、「Promiseを待たずに同じファイルに複数回filehandle.writeを使うのは安全ではありません」。しかし
makeWritableFileStream
関数の実装時は心配不要で、ストリーム実装が基盤シンクのwrite()
メソッドを、前回Promiseが解決するまで呼ばないことを保証します!
10.8. 同じ基盤リソースをラップする { readable, writable } ストリームペア
次の関数は{ readable, writable }
形式のオブジェクトを返します。readable
プロパティはリーダブルストリーム、writable
プロパティはライタブルストリームで、両方が同じウェブソケットリソースをラップします。これは、§ 10.1 基盤となるプッシュソース(バックプレッシャー非対応)のリーダブルストリームと§ 10.6 バックプレッシャーも成功信号もないライタブルストリームを組み合わせたものです。
同時に、JavaScriptクラスを使って再利用可能な基盤シンク・基盤ソース抽象化を作成する方法も示しています。
function streamifyWebSocket( url, protocol) { const ws= new WebSocket( url, protocols); ws. binaryType= "arraybuffer" ; return { readable: new ReadableStream( new WebSocketSource( ws)), writable: new WritableStream( new WebSocketSink( ws)) }; } class WebSocketSource{ constructor ( ws) { this . _ws= ws; } start( controller) { this . _ws. onmessage= event=> controller. enqueue( event. data); this . _ws. onclose= () => controller. close(); this . _ws. addEventListener( "error" , () => { controller. error( new Error ( "The WebSocket errored!" )); }); } cancel() { this . _ws. close(); } } class WebSocketSink{ constructor ( ws) { this . _ws= ws; } start( controller) { this . _ws. onclose= () => controller. error( new Error ( "The server closed the connection unexpectedly!" )); this . _ws. addEventListener( "error" , () => { controller. error( new Error ( "The WebSocket errored!" )); this . _ws. onclose= null ; }); return new Promise( resolve=> this . _ws. onopen= resolve); } write( chunk) { this . _ws. send( chunk); } close() { return this . _closeWS( 1000 ); } abort( reason) { return this . _closeWS( 4000 , reason&& reason. message); } _closeWS( code, reasonString) { return new Promise(( resolve, reject) => { this . _ws. onclose= e=> { if ( e. wasClean) { resolve(); } else { reject( new Error ( "The connection was not closed cleanly" )); } }; this . _ws. close( code, reasonString); }); } }
この関数で作成したオブジェクトを使って、標準ストリームAPIでリモートウェブソケットと通信できます:
const streamyWS= streamifyWebSocket( "wss://example.com:443/" , "protocol" ); const writer= streamyWS. writable. getWriter(); const reader= streamyWS. readable. getReader(); writer. write( "Hello" ); writer. write( "web socket!" ); reader. read(). then(({ value, done}) => { console. log( "ウェブソケットからの応答: " , value); });
この仕組みではreadable
側をキャンセルするとwritable
側も暗黙的に閉じられ、逆にwritable
側を閉じたり中断した場合もreadable
側が自動で閉じられます。
このウェブソケットラップ方式については前述の注意も参照してください。
10.9. テンプレートタグを置換する変換ストリーム
データストリームの中で、タグを変数で置き換えるのはよくある便利な操作です。置換が必要な部分が全体のデータサイズに比べて小さい場合、この例のようなシンプルな方法が有効です。これは、文字列を文字列にマッピングし、例えば
"Time: {{time}} Message: {{message}}"
というテンプレートを "Time: 15:36 Message: hello"
のように変換します。LipFuzzTransformer
の substitutions
パラメータに
{ time: "15:36", message: "hello" }
を渡していると仮定します。
またこの例では、チャンクが部分的なデータを含み、さらにデータを受信するまで変換できない状況をどう扱うかも示しています。この場合、部分テンプレートタグは partialChunk
プロパティに蓄積され、タグの終端が見つかるかストリームの終わりに達するまで保持されます。
class LipFuzzTransformer{ constructor ( substitutions) { this . substitutions= substitutions; this . partialChunk= "" ; this . lastIndex= undefined ; } transform( chunk, controller) { chunk= this . partialChunk+ chunk; this . partialChunk= "" ; // lastIndexは最後の置換の直後の文字の位置です。 this . lastIndex= 0 ; chunk= chunk. replace( /\{\{([a-zA-Z0-9_-]+)\}\}/g , this . replaceTag. bind( this )); // 文字列末尾に不完全なテンプレートがある場合の正規表現。 const partialAtEndRegexp= /\{(\{([a-zA-Z0-9_-]+(\})?)?)?$/g ; // 既に置換済みの文字は無視する。 partialAtEndRegexp. lastIndex= this . lastIndex; this . lastIndex= undefined ; const match= partialAtEndRegexp. exec( chunk); if ( match) { this . partialChunk= chunk. substring( match. index); chunk= chunk. substring( 0 , match. index); } controller. enqueue( chunk); } flush( controller) { if ( this . partialChunk. length> 0 ) { controller. enqueue( this . partialChunk); } } replaceTag( match, p1, offset) { let replacement= this . substitutions[ p1]; if ( replacement=== undefined ) { replacement= "" ; } this . lastIndex= offset+ replacement. length; return replacement; } }
この場合、transformer を TransformStream
コンストラクターに渡すクラスとして定義しています。インスタンスデータの管理が必要な場合に有用です。
クラスは以下のように使います:
const data= { userName, displayName, icon, date}; const ts= new TransformStream( new LipFuzzTransformer( data)); fetchEvent. respondWith( fetch( fetchEvent. request. url). then( response=> { const transformedBody= response. body// バイナリエンコードされたレスポンスを文字列にデコード . pipeThrough( new TextDecoderStream()) // LipFuzzTransformerを適用 . pipeThrough( ts) // 変換後の文字列をエンコード . pipeThrough( new TextEncoderStream()); return new Response( transformedBody); }) );
簡単のため、LipFuzzTransformer
はエスケープなしでテキスト置換を行っています。実際のアプリケーションでは、セキュリティと堅牢性のため、文脈に応じてエスケープ処理を行うテンプレートシステムの利用が推奨されます。
10.10. 同期マッパー関数から作る変換ストリーム
次の関数は、同期「マッパー」関数から新しい TransformStream
インスタンスを作成できます。通常 Array.prototype.map
に渡すような関数型です。
APIが些細な変換でも簡潔に使えることが分かります。
function mapperTransformStream( mapperFunction) { return new TransformStream({ transform( chunk, controller) { controller. enqueue( mapperFunction( chunk)); } }); }
この関数を使い、すべての入力を大文字化する TransformStream
を作成できます:
const ts= mapperTransformStream( chunk=> chunk. toUpperCase()); const writer= ts. writable. getWriter(); const reader= ts. readable. getReader(); writer. write( "No need to shout" ); // "NO NEED TO SHOUT" とログ出力: reader. read(). then(({ value}) => console. log( value));
同期変換は自身でバックプレッシャーを引き起こすことはありませんが、バックプレッシャーが無い限りのみチャンク変換を行うため、リソースが無駄遣いされることはありません。
例外は自然にストリームエラーとなります:
const ts= mapperTransformStream( chunk=> JSON. parse( chunk)); const writer= ts. writable. getWriter(); const reader= ts. readable. getReader(); writer. write( "[1, " ); // 2回ともSyntaxErrorをログ出力: reader. read(). catch ( e=> console. error( e)); writer. write( "{}" ). catch ( e=> console. error( e));
10.11. 新しいリーダブルストリームを作る原始としての恒等変換ストリームの利用例
恒等変換ストリームと pipeTo()
を組み合わせることで、ストリーム操作の強力な手法となります。このセクションではその一般的な使い方例をいくつか示します。
Promiseで リーダブルストリーム を返す場合でも、それをリーダブルストリームのように扱いたくなることがあります。シンプルなアダプター関数だけで対応できます:
function promiseToReadable( promiseForReadable) { const ts= new TransformStream(); promiseForReadable. then( readable=> readable. pipeTo( ts. writable)) . catch ( reason=> ts. writable. abort( reason)) . catch (() => {}); return ts. readable; }
ここでは、データを 書き込み側にパイプし、その読み取り側を返します。パイプ中にエラーが起きた場合は書き込み側を abort
し、返された読み取り側にもエラーが自動伝搬します。既に pipeTo()
によって書き込み側がエラーになっていた場合、abort()
呼び出しはrejectを返しますが、これも安全に無視できます。
これを拡張し、複数のリーダブルストリームを一つに連結する関数例:
function concatenateReadables( readables) { const ts= new TransformStream(); let promise= Promise. resolve(); for ( const readableof readables) { promise= promise. then( () => readable. pipeTo( ts. writable, { preventClose: true }), reason=> { return Promise. all([ ts. writable. abort( reason), readable. cancel( reason) ]); } ); } promise. then(() => ts. writable. close(), reason=> ts. writable. abort( reason)) . catch (() => {}); return ts. readable; }
エラー処理は微妙で、連結したストリームのキャンセル時はすべての入力ストリームもキャンセルする必要があります。しかし正常系はシンプルで、readables
イテラブルの各ストリームを順番に恒等変換ストリームの 書き込み側にパイプし、最後にクローズします。読み取り側はすべてのストリームのチャンクを連結したものとなり、関数から返します。バックプレッシャーも通常通り働きます。
謝辞
編集者は、以下の方々に感謝します: Anne van Kesteren、 AnthumChris、 Arthur Langereis、 Ben Kelly、 Bert Belder、 Brian di Palma、 Calvin Metcalf、 Dominic Tarr、 Ed Hager、 Eric Skoglund、 Forbes Lindesay、 Forrest Norvell、 Gary Blackwood、 Gorgi Kosev、 Gus Caplan、 贺师俊 (hax)、 Isaac Schlueter、 isonmad、 Jake Archibald、 Jake Verbaten、 James Pryor、 Janessa Det、 Jason Orendorff、 Jeffrey Yasskin、 Jeremy Roman、 Jens Nockert、 Lennart Grahl、 Luca Casonato、 Mangala Sadhu Sangeet Singh Khalsa、 Marcos Caceres、 Marvin Hagemeister、 Mattias Buelens、 Michael Mior、 Mihai Potra、 Nidhi Jaju、 Romain Bellessort、 Simon Menke、 Stephen Sugden、 Surma、 Tab Atkins、 Tanguy Krotoff、 Thorsten Lorenz、 Till Schneidereit、 Tim Caswell、 Trevor Norris、 tzik、 Will Chan、 Youenn Fablet、 平野裕 (Yutaka Hirano)、 そして Xabier Rodríguez ― 本仕様への貢献に感謝します。コミュニティによる本仕様への関与は非常に素晴らしく、皆様なしでは成し得ませんでした。
この標準は Adam Rice (Google, ricea@chromium.org)、Domenic Denicola (Google, d@domenic.me)、 Mattias Buelens、および 吉野剛史 (Takeshi Yoshino, tyoshino@chromium.org) によって執筆されました。
知的財産権
Copyright © WHATWG (Apple、Google、Mozilla、Microsoft)。本作業は クリエイティブ・コモンズ 表示 4.0 国際ライセンス の下でライセンスされています。その一部がソースコードに組み込まれる場合、その部分は BSD 3-Clause ライセンス の下でライセンスされます。
これは現行標準です。 特許審査版に関心のある方は 現行標準審査ドラフト をご覧ください。