drapostolos / rdp4j

Remote Directory Poller for Java
MIT License
46 stars 25 forks source link

Listeners and thread safety (counting events) #29

Closed ilmirons closed 1 year ago

ilmirons commented 1 year ago

Hi, I'd like to write an EventCounterListener, providing count for added/removed files in each polled directory, for each cycle. As a single instance of listener is provided I guess it is shared among threads polling different directories so I went for the "synchronize all" way.

Are there known guarantees on running threads that could allow me simplify things/have better performances? Below is a snippet of my implementation.

public class EventCounterListener implements PollCycleListener, DirectoryListener, InitialContentListener {

    static final Hashtable<String, AtomicInteger> fileAdded = new Hashtable<>();
    static final Hashtable<String, AtomicInteger> fileRemoved = new Hashtable<>();

    @Override
    public void beforePollingCycle(@NotNull BeforePollingCycleEvent beforePollingCycleEvent) {
        var directories = beforePollingCycleEvent.getDirectoryPoller().getPolledDirectories();
        directories.forEach(pd -> {
            resetCounter(pd, fileAdded);
            resetCounter(pd, fileRemoved);
        });
    }

    private static void resetCounter(@NotNull PolledDirectory pd, @NotNull Hashtable<String,AtomicInteger> ht) {
        if (!ht.containsKey(pd.toString())) {
            ht.put(pd.toString(), new AtomicInteger()); // toString is Overriden to return fullPath
        } else {
            ht.get(pd.toString()).set(0);
        }
    }

    @Override
    public void afterPollingCycle(@NotNull AfterPollingCycleEvent afterPollingCycleEvent) {
        var directories = afterPollingCycleEvent.getDirectoryPoller().getPolledDirectories();
        directories.forEach( log.info("{}:\n\tAdded: {}\n\tRemoved: {}\n",
                pd,
                getAdded(pd),
                getRemoved(pd));
    }

    static int getAdded(@NotNull PolledDirectory pd) {
        return fileAdded.get(pd.toString()).get();
    }

    @Override
    public void fileAdded(@NotNull FileAddedEvent fileAddedEvent) {
        var pd = fileAddedEvent.getPolledDirectory();
        incrementAdded(pd);
    }

    private static void incrementAdded(@NotNull PolledDirectory pd) {
        fileAdded.get(pd.toString()).incrementAndGet();
    }

   // fileRemoved goes along the same lines and initialContent iterate over files calling incrementAdded
   // ...
}

Thanks for sharing your great work.

Andrea

drapostolos commented 1 year ago

Javadoc mentions:

    /**
     * Enable parallel polling of the directories added in the {@link DirectoryPoller}.
     * <p>
     * NOTE!
     * This puts constraints on the added listeners to be thread safe.
     * <p>
     * Optional setting. Disabled by default.
     * 
     * @return {@link DirectoryPollerBuilder}
     */
    public DirectoryPollerBuilder enableParallelPollingOfDirectories()

i.e. your listener implementations must be thread safe.

drapostolos commented 1 year ago

Here is en example:

import java.io.File;
import java.util.concurrent.TimeUnit;

import com.github.drapostolos.rdp4j.DirectoryPoller;
import com.github.drapostolos.rdp4j.JavaIoFileAdapter;

public class Demo {

    public static void main(String[] args) {
        DirectoryPoller poller = DirectoryPoller.newBuilder()
        .addPolledDirectory(new JavaIoFileAdapter(new File("C:\\temp\\dir1")))
        .addPolledDirectory(new JavaIoFileAdapter(new File("C:\\temp\\dir2")))
        .addListener(new CounterListener())
        .enableFileAddedEventsForInitialContent()
        .setPollingInterval(10, TimeUnit.SECONDS)
        .enableParallelPollingOfDirectories()
        .start();

        // Do other things here...

        poller.awaitTermination();
    }
}

With the CounterListener looking something like this:

package _play;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import com.github.drapostolos.rdp4j.AfterPollingCycleEvent;
import com.github.drapostolos.rdp4j.BeforePollingCycleEvent;
import com.github.drapostolos.rdp4j.DirectoryListener;
import com.github.drapostolos.rdp4j.FileAddedEvent;
import com.github.drapostolos.rdp4j.FileModifiedEvent;
import com.github.drapostolos.rdp4j.FileRemovedEvent;
import com.github.drapostolos.rdp4j.PollCycleListener;
import com.github.drapostolos.rdp4j.spi.PolledDirectory;

public class CounterListener implements PollCycleListener, DirectoryListener {
    private Map<PolledDirectory, AtomicInteger> numFilesAddedPerCycle = new ConcurrentHashMap<>();
    private Map<PolledDirectory, AtomicInteger> numFilesRemovedPerCycle = new ConcurrentHashMap<>();

    @Override
    public void fileAdded(FileAddedEvent event) throws InterruptedException {
        System.out.println("+ " + event.getFileElement());
        incrementAdded(event.getPolledDirectory());
    }

    private void incrementAdded(PolledDirectory dir) {
        numFilesAddedPerCycle.computeIfAbsent(dir, k -> new AtomicInteger(0)).incrementAndGet();
    }

    @Override
    public void fileRemoved(FileRemovedEvent event) throws InterruptedException {
        System.out.println("- " + event.getFileElement());
        incrementRemoved(event.getPolledDirectory());
    }

    private void incrementRemoved(PolledDirectory dir) {
        numFilesRemovedPerCycle.computeIfAbsent(dir, k -> new AtomicInteger(0)).incrementAndGet();
    }

    @Override
    public void fileModified(FileModifiedEvent event) throws InterruptedException {
        // empty
    }

    @Override
    public void beforePollingCycle(BeforePollingCycleEvent event) throws InterruptedException {
        numFilesAddedPerCycle.clear();
        numFilesRemovedPerCycle.clear();
    }

    @Override
    public void afterPollingCycle(AfterPollingCycleEvent event) throws InterruptedException {
        if(numFilesAddedPerCycle.size() > 0 || numFilesRemovedPerCycle.size() > 0) {
            event.getDirectoryPoller().getPolledDirectories().forEach(dir -> {
                System.out.println(String.format("%s:\n\tAdded: %s\n\tRemoved: %s\n", dir, getAddedCount(dir), getRemovedCount(dir)));
            });
        }
    }

    private int getAddedCount(PolledDirectory dir) {
        return numFilesAddedPerCycle.computeIfAbsent(dir, k -> new AtomicInteger(0)).get();
    }

    private int getRemovedCount(PolledDirectory dir) {
        return numFilesRemovedPerCycle.computeIfAbsent(dir, k -> new AtomicInteger(0)).get();
    }

}

NOTE1 Above is using javas ConcurrentHashMap

NOTE2 Remember to let your PolledDirectory implementations to implement equals() and hashcode() methods.

ilmirons commented 1 year ago

Much more cleaner, thanks.