googledatalab / datalab

Interactive tools and developer experiences for Big Data on Google Cloud Platform.
Apache License 2.0
974 stars 249 forks source link

Converting from colab code to datalab with tfma and tfdv example #2095

Open OrielResearchCure opened 5 years ago

OrielResearchCure commented 5 years ago

Hi all,

I am working on the Chicago taxi example (chicago_taxi_tfma_local_playground) : https://github.com/tensorflow/model-analysis/tree/master/examples/chicago_taxi

As a first step, I have converted the code to a colab notebook- made few adjustment and have it working here: https://colab.research.google.com/drive/1pAlfpPM0SYd7xVbqQzyDVDSiQudGWmKg

Using that as a reference for installation versions and others, I have started working on a datalab version for the code (this will be reference for my model code eventually)

I have attached the ipynb (renamed it to txt) from datalab and would appreciate if i could get help with executing the command: train_stats = tfdv.generate_statistics_from_csv(data_location=os.path.join(TRAIN_DATA_DIR, 'data.csv')) the error is:

` AttributeErrorTraceback (most recent call last)

in () 1 # Compute stats over training data. ----> 2 train_stats = tfdv.generate_statistics_from_csv(data_location=os.path.join(TRAIN_DATA_DIR, 'data.csv')) /usr/local/envs/py2env/lib/python2.7/site-packages/tensorflow_data_validation/utils/stats_gen_lib.pyc in generate_statistics_from_csv(data_location, column_names, delimiter, output_path, stats_options, pipeline_options) 156 shard_name_template='', 157 coder=beam.coders.ProtoCoder( --> 158 statistics_pb2.DatasetFeatureStatisticsList))) 159 return load_statistics(output_path) 160 /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/pipeline.pyc in __exit__(self, exc_type, exc_val, exc_tb) 421 def __exit__(self, exc_type, exc_val, exc_tb): 422 if not exc_type: --> 423 self.run().wait_until_finish() 424 425 def visit(self, visitor): /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/pipeline.pyc in run(self, test_runner_api) 401 if test_runner_api and self._verify_runner_api_compatible(): 402 return Pipeline.from_runner_api( --> 403 self.to_runner_api(), self.runner, self._options).run(False) 404 405 if self._options.view_as(TypeOptions).runtime_type_check: /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/pipeline.pyc in run(self, test_runner_api) 414 finally: 415 shutil.rmtree(tmpdir) --> 416 return self.runner.run_pipeline(self) 417 418 def __enter__(self): /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.pyc in run_pipeline(self, pipeline) 136 runner = BundleBasedDirectRunner() 137 --> 138 return runner.run_pipeline(pipeline) 139 140 /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_pipeline(self, pipeline) 227 from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner 228 pipeline.visit(DataflowRunner.group_by_key_input_visitor()) --> 229 return self.run_via_runner_api(pipeline.to_runner_api()) 230 231 def run_via_runner_api(self, pipeline_proto): /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_via_runner_api(self, pipeline_proto) 230 231 def run_via_runner_api(self, pipeline_proto): --> 232 return self.run_stages(*self.create_stages(pipeline_proto)) 233 234 def create_stages(self, pipeline_proto): /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_stages(self, pipeline_components, stages, safe_coders) 1013 metrics_by_stage[stage.name] = self.run_stage( 1014 controller, pipeline_components, stage, -> 1015 pcoll_buffers, safe_coders).process_bundle.metrics 1016 finally: 1017 controller.close() /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_stage(self, controller, pipeline_components, stage, pcoll_buffers, safe_coders) 1130 result = BundleManager( 1131 controller, get_buffer, process_bundle_descriptor, -> 1132 self._progress_frequency).process_bundle(data_input, data_output) 1133 1134 while True: /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.pyc in process_bundle(self, inputs, expected_outputs) 1386 process_bundle=beam_fn_api_pb2.ProcessBundleRequest( 1387 process_bundle_descriptor_reference=self._bundle_descriptor.id)) -> 1388 result_future = self._controller.control_handler.push(process_bundle) 1389 1390 with ProgressRequester( /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.pyc in push(self, request) 1258 request.instruction_id = 'control_%s' % self._uid_counter 1259 logging.debug('CONTROL REQUEST %s', request) -> 1260 response = self.worker.do_instruction(request) 1261 logging.debug('CONTROL RESPONSE %s', response) 1262 return ControlFuture(request.instruction_id, response) /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.pyc in do_instruction(self, request) 210 # E.g. if register is set, this will call self.register(request.register)) 211 return getattr(self, request_type)(getattr(request, request_type), --> 212 request.instruction_id) 213 else: 214 raise NotImplementedError /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.pyc in process_bundle(self, request, instruction_id) 232 try: 233 with state_handler.process_instruction_id(instruction_id): --> 234 processor.process_bundle(instruction_id) 235 finally: 236 del self.bundle_processors[instruction_id] /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.pyc in process_bundle(self, instruction_id) 417 input_op_by_target[ 418 data.target.primitive_transform_reference --> 419 ].process_encoded(data.data) 420 421 # Finish all operations. /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.pyc in process_encoded(self, encoded_windowed_values) 122 decoded_value = self.windowed_coder_impl.decode_from_stream( 123 input_stream, True) --> 124 self.output(decoded_value) 125 126 /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.Operation.output() /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.Operation.output() /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.receive() /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.DoOperation.process() /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.DoOperation.process() /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.receive() /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.process() /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner._reraise_augmented() /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.process() /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/common.so in apache_beam.runners.common.PerWindowInvoker.invoke_process() /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/common.so in apache_beam.runners.common.PerWindowInvoker._invoke_per_window() /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/io/iobase.pyc in process(self, element, init_result) 1052 writer = self.sink.open_writer(init_result, str(uuid.uuid4())) 1053 for e in bundle[1]: # values -> 1054 writer.write(e) 1055 return [window.TimestampedValue(writer.close(), window.MAX_TIMESTAMP)] 1056 /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/io/filebasedsink.pyc in write(self, value) 386 387 def write(self, value): --> 388 self.sink.write_record(self.temp_handle, value) 389 390 def close(self): /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/io/filebasedsink.pyc in write_record(self, file_handle, value) 135 this sink's Coder. 136 """ --> 137 self.write_encoded_record(file_handle, self.coder.encode(value)) 138 139 def write_encoded_record(self, file_handle, encoded_value): /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/io/tfrecordio.pyc in write_encoded_record(self, file_handle, value) 278 279 def write_encoded_record(self, file_handle, value): --> 280 _TFRecordUtil.write_record(file_handle, value) 281 282 /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/io/tfrecordio.pyc in write_record(cls, file_handle, value) 99 file_handle.write('{}{}{}{}'.format( 100 encoded_length, --> 101 struct.pack(' 81 crc = crc32c_fn(value) 82 return (((crc >> 15) | (crc << 17)) + 0xa282ead8) & 0xffffffff 83 /usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/io/tfrecordio.pyc in _default_crc32c_fn(value) 45 try: 46 import snappy # pylint: disable=import-error ---> 47 _default_crc32c_fn.fn = snappy._snappy._crc32c # pylint: disable=protected-access 48 except ImportError: 49 logging.warning('Couldn\'t find python-snappy so the implementation of ' AttributeError: 'module' object has no attribute '_snappy' [while running 'WriteStatsOutput/Write/WriteImpl/WriteBundles'] ` [chicago_taxi_tfma_local_playground.ipynb.txt](https://github.com/googledatalab/datalab/files/2630332/chicago_taxi_tfma_local_playground.ipynb.txt) Looking forward to hearing back from you! Many thanks, eilalan
OrielResearchCure commented 5 years ago

I was able to make a progress and have most of the code running+ the tensorflow data validation package. the only issue that I have is with the rendering of the tensorflow-model-analysis which is installed by jupyter nbextension as following:

!jupyter nbextension enable --sys-prefix --py widgetsnbextension Returns: Enabling notebook extension jupyter-js-widgets/extension...

!jupyter nbextension install --py --symlink tensorflow_model_analysis --sys-prefix

Returns: /usr/local/envs/py2env/lib/python2.7/site-packages/h5py/init.py:36: FutureWarning: Conversion of the second argument of issubdtype from float to np.floating is deprecated. In future, it will be treated as np.float64 == np.dtype(float).type. from ._conv import register_converters as _register_converters /usr/local/envs/py2env/lib/python2.7/site-packages/scipy/spatial/init.py:96: ImportWarning: Not importing directory '/usr/local/envs/py2env/lib/python2.7/site-packages/scipy/spatial/qhull': missing init.py from .qhull import * /usr/local/envs/py2env/lib/python2.7/site-packages/scipy/optimize/_minimize.py:37: ImportWarning: Not importing directory '/usr/local/envs/py2env/lib/python2.7/site-packages/scipy/optimize/lbfgsb': missing init.py from .lbfgsb import _minimize_lbfgsb Installing /usr/local/envs/py2env/lib/python2.7/site-packages/tensorflow_model_analysis/static -> tfma_widget_js

Returns: /usr/local/envs/py2env/lib/python2.7/site-packages/h5py/init.py:36: FutureWarning: Conversion of the second argument of issubdtype from float to np.floating is deprecated. In future, it will be treated as np.float64 == np.dtype(float).type. from ._conv import register_converters as _register_converters /usr/local/envs/py2env/lib/python2.7/site-packages/scipy/spatial/init.py:96: ImportWarning: Not importing directory '/usr/local/envs/py2env/lib/python2.7/site-packages/scipy/spatial/qhull': missing init.py from .qhull import * /usr/local/envs/py2env/lib/python2.7/site-packages/scipy/optimize/_minimize.py:37: ImportWarning: Not importing directory '/usr/local/envs/py2env/lib/python2.7/site-packages/scipy/optimize/lbfgsb': missing init.py from .lbfgsb import _minimize_lbfgsb Enabling notebook extension tfma_widget_js/extension...

Please let me know how should I update these commands. Many thanks, eilalan