ReadableStream

Streamed data starts with a ReadableStream “source” and ends in a WritableStream “sink”.

+----------------+              +----------------+
| ReadableStream | ---pipeTo--> | WritableStream |
+----------------+              +----------------+
const reader = new ReadableStream<number>({
  start(controller) {
    controller.enqueue(1)
    controller.enqueue(2)
    controller.enqueue(3)
    controller.close()
  },
})

const writer = new WritableStream<number>({
  write(chunk) {
    console.info(chunk)
  },
})

reader.pipeTo(writer).then(() => console.info('done'))
// 1
// 2
// 3
// done

Pulling from a source

The above example immediately queues 3 items in to a “source” that is then piped in to a “sink”. Each item is consumed, with the WritableStream.prototype.write() function. Although perfectly acceptable, the above approach can be much improved, especially when dealing with large amounts of data, by registering a pull function. The registered pull function will be called after a queued item has moved through the stream. It will also be called whether there is room in the queue or not.

let i = 0

const reader = new ReadableStream<number>({
  pull(controller) {
    controller.enqueue(i++)
  },
})

An important feature to note about the pull function is that when it returns a Promise it will not be called again until the Promise has resolved or when a queued item has just passed through the stream. One may want to implement back/forward pressure by checking the desiredSize property.

const asyncIterator = generateResults() // an async generator

const reader = new ReadableStream<number>({
  async pull(controller) {
    while (controller.desiredSize) {
      let result: IteratorResult<number>

      try {
        result = await asyncIterator.next()
      } catch (error) {
        return controller.error(error)
      }

      if (result.done) return controller.close()

      controller.enqueue(result.value)
    }
  },
})