sodadata / soda-core

:zap: Data quality testing for the modern data stack (SQL, Spark, and Pandas) https://www.soda.io
https://go.soda.io/core-docs
Apache License 2.0
1.81k stars 192 forks source link

Yaml emitter error while executing scans using multithreading approach #2107

Closed fasidd911-dengg closed 4 days ago

fasidd911-dengg commented 5 days ago

Hi Soda team.

Background We are using the soda-core-athena library. The following code works when handling a single thread at a time. However, when we run multiple threads, sometimes we are running into the Yaml emitter error found in the stacktrace below. I'm wondering if there might be some thread safety issues going on within the ruaml yaml emitter, or in the way it is being used. class DQConfig: def init( self, name: str, configuration: str, checks: str, data_source_name: str = None ): self.name = name self.configuration = configuration self.variables = {} self.execution = None

more lines

def runNewScan(config: DQConfig): scan = Scan() scan.add_configuration_yaml_str(config.configuration) scan.set_data_source_name(config.data_source_name) scan.add_variables(variables=config.variables) scan.add_sodacl_yaml_str(config.checks) scan.execute() result = scan.get_scan_results() result["dataProductName"] = config.name return result

def run_scans(configs): with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
futures = [executor.submit(runNewScan, config) for config in configs] results = [future.result() for future in concurrent.futures.as_completed(futures)]
return results due_configs = get_due_dq_checks(configs) results_list = run_scans(due_configs) Stacktrace Traceback (most recent call last):

File "/home/devusr/app/main.py", line 352, in results_list = run_scans(due_configs) File "/home/devusr/app/main.py", line 99, in run_scans results = [future.result() for future in concurrent.futures.as_completed(futures)] File "/home/devusr/app/main.py", line 99, in results = [future.result() for future in concurrent.futures.as_completed(futures)] File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in result return self.get_result() File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in get_result raise self._exception File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 58, in run result = self.fn(*self.args, **self.kwargs) File "/home/devusr/app/main.py", line 91, in runNewScan scan.execute() File "/home/devusr/app/venv/lib/python3.9/site-packages/soda/scan.py", line 638, in execute self.scan_results = self.build_scan_results() File "/home/devusr/app/venv/lib/python3.9/site-packages/soda/scan.py", line 72, in build_scan_results checks = [check.get_dict() for check in self._checks if check.outcome is not None and check.archetype is None] File "/home/devusr/app/venv/lib/python3.9/site-packages/soda/scan.py", line 72, in checks = [check.get_dict() for check in self._checks if check.outcome is not None and check.archetype is None] File "/home/devusr/app/venv/lib/python3.9/site-packages/soda/execution/check/check.py", line 366, in get_dict "definition": self.create_definition(), File "/home/devusr/app/venv/lib/python3.9/site-packages/soda/execution/check/check.py", line 197, in create_definition return to_yaml_str({check_cfg.source_header: [{check_cfg.source_line: check_cfg.source_configurations}]}) File "/home/devusr/app/venv/lib/python3.9/site-packages/soda/common/yaml_helper.py", line 13, in to_yaml_str return YamlHelper.to_yaml(yaml_object) File "/home/devusr/app/venv/lib/python3.9/site-packages/soda/common/yaml_helper.py", line 24, in to_yaml cls.__yaml.dump(yaml_object, stream) File "/home/devusr/app/venv/lib/python3.9/site-packages/ruamel/yaml/main.py", line 563, in dump self._context_manager.dump(data) File "/home/devusr/app/venv/lib/python3.9/site-packages/ruamel/yaml/main.py", line 913, in dump self._yaml.representer.represent(data) File "/home/devusr/app/venv/lib/python3.9/site-packages/ruamel/yaml/representer.py", line 82, in represent self.serializer.serialize(node) File "/home/devusr/app/venv/lib/python3.9/site-packages/ruamel/yaml/serializer.py", line 109, in serialize self.serialize_node(node, None, None) File "/home/devusr/app/venv/lib/python3.9/site-packages/ruamel/yaml/serializer.py", line 213, in serialize_node self.emitter.emit( File "/home/devusr/app/venv/lib/python3.9/site-packages/ruamel/yaml/emitter.py", line 257, in emit self.state() File "/home/devusr/app/venv/lib/python3.9/site-packages/ruamel/yaml/emitter.py", line 705, in expect_block_mapping_key self.expect_node(mapping=True) File "/home/devusr/app/venv/lib/python3.9/site-packages/ruamel/yaml/emitter.py", line 467, in expect_node raise EmitterError('expected NodeEvent, but got {self.event!s}') ruamel.yaml.emitter.EmitterError: expected NodeEvent, but got {self.event!s}

Let me know your thoughts, or what else I could do to help. Thank you!

tools-soda commented 5 days ago

SAS-3764

m1n0 commented 4 days ago

hi, this is a duplicate of #2064 , there is also some potential advice to resolve this. Also, please use code formatting next time for easier readability. Closing, please continue in the linked issue.