typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.37k stars 601 forks source link

[fs2-io] Watcher only remembers last file it was asked to watch #2375

Closed keynmol closed 3 years ago

keynmol commented 3 years ago

fs2 3.0.2

The way I understood how Watcher can be used is:

As long as the watcher resource is open, calling w.watch(path) (where path is a file) on it should add files to the list of watches. And w.events() would be a stream of events coming from both files.

What I think is happening instead, is only the last path added via w.watch(path) receives notifications.

Here's a reproduction as a self-contained ammonite script (alternatively, here it is in a Scastie: https://scastie.scala-lang.org/KiUg7NdURt6J7ULU8R3CUw) :

import $ivy.`co.fs2::fs2-core:3.0.2`
import $ivy.`co.fs2::fs2-io:3.0.2`

import cats.effect.kernel.Resource
import fs2.io.file.Files
import cats.effect._
import cats.syntax.all._
import fs2.io.Watcher
import java.nio.file.Paths
import scala.concurrent.duration._

@main def test() = {
  import cats.effect.unsafe.implicits.global

  val monster =
    (
      Files[IO].tempDirectory(),
      Watcher.default[IO],
      Resource.eval(IO.ref(List.empty[Watcher.Event]))
    ).parTupled
      .flatMap { case (temp, w, ref) =>
        val file1 = Paths.get(temp.toAbsolutePath().toString, "file1")
        val file2 = Paths.get(temp.toAbsolutePath().toString, "file2")

        val pump = w
          .events()
          .evalMap(ev => ref.update(ev +: _))
          .compile
          .drain
          .background
          .void

        pump.evalMap { _ =>
          w.watch(file2) *>
            w.watch(file1) *>
            IO.sleep(100.millis) *> // just in case
            IO.blocking(os.write(os.Path(file1), "hello file1")) *>
            IO.blocking(os.write(os.Path(file2), "hello file2")) *>
            IO.sleep(100.millis) *> // just in case
            ref.get.flatMap(IO.println)
        }

      }

  monster.use_.unsafeRunSync()
}

It outputs

List(Modified(/tmp/17267169841305025399/file1,1), Created(/tmp/17267169841305025399/file1,1))

whereas I would expect events from both paths.

If you switch the order of w.watch calls, you'll see notifications for the other file.

If you replace both calls with one w.watch(temp) (containing folder), you'll see notifications for both files

mpilquist commented 3 years ago

Hm, this test passes (on OS X):

  test("supports watching multiple files".only) {
    Stream
      .resource((Watcher.default[IO], tempFile, tempFile).tupled)
      .flatMap { case (w, f1, f2) =>
        val events = Stream(f1, f2)
          .covary[IO]
          .foreach(f => w.watch(f, modifiers = modifiers).void) ++ w.events()
        events
          .scan(0) {
            case (cnt, Watcher.Event.Modified(_, _)) =>
              cnt + 1
            case (cnt, _) =>
              cnt
          }
          .takeWhile(_ < 2)
          .concurrently(
            smallDelay ++ Stream.eval(modify(f1)) ++ smallDelay ++ Stream.eval(modify(f2))
          )
      }
      .compile
      .drain
  }
mpilquist commented 3 years ago

This works too, which registers the watches after the event stream is opened:

    Stream
      .resource((Watcher.default[IO], tempFile, tempFile).tupled)
      .flatMap { case (w, f1, f2) =>
        w.events()
          .scan(0) {
            case (cnt, Watcher.Event.Modified(_, _)) =>
              cnt + 1
            case (cnt, _) =>
              cnt
          }
          .takeWhile(_ < 2)
          .concurrently(
            smallDelay ++ Stream
              .exec(List(f1, f2).traverse(f => w.watch(f, modifiers = modifiers)).void) ++
              smallDelay ++ Stream.eval(modify(f1)) ++ smallDelay ++ Stream.eval(modify(f2))
          )
      }
      .compile
      .drain
keynmol commented 3 years ago

hmm.

I'm on Linux - and given it's filesystem specific, could it be the cause? Scastie presumably runs on Linux as well.

Do you see anything obviously wrong in my snippet? In my case I open event stream after registering all the watchers (like your first example)

Also, if you have this test in a branch - would be interesting to see if CI's ubuntu runner passes it as well.

keynmol commented 3 years ago

Right, I ran my snippet on my OS X machine, it printed out an empty List and laptop crashed :D So I'd say inconclusive

mpilquist commented 3 years ago

Too funny. I'll open a PR with the test above and see how GHA handles it.

mpilquist commented 3 years ago

2378 - passed on GHA, can you give it a try? Then I guess let's see if we can reproduce what you saw using a port of your original test case

keynmol commented 3 years ago

I checked - that branch also passes on my machine, but not if I apply this patch (test times out after 30 seconds):

diff --git a/io/src/test/scala/fs2/io/file/WatcherSuite.scala b/io/src/test/scala
/fs2/io/file/WatcherSuite.scala
index b7ae13cb6..aa8e0cdef 100644
--- a/io/src/test/scala/fs2/io/file/WatcherSuite.scala
+++ b/io/src/test/scala/fs2/io/file/WatcherSuite.scala
@@ -71,10 +71,15 @@ class WatcherSuite extends Fs2Suite with BaseFileSuite {
   }

   test("supports watching multiple files") {
+    val setup = tempDirectory.flatMap {dir => 
+      val files = cats.effect.Resource.eval(aFile(dir).product(aFile(dir)))
+      Watcher.default[IO].product(files)
+    }
+
     Stream
-      .resource((Watcher.default[IO], tempFile, tempFile).tupled)
-      .flatMap { case (w, f1, f2) =>
-        w.events()
+      .resource(setup)
+      .flatMap { case (w, (f1, f2)) =>
+        w.events().debug()
           .scan(0) {
             case (cnt, Watcher.Event.Modified(_, _)) =>
               cnt + 1

I think files being in the same folder is key here - which is how my snippet is different from the current test

mpilquist commented 3 years ago

OK the bug is roughly here: https://github.com/typelevel/fs2/blob/6a625c5c7f60857a5b9773c26692bd6c8ac2ba3a/io/src/main/scala/fs2/io/file/Watcher.scala#L294-L301

Events for both paths get fired here but the earlier registered path gets filtered out in that code.

mpilquist commented 3 years ago

Aha, because both paths have the same WatchKey

mpilquist commented 3 years ago

This issue is caused by WatchService only supporting watching directories, not individual files (https://stackoverflow.com/questions/16251273/can-i-watch-for-single-file-change-with-watchservice-not-the-whole-directory#comment37744531_16251273). We can support watching individual files but we'll need to change some internal structure of Watcher to store multiple watched paths for an individual WatchKey.

mpilquist commented 3 years ago

Fixed in #2382