Open nandwalritik opened 1 year ago
===========
I0403 19:07:50.640271 140107295605376 translations.py:710] ==================== <function fix_side_input_pcoll_coders at 0x7f6d1a716e50> ====================
I0403 19:07:50.640815 140107295605376 translations.py:710] ==================== <function pack_combiners at 0x7f6d1a71b3a0> ====================
I0403 19:07:50.641649 140107295605376 translations.py:710] ==================== <function lift_combiners at 0x7f6d1a71b430> ====================
I0403 19:07:50.642046 140107295605376 translations.py:710] ==================== <function expand_sdf at 0x7f6d1a71b5e0> ====================
I0403 19:07:50.642954 140107295605376 translations.py:710] ==================== <function expand_gbk at 0x7f6d1a71b670> ====================
I0403 19:07:50.643432 140107295605376 translations.py:710] ==================== <function sink_flattens at 0x7f6d1a71b790> ====================
I0403 19:07:50.643682 140107295605376 translations.py:710] ==================== <function greedily_fuse at 0x7f6d1a71b820> ====================
I0403 19:07:50.646259 140107295605376 translations.py:710] ==================== <function read_to_impulse at 0x7f6d1a71b8b0> ====================
I0403 19:07:50.646443 140107295605376 translations.py:710] ==================== <function impulse_to_input at 0x7f6d1a71b940> ====================
I0403 19:07:50.646646 140107295605376 translations.py:710] ==================== <function sort_stages at 0x7f6d1a71bb80> ====================
I0403 19:07:50.647075 140107295605376 translations.py:710] ==================== <function add_impulse_to_dangling_transforms at 0x7f6d1a71bca0> ====================
I0403 19:07:50.647224 140107295605376 translations.py:710] ==================== <function setup_timer_mapping at 0x7f6d1a71baf0> ====================
I0403 19:07:50.647531 140107295605376 translations.py:710] ==================== <function populate_data_channel_coders at 0x7f6d1a71bc10> ====================
I0403 19:07:50.652454 140107295605376 statecache.py:234] Creating state cache with size 104857600
I0403 19:07:50.652946 140107295605376 worker_handlers.py:903] Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f6d1a5f17f0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
I0403 19:07:50.654591 140107295605376 environments.py:376] Default Python SDK image for environment is apache/beam_python3.9_sdk:2.46.0
I0403 19:07:50.721939 140107295605376 environments.py:376] Default Python SDK image for environment is apache/beam_python3.9_sdk:2.46.0
I0403 19:07:51.123130 140107295605376 environments.py:376] Default Python SDK image for environment is apache/beam_python3.9_sdk:2.46.0
I0403 19:07:51.159239 140107295605376 environments.py:376] Default Python SDK image for environment is apache/beam_python3.9_sdk:2.46.0
I0403 19:07:51.183309 140107295605376 environments.py:376] Default Python SDK image for environment is apache/beam_python3.9_sdk:2.46.0
I0403 19:08:32.564373 140107295605376 environments.py:376] Default Python SDK image for environment is apache/beam_python3.9_sdk:2.46.0
/home/nandwalritik/NSLSearch/amazon_dataset/MAVE/clean_amazon_product_metadata_main.py:155: MarkupResemblesLocatorWarning: The input looks more like a filename than markup. You may want to open this file and pass the filehandle into Beautiful Soup.
soup = bs4.BeautifulSoup(html, 'html.parser')
/home/nandwalritik/NSLSearch/amazon_dataset/MAVE/clean_amazon_product_metadata_main.py:155: MarkupResemblesLocatorWarning: The input looks more like a URL than markup. You may want to use an HTTP client like requests to get the document behind the URL, and feed that document to Beautiful Soup.
soup = bs4.BeautifulSoup(html, 'html.parser')
After running it for an hour, I only get these logs and beam-temp-mave_positives_counts-bab671c8d22411ed90895da68932ab02 beam-temp-mave_positives.jsonl-bab671c9d22411ed90895da68932ab02
and these two folders inside reproduce
folder, that too are empty.
Hi,
It is normal generating the full version of the dataset takes many hours using a single CPU thread.
You can try to change the pipeline options in here to use multiple threads.
You can start from Direct Runner with direct_num_workers > 0
to see if it works, then try other runners with more CPUs.
Please let me know if this works.
I tried with direct_num_workers = 8
for 2 hours still no output, Is their any way by which I can check percentage of task completion for apache beam pipeline process using tqdm.
Update :- After approximately 3 hours I got this error
Attaching logs for your reference:-
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1418, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1898, in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/json/decoder.py", line 353, in raw_decode
obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Unterminated string starting at: line 1 column 5864 (char 5863)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ritik/work/bloomTrain/MAVE/clean_amazon_product_metadata_main.py", line 340, in <module>
app.run(main)
File "/home/ritik/.local/lib/python3.9/site-packages/absl/app.py", line 308, in run
_run_main(main, args)
File "/home/ritik/.local/lib/python3.9/site-packages/absl/app.py", line 254, in _run_main
sys.exit(main(argv))
File "/home/ritik/work/bloomTrain/MAVE/clean_amazon_product_metadata_main.py", line 336, in main
pipeline(p)
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/pipeline.py", line 600, in __exit__
self.result = self.run()
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/pipeline.py", line 577, in run
return self.runner.run_pipeline(self, self._options)
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 202, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 455, in run_stages
bundle_results = self._execute_bundle(
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 783, in _execute_bundle
self._run_bundle(
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1012, in _run_bundle
result, splits = bundle_manager.process_bundle(
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1454, in process_bundle
for result, split_result in executor.map(execute, zip(part_inputs, # pylint: disable=bad-option-value
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/concurrent/futures/_base.py", line 609, in result_iterator
yield fs.pop().result()
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/concurrent/futures/_base.py", line 439, in result
return self.__get_result()
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1446, in execute
return bundle_manager.process_bundle(
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1348, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
response = self.worker.do_instruction(request)
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 624, in do_instruction
return getattr(self, request_type)(
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 662, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1062, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 232, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 1021, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/worker/operations.py", line 1030, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/common.py", line 1433, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
File "apache_beam/runners/common.py", line 818, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 982, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1582, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1695, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1420, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1508, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1418, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1898, in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/home/ritik/.conda/envs/attrValEcom/lib/python3.9/json/decoder.py", line 353, in raw_decode
obj, end = self.scan_once(s, idx)
RuntimeError: json.decoder.JSONDecodeError: Unterminated string starting at: line 1 column 5864 (char 5863) [while running 'JSONLoads_AmazonMetadata']
I think the above error occurs at the "JSONLoads_AmazonMetadata" stage in the beam pipeline, which is simply loading the Json file of All_Amazon_Meta.json
. Could you double check if the All_Amazon_Meta.json
you downloaded is the correct version?
For monitoring, wondering if you could try the Apache Flink Runner, which has a Dashboard to monitor the process. Normally we expect users have access to a cluster containing many CPUs to process those data. Another way for monitoring is to print the counters in the def process(...)
of beam.DoFn
s.
Unfortunately we can not open source the full version of the dataset due to license issue. Another thing you can do is to use the processing functions in clean_amazon_product_metadata_main.py
and join the mave_*_labels.jsonl
with All_Amazon_Meta.json
manually by yourself, for example, in a colab. The resulting data is not too large, which is about positives: 3.5G, negatives: 1.6G.
Ok Let me try. Thanks.
Hi I tried running
clean_amazon_product_metadata_main.sh
to generate full version of the dataset on amazon ec2 g3.4x.large instance, and even I tried that on my local but I am unable to generate full version of the dataset, can you share the full version of the dataset with paragraphs.