Open jqmp opened 4 years ago
Just noting that we've also seen a google.resumable_media.common.DataCorruption
error in the wild; however, we don't know if this was a problem on Google's end or ours (I suspect Google's).
Recreated this error by dropping internet connection while gcs cache was being updated. Here is the stack trace
BrokenPipeError Traceback (most recent call last)
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
676 headers=headers,
--> 677 chunked=chunked,
678 )
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
391 else:
--> 392 conn.request(method, url, **httplib_request_kw)
393
~/.pyenv/versions/3.7.6/lib/python3.7/http/client.py in request(self, method, url, body, headers, encode_chunked)
1251 """Send a complete request to the server."""
-> 1252 self._send_request(method, url, body, headers, encode_chunked)
1253
~/.pyenv/versions/3.7.6/lib/python3.7/http/client.py in _send_request(self, method, url, body, headers, encode_chunked)
1297 body = _encode(body, 'body')
-> 1298 self.endheaders(body, encode_chunked=encode_chunked)
1299
~/.pyenv/versions/3.7.6/lib/python3.7/http/client.py in endheaders(self, message_body, encode_chunked)
1246 raise CannotSendHeader()
-> 1247 self._send_output(message_body, encode_chunked=encode_chunked)
1248
~/.pyenv/versions/3.7.6/lib/python3.7/http/client.py in _send_output(self, message_body, encode_chunked)
1064 + b'\r\n'
-> 1065 self.send(chunk)
1066
~/.pyenv/versions/3.7.6/lib/python3.7/http/client.py in send(self, data)
986 try:
--> 987 self.sock.sendall(data)
988 except TypeError:
~/.pyenv/versions/3.7.6/lib/python3.7/ssl.py in sendall(self, data, flags)
1033 while count < amount:
-> 1034 v = self.send(byte_view[count:])
1035 count += v
~/.pyenv/versions/3.7.6/lib/python3.7/ssl.py in send(self, data, flags)
1002 self.__class__)
-> 1003 return self._sslobj.write(data)
1004 else:
BrokenPipeError: [Errno 32] Broken pipe
During handling of the above exception, another exception occurred:
ProtocolError Traceback (most recent call last)
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/requests/adapters.py in send(self, request, stream, timeout, verify, cert, proxies)
448 retries=self.max_retries,
--> 449 timeout=timeout
450 )
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
724 retries = retries.increment(
--> 725 method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
726 )
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/urllib3/util/retry.py in increment(self, method, url, response, error, _pool, _stacktrace)
402 if read is False or not self._is_method_retryable(method):
--> 403 raise six.reraise(type(error), error, _stacktrace)
404 elif read is not None:
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/urllib3/packages/six.py in reraise(tp, value, tb)
733 if value.__traceback__ is not tb:
--> 734 raise value.with_traceback(tb)
735 raise value
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
676 headers=headers,
--> 677 chunked=chunked,
678 )
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
391 else:
--> 392 conn.request(method, url, **httplib_request_kw)
393
~/.pyenv/versions/3.7.6/lib/python3.7/http/client.py in request(self, method, url, body, headers, encode_chunked)
1251 """Send a complete request to the server."""
-> 1252 self._send_request(method, url, body, headers, encode_chunked)
1253
~/.pyenv/versions/3.7.6/lib/python3.7/http/client.py in _send_request(self, method, url, body, headers, encode_chunked)
1297 body = _encode(body, 'body')
-> 1298 self.endheaders(body, encode_chunked=encode_chunked)
1299
~/.pyenv/versions/3.7.6/lib/python3.7/http/client.py in endheaders(self, message_body, encode_chunked)
1246 raise CannotSendHeader()
-> 1247 self._send_output(message_body, encode_chunked=encode_chunked)
1248
~/.pyenv/versions/3.7.6/lib/python3.7/http/client.py in _send_output(self, message_body, encode_chunked)
1064 + b'\r\n'
-> 1065 self.send(chunk)
1066
~/.pyenv/versions/3.7.6/lib/python3.7/http/client.py in send(self, data)
986 try:
--> 987 self.sock.sendall(data)
988 except TypeError:
~/.pyenv/versions/3.7.6/lib/python3.7/ssl.py in sendall(self, data, flags)
1033 while count < amount:
-> 1034 v = self.send(byte_view[count:])
1035 count += v
~/.pyenv/versions/3.7.6/lib/python3.7/ssl.py in send(self, data, flags)
1002 self.__class__)
-> 1003 return self._sslobj.write(data)
1004 else:
ProtocolError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))
During handling of the above exception, another exception occurred:
ConnectionError Traceback (most recent call last)
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/bionic-0.8.2-py3.7.egg/bionic/persistence.py in _blob_from_file(self, file_path)
317 try:
--> 318 self._cloud.upload(file_path, blob_url)
319 except Exception as e:
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/bionic-0.8.2-py3.7.egg/bionic/persistence.py in upload(self, path, url)
743 assert path.is_file()
--> 744 self._tool.blob_from_url(url).upload_from_filename(str(path))
745
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/google/cloud/storage/blob.py in upload_from_filename(self, filename, content_type, client, predefined_acl)
1341 size=total_bytes,
-> 1342 predefined_acl=predefined_acl,
1343 )
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/google/cloud/storage/blob.py in upload_from_file(self, file_obj, rewind, size, content_type, num_retries, client, predefined_acl)
1286 created_json = self._do_upload(
-> 1287 client, file_obj, content_type, size, num_retries, predefined_acl
1288 )
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/google/cloud/storage/blob.py in _do_upload(self, client, stream, content_type, size, num_retries, predefined_acl)
1196 response = self._do_resumable_upload(
-> 1197 client, stream, content_type, size, num_retries, predefined_acl
1198 )
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/google/cloud/storage/blob.py in _do_resumable_upload(self, client, stream, content_type, size, num_retries, predefined_acl)
1143 while not upload.finished:
-> 1144 response = upload.transmit_next_chunk(transport)
1145
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/google/resumable_media/requests/upload.py in transmit_next_chunk(self, transport)
424 headers=headers,
--> 425 retry_strategy=self._retry_strategy,
426 )
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/google/resumable_media/requests/_helpers.py in http_request(transport, method, url, data, headers, retry_strategy, **transport_kwargs)
135 )
--> 136 return _helpers.wait_and_retry(func, RequestsMixin._get_status_code, retry_strategy)
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/google/resumable_media/_helpers.py in wait_and_retry(func, get_status_code, retry_strategy)
149 """
--> 150 response = func()
151 if get_status_code(response) not in RETRYABLE:
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/google/auth/transport/requests.py in request(self, method, url, data, headers, max_allowed_time, timeout, **kwargs)
453 timeout=timeout,
--> 454 **kwargs
455 )
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/requests/sessions.py in request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
529 send_kwargs.update(settings)
--> 530 resp = self.send(prep, **send_kwargs)
531
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/requests/sessions.py in send(self, request, **kwargs)
642 # Send the request
--> 643 r = adapter.send(request, **kwargs)
644
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/requests/adapters.py in send(self, request, stream, timeout, verify, cert, proxies)
497 except (ProtocolError, socket.error) as err:
--> 498 raise ConnectionError(err, request=request)
499
ConnectionError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))
During handling of the above exception, another exception occurred:
InternalCacheStateError Traceback (most recent call last)
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/bionic-0.8.2-py3.7.egg/bionic/persistence.py in save_result(self, result)
199 try:
--> 200 self._save_or_reregister_result(result)
201 except InternalCacheStateError as e:
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/bionic-0.8.2-py3.7.egg/bionic/persistence.py in _save_or_reregister_result(self, result)
265 else:
--> 266 blob_url = self._blob_from_file(file_path)
267 self._cloud.inventory.register_url(self.query, blob_url, value_hash)
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/bionic-0.8.2-py3.7.egg/bionic/persistence.py in _blob_from_file(self, file_path)
319 except Exception as e:
--> 320 raise InternalCacheStateError.from_failure("artifact file", file_path, e)
321
InternalCacheStateError: Unable to read artifact file PosixPath('/Users/zaki/Development/bionic/tests/bndata/test_cach_workflow/artifacts/raw_frame/5c880906-899e-43f8-bbe4-b55716ed617d/raw_frame.pq') in cache: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))
The above exception was the direct cause of the following exception:
InvalidCacheStateError Traceback (most recent call last)
<ipython-input-14-24a5fe28f92c> in <module>
----> 1 f2.get("print_stats")
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/bionic-0.8.2-py3.7.egg/bionic/flow.py in __call__(self, *args, **kwargs)
1645
1646 def __call__(self, *args, **kwargs):
-> 1647 return self._wrapped_method(*args, **kwargs)
1648
1649 def __dir__(self):
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/bionic-0.8.2-py3.7.egg/bionic/flow.py in get(self, name, collection, fmt, mode)
1195
1196 dnode = entity_dnode_from_descriptor(name)
-> 1197 orig_result_group = self._deriver.derive(dnode)
1198 # Remove all results with missing values.
1199 result_group = ResultGroup(
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/bionic-0.8.2-py3.7.egg/bionic/deriver.py in derive(self, dnode)
71 """
72 self.get_ready()
---> 73 return self._compute_result_group_for_dnode(dnode)
74
75 def export_dag(self, include_core=False):
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/bionic-0.8.2-py3.7.egg/bionic/deriver.py in _compute_result_group_for_dnode(self, dnode)
447 task_key_logger=task_key_logger,
448 )
--> 449 results_by_dnode_by_task_key = task_runner.run(requested_task_states)
450
451 for state in requested_task_states:
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/bionic-0.8.2-py3.7.egg/bionic/core/flow_execution.py in run(self, states)
92 continue
93
---> 94 self._process_entry(entry)
95
96 assert len(self._pending_entries) == 0
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/bionic-0.8.2-py3.7.egg/bionic/core/flow_execution.py in _process_entry(self, entry)
129 or state.task.is_simple_lookup
130 ):
--> 131 entry.compute(self.task_key_logger)
132 self._mark_entry_completed(entry)
133
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/bionic-0.8.2-py3.7.egg/bionic/core/task_execution.py in compute(self, task_key_logger)
128 if state.should_persist:
129 accessor = state._cache_accessors[ix]
--> 130 accessor.save_result(result)
131
132 value_hash = accessor.load_result_value_hash()
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/bionic-0.8.2-py3.7.egg/bionic/persistence.py in save_result(self, result)
200 self._save_or_reregister_result(result)
201 except InternalCacheStateError as e:
--> 202 self._raise_state_error_with_explanation(e)
203
204 def update_provenance(self):
~/.pyenv/versions/3.7.6/envs/bionic/lib/python3.7/site-packages/bionic-0.8.2-py3.7.egg/bionic/persistence.py in _raise_state_error_with_explanation(self, source_exc)
373 {inventory_root_urls}."""
374 )
--> 375 ) from source_exc
376
377
InvalidCacheStateError: Cached data may be in an invalid state; this should be impossible but could have resulted from either a bug or a change to the cached files. You should be able to repair the problem by removing all cached files under ('file:///Users/zaki/Development/bionic/tests/bndata/test_cach_workflow/inventory and gs://seller-ds-production/zaki/bionic_cache_test/inventory',).```
My approach here will be to catch and raise the BrokenPipeError if it occurs when gcs cache is being written to with a message like ("GCS connection dropped, try rerunning flow").
In general, almost any exception while reading a cached file will currently be reported as an
InvalidCacheStateError
, which means the user will be told the cache is corrupted and needs to be cleared. However, this includes things like like GCS timing out while copying a local cached file into a bucket (like here), which are not cache corruption problems at all. We may need to be more picky about which exceptions we report this way.