MarkEdmondson1234 / ga-bq-stream

Stream JSON data into BigQuery
30 stars 18 forks source link

Partial Data in Bigquery #4

Closed jerdeboer closed 5 years ago

jerdeboer commented 6 years ago

According to the logs everything works fine and records are inserted as designed. But for some strange reason as new records are added old records are removed, from over 2000 successful entries only 30 odd are available in BQ.

As far as I can see new entries are recorded and old ones replaced etc etc. There are about 10 requests per 5 secs or so, so I am not sure if that could be the issue

MarkEdmondson1234 commented 6 years ago

Are you querying with caching turned off?

jerdeboer commented 6 years ago

Hi Mark, yes I have turned off caching off.

As a test I turned off the data stream, hoping that the table would fix itself but out of 3500 successful log entries only 48 have made it to big query.

I have not modified any of your code

jerdeboer commented 6 years ago

This is some sample data that was collected

https://docs.google.com/spreadsheets/d/1Svm6cDWzSvD0RHGo_O5J16UDvqFfDAK5irNki5nYtos/edit?usp=sharing

MarkEdmondson1234 commented 6 years ago

Hmm that is mysterious, we have it running now and its working. In the GCP logs, do you see the logging that occurs here within your App Engine?

https://github.com/MarkEdmondson1234/ga-bq-stream/blob/master/main.py#L82

MarkEdmondson1234 commented 6 years ago

Perhaps check your bigquery Python client is the same, this repo is 2 years old by now so some library code may have changed. But if you are getting SOME data then I can't think why you can't get the rest, other than perhaps its in a partition you are not querying? See if your data is there when you query like this:

SELECT * FROM [your-table] WHERE _PARTITIONTIME is null
jerdeboer commented 6 years ago

Hi Mark where SELECT * FROM [your-table] WHERE _PARTITIONTIME is null there are no results.

Below is a sample screenshot of the logging that occurs successfully, last night the total ran to 31 where is as the logs count over 2000.

Will try to downgrade the Python Client, but you are right in saying it's odd that some data does come through

image

jerdeboer commented 6 years ago

Changed the Python client and still the same, run out of ideas. What's more confusing is that a search on null partition is always empty

jerdeboer commented 6 years ago

This seems to be the error

Traceback (most recent call last): File "/base/alloc/tmpfs/dynamic_runtimes/python27g/90ff42587f3b5ce/python27/python27_lib/versions/1/google/appengine/runtime/wsgi.py", line 267, in Handle result = handler(dict(self._environ), self._StartResponse) File "/base/alloc/tmpfs/dynamic_runtimes/python27g/90ff42587f3b5ce/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1519, in __call__ response = self._internal_error(e) File "/base/alloc/tmpfs/dynamic_runtimes/python27g/90ff42587f3b5ce/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1511, in __call__ rv = self.handle_exception(request, response, e) File "/base/alloc/tmpfs/dynamic_runtimes/python27g/90ff42587f3b5ce/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1505, in __call__ rv = self.router.dispatch(request, response) File "/base/alloc/tmpfs/dynamic_runtimes/python27g/90ff42587f3b5ce/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1253, in default_dispatcher return route.handler_adapter(request, response) File "/base/alloc/tmpfs/dynamic_runtimes/python27g/90ff42587f3b5ce/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 1077, in __call__ return handler.dispatch() File "/base/alloc/tmpfs/dynamic_runtimes/python27g/90ff42587f3b5ce/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 547, in dispatch return self.handle_exception(e, self.app.debug) File "/base/alloc/tmpfs/dynamic_runtimes/python27g/90ff42587f3b5ce/python27/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 545, in dispatch return method(*args, **kwargs) File "/base/data/home/apps/e~stratos-221306/20181113t104230.413953938751831313/main.py", line 129, in post stream_data(datasetId, tableId, b, ts) File "/base/data/home/apps/e~stratos-221306/20181113t104230.413953938751831313/main.py", line 64, in stream_data data['ts'] = time_stamp TypeError: list indices must be integers, not str

MarkEdmondson1234 commented 6 years ago

That suggests the timestamp you are sending in is type string not integer - what is your URL request?

It seems the json data is a list and not a dictionary - perhaps you are only sending in a json list and should change it to a json dict:

data = json_data

data['ts'] = time_stamp 

No idea why it used to work and stopped working, I guess the data changed somewhere.

MarkEdmondson1234 commented 6 years ago

You can add a log of logging.debug(json.dumps(data)) before that line to see what you are trying to index.

jerdeboer commented 6 years ago

This is the data dump, in terms of of GTM, I have followed the directions as you suggested

{"searched_adult_pax_count": "1", "searched_sector_type": "round-trip", "searched_departure_station_name": "CPT", "searched_arrival_station_name": "HLA", "searched_carrier_name_outbound": "MN", "searched_ATDSAST_time_outbound": "08:50", "UserID": "6645-739-493-63-1587538826", "searched_infant_pax_count": "0", "searched_flight_number_inbound": "475", "searched_ATDSAST_time_inbound": "20:00", "searched_departure_date": "01-17-2019", "searched_cabin": "Economy", "searched_carrier_name_inbound": "MN", "searched_child_pax_count": "0", "searched_flight_number_outbound": "456", "searched_flight_pace": "64", "searched_return_date": "01-18-2019"}

GTM code

` var bqArray = {};

bqArray["UserID"] = "{{userID}}";
bqArray["searched_flight_pace"] = "{{DX-Flightpace}}";
bqArray["searched_sector_type"] = "{{DX-FlightType}}";
bqArray["searched_cabin"] = "{{DX-FlightCabin}}";
bqArray["searched_adult_pax_count"] = "{{DX-FlightADTPax}}";
bqArray["searched_child_pax_count"] = "{{DX-FlightCHDPax}}";
bqArray["searched_infant_pax_count"] = "{{DX-FlightINFPax}}";
bqArray["searched_departure_station_name"] = "{{DX-LookupDepartureCity}}"; bqArray["searched_departure_date"] = "{{DX - Departure Date}}"; bqArray["searched_ATDSAST_time_outbound"] = "{{DX - Inbound Time}}"; bqArray["searched_flight_number_outbound"] = "{{DX-FlightSegment1FlightNumber}}"; bqArray["searched_carrier_name_outbound"] = "{{DX-FlightSegment1OperatingAirline}}"; bqArray["searched_arrival_station_name"] = "{{DX-LookupArrivalCity}}"; bqArray["searched_return_date"] = "{{DX - Arrival Date}}";
bqArray["searched_ATDSAST_time_inbound"] = "{{DX - Outbound Time}}";
bqArray["searched_flight_number_inbound"] = "{{DX-FlightSegment2FlightNumber}}"; bqArray["searched_carrier_name_inbound"] = "{{DX-FlightSegment2OperatingAirline}}";

jQuery.post("https://stratos-221306.appspot.com/bq-streamer", {"bq":JSON.stringify(bqArray)}); `

MarkEdmondson1234 commented 6 years ago

If its an error for this line: https://github.com/MarkEdmondson1234/ga-bq-stream/blob/1972a04e70bfa12689bbf752b6c97cadd972c0ac/main.py#L64

I took your data dump to try and replicate the error, but its working ok for me:

>>> import json
>>> import time

>>> data_s = '{"searched_adult_pax_count": "1", "searched_sector_type": "round-trip", "searched_departure_station_name": "CPT", "searched_arrival_station_name": "HLA", "searched_carrier_name_outbound": "MN", "searched_ATDSAST_time_outbound": "08:50", "UserID": "6645-739-493-63-1587538826", "searched_infant_pax_count": "0", "searched_flight_number_inbound": "475", "searched_ATDSAST_time_inbound": "20:00", "searched_departure_date": "01-17-2019", "searched_cabin": "Economy", "searched_carrier_name_inbound": "MN", "searched_child_pax_count": "0", "searched_flight_number_outbound": "456", "searched_flight_pace": "64", "searched_return_date": "01-18-2019"}'

>>> json.loads(data_s)
{u'searched_adult_pax_count': u'1', u'searched_sector_type': u'round-trip', u'searched_departure_station_name': u'CPT', u'searched_arrival_station_name': u'HLA', u'searched_carrier_name_outbound': u'MN', u'searched_ATDSAST_time_outbound': u'08:50', u'UserID': u'6645-739-493-63-1587538826', u'searched_infant_pax_count': u'0', u'searched_flight_number_inbound': u'475', u'searched_ATDSAST_time_inbound': u'20:00', u'searched_departure_date': u'01-17-2019', u'searched_cabin': u'Economy', u'searched_carrier_name_inbound': u'MN', u'searched_child_pax_count': u'0', u'searched_flight_number_outbound': u'456', u'searched_flight_pace': u'64', u'searched_return_date': u'01-18-2019'}

>>> data = json.loads(data_s)

# this is set in the function call, are you sure this is being set?
>>> time_stamp = time.time()
>>> time_stamp
1542184826.499915
>>> data['ts'] = time_stamp
>>> data
{u'searched_adult_pax_count': u'1', u'searched_sector_type': u'round-trip', u'searched_departure_station_name': u'CPT', u'searched_arrival_station_name': u'HLA', u'searched_carrier_name_outbound': u'MN', u'searched_ATDSAST_time_outbound': u'08:50', u'UserID': u'6645-739-493-63-1587538826', u'searched_infant_pax_count': u'0', 'ts': 1542184837.375784, u'searched_flight_number_inbound': u'475', u'searched_ATDSAST_time_inbound': u'20:00', u'searched_departure_date': u'01-17-2019', u'searched_cabin': u'Economy', u'searched_carrier_name_inbound': u'MN', u'searched_child_pax_count': u'0', u'searched_flight_number_outbound': u'456', u'searched_flight_pace': u'64', u'searched_return_date': u'01-18-2019'}
>>> type(data)
<type 'dict'>

It seems to be when it tries to add 'ts' to the 'data' object - you can use 'type()' on these objects before the bug line to confirm they are what is expected (float and dict). You could try operating on data[0] instead, because possibly you are sending in a list of dicts instead in the JSON parsing.

Sorry I can't be more help, I need reproducible examples before I can fix any possible bugs in the code, but hopefully above will put you on the right track.

jerdeboer commented 5 years ago

Solved the timestamp issue but still having the data ingestion issues, went deeper into the issue and managed to get a response from the API { "kind": "bigquery#tableDataInsertAllResponse", "insertErrors": [ { "index": 0, "errors": [ { "reason": "invalid", "location": "serid", "debugInfo": "", "message": "no such field." } ] } ] }

MarkEdmondson1234 commented 5 years ago

Glad you had progress :)

That error says your schema is not matching the data you are sending in, so perhaps just a schema update for your table?

jerdeboer commented 5 years ago

So despite the code running perfect without any errors in the logs, BQ is not playing ball. My last attempt

Rows are successfully inserted at approximately 1 every second without any issues

So this is what we know, the JSON data that is fed into BQ is fine I have included the hash as its all test data anyway

https://cauldron-223007.appspot.com/bq-get?hash=4cc20ca29afa8185a677c35a57baca9ce6e4c22ccf2833295e43c741

When running a query on _PARTITIONTIME is null the results are null, no data seems to reach that point.

Instead of appending Data, it seems to it overwrite existing data.

MarkEdmondson1234 commented 5 years ago

That the data is overwriting suggests to me the insert id is the same, which should be a random string inserted each time by this bit of code:

 errors = table.insert_data(rows, row_ids = str(uuid.uuid4()))
jerdeboer commented 5 years ago

I have that exact code... 2018-11-19 10:52:01.775 SAST Loaded 1 row into soups:leeks$20181119 (/base/data/home/apps/e~cauldron-223007/20181119t104215.414093083898287797/main.py:84)

jerdeboer commented 5 years ago

Hi Mark

Solved

Seems like errors = table.insert_data(rows, row_ids = str(uuid.uuid4())) is the actual problem child, if you remove the row_ids = str(uuid.uuid4()) then BQ seems to handle the rows insert ID's without the need to specifically ID the rows.

Thanks again for the help

MarkEdmondson1234 commented 5 years ago

Great! Glad it’s sorted.