[io] Propagate cancellation in _HttpOutgoing.addStream

If HttpResponse is being closed prematurally (e.g. because client
decided to close its request) we need to propagate cancellation
to the stream which is being piped into the response. Otherwise
we will keep that stream forever hanging around and leak underlying
resources.

Fixes https://github.com/dart-lang/sdk/issues/55886

TEST=tests/standalone/io/regress_55886_test.dart
R=kustermann@google.com

Change-Id: I7c294ed19cc7c350fd101b078bd650ce8a6526a2
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/369061
Reviewed-by: Martin Kustermann <kustermann@google.com>
Commit-Queue: Martin Kustermann <kustermann@google.com>
This commit is contained in:
Vyacheslav Egorov 2024-06-02 20:33:26 +00:00 committed by Commit Queue
parent a4d3ee054d
commit fc4bcab2ef
2 changed files with 89 additions and 0 deletions

View file

@ -1793,6 +1793,7 @@ class _HttpOutgoing implements StreamConsumer<List<int>> {
onError: controller.addError,
onDone: controller.close,
cancelOnError: true);
controller.onCancel = sub.cancel;
controller.onPause = sub.pause;
controller.onResume = sub.resume;
// Write headers now that we are listening to the stream.

View file

@ -0,0 +1,88 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// Regression test for https://dartbug.com/55886: [HttpResponse.addStream]
// should cancel subscription to the stream which is being added if
// [HttpResponse] itself is being closed.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:async_helper/async_helper.dart';
import 'package:expect/expect.dart';
Future<void> pipeStream(Stream<List<int>> from, IOSink to) async {
bool wasCancelled = false;
StreamSubscription<List<int>>? subscription;
late final StreamController<List<int>> streamController;
streamController = StreamController<List<int>>(
onPause: () {
subscription?.pause();
},
onResume: () {
subscription?.resume();
},
onCancel: () {
wasCancelled = true;
subscription?.cancel();
subscription = null;
},
onListen: () {
subscription = from.listen(
(data) {
streamController.add(data);
},
onDone: () {
streamController.close();
subscription?.cancel();
subscription = null;
},
onError: (e, st) {
streamController.addError(e, st);
subscription?.cancel();
subscription = null;
},
);
},
);
await streamController.stream.pipe(to);
Expect.isTrue(wasCancelled);
}
Stream<List<int>> generateSlowly() async* {
for (var i = 0; i < 100; i++) {
yield utf8.encode("item $i");
await Future.delayed(Duration(milliseconds: 100));
}
}
Future<void> serve(HttpServer server) async {
await for (var rq in server) {
rq.response.bufferOutput = false;
await pipeStream(generateSlowly(), rq.response);
break;
}
}
void main() async {
asyncStart();
final server = await HttpServer.bind('localhost', 0);
serve(server).then((_) => asyncEnd());
// Send request and then cancel response stream subscription after
// the first chunk. This should cause server to close the connection
// and cancel subscription to the stream which is being piped into
// the response.
final client = HttpClient();
final rq = await client.get('localhost', server.port, '/');
final rs = await rq.close();
late StreamSubscription sub;
sub = rs.map(utf8.decode).listen((msg) {
sub.cancel();
});
}