Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 96 additions & 11 deletions ext/node/polyfills/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ import {
normalizeEncoding,
} from "ext:deno_node/internal/util.mjs";
import { AbortError } from "ext:deno_node/internal/errors.ts";
import { debuglog } from "ext:deno_node/internal/util/debuglog.ts";
import process from "node:process";
import { Buffer } from "node:buffer";
import { Duplex, Readable, Writable } from "node:stream";

let debug = debuglog("stream", (fn) => {
debug = fn;
});

function isWritableStream(object) {
return object instanceof WritableStream;
}
Expand Down Expand Up @@ -504,32 +509,77 @@ export function newReadableStreamFromStreamReadable(
let controller;

function onData(chunk) {
// Copy the Buffer to detach it from the pool.
if (Buffer.isBuffer(chunk) && !objectMode) {
chunk = new Uint8Array(chunk);
}
controller.enqueue(chunk);
if (controller.desiredSize <= 0) {
try {
// If the stream is destroyed, don't process any more data
if (isDestroyed(streamReadable)) {
return;
}

// Copy the Buffer to detach it from the pool.
if (Buffer.isBuffer(chunk) && !objectMode) {
chunk = new Uint8Array(chunk);
}

try {
controller.enqueue(chunk);
if (controller.desiredSize <= 0) {
streamReadable.pause();
}
} catch (err) {
// If enqueue fails (e.g., because the stream is closed),
// pause the stream to prevent further reads
streamReadable.pause();
// Don't rethrow to prevent unhandled promise rejections
debug(
"Ignoring controller.enqueue() error (stream closed during data processing):",
err
);
}
} catch (err) {
// Catch any synchronous errors to prevent unhandled exceptions
streamReadable.pause();
debug(
"Unexpected error in onData, pausing stream:",
err
);
}
}

streamReadable.pause();

let isCanceled = false;

const cleanup = finished(streamReadable, (error) => {
// If we've already been canceled, don't try to close the controller
if (isCanceled) {
return;
}

if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") {
const err = new AbortError(undefined, { cause: error });
error = err;
}

cleanup();

// This is a protection against non-standard, legacy streams
// that happen to emit an error event again after finished is called.
streamReadable.on("error", () => {});
if (error) {
return controller.error(error);

try {
if (error) {
controller.error(error);
} else {
controller.close();
}
} catch (err) {
// Ignore errors from controller.close() or controller.error()
// as they typically mean the stream was already closed or errored
debug(
`Ignoring controller.${error ? "error()" : "close()"} error in cleanup (controller already terminated):`,
err
);
}
controller.close();
});

streamReadable.on("data", onData);
Expand All @@ -543,8 +593,43 @@ export function newReadableStreamFromStreamReadable(
streamReadable.resume();
},

cancel(reason) {
destroy(streamReadable, reason);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that Node.js, keeps track of wasCanceled variable here and then later uses it to decide if the controller should be closed. Have you tried this approach? https://github.com/nodejs/node/blob/886e4b3b534a9f3ad2facbc99097419e06615900/lib/internal/webstreams/adapters.js#L499

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed!

async cancel(reason) {
// Mark as canceled to prevent the finished callback from closing the controller
isCanceled = true;

// If the stream is already destroyed, resolve immediately
if (isDestroyed(streamReadable)) {
return;
}

// Create a promise that resolves when the stream is destroyed
return new Promise((resolve) => {
// Check again in case destroy was called in the meantime
if (isDestroyed(streamReadable)) {
return resolve();
}

// Remove the data listener to prevent more reads
streamReadable.removeListener("data", onData);

// Destroy the stream
destroy(streamReadable, reason, (err) => {
if (err) {
// If there's an error, try to forward it to the controller
// but don't fail if the controller is already closed
try {
controller.error(err);
} catch (controllerErr) {
// Ignore errors from controller.error()
debug(
"Ignoring controller.error() in cancel (controller already terminated):",
controllerErr
);
}
}
resolve();
});
});
},
}, strategy);
}
Expand Down
1 change: 1 addition & 0 deletions tests/node_compat/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,7 @@
"parallel/test-stream-readable-setEncoding-existing-buffers.js" = {}
"parallel/test-stream-readable-setEncoding-null.js" = {}
"parallel/test-stream-readable-strategy-option.js" = {}
"parallel/test-stream-readable-to-web-termination.js" = {}
"parallel/test-stream-readable-unpipe-resume.js" = { flaky = true }
"parallel/test-stream-readable-unshift.js" = {}
"parallel/test-stream-readable-with-unimplemented-_read.js" = {}
Expand Down
Loading