A network server on port 21 (FTP) will be source of events for Apache-flume. Files in main directory's server will be discovered and proccessed. The source is implemented as pollable source in terms of Flume, as the polling time is configurable in the main configuration of flume's file. In main flume's agent configuration file must be specified if security for FTP is required. There are two kind of protocol security supported by the plugin:
Files can be processed in two ways:
Proccesed files's name and size will be tracked into a Map, this one will be "saved" into an external file (file.name), located in parameter .folder of the config.
Clone the project:
git clone https://github.com/keedio/flume-ftp-source.git
Build with Maven:
mvn clean package
$ cd plugins.d
$ mkdir flume-ftp
$ cd flume-ftp
$ mkdir lib libext
$ cp jsch-0.1.54.jar libext/
$ cp commons-net-3.3.jar libext/
$ cp flume-ftp-source-X.Y.Z.jar lib/
Create a config file, examples.
$ cp flume-ng-ftp-source-FTP.conf apache-flume-1.4.0-bin/conf/
Which files will be processed?
Files in Ftp's user directory will be processed (Remote Directory).
For example, if sever and user :
agent.sources.ftp1.name.server = 192.168.0.2
agent.sources.ftp1.user = mortadelo
host:~ root# ftp 192.168.0.2
Connected to 192.168.0.2.
220 (vsFTPd 3.0.2)
Name (192.168.0.2:root): mortadelo
331 Please specify the password.
Password:
230 Login successful.
Remote system type is UNIX.
Using binary mode to transfer files.
ftp> dir
229 Entering Extended Passive Mode (|||29730|).
150 Here comes the directory listing.
-rw-r--r-- 1 0 0 60 Aug 18 06:48 file1.txt
-rw-r--r-- 1 0 0 60 Aug 18 06:48 file2.txt
226 Directory send OK.
ftp> pwd
Remote directory: /
ftp>
we want to process file1.txt and file2.txt
Launch flume binary:
$ ./bin/flume-ng agent -c conf -conf-file conf/flume-ng-ftp-source-FTP.conf --name agent -Dflume.root.logger=INFO,console
[...]
2017-08-18 09:07:33,471 (PollableSourceRunner-Source-ftp1) [INFO - org.keedio.flume.source.ftp.source.Source.process(Source.java:89)] Actual dir: / files: 0
2017-08-18 09:07:33,503 (PollableSourceRunner-Source-ftp1) [INFO - org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:193)] Discovered: file1.txt ,size: 60
2017-08-18 09:07:33,516 (PollableSourceRunner-Source-ftp1) [INFO - org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:232)] Processed: file1.txt ,total files: 1
2017-08-18 09:07:33,518 (PollableSourceRunner-Source-ftp1) [INFO - org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:193)] Discovered: file2.txt ,size: 60
2017-08-18 09:07:33,521 (PollableSourceRunner-Source-ftp1) [INFO - org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:232)] Processed: file2.txt ,total files: 2
2017-08-18 09:07:38,526 (PollableSourceRunner-Source-ftp1) [INFO - org.keedio.flume.source.ftp.source.Source.process(Source.java:89)] Actual dir: / files: 2
2017-08-18 09:07:43,535 (PollableSourceRunner-Source-ftp1) [INFO - org.keedio.flume.source.ftp.source.Source.process(Source.java:89)] Actual dir: / files: 2
2017-08-18 09:07:48,547 (PollableSourceRunner-Source-ftp1) [INFO - org.keedio.flume.source.ftp.source.Source.process(Source.java:89)] Actual dir: / files: 2
[...]
Data processed.
For testing purposes set:
agent.sinks.k1.type = file_roll
agent.sinks.k1.sink.directory = /var/log/flume-ftp
in /var/log/flume-ftp, flume will create a file ( file_roll ) as 123456789...-
[host]# ls -ll
-rw-r--r-- 1 root root 120 Aug 18 09:07 1503040052934-1
tail -f 1503040052934-1
line from file1.txt Fri_Aug_18_06:48:40.1503038920_UTC_2017
line from file2.txt Fri_Aug_18_06:48:51.1503038931_UTC_2017
Stop and start processing files from the latest information unprocessed.
In config file, parameters
agent.sources.ftp1.folder = /var/log/flume-ftp
agent.sources.ftp1.file.name = status-ftp1-file.ser
configure the path for the file that will keep a track status of files and information processed. For example, if stopping flume-ng and restarting, file1.txt and file2.txt will not be discovered again. With flume stopped i appended a new line to file1.txt.
2017-08-18 09:48:50,633 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: SOURCE.ftp1 started
2017-08-18 09:48:50,638 (PollableSourceRunner-Source-ftp1) [INFO - org.keedio.flume.source.ftp.source.Source.process(Source.java:89)] Actual dir: / files: 2
2017-08-18 09:48:50,665 (PollableSourceRunner-Source-ftp1) [INFO - org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:200)] Modified: file1.txt ,size: 60
2017-08-18 09:48:50,672 (PollableSourceRunner-Source-ftp1) [INFO - org.keedio.flume.source.ftp.source.Source.discoverElements(Source.java:232)] Processed: file1.txt ,total files: 2
Whether a recursive listing should be performed
In config file, parameter agent.sources.sftp1.search.recursive = false
(by default, this is true
) specifies that a recursive search should not be performed in agent.sources.sftp1.working.directory
.
Wait for files to be finalized before reading
This is useful when large files are being written to the source server, especially compressed files. To avoid reading them while they're still being written to, specify the parameter agent.sources.sftp1.search.processInUse = false
in config file. This must be accompanied by another parameter - agent.sources.sftp1.search.processInUseTimeout
, which is specified in seconds. To determine if a file is still being written to, the Flume agent will check the file's last modified timestamp. If the file was modified within search.processInUseTimeout
seconds ago, it will be considered as still being written to. A value of 30 is usually sufficiently conservative.
INFO Source
File testfile.csv.gz is still being written. Will skip for now and re-read when write is completed.
INFO Source
Actual dir: /home/mydir files: 24
INFO Source
Discovered: testfile.csv.gz ,size: 5441264
INFO HDFSDataStream
Serializer = TEXT, UseRawLocalFileSystem = false
Decompress source files on the fly
In many cases, source files might be present in a compressed format using a codec such as GZIP
. Reading such files in chunks or lines may not be useful. To decompress such files on the fly, provide the parameter agent.sources.sftp1.compressed
in the config file, with its value as the name of the compression codec used (e.g., agent.sources.sftp1.compressed = gzip
). This will cause the Flume agent to read and decompress the source files on the fly and make the decompressed data available in the specified channel.
INFO Source
Discovered: testfile.csv.gz ,size: 5441264
INFO Source
File testfile.csv.gz is GZIP compressed, and decompression has been requested by user. Will attempt to decompress.
INFO HDFSDataStream
Serializer = TEXT, UseRawLocalFileSystem = false
agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source agent.sources.ftp1.client.source = ftp agent.sources.ftp1.name.server = 127.0.0.1 agent.sources.ftp1.user = username agent.sources.ftp1.password = password agent.sources.ftp1.port = 21
agent.sources.ftps1.type = org.keedio.flume.source.ftp.source.Source agent.sources.ftps1.client.source = ftps agent.sources.ftps1.name.server = 127.0.0.1 agent.sources.ftps1.user = username agent.sources.ftps1.password = password agent.sources.ftps1.port = 21 agent.sources.ftps1.security.enabled = true agent.sources.ftps1.security.cipher = TLS agent.sources.ftps1.security.certificate.enabled = (false | true) (if false the plugin will accept any certificate sent by the server, validated or not). agent.sources.ftps1.path.keystore = /paht/to/keystore agent.sources.ftps1.store.pass = the_keyStore_password
agent.sources.sftp1.type = org.keedio.flume.source.ftp.source.Source agent.sources.sftp1.client.source = sftp agent.sources.sftp1.name.server = 127.0.0.1 agent.sources.sftp1.user = username agent.sources.sftp1.password = password agent.sources.sftp1.port = 22 agent.sources.sftp1.strictHostKeyChecking = no // WARNING: for testing porposes only, default is yes agent.sources.sftp1.knownHosts = /home/<user launching flume>/.ssh/known_hosts
working.directory is under root directory server returned by FTP server:
agent.soures.<fpt1 | ftps1 | sftp1>.working.directory = [remote_directory]/directoryName
example 1:
agent.soures.fpt1.working.directory = /directory_flume_files
example 2:
agent.soures.sftp1.working.directory = /home/user/directory_flume_files
If this parameter is omitted, default value will be set to 10000 ms.
agent.sources.<fpt1 | ftps1 | sftp1>.run.discover.delay=5000
agent.sources.<fpt1 | ftps1 | sftp1>.flushlines = (true | false)
Customizing this option is intended for particular cases.
agent.sources.ftp1.chunk.size = 1024
If omitted, a default one will be created.
agent.sources.ftp1.file.name = status-ftp1-file.ser
agent.sources.ftp1.folder = /var/flume
Java Regular Expressions for FTP, FTPS and SFTP protocols.
example 1:
agent.sources.ftp1.filter.pattern = .+\\.csv ----> only process files ends with
example 2:
agent.sources.sftp1.filter.pattern = flume_file.* ----> only process files starts with
m : stands for parameter is mandatory for above source
o : optional
x : not available
Parameter | Description | ftp | ftps | sftp |
---|---|---|---|---|
client.source | type of source from where get data | m | m | m |
name.server | hostname or ipaddress | m | m | m |
user | username allowed to connect | m | m | m |
password | usenames's pass | m | m | m |
port | server's port to connect | m | m | m |
security.enabled | cryptographic protocols | x | m | x |
security.cipher | Auth SSL or TLS | x | m | x |
security.certificate.enabled | accept or not server's certificate | x | o | x |
path.keystore | folder to keep keystory | x | o | x |
knownHosts | keys | x | x | m |
working.directory | custom directory to search for files | o | o | x |
folder | directory where to keep track status files | o | o | o |
discover.delay | polling time | o | o | o |
chunk.size | for binary files size of event | o | o | o |
file.name | file's name allocated in folder for track status | o | o | o |
flushlines | true or false | m | m | m |
search.recursive | true or false | o | o | o |
search.processInUse | true or false | o | o | o |
search.processInUseTimeout | time in seconds to determine busyness of files | o | o | o |
sftp1.compressed | if source files are compressed, compression format | o | o | o |
filter.pattern | Java Regular Expression | o | o | o |
strictHostKeyChecking | Disable verifying public key of the SSH protocol (for testing only) | x | x | o |
-- www.keedio.com