davidmoten / rxjava2-file

Apache License 2.0
13 stars 0 forks source link

rxjava2-file


Maven Central
codecov

Status: released to Maven Central

Requires Java 8+.

Flowable utilities for files:

Maven site reports are here including javadoc.

Getting started

Add this maven dependency to your pom.xml:

<dependency>
  <groupId>com.github.davidmoten</groupId>
  <artifactId>rxjava2-file</artifactId>
  <version>VERSION_HERE</version>
</dependency>

How to build

git clone https://github.com/davidmoten/rxjava2-file
cd rxjava2-file
mvn clean install 

Examples

Tail a text file with NIO

Tail the lines of the text log file /var/log/server.log as a Flowable<String>:

import com.github.davidmoten.rx2.file.Files;

Flowable<String> lines = 
     Files.tailLines("/var/log/server.log")
          .nonBlocking()
          .pollingInterval(500, TimeUnit.MILLISECONDS, Schedulers.io())
          // set a private sun modifier that improves OSX responsiveness
          .modifier(SensitivityWatchEventModifier.HIGH)
          .startPosition(0)
          .chunkSize(8192)
          .utf8()
          .build();

or, using defaults of startPosition 0, chunkSize 8192, charset UTF-8, scheduler Schedulers.io():

Flowable<String> items = 
     Files.tailLines("/var/log/server.log").nonBlocking().build();

Tail a text file without NIO

The above example uses a WatchService to generate WatchEvents to prompt rereads of the end of the file to perform the tail.

To use polling without a WatchService (say every 5 seconds):

Flowable<String> items = 
  Files.tailLines("/var/log/server.log")
       .events(Flowable.interval(5, TimeUnit.SECONDS))
       .build();

Tail a binary file with NIO

Flowable<byte[]> items = 
  Files.tailBytes("/tmp/dump.bin").blocking().build();

Tail a binary file without NIO

Flowable<byte[]> items = 
  Files.tailBytes("/tmp/dump.bin")
       .events(Flowable.interval(5, TimeUnit.SECONDS))
       .build();

Stream WatchService events for a file

Flowable<WatchEvent<?>> events = 
  Files
    .watch(file)
    .nonBlocking()
    .scheduler(Schedulers.io())
    .pollInterval(1, TimeUnit.MINUTES)
    .build();

Backpressure

When tailLines or tailBytes is used a conversion to Flowable occurs on the WatchEvent stream. This is desirable to handle large amounts of data being tailed in combination with a slow processor (e.g. a network call). The default strategy is BUFFER but the strategy is specifiable in the tailLines and tailBytes builders.

Non-blocking and blocking

Two alternatives are supported by the library for getting file change events from a WatchService. The nonBlocking() builder methods configure the stream to use events via WatchService.poll which is a non-blocking call (but may involve some I/O?). The blocking() builder methods configure the stream to use events via WatchService.take which is a blocking call.

So when specify nonBlocking() you end up with a stream that is asynchronous and blocking() gives you a synchronous stream (everything happens on the current thread unless of course you add asynchrony to the returned Flowable).

OSX

Apparently the WatchService can be slow on OSX (see here). Note that the first example above shows how to pass a special WatchEvent.Modifier which some find has a beneficial effect. Without that the WatchService can take >10 seconds to detect changes to the file system.

Windows

Detecting changes to files on Windows also seems problematic. See https://stackoverflow.com/questions/24306875/watchservice-in-windows-7-does-not-work.