geopython / stetl

Stetl, Streaming ETL, is a lightweight geospatial processing and ETL framework written in Python.
https://www.stetl.org
GNU General Public License v3.0
85 stars 35 forks source link

Strange behaviour XmlAssembler? #49

Open fsteggink opened 8 years ago

fsteggink commented 8 years ago

While writing unit tests for XmlAssembler, I ran into a couple of issues. At first I've set up a chain reading only one GML file with three FeatureMember elements. In my config I wanted to write an etree doc for every two elements. I'm expecting two documents in this case, one with two elements, and one with only one element (the last one). I was surprised that no doc was written (to stdout). Here is my config:

# Config file for unit testing XmlAssembler.

[etl]
chains = input_glob_file|parse_xml_file|xml_assembler|output_std

[input_glob_file]
class = inputs.fileinput.GlobFileInput
file_path = tests/data/dummy.gml

# The source input file producing XML elements
[parse_xml_file]
class = filters.xmlelementreader.XmlElementReader
element_tags = FeatureMember

# Assembles etree docs gml:featureMember elements, each with "max_elements" elements
[xml_assembler]
class = filters.xmlassembler.XmlAssembler
max_elements = 2
container_doc = <?xml version="1.0" encoding="UTF-8"?>
   <gml:FeatureCollectionT10NL
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:top10nl="http://www.kadaster.nl/schemas/imbrt/top10nl/1.2"
    xmlns:brt="http://www.kadaster.nl/schemas/imbrt/brt-alg/1.0"
    xmlns:gml="http://www.opengis.net/gml/3.2"
    xsi:schemaLocation="http://www.kadaster.nl/schemas/imbrt/top10nl/1.2 http://www.kadaster.nl/schemas/top10nl/vyyyymmdd/TOP10NL_1_2.xsd">
    </gml:FeatureCollectionT10NL >
element_container_tag = FeatureCollectionT10NL

[output_std]
class = outputs.standardoutput.StandardOutput

I was suspecting this check in XmlAssembler.consume_element: if element is None or packet.is_end_of_stream() is True: (Note that the is True is redundant, but that doesn't matter.) It turned indeed out that packet.is_end_of_stream was true. I think it is already caused by the GlobFileInput. I've just added this input class yesterday. It could be the case that I'm not understanding properly when is_end_of_stream should be set to true, but I'm wondering whether a filter which can return multiple packets based on one input packet (for example when an XML file is being parsed using XmlElementReader) should actually reset is_end_of_stream or is_end_of_doc.

When I skip this check, so I'm only checking for element is None, then a new XML document is generatedfor every XML element, so I was getting 3 documents, instead of the expected 2.

When I'm reading all GML files in my test data directory (currently 3 files), by setting file_path to tests/data/*.gml in input_glob_file, I'm getting either 6 documents (while checking for packet.is_end_of_stream()) or 9 documents. With 3 files I'm actually expecting 6 documents (3 x 2), namely a doc with 2 elements followed by a doc with 1 element, three times. However, each document contains only one element, only of the first 2 GML files. When disabling the aforementioned check I'm getting 9 docs, each with one element.

So, my question is how packet.is_end_of_stream and packet.is_end_of_doc should actually behave. Should they be reset when one input packet result in multiple output packets for the particular component? Or is there more to it?

I've attached my unit test file. The method test_execute is just a work-in-progress. test_xml_assembler.zip

justb4 commented 8 years ago

In general end_of_doc is used to denote one unit for processing: an array of records, an etree. It is used to split a large input, like a huge GML file or an entire DB-table (e.g. an Ogr-input stream), into manageable chunks. There can be several/many consecutive end_of_docs. The input provider usually sets end_of_doc in the Packet. For example when parsing a list of files. end_of_stream denotes the end of the entire input and indicates that all input is done.

In general my idea was that XmlElementReader would be a FileInput (implementation is still there) and that Glob/ZipFileInputs would be a general way to assemble/read multiple files as a list of files within any FileInput class. For example the base class FileInput can have a filename_pattern to deal/expand with Globbing. A separate GlobFileInput would not be required. Same possibly for Zip files. end_of_doc would then be indicating a true end of document/file for each input file and end_of_stream when all files were done, i.e. the very last Packet would have both end_of_doc and end_of_stream set.

This is the general idea. As XmlElementReader is a Filter here and with Glob/ZipFileInputs as Input for file names, things may get tricky. I have to look deeper into the implementation.

(I am running a bit out of time now. Will at least submit this comment and get back).

justb4 commented 8 years ago

Ok, went through the test_xml_assembler.zip tests and the implementation of the xmlelementreader.py Filter. I cannot find directly the reason for the test failure (I get a one-element doc 1 != 2 i.s.o 2). Surely has to do with end_of_doc setting. But on line 127:

            # If there is a next component, let it process
            if self.next:
                # Hand-over data (line, doc whatever) to the next component
                packet.format = self._output_format
                packet = self.next.process(packet)

I think I understand why this code is required: invoke() gets a single file path but needs to generate multiple Packets, one for each Element. However calling the next Component is what the Stetl ETL framework already does. Components are not supposed to even have knowledge of next.

I still see no reason for XmlElementReader being a Filter. Just like we do with NLExtract BAG file reading/processing in https://github.com/nlextract/NLExtract/blob/master/bag/src/bagfilereader.py we should be able to read/process any mixture of directories, .zip files, globbing within the FileInput class. Globbing is already possible, for .zip files we need to make some abstractions and changes in possibly all FileInput derived classes: e.g. work with a (list of) File object rather than a file path. If we would use Filters we would need to duplicate all current FileInput classes, e.g. for let's say zipped JSON files etc. Also we would not be able to handle mixtures of .zip and non-zipped files (like with BAG). I hope you agree. I know this will impact several NLExtract configs (BGT, BRK at least), but we can work on this together. I can e.g. adapt the FileInput classes to be able to handle a mixture of dirs, filename lists, glob and .zip maybe even later .tar files. I can open a ticket: like FileInput classes should be able to handle .zip files first and later FileInput classes should be able to handle any mixture of .zip/glob,dirs files

Thanks to your excellent work on the Unit tests we can also do any redesign/refactoring in a more controlled way!

justb4 commented 8 years ago

So XmlAssembler should set end_of_doc after each "flush" of an assembled document. The last doc also has end_of_stream true.

I've added some handy base code in Chain and a PacketBuffer Filter. This should make it easier to inspect ETL-results in unit-tests. Unfortunately the commit for XmlAssembler tests went with issue #40: this was commit 16e9646d4246e9b33d09f0064e1f474fdb97cf4c.

It shows an enhanced implementation for XmlAssembler Unit Test (FileInput version, see cfg):

fsteggink commented 8 years ago

I agree that in this specific test case an XmlElementStreamer file input is actually a more appropriate source. I also agree that the FileInput itself could become more generic (by adding globbing and optional unzip capabilities). A single file, multiple files (glob), a dir or one or more ZIP files should all be treated the same. Basically, all format-specific filters (like XmlElementReader, note, the "Reader" part in the name!) should move to inputs.

The same is also true for any possible outputs. We should not have a specific "Zipper" output, which only zips data, since compressing is a generic operation. Note that it doesn't exist, but what about the HttpOutput? What "format" does it output? And what if we want to write OGR output to a HTTP stream? Too much questions, beyond the scope of this issue. We should distinguish between the file format (GML, JSON, Shape, PostGIS, whatever) and the way how the data is delivered (file, dir, HTTP stream, zipped, combination of these ways).

However, regarding my original question, I can also imagine to be cases when a filter will convert a single input package into multiple output packages. The input package could come from a Stetl input or a filter. That doesn't matter. If the input package has end_of_doc and/or end_of_stream set, I think these flags should only be set in the last package which is being emitted by the filter.

The opposite is also true: when a filter combines multiple input packages into a single output package, and one of the input packages (the last one ideally!) has end_of_doc and/or end_of_stream set, the output package should also have these flags set.

I agree that the extra call to self.next is likely part of this problem. I'll try to understand better, as I'm not very familiar with the Stetl code yet. Writing unit tests is a very good way to become more familiar, because then you're forced to think about such things :)

fsteggink commented 8 years ago

As for the original use of the XmlElementReader followed by the XmlAssembler, this is the current TOP10NL workflow in NLExtract. The BGT and BRK are based on that as well. I'm giving priority to the current workflows.

justb4 commented 8 years ago

On 08-08-16 14:11, Frank Steggink wrote:

I agree that in this specific test case an XmlElementStreamer file input is actually a more appropriate source. I also agree that the FileInput itself could become more generic (by adding globbing and optional unzip capabilities). A single file, multiple files (glob), a dir or one or more ZIP files should all be treated the same. Basically, all format-specific filters (like XmlElementReader, note, the "Reader" part in the name!) should move to inputs. Good to see you agree! For .zip files this may also be more efficient since unzipping to a temp file is not necessary.

I will open a separate issue for "generic file input". Some file inputs like XmlElementStreamerFileInput and LineStreamerFileInput will open/parse a file but pass file-content (lines, parsed elements) in chunks on each read(). Currently these classes implement this fully within their read() function, but the generic pattern is that they maintain a "context" for the open/parsed file. This will not be too hard to generalize. Also for Inputs we may apply the Strategy Design Pattern (many refs on the web).

Note: XmlElementStreamerFileInput and XmlAssembler were inspired from your initial straight Python implementation for NLExtract-Top10NL!

But in general the current ETL use-cases, like NLExtract should be leading in any new features.

The same is also true for any possible outputs. We should not have a specific "Zipper" output, which only zips data, since compressing is a generic operation. Note that it doesn't exist, but what about the HttpOutput? What "format" does it output? And what if we want to write OGR output to a HTTP stream? Too much questions, beyond the scope of this issue. We should distinguish between the file format (GML, JSON, Shape, PostGIS, whatever) and the way how the data is delivered (file, dir, HTTP stream, zipped, combination of these ways). Yes, Stetl is mainly "content-type" driven, i.e. a Packet Format may contain a record, an etree(element) etc. The source of the content should not matter. HttpOutput is mainly a base class for (OGC) specific protocols like WFS and SOS. We can tackle Outputs later.

However, regarding my original question, I can also imagine to be cases when a filter will convert a single input package into multiple output packages. The input package could come from a Stetl input or a filter. That doesn't matter. If the input package has |end_of_doc| and/or |end_of_stream| set, I think these flags should only be set in the last package which is being emitted by the filter. Yes that is true. I guessed you mean "Packets". This already works for the XmlElementStreamerFileInput and LineStreamerFileInput: i.s.o. returning the entire file-content/etree in a single Packet, they maintain a "context". As Stetl will call invoke() until end_of_stream, these classes can output multiple Packets. For Filters usually an "array" Format is used to pass multiple data chunks like etree_feature_array or record_array. But this is somewhat harder for Filters. We may need another Packet-flag so the ETL will call on the Filter multiple times. Maybe end_of_doc may be sufficient.

Splitting to multiple ETL streams is another common ETL-feature. There is an issue for that: https://github.com/geopython/stetl/issues/35. I actually have a use-case in a project for that: outputting sensor-data to multiple web-services (OGC SOS and OGC SensorThings API).

The opposite is also true: when a filter combines multiple input packages into a single output package, and one of the input packages (the last one ideally!) has |end_of_doc| and/or |end_of_stream| set, the output package should also have these flags set. Yes, combining is also a common ETL-feature (viz FME), but IMO less common. This would be more analysis-like.

I agree that the extra call to self.next is likely part of this problem. I'll try to understand better, as I'm not very familiar with the Stetl code yet. Writing unit tests is a very good way to become more familiar, because then you're forced to think about such things :) Agree fully! That is why I am so happy with the Unit-tests: this allows us to redesign/refactor our code with less chances to break functionality. In time we may even apply a test-first strategy, I have applied in many Java-projects...

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/geopython/stetl/issues/49#issuecomment-238217614, or mute the thread https://github.com/notifications/unsubscribe-auth/AAjj5rVyR2mTg5e4vjhsmeH6rPyWx6_Hks5qdxz2gaJpZM4Jdzvm.