felangel / mason

Tools which allow developers to create and consume reusable templates called bricks.
https://docs.brickhub.dev
974 stars 97 forks source link

fix: Crash with "StreamSink is bound to stream" #1280

Open matthew-carroll opened 6 months ago

matthew-carroll commented 6 months ago

Description

I have a static site generator that watches a local directory for changes and runs a callback every time a file in the directory changes. I'm using Mason Logger to log output throughout the static site generator, including this method.

When the file system reports multiple file changes in quick succession, the logger seems to blow up with the error "StreamSink is bound to stream"

Steps To Reproduce

I don't have an isolated reproduction. However, I can provide some snippets of the implementation.

I watch a couple directories:

    // Rebuild the website whenever a source file changes.
    DirectoryWatcher("${Directory.current.absolute.path}${Platform.pathSeparator}bin") //
        .events
        .listen(_onSourceFileChange);
    DirectoryWatcher("${Directory.current.absolute.path}${Platform.pathSeparator}source") //
        .events
        .listen(_onSourceFileChange);

Then I have an _onSourceFileChange method where the error originates:

Future<void> _onSourceFileChange(WatchEvent event) async {
  _log.detail("File system change (${event.type}): ${event.path}.");
}

That log message at the very beginning of the callback method is all that seems to be needed to trigger the error.

Expected Behavior

No error occurs and the message is logged.

Additional Context Version: 0.2.12

Example error message:

// Bad state: StreamSink is bound to a stream
      // #0      _StreamSinkImpl._controller (dart:io/io_sink.dart:234:7)
      // #1      _StreamSinkImpl.add (dart:io/io_sink.dart:154:5)
      // #2      _IOSinkImpl.write (dart:io/io_sink.dart:287:5)
      // #3      _StdSink._write (dart:io/stdio.dart:401:13)
      // #4      _StdSink.writeln (dart:io/stdio.dart:419:5)
      // #5      Logger.detail (package:mason_logger/src/mason_logger.dart:152:13)
<-- This is where control flow moves from my code to the logger -->
      // #6      StaticShockDevServer._onSourceFileChange (package:static_shock_cli/src/dev_server.dart:171:10)
      // #7      _RootZone.runUnaryGuarded (dart:async/zone.dart:1594:10)
      // #8      _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:365:11)
      // #9      _BufferingStreamSubscription._add (dart:async/stream_impl.dart:297:7)
      // #10     _SyncBroadcastStreamController._sendData (dart:async/broadcast_stream_controller.dart:377:25)
      // #11     _BroadcastStreamController.add (dart:async/broadcast_stream_controller.dart:244:5)
      // #12     _RootZone.runUnaryGuarded (dart:async/zone.dart:1594:10)
      // #13     _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:365:11)
      // #14     _DelayedData.perform (dart:async/stream_impl.dart:541:14)
      // #15     _PendingEvents.handleNext (dart:async/stream_impl.dart:646:11)
      // #16     _PendingEvents.schedule.<anonymous closure> (dart:async/stream_impl.dart:617:7)
      // #17     _microtaskLoop (dart:async/schedule_microtask.dart:40:21)
      // #18     _startMicrotaskLoop (dart:async/schedule_microtask.dart:49:5)
      // #19     _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:118:13)
      // #20     _Timer._runTimers (dart:isolate-patch/timer_impl.dart:405:11)
      // #21     _Timer._handleMessage (dart:isolate-patch/timer_impl.dart:429:5)
      // #22     _RawReceivePort._handleMessage (dart:isolate-patch/isolate_patch.dart:184:12)
felangel commented 6 months ago

Hey 👋 Thanks for filing an issue! I’ll take a closer look and see if I can reproduce later today.

felangel commented 6 months ago

I spent some time and was unable to reproduce this locally on MacOS using:

Dart SDK version: 3.3.3 (stable) (Tue Mar 26 14:21:33 2024 +0000) on "macos_arm64"

With the following sample:

import 'dart:io';

import 'package:mason_logger/mason_logger.dart';
import 'package:watcher/watcher.dart';

Future<void> main() async {
  final logger = Logger()..level = Level.verbose;

  logger.info('Watching ${Directory.current.absolute.path} for changes');
  DirectoryWatcher(Directory.current.absolute.path).events.listen((event) {
    logger.detail('File system change (${event.type}): ${event.path}.');
  });
}
matthew-carroll commented 6 months ago

Hm, I'm also seeing this error when I try to log things from a web socket callback function.

Do you know if there are any weird aspects to stdout and stderr when running a shelf server, or a websocket, or a directory watcher? Or all three?

The issue is somehow related to time. When I make multiple changes to source files quickly, it results in the following series of desired events:

  1. Start website rebuild 1
  2. Queue build 2 (due to second change)
  3. Queue build 3 (due to third change)
  4. Build 1 is done, send a message to all websockets telling them new data is available
  5. Each connected browser reloads itself
  6. Start website rebuild 2
  7. etc...

Step 5 and Step 6 are essentially happening at the same time. We don't know when the websites actually request updated versions of their files, so I fire off that notification and then immediately start rebuilding the website again, assuming a build is queued up.

Through experimentation, I've found that if I wait for 1 second after telling the websites to refresh, and then I start the next build, this error doesn't seem to happen any more.

I'll make some kind of adjustment on my end to try to avoid this error in general, but I'd really like to understand the root cause in the hopes that I might fully solve it.

Maybe it's two things trying to access the same file at the same time? I would think the OS would transparently solve that problem. Also, if that's the problem, I would expect a very different error in a different place in the code.

felangel commented 5 months ago

Do you know if there are any weird aspects to stdout and stderr when running a shelf server, or a websocket, or a directory watcher? Or all three?

I'm not aware of any, unfortunately 😞

I'll make some kind of adjustment on my end to try to avoid this error in general, but I'd really like to understand the root cause in the hopes that I might fully solve it.

I'd love to better understand the root cause as well, let me know if you get to the bottom of it.

Maybe it's two things trying to access the same file at the same time? I would think the OS would transparently solve that problem. Also, if that's the problem, I would expect a very different error in a different place in the code.

Yeah that doesn't seem likely. I would also expect a very different error if that were the case. Is the issue OS-specific? Or can you reproduce on MacOS and Windows for example?

matthew-carroll commented 5 months ago

I haven't tried Windows because I've only got Macs.

I still haven't root caused it, but I do think it has something to do with the file system.

The static site generator I'm building has to do two things at the same time to be useful. It has to rebuild the static site when you make a change. It also has to serve the static site through a web server. I think, as far as I can tell, I would run a build for change A, tell the local site to refresh, the local site asks for files again, and at the same time I'm already running another build for change B.

Why the FS couldn't do something reasonable in that case, I'm not sure. But I changed the build behavior so that it completes all queued builds before telling the local site to refresh and that has eliminated the vast majority of such crashes. My guess is, if the user were to manually refresh during a build, the same problem would occur.