dart-lang / sdk

The Dart SDK, including the VM, dart2js, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
9.98k stars 1.54k forks source link

Get Progress Event on Multipart Request #35427

Open awazgyawali opened 5 years ago

awazgyawali commented 5 years ago

Of course, I should have filed this request on http package repository, but I found this there dart-lang/http#153. Then asked a question on SO, added a bounty on it too, asked on gitter, but sadly I got no reply. Filed an issue https://github.com/flutter/flutter/issues/25444 on flutter SDK and I was directed here.

Is there any workaround for this behavior? If no, this might actually be a feature request.

SO Question: https://stackoverflow.com/questions/53727911/how-to-get-progress-event-while-uploading-file-on-http-multipartrequest-request.

Thanks in advance!!!

awazgyawali commented 5 years ago

After waiting for a week or soo for an answer, I created a plugin to get this behavior. Package https://pub.dartlang.org/packages/multipart_request .

Example to use it:

var request = MultipartRequest();

request.addFile("image", imagePath);

Response response = request.send();

response.onError = () {
  print("Error");
};

response.onComplete = (response) {
  print(response);
};

response.progress.listen((int progress) {
  print("progress from response object " + progress.toString());
});

Works only on android.
Hope it helps someone who's wondering around for the same question.

dru commented 5 years ago

For raw non-mulipart upload requests you can forward file stream to request's stream. By adding map function to it you can calculate progress.

Example: https://gist.github.com/dru/85975bf55151e7f160a10cdd3575e9be Minimal upload server on Rust: https://gist.github.com/dru/7e29eec44230aa6d455d1c52881116eb

1AlexFix1 commented 5 years ago

Any news?

sortie commented 5 years ago

No news right now. I'm currently triaging the dart:io issues, understanding what's important to people (bunch of interest in this issue for instance), and planning what to work on.

1AlexFix1 commented 5 years ago

No news right now. I'm currently triaging the dart:io issues, understanding what's important to people (bunch of interest in this issue for instance), and planning what to work on.

What do you think about the implementation of the unloading link below, there is a calculation of how many bytes were read from the file, but not sent to the network. The code works quite well, tell me what the minus of this solution is, the pitfalls project

Nico04 commented 4 years ago

Any news on this request ? I would be very usefull to me :)

dtaalbers commented 4 years ago

What do you think about the implementation of the unloading link below, there is a calculation of how many bytes were read from the file, but not sent to the network. The code works quite well, tell me what the minus of this solution is, the pitfalls project

This is not actually a solution though. Because reading the bytes of a file is done quite quickly, meaning the progress bar will skip from 0% - 100% within a ~second. After that it just stays on 100% for a while (depending on the file size) while it's actually uploading the file.

I really need that upload progress to be available so that I can smoothly report the upload progress to the user. I've listed the solutions that I've tried in this SO question (including your suggestion). They all come back to the HTTP package not supporting the upload progress natively.

Do we have any info yet if and when this will be added?

awazgyawali commented 4 years ago

Dio package now supports progress callback if I recall correctly. Try migrating to dio if thats possible. Ref: https://github.com/flutterchina/dio/issues/103

dtaalbers commented 4 years ago

@awazgyawali Thank you for the reply! As you can see in my SO question, I've tried the DIO solution already, together with my custom solution (which basically do the same thing).

They both read the file into a stream and report the progress of that, not the uploading progress it self. So basically I think the following is happening when you upload using DIO or my solution:

1) You select a file and start the upload. 2) The selected file is read into a stream. 3) While reading that file into a stream, the bytes read is reported as progress. This is done quite quickly, which makes the progress bar skip from 0% to 100% in within a ~second and in few steps. See below for my print out from a ~10 MB file, which is basically printed within one second.

flutter: progress: 0.000003035281907563734 (29/9554302)
flutter: progress: 0.000013187776563897604 (126/9554302)
flutter: progress: 0.999996546058519 (9554269/9554302)
flutter: progress: 0.9999967553883057 (9554271/9554302)
flutter: progress: 1.0 (9554302/9554302)

4) The request starts uploading the stream and you have to wait a certain amount of time for that to finish. In the meantime, the progress bar is already on 100%, so you're basically watching that 100% filled progress bar for a few seconds (depending on the file size). 5) After waiting a while, the 2xx response from the server is received and your upload is finished.

So no, I don't think my custom solution and that of DIO is the proper upload progress.

cachapa commented 4 years ago

From my own research into this, it appears you'll never get a very accurate progress monitoring because AFAIK no operating system reports the bytes leaving the network interface for each individual stream.

The way I've seen this solved in other languages and frameworks has always been to count the bytes going into the upper layers of the network model. This isn't perfect since there are buffers involved, causing the first few kb/mb to be "sent" immediately, as well the process to "hang" on 100% for a second or two before completing. But it does a good enough job for most applications.

sayhicoelho commented 4 years ago

The DIO solution does not solve the issue. With a slow connection, my upload is taking 15~20 secs to complete, but I can't show the upload percentage correctly, because it suddenly goes from 0% to 100%.

I/flutter ( 4238): 0 of 0 (0%)
I/flutter ( 4238): 29 of 4943262 (0%)
I/flutter ( 4238): 78 of 4943262 (0%)
I/flutter ( 4238): 102 of 4943262 (0%)
I/flutter ( 4238): 104 of 4943262 (0%)
I/flutter ( 4238): 133 of 4943262 (0%)
I/flutter ( 4238): 277 of 4943262 (0%)
I/flutter ( 4238): 4943229 of 4943262 (100%)
I/flutter ( 4238): 4943231 of 4943262 (100%)
I/flutter ( 4238): 4943262 of 4943262 (100%)
jigarfumakiya commented 3 years ago

Any update on this issue?

j0nscalet commented 3 years ago

This might be useful to someone that's been trying to figure this out so I wanted to share.

I ended up create my own Client to get access to HttpClientRequest and write to its stream. I had to dig into the io library in the sdk to ensure that by adding/writing via request.addStream() would in fact write to the open socket, which it should. See send method comments for more details.

@sayhicoelho I didn't run into the issue you were having. It looks like the chunks coming in properly in your case however it doesn't look like your calculating progress properly.

import 'dart:async';
import 'dart:convert';
// Avoid name conflicts by
// only showing what we need to filfull the contract with BaseClient which
// lives in the http package
import 'dart:io' show HttpClient, HttpException, HttpClientResponse;
import 'dart:typed_data';
import 'package:meta/meta.dart';
import 'package:http/http.dart'
    show BaseClient, BaseRequest, ClientException, Response, StreamedResponse;
import 'package:http/io_client.dart' show IOStreamedResponse;
import '../extensions/round_double_to_decimal_places.dart';

/// A http client that reports upload progress
/// What is the difference between HttpRequest v HttpClientRequest?
/// HttpRequest is class that lives in the http package which standardizes
/// http usage in the sdk in the browser and io.
class Client implements BaseClient {
  HttpClient inner;
  @visibleForTesting
  final StreamController<Map<String, dynamic>> uploadProgressController;

  Stream<Map<String, dynamic>> get uploadProgressStream =>
      uploadProgressController.stream;

  Client(this.inner, this.uploadProgressController)
      : assert(() {
          if (uploadProgressController == null) {
            return true;
          }
          return uploadProgressController.stream.isBroadcast;
        }()),
        assert(inner != null);

  /// Closes the client.
  ///
  /// Terminates all active connections. If a client remains unclosed, the Dart
  /// process may not terminate.
  @override
  void close() {
    if (inner != null) {
      inner.close(force: true);
      inner = null;
    }
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> delete(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'delete is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> get(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'get is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> head(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'head is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> patch(url,
      // ignore: type_annotate_public_apis
      {Map<String, String> headers,
      // ignore: type_annotate_public_apis
      body,
      Encoding encoding}) {
    throw UnsupportedError(
        'patch is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> post(url,
      // ignore: type_annotate_public_apis
      {Map<String, String> headers,
      // ignore: type_annotate_public_apis
      body,
      Encoding encoding}) {
    throw UnsupportedError(
        'post is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> put(url,
      // ignore: type_annotate_public_apis
      {Map<String, String> headers,
      // ignore: type_annotate_public_apis
      body,
      Encoding encoding}) {
    throw UnsupportedError(
        'put is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<String> read(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'read is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Uint8List> readBytes(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'readBytes is not supported please use the send method instead.');
  }

  @override
  Future<StreamedResponse> send(BaseRequest request) async {
    try {
      // Hypothesis on why this works:
      // Streams get added together down the call stack starting with the
      // BaseRequest's body's byte stream until the socket stream is added later
      //  down the call stack in _HttpOutgoing.
      // The stack looks like this:
      // HttpClientReqest.addStream(Stream<List<int>> s)
      // HttpClientRequest extends _HttpOutgoingMessage
      // _HttpOutgoingMessage.addStream(Stream<List<int>> s)
      // _HttpOutgoingMessage extends _StreamSinkImpl
      // _StreamSinkImpl.addStream(Stream<List<int>> s)
      // which adds the target stream which is an instance of _HttpOutgoing
      // HttpOutgoing.addStream(Stream<List<int>> stream)
      // inside here is where all the magic happens..
      // add the series of streams that have been building up above
      // to the socket's stream on line 1558:
      /*
        return socket.addStream(controller.stream).then((_) {
          // When this future returns all data should be added to stream        
          return outbound;
        }
      */

      // It would more accurate to keep track bytes as they are written to
      // socket, and once hitting content.length, the request be considered
      // uploaded.
      // This happens privately in _HttpOutgoing.addStream(Stream <List<int>> s)
      // unforunately it's not public accessible.
      // Take a look:
      /*
      void onData(List<int> data) {
      if (_socketError) return;
      if (data.length == 0) return;
      if (chunked) {
        if (_gzip) {
          _gzipAdd = controller.add;
          _addGZipChunk(data, _gzipSink!.add);
          _gzipAdd = null;
          return;
        }
        _addChunk(_chunkHeader(data.length), controller.add);
        _pendingChunkedFooter = 2;
      } else {
        var contentLength = this.contentLength;
        if (contentLength != null) {
          _bytesWritten += data.length;
          if (_bytesWritten > contentLength) {
            controller.addError(new HttpException(
                "Content size exceeds specified contentLength. "
                "$_bytesWritten bytes written while expected "
                "$contentLength. "
                "[${new String.fromCharCodes(data)}]"));
            return;
          }
        }
      }
      */

      // This needs to stay out as we need to the bytes stream before
      // we open a connection via openUrl
      var s = request.finalize();

      var clientRequest = (await inner.openUrl(request.method, request.url))
        ..followRedirects = request.followRedirects
        ..maxRedirects = request.maxRedirects
        ..contentLength = (request?.contentLength ?? -1)
        ..persistentConnection = request.persistentConnection;
      request.headers.forEach((name, value) {
        clientRequest.headers.set(name, value);
      });
      var clientResponse;

      if (request.method == "POST" && uploadProgressController != null) {
        var bytesUploaded = 0.0;
        var totalBytes = request.contentLength / 1000;
        var elaspedTime;
        double progress;
        double averageSpeed;
        var watch = Stopwatch();
        watch.start();
        Map<String, dynamic> getUploadProgressUpdate(String status) {
          var statusUpdate = {
            'status': status,
            'url': request.url.toString(),
            'progress': progress.isNaN ? 0.0 : progress.roundToPlaces(2),
            'totalBytes': totalBytes.roundToPlaces(3),
            'bytesUploaded': bytesUploaded.roundToPlaces(3),
            'elapsedTime': elaspedTime,
            'averageSpeed': averageSpeed.roundToPlaces(3)
          };

          return statusUpdate;
        }

        await clientRequest.addStream(s.map((chunk) {
          bytesUploaded += chunk.length / 1000;
          elaspedTime = watch.elapsedMilliseconds / 1000;
          progress = (bytesUploaded / totalBytes) * 100.0;
          averageSpeed = bytesUploaded / elaspedTime;

          uploadProgressController.add(getUploadProgressUpdate('uploading'));

          return chunk;
        }));
        watch.stop();
        uploadProgressController.add(getUploadProgressUpdate('uploaded'));
        clientResponse = await clientRequest.close();
      } else {
        clientResponse = await s.pipe(clientRequest) as HttpClientResponse;
      }

      var headers = <String, String>{};
      clientResponse.headers.forEach((key, values) {
        headers[key] = values.join(',');
      });

      return IOStreamedResponse(
          clientResponse.handleError(
              (error) => throw ClientException(error.message, error.uri),
              test: (error) => error is HttpException),
          clientResponse.statusCode,
          contentLength: clientResponse.contentLength == -1
              ? null
              : clientResponse.contentLength,
          request: request,
          headers: headers,
          isRedirect: clientResponse.isRedirect,
          persistentConnection: clientResponse.persistentConnection,
          reasonPhrase: clientResponse.reasonPhrase,
          inner: clientResponse);
    } on HttpException catch (error) {
      throw ClientException(error.message, error.uri);
    }
  }
}
dtaalbers commented 3 years ago

This might be useful to someone that's been trying to figure this out so I wanted to share.

I ended up create my own Client to get access to HttpClientRequest and write to its stream. I had to dig into the io library in the sdk to ensure that by adding/writing via request.addStream() would in fact write to the open socket, which it should. See send method comments for more details.

@sayhicoelho I didn't run into the issue you were having. It looks like the chunks coming in properly in your case however it doesn't look like your calculating progress properly.

import 'dart:async';
import 'dart:convert';
// Avoid name conflicts by
// only showing what we need to filfull the contract with BaseClient which
// lives in the http package
import 'dart:io' show HttpClient, HttpException, HttpClientResponse;
import 'dart:typed_data';
import 'package:meta/meta.dart';
import 'package:http/http.dart'
    show BaseClient, BaseRequest, ClientException, Response, StreamedResponse;
import 'package:http/io_client.dart' show IOStreamedResponse;
import '../extensions/round_double_to_decimal_places.dart';

/// A http client that reports upload progress
/// What is the difference between HttpRequest v HttpClientRequest?
/// HttpRequest is class that lives in the http package which standardizes
/// http usage in the sdk in the browser and io.
class Client implements BaseClient {
  HttpClient inner;
  @visibleForTesting
  final StreamController<Map<String, dynamic>> uploadProgressController;

  Stream<Map<String, dynamic>> get uploadProgressStream =>
      uploadProgressController.stream;

  Client(this.inner, this.uploadProgressController)
      : assert(() {
          if (uploadProgressController == null) {
            return true;
          }
          return uploadProgressController.stream.isBroadcast;
        }()),
        assert(inner != null);

  /// Closes the client.
  ///
  /// Terminates all active connections. If a client remains unclosed, the Dart
  /// process may not terminate.
  @override
  void close() {
    if (inner != null) {
      inner.close(force: true);
      inner = null;
    }
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> delete(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'delete is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> get(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'get is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> head(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'head is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> patch(url,
      // ignore: type_annotate_public_apis
      {Map<String, String> headers,
      // ignore: type_annotate_public_apis
      body,
      Encoding encoding}) {
    throw UnsupportedError(
        'patch is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> post(url,
      // ignore: type_annotate_public_apis
      {Map<String, String> headers,
      // ignore: type_annotate_public_apis
      body,
      Encoding encoding}) {
    throw UnsupportedError(
        'post is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> put(url,
      // ignore: type_annotate_public_apis
      {Map<String, String> headers,
      // ignore: type_annotate_public_apis
      body,
      Encoding encoding}) {
    throw UnsupportedError(
        'put is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<String> read(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'read is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Uint8List> readBytes(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'readBytes is not supported please use the send method instead.');
  }

  @override
  Future<StreamedResponse> send(BaseRequest request) async {
    try {
      // Hypothesis on why this works:
      // Streams get added together down the call stack starting with the
      // BaseRequest's body's byte stream until the socket stream is added later
      //  down the call stack in _HttpOutgoing.
      // The stack looks like this:
      // HttpClientReqest.addStream(Stream<List<int>> s)
      // HttpClientRequest extends _HttpOutgoingMessage
      // _HttpOutgoingMessage.addStream(Stream<List<int>> s)
      // _HttpOutgoingMessage extends _StreamSinkImpl
      // _StreamSinkImpl.addStream(Stream<List<int>> s)
      // which adds the target stream which is an instance of _HttpOutgoing
      // HttpOutgoing.addStream(Stream<List<int>> stream)
      // inside here is where all the magic happens..
      // add the series of streams that have been building up above
      // to the socket's stream on line 1558:
      /*
        return socket.addStream(controller.stream).then((_) {
          // When this future returns all data should be added to stream        
          return outbound;
        }
      */

      // It would more accurate to keep track bytes as they are written to
      // socket, and once hitting content.length, the request be considered
      // uploaded.
      // This happens privately in _HttpOutgoing.addStream(Stream <List<int>> s)
      // unforunately it's not public accessible.
      // Take a look:
      /*
      void onData(List<int> data) {
      if (_socketError) return;
      if (data.length == 0) return;
      if (chunked) {
        if (_gzip) {
          _gzipAdd = controller.add;
          _addGZipChunk(data, _gzipSink!.add);
          _gzipAdd = null;
          return;
        }
        _addChunk(_chunkHeader(data.length), controller.add);
        _pendingChunkedFooter = 2;
      } else {
        var contentLength = this.contentLength;
        if (contentLength != null) {
          _bytesWritten += data.length;
          if (_bytesWritten > contentLength) {
            controller.addError(new HttpException(
                "Content size exceeds specified contentLength. "
                "$_bytesWritten bytes written while expected "
                "$contentLength. "
                "[${new String.fromCharCodes(data)}]"));
            return;
          }
        }
      }
      */

      // This needs to stay out as we need to the bytes stream before
      // we open a connection via openUrl
      var s = request.finalize();

      var clientRequest = (await inner.openUrl(request.method, request.url))
        ..followRedirects = request.followRedirects
        ..maxRedirects = request.maxRedirects
        ..contentLength = (request?.contentLength ?? -1)
        ..persistentConnection = request.persistentConnection;
      request.headers.forEach((name, value) {
        clientRequest.headers.set(name, value);
      });
      var clientResponse;

      if (request.method == "POST" && uploadProgressController != null) {
        var bytesUploaded = 0.0;
        var totalBytes = request.contentLength / 1000;
        var elaspedTime;
        double progress;
        double averageSpeed;
        var watch = Stopwatch();
        watch.start();
        Map<String, dynamic> getUploadProgressUpdate(String status) {
          var statusUpdate = {
            'status': status,
            'url': request.url.toString(),
            'progress': progress.isNaN ? 0.0 : progress.roundToPlaces(2),
            'totalBytes': totalBytes.roundToPlaces(3),
            'bytesUploaded': bytesUploaded.roundToPlaces(3),
            'elapsedTime': elaspedTime,
            'averageSpeed': averageSpeed.roundToPlaces(3)
          };

          return statusUpdate;
        }

        await clientRequest.addStream(s.map((chunk) {
          bytesUploaded += chunk.length / 1000;
          elaspedTime = watch.elapsedMilliseconds / 1000;
          progress = (bytesUploaded / totalBytes) * 100.0;
          averageSpeed = bytesUploaded / elaspedTime;

          uploadProgressController.add(getUploadProgressUpdate('uploading'));

          return chunk;
        }));
        watch.stop();
        uploadProgressController.add(getUploadProgressUpdate('uploaded'));
        clientResponse = await clientRequest.close();
      } else {
        clientResponse = await s.pipe(clientRequest) as HttpClientResponse;
      }

      var headers = <String, String>{};
      clientResponse.headers.forEach((key, values) {
        headers[key] = values.join(',');
      });

      return IOStreamedResponse(
          clientResponse.handleError(
              (error) => throw ClientException(error.message, error.uri),
              test: (error) => error is HttpException),
          clientResponse.statusCode,
          contentLength: clientResponse.contentLength == -1
              ? null
              : clientResponse.contentLength,
          request: request,
          headers: headers,
          isRedirect: clientResponse.isRedirect,
          persistentConnection: clientResponse.persistentConnection,
          reasonPhrase: clientResponse.reasonPhrase,
          inner: clientResponse);
    } on HttpException catch (error) {
      throw ClientException(error.message, error.uri);
    }
  }
}

Got a use example of that? Looks interesting!

dtaalbers commented 3 years ago

@j0nscalet forgot to tag you 😁

jerryrt commented 3 years ago

This might be useful to someone that's been trying to figure this out so I wanted to share.

I ended up create my own Client to get access to HttpClientRequest and write to its stream. I had to dig into the io library in the sdk to ensure that by adding/writing via request.addStream() would in fact write to the open socket, which it should. See send method comments for more details.

@sayhicoelho I didn't run into the issue you were having. It looks like the chunks coming in properly in your case however it doesn't look like your calculating progress properly.

import 'dart:async';
import 'dart:convert';
// Avoid name conflicts by
// only showing what we need to filfull the contract with BaseClient which
// lives in the http package
import 'dart:io' show HttpClient, HttpException, HttpClientResponse;
import 'dart:typed_data';
import 'package:meta/meta.dart';
import 'package:http/http.dart'
    show BaseClient, BaseRequest, ClientException, Response, StreamedResponse;
import 'package:http/io_client.dart' show IOStreamedResponse;
import '../extensions/round_double_to_decimal_places.dart';

/// A http client that reports upload progress
/// What is the difference between HttpRequest v HttpClientRequest?
/// HttpRequest is class that lives in the http package which standardizes
/// http usage in the sdk in the browser and io.
class Client implements BaseClient {
  HttpClient inner;
  @visibleForTesting
  final StreamController<Map<String, dynamic>> uploadProgressController;

  Stream<Map<String, dynamic>> get uploadProgressStream =>
      uploadProgressController.stream;

  Client(this.inner, this.uploadProgressController)
      : assert(() {
          if (uploadProgressController == null) {
            return true;
          }
          return uploadProgressController.stream.isBroadcast;
        }()),
        assert(inner != null);

  /// Closes the client.
  ///
  /// Terminates all active connections. If a client remains unclosed, the Dart
  /// process may not terminate.
  @override
  void close() {
    if (inner != null) {
      inner.close(force: true);
      inner = null;
    }
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> delete(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'delete is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> get(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'get is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> head(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'head is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> patch(url,
      // ignore: type_annotate_public_apis
      {Map<String, String> headers,
      // ignore: type_annotate_public_apis
      body,
      Encoding encoding}) {
    throw UnsupportedError(
        'patch is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> post(url,
      // ignore: type_annotate_public_apis
      {Map<String, String> headers,
      // ignore: type_annotate_public_apis
      body,
      Encoding encoding}) {
    throw UnsupportedError(
        'post is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Response> put(url,
      // ignore: type_annotate_public_apis
      {Map<String, String> headers,
      // ignore: type_annotate_public_apis
      body,
      Encoding encoding}) {
    throw UnsupportedError(
        'put is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<String> read(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'read is not supported please use the send method instead.');
  }

  @override
  // ignore: type_annotate_public_apis
  Future<Uint8List> readBytes(url, {Map<String, String> headers}) {
    throw UnsupportedError(
        'readBytes is not supported please use the send method instead.');
  }

  @override
  Future<StreamedResponse> send(BaseRequest request) async {
    try {
      // Hypothesis on why this works:
      // Streams get added together down the call stack starting with the
      // BaseRequest's body's byte stream until the socket stream is added later
      //  down the call stack in _HttpOutgoing.
      // The stack looks like this:
      // HttpClientReqest.addStream(Stream<List<int>> s)
      // HttpClientRequest extends _HttpOutgoingMessage
      // _HttpOutgoingMessage.addStream(Stream<List<int>> s)
      // _HttpOutgoingMessage extends _StreamSinkImpl
      // _StreamSinkImpl.addStream(Stream<List<int>> s)
      // which adds the target stream which is an instance of _HttpOutgoing
      // HttpOutgoing.addStream(Stream<List<int>> stream)
      // inside here is where all the magic happens..
      // add the series of streams that have been building up above
      // to the socket's stream on line 1558:
      /*
        return socket.addStream(controller.stream).then((_) {
          // When this future returns all data should be added to stream        
          return outbound;
        }
      */

      // It would more accurate to keep track bytes as they are written to
      // socket, and once hitting content.length, the request be considered
      // uploaded.
      // This happens privately in _HttpOutgoing.addStream(Stream <List<int>> s)
      // unforunately it's not public accessible.
      // Take a look:
      /*
      void onData(List<int> data) {
      if (_socketError) return;
      if (data.length == 0) return;
      if (chunked) {
        if (_gzip) {
          _gzipAdd = controller.add;
          _addGZipChunk(data, _gzipSink!.add);
          _gzipAdd = null;
          return;
        }
        _addChunk(_chunkHeader(data.length), controller.add);
        _pendingChunkedFooter = 2;
      } else {
        var contentLength = this.contentLength;
        if (contentLength != null) {
          _bytesWritten += data.length;
          if (_bytesWritten > contentLength) {
            controller.addError(new HttpException(
                "Content size exceeds specified contentLength. "
                "$_bytesWritten bytes written while expected "
                "$contentLength. "
                "[${new String.fromCharCodes(data)}]"));
            return;
          }
        }
      }
      */

      // This needs to stay out as we need to the bytes stream before
      // we open a connection via openUrl
      var s = request.finalize();

      var clientRequest = (await inner.openUrl(request.method, request.url))
        ..followRedirects = request.followRedirects
        ..maxRedirects = request.maxRedirects
        ..contentLength = (request?.contentLength ?? -1)
        ..persistentConnection = request.persistentConnection;
      request.headers.forEach((name, value) {
        clientRequest.headers.set(name, value);
      });
      var clientResponse;

      if (request.method == "POST" && uploadProgressController != null) {
        var bytesUploaded = 0.0;
        var totalBytes = request.contentLength / 1000;
        var elaspedTime;
        double progress;
        double averageSpeed;
        var watch = Stopwatch();
        watch.start();
        Map<String, dynamic> getUploadProgressUpdate(String status) {
          var statusUpdate = {
            'status': status,
            'url': request.url.toString(),
            'progress': progress.isNaN ? 0.0 : progress.roundToPlaces(2),
            'totalBytes': totalBytes.roundToPlaces(3),
            'bytesUploaded': bytesUploaded.roundToPlaces(3),
            'elapsedTime': elaspedTime,
            'averageSpeed': averageSpeed.roundToPlaces(3)
          };

          return statusUpdate;
        }

        await clientRequest.addStream(s.map((chunk) {
          bytesUploaded += chunk.length / 1000;
          elaspedTime = watch.elapsedMilliseconds / 1000;
          progress = (bytesUploaded / totalBytes) * 100.0;
          averageSpeed = bytesUploaded / elaspedTime;

          uploadProgressController.add(getUploadProgressUpdate('uploading'));

          return chunk;
        }));
        watch.stop();
        uploadProgressController.add(getUploadProgressUpdate('uploaded'));
        clientResponse = await clientRequest.close();
      } else {
        clientResponse = await s.pipe(clientRequest) as HttpClientResponse;
      }

      var headers = <String, String>{};
      clientResponse.headers.forEach((key, values) {
        headers[key] = values.join(',');
      });

      return IOStreamedResponse(
          clientResponse.handleError(
              (error) => throw ClientException(error.message, error.uri),
              test: (error) => error is HttpException),
          clientResponse.statusCode,
          contentLength: clientResponse.contentLength == -1
              ? null
              : clientResponse.contentLength,
          request: request,
          headers: headers,
          isRedirect: clientResponse.isRedirect,
          persistentConnection: clientResponse.persistentConnection,
          reasonPhrase: clientResponse.reasonPhrase,
          inner: clientResponse);
    } on HttpException catch (error) {
      throw ClientException(error.message, error.uri);
    }
  }
}

Glad this piece of code is working, though I have to put two different Client/HttpClient impl in one project.

j0nscalet commented 3 years ago

@dtaalbers Sure do. Check out this gist.

dtaalbers commented 3 years ago

@j0nscalet Thanks. I've tried your solution but it still has the same problem explained in my SO question.

While your solution provides a progress event, it is the wrong progress event. This is the progress of the bytes (of the file) being read into memory (stream) and not the progress of the bytes being send to the server. See below the listener of the stream that prints outs the events. A 4MB file is read into memory and the progress value goes from 0% to 100% within a second. After that the process hangs a while on 100% while the file is being send to the server.

This causes any progress bar that uses the progress value in a view to shoot from 0% to 100% right after clicking 'upload' and then it stays on the 100% until the file is uploaded. How long it "hangs" depends on the file size of course.

Do you use your solution with a progress bar connected? Do you have this as well?

client.uploadProgressStream.listen((Map<String, dynamic> event) {
    print("${event['progress']} / ${event['bytesUploaded']} / ${event['totalBytes']} ");
});
flutter: 0.0018157922890662803 / 0.074 / 4075.356
flutter: 0.0042450279190333305 / 0.173 / 4075.356
flutter: 99.99808605677639 / 4075.278 / 4075.356
flutter: 99.99813513224365 / 4075.2799999999997 / 4075.356
flutter: 99.99999999999999 / 4075.3559999999998 / 4075.356
j0nscalet commented 3 years ago

@dtaalbers sorry it took me so long to respond it's been a busy few weeks.

Unforunately, we're not having the issue you're describing and yes we have this stream connected to a progress animation. I've a wrote a quck test comparing elapsed times reading and streaming a file.. Those times for me are very, very different: 0.013 sec v 8.412 sec. If you run this what happens?

Again, the whole idea here is that by adding the file's stream to the requests stream we're attaching to the socket that uploads the file in _HttpOutGoing. (See my 'Hypothesis on why this works' above for more details.)

j0nscalet commented 3 years ago

@dtaalbers meant to include a gist as an example. https://gist.github.com/j0nscalet/72c080a0ab8e2a1a2c6c3c267fd07a94

HTH.

KirillArtemenko commented 2 years ago

This problem (0% to 100% imediatly) is always appears when using MultipartFile.fromBytes() method (while using fromFile or fromFileSync all works fine).

As I assume, this is due to the work of the algorithm, which counts the transferred bytes when reading the file from disk (by filepath while using the fromFile method) instead of actually accounting for the bytes transferred by network. When using fromBytes, no chunks are read because all the bytes have already been read and allocated in memory.

I recently ran into this problem and just started looking for a solution and this is just my guess as to why this is happening. Hopefully someone can find a way to get the value of the actually transferred bytes over the network.

angwarati commented 10 months ago

Is there any solution for this yet?

Falynsky commented 5 months ago

Is there any solution for this yet? 👀