keedio / flume-ftp-source

FTP network server is source of events for Apache-flume
80 stars 61 forks source link

Zip file sourcing #12

Closed deepakbs closed 9 years ago

deepakbs commented 9 years ago

Hi, I am trying to poll an FTP location for a zip file. Since i need the to unzip the zip file and send the XML file, i wrote an interceptor to do the task. Although interceptor works, the zip file does not get through properly from the FTPSource to Intercpetor. If i output the file in FTPSource.readstream at the beginning before converting the inputstream to byte array, the file gets correctly created. But after converting to byte array if i output the file, it does not get created properly. The scenario i am working on is, collect the zip file in Flume , unzip the file, extract the XML file and send it to Kafka for further processing. Can you please suggest the best way of doing this.

Thanks Deepak

deepakbs commented 9 years ago

Hi, I just found the issue. In FTPSource.readStream() method, the processMessage(data) is being called for every CHUNKSIZE(1024) bytes of reading the input file. This for some reason was causing the zip file not to be transferred properly to the Interceptor. So i changed the code as below, read the complete file and then call processMessage and it now works fine. Please let me know if i am on the right track.

          boolean successRead = true;
    try {
        inputStream.skip(position);
        byte[] bytesArray = new byte[CHUNKSIZE];
        int bytesRead = 0;
        ByteArrayOutputStream baostream = new ByteArrayOutputStream(
                CHUNKSIZE);
        while ((bytesRead = inputStream.read(bytesArray)) != -1) {
            try  {
                baostream.write(bytesArray, 0, bytesRead);

            } finally {
        }
        }

        byte[] data = baostream.toByteArray();
        processMessage(data);
        if (baostream != null) {
            baostream.close();
        }
        data = null;
        inputStream.close();
lazaromedina commented 9 years ago

Hi, if i understand your schema, flume-ftp-source is not the solution for what you need. Keep in mind that flume-ftp-source is not intended to move files from one site to another. Is intended to proccess the content of a (huge) file(s) hosted on a ftp server. The actual content and the new one of a file, that will be appended over time, will be events for further proccesing through flume schema. To accomplish this goal the file is retrieved, readed and processed (in terms of flume) by pieces to minimize the lost of data . In your "solution" when you extract "proccesMessage" from the main loop from readStream, the file is still readed in pieces, but finally you allocate all the data in a byte array to be processed in one event. The byte array has a limit of integer.MAX_VALUE, so if the file not exceed this natural limit there should not be problems when you move your file to the interceptor. In the top of my mind: