keedio / flume-ftp-source

FTP network server is source of events for Apache-flume
80 stars 61 forks source link

Configurable recursive searching, decompression on-the-fly and wait for in-use files #29

Closed opensorceror closed 6 years ago

opensorceror commented 6 years ago

Hi there! Let me start by saying that this is a very useful plugin! While attempting to use this in a production environment, I had to do several enhancements to this plugin:

  1. Made recursive searching (discoverElements) configurable using the boolean parameter search.recursive. This was needed because, for whatever reason, there was a single directory in the user's home directory on the remote server to which only root had access.
  2. Added configurable parameters - search.processInUse and search.processInUseTimeout - to temporarily skip and revisit (on next poll) files on source server that are currently being written to. This is useful in the case of huge files (especially compressed ones) that should not be read until they are fully written.
  3. Added configurable parameter - compressed - that enables a user to specify a compression format for the source files, if they are compressed. This will cause the Flume agent to decompress source files on the fly and make the decompressed data available. I needed this because the source files were gzip compressed, and contained csv files inside them. This effectively enabled reading the csv files within the compressed files line by line, and I plugged in an interceptor to process each line.

Tests for newly introduced methods have been introduced. The modified plugin has been tested in a production environment with huge amounts of log data, a memory channel and an HDFS sink. Please let me know if you have any questions.

Thanks!

lucarosellini commented 6 years ago

Remarkable job @opensorceror, thank you. Before merging this PR I'd like you to take a look at a few issues I've found.

  1. Tests do not initialize successfully because of a NPE at KeedioSource.java:625: wrapping that line inside a if (compressionFormat != null) would suffice.
  2. After applying the fix above tests are successfully initialized but I get several tests failure:
    
    testProcessNewFileInNewFolder(org.keedio.flume.source.ftp.source.ftp.EmbeddedFtpSourceTest)  Time elapsed: 0.008 sec  <<< FAILURE!
    java.lang.AssertionError: expected:<1> but was:<0>
        at org.testng.Assert.fail(Assert.java:89)
        at org.testng.Assert.failNotEquals(Assert.java:489)
        at org.testng.Assert.assertEquals(Assert.java:118)
        at org.testng.Assert.assertEquals(Assert.java:260)
        at org.testng.Assert.assertEquals(Assert.java:270)
        at org.keedio.flume.source.ftp.source.ftp.EmbeddedFtpSourceTest.testProcessNewFileInNewFolder(EmbeddedFtpSourceTest.java:76)

testProcessNewFileInNewFolder(org.keedio.flume.source.ftp.source.ftps.EmbeddedFtpsSourceTest) Time elapsed: 0.014 sec <<< FAILURE! java.lang.AssertionError: expected:<1> but was:<0> at org.testng.Assert.fail(Assert.java:89) at org.testng.Assert.failNotEquals(Assert.java:489) at org.testng.Assert.assertEquals(Assert.java:118) at org.testng.Assert.assertEquals(Assert.java:260) at org.testng.Assert.assertEquals(Assert.java:270) at org.keedio.flume.source.ftp.source.ftps.EmbeddedFtpsSourceTest.testProcessNewFileInNewFolder(EmbeddedFtpsSourceTest.java:80)



could you please take a look at it?
opensorceror commented 6 years ago

Sorry, had forgotten to add mocks. Those tests are passing now, can you check? I could not run all tests because I'm currently using a Windows machine. We should look at making our tests platform independent?

lucarosellini commented 6 years ago

Hi @opensorceror, we also need to add mocks in org.keedio.flume.source.ftp.source.ftps.AbstractFtpsSourceTest.

Adding the two mocks tests pass successfully, I've already checked that. Could you please add them to the PR?

Regarding making tests multiplatform: I will open an issue for this, but this will be a low priority one for us, we currently do not have any use case to justify the development effort.

opensorceror commented 6 years ago

Thanks Luca. Added the mocks to:

  1. org.keedio.flume.source.ftp.source.ftps.AbstractFtpsSourceTest, and
  2. org.keedio.flume.source.ftp.source.ssh.AbstractSshSourceTest.
lucarosellini commented 6 years ago

Thanks @opensorceror, I've merged this in the develop, we'll issue a new release asap.

lazaromedina commented 6 years ago

Hi @opensorceror, thanks for the new features. I have made two small changes:

I have a question: in your environment, how did you deal with different timezones? thanks again, best Luis

opensorceror commented 6 years ago

Thanks, @lazaromedina . That makes perfect sense. What do you mean by different timezones? Currently, all the data is collected in a single timezone, and any conversion is performed by the source server (to Eastern Time). But in the future if we get data from different timezones, there will likely be a timestamp field that also indicates the timezone. We will then perform conversion on our side (using an interceptor or Spark) before loading the data into Hive. Does that answer your question?

lazaromedina commented 6 years ago

Hi @opensorceror, yes, thank you very much for your answer. Best Luis.