scrapinghub / exporters

Exporters is an extensible export pipeline library that supports filter, transform and several sources and destinations
BSD 3-Clause "New" or "Revised" License
40 stars 10 forks source link

Add Avro formatter #356

Open manugarri opened 7 years ago

manugarri commented 7 years ago

This PR adds a new formatter to format data in the Avro format, using fastavro as the backend.

This format is designed to be parallelized. To allow for streaming write (which seems to be the only interface allowed to formatters, with their header and footer apis), using a sync_interval of 0 forces fastavro's Writer to add a block end (basically a 16bit random string) after every record write. This ensures the validity of the output created by the exporters Writers.

manugarri commented 7 years ago

Not sure if Travis' failure has to do with my PR (tests passed locally).



./tests/test_formatters.py:172:71: E203 whitespace before ','

./tests/test_formatters.py:197:1: W391 blank line at end of file

make: *** [test] Error 1

ERROR: InvocationError: '/usr/bin/make test'```

I see some pep related flags that I can change right now. 
eliasdorneles commented 7 years ago

I tried running it with the following config:

{
    "exporter_options": {
        "formatter": {
            "name": "exporters.export_formatter.AvroExportFormatter",
            "options": {
                "schema": "{\"namespace\": \"random.names\", \"type\": \"record\", \"name\": \"TestRecord\", \"fields\": [{\"type\": \"string\", \"name\": \"country_code\"}, {\"type\": \"string\", \"name\": \"city\"}, {\"default\": \"N/A\", \"type\": \"string\", \"name\": \"state\"}, {\"default\": 0, \"type\": \"int\", \"name\": \"value\"}]}"
            }
        }
    },
    "reader": {
        "name": "exporters.readers.random_reader.RandomReader",
        "options": {
            "number_of_items": 100,
            "batch_size": 10
        }
    },
    "writer": {
        "name": "exporters.writers.fs_writer.FSWriter",
        "options": {
            "filebase": "/tmp/avro_format_test_",
            "items_per_buffer_write": 100
        }
    }
}

And it crashed with:

ERROR:export-pipeline: -- EXPORTMANAGER -- 'dict' object has no attribute 'encode'
Traceback (most recent call last):
  File "bin/export.py", line 41, in <module>
    run(args)
  File "bin/export.py", line 36, in run
    exporter.export()
  File "/home/elias/hacking/scrapinghub/exporters/exporters/export_managers/base_exporter.py", line 224, in export
    self._run_pipeline()
  File "/home/elias/hacking/scrapinghub/exporters/exporters/export_managers/base_exporter.py", line 167, in _run_pipeline
    self._run_pipeline_iteration()
  File "/home/elias/hacking/scrapinghub/exporters/exporters/export_managers/base_exporter.py", line 76, in _run_pipeline_iteration
    self.writer.write_batch(batch=next_batch)
  File "/home/elias/hacking/scrapinghub/exporters/exporters/writers/base_writer.py", line 111, in write_batch
    self.write_buffer.buffer(item)
  File "/home/elias/hacking/scrapinghub/exporters/exporters/write_buffers/base.py", line 37, in buffer
    self.items_group_files.add_item_to_file(item, key)
  File "/home/elias/hacking/scrapinghub/exporters/exporters/write_buffers/grouping.py", line 115, in add_item_to_file
    buffer_file.add_item_to_file(item)
  File "/home/elias/hacking/scrapinghub/exporters/exporters/write_buffers/grouping.py", line 84, in add_item_to_file
    content = self.formatter.format(item)
  File "/home/elias/hacking/scrapinghub/exporters/exporters/export_formatter/avro_export_formatter.py", line 62, in format
    self.writer.write(item)
  File "/home/elias/.virtualenvs/exporters2/local/lib/python2.7/site-packages/fastavro/writer.py", line 537, in write
    write_data(self.io, record, self.schema)
  File "/home/elias/.virtualenvs/exporters2/local/lib/python2.7/site-packages/fastavro/writer.py", line 432, in write_data
    return fn(fo, datum, schema)
  File "/home/elias/.virtualenvs/exporters2/local/lib/python2.7/site-packages/fastavro/writer.py", line 363, in write_record
    name, field.get('default')), field['type'])
  File "/home/elias/.virtualenvs/exporters2/local/lib/python2.7/site-packages/fastavro/writer.py", line 432, in write_data
    return fn(fo, datum, schema)
  File "/home/elias/.virtualenvs/exporters2/local/lib/python2.7/site-packages/fastavro/writer.py", line 178, in write_utf8
    write_bytes(fo, utob(datum))
  File "fastavro/_six.py", line 48, in fastavro._six.py2_utob (fastavro/_six.c:2462)
  File "fastavro/_six.py", line 49, in fastavro._six.py2_utob (fastavro/_six.c:2320)
AttributeError: 'dict' object has no attribute 'encode'

I'm probably doing something wrong, any help @manugarri ? :)

manugarri commented 7 years ago

@eliasdorneles the issue you are facing is because that the schema you provided does not match what RandomReader produces.

The items produced by it look like this:

{'city': {'name': u'alicante', 'district': u'dist3'}, 'state': u'val\xe9ncia', 'value': 1301, 'country_code': u'us', 'key': 0}

So you would need the following avro schema to define those items:

[
  {
    "type": "record",
    "name": "City",
    "fields": [
      {
        "name": "name",
        "type": "string"
      },
      {
        "name": "district",
        "type": "string"
      }
    ]
  },
  {
    "type": "record",
    "name": "TestRecord",
    "fields": [
      {
        "name": "city",
        "type": "City"
      },
      {
        "name": "state",
        "type": "string"
      },
      {
        "name": "value",
        "type": "int",
        "default": 0
      },
      {
        "name": "key",
        "type": "int"
      },
      {
        "name": "country_code",
        "type": "string"
      }
    ]
  }
]

Basically, a City type inside the general Item record. I tested it and does work.

tsrdatatech commented 7 years ago

@manugarri we are planning to update exporters to python 3 soon, if possible could you make sure your code reflects that? Thanks

manugarri commented 7 years ago

@tsrdatatech I run 2to3 on my code and the only thing I see is the change in tests/test_formatters.py

-        with self.assertRaisesRegexp(ConfigurationError, "requires at least one of"):
+        with self.assertRaisesRegex(ConfigurationError, "requires at least one of"):
             CSVExportFormatter({})

Which is not really my code (its the test for CSVExportFormatter).

Testing on Py3 raises all kinds of errors from the library not related to this PR.

tsrdatatech commented 7 years ago

No worries, we will fix this.

manugarri commented 7 years ago

awesome, thanks @tsrdatatech let me know if you need anything else on my side. I'd rather not have to keep my own fork when this PR can benefit the community :)

tsrdatatech commented 7 years ago

yw @manugarri, we should have the Python 3 version ready by early next week, I agree this PR should be able to be added in then. Thanks for your contribution.

manugarri commented 7 years ago

Awesome, thanks @tsrdatatech and @eliasdorneles :)

tribeclickgh commented 6 years ago

Any advances on this guys?

tsrdatatech commented 6 years ago

@tribeclickgh There is a python 3 branch at https://github.com/scrapinghub/exporters/tree/python3 that is closer to being complete but we have not had time to finish it, your more than welcome to do that if you want. Not sure when we will have this complete at this time.