uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Problem with HelloWorld Example on Front Page of Repo #475

Closed andrewredd closed 4 years ago

andrewredd commented 4 years ago

Hi I'm running the following code:

from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from pyspark.sql.types import IntegerType
import numpy as np
from petastorm.fs_utils import FilesystemResolver

resolver=FilesystemResolver(output_url + 'test', spark.sparkContext._jsc.hadoopConfiguration(),
                             hdfs_driver='libhdfs')
fact = resolver.filesystem_factory()

HelloWorldSchema = Unischema('HelloWorldSchema', [
   UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
   UnischemaField('other_data', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
])

def row_generator(x):
   """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
   return {'id': x,
           'other_data': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}

def generate_hello_world_dataset(output_url, spark, sc):
   rows_count = 1000
   rowgroup_size_mb = 256

   # Wrap dataset materialization portion. Will take care of setting up spark environment variables as
   # well as save petastorm specific metadata
   with materialize_dataset(spark, url, HelloWorldSchema, rowgroup_size_mb, filesystem_factory=fact):

       rows_rdd = sc.parallelize(range(rows_count))\
           .map(row_generator)\
           .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))

       spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema(), ) \
           .coalesce(10) \
           .write \
           .mode('overwrite') \
           .parquet(url)

generate_hello_world_dataset(url, spark, sc)

This is the only way that I can run with a libhdfs setup. I get the following error.

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/worker.py", line 377, in main
    process()
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/basedir/home/aredd/venvs/prometheus/lib64/python3.6/site-packages/petastorm/etl/dataset_metadata.py", line 216, in get_row_group_info
  File "/basedir/home/aredd/venvs/prometheus/lib64/python3.6/site-packages/petastorm/fs_utils.py", line 108, in <lambda>
  File "/basedir/tmp/mapred.tmp1/yarn/nm/usercache/username/appcache/application_1576215002453_189781/container_e15_1576215002453_189781_01_000003/PRO/pro/lib64/python3.6/site-packages/petastorm/hdfs/namenode.py", line 266, in hdfs_connect_namenode
    return pyarrow.hdfs.connect(hostname, url.port or 8020, driver=driver, user=user)
  File "/basedir/tmp/mapred.tmp1/yarn/nm/usercache/username/appcache/application_1576215002453_189781/container_e15_1576215002453_189781_01_000003/PRO/pro/lib64/python3.6/site-packages/pyarrow/hdfs.py", line 215, in connect
    extra_conf=extra_conf)
  File "/basedir/tmp/mapred.tmp1/yarn/nm/usercache/username/appcache/application_1576215002453_189781/container_e15_1576215002453_189781_01_000003/PRO/pro/lib64/python3.6/site-packages/pyarrow/hdfs.py", line 40, in __init__
    self._connect(host, port, user, kerb_ticket, driver, extra_conf)
  File "pyarrow/io-hdfs.pxi", line 105, in pyarrow.lib.HadoopFileSystem._connect
  File "pyarrow/error.pxi", line 80, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: HDFS connection failed

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Thanks in advance

selitvin commented 4 years ago

Would be happy to help. In your example, I can not find how f in filesystem_factory=f is set. I would expect this line to look like: materialize_dataset(..., filesystem_factory=resolver.filesystem_factory()).

I took your example and fixed it up so I could run it. I don't have a way to check with a real hdfs setup. Can do it tomorrow, but this is the code I would expect to work if url is changed from file:/// to hdfs://....

If it does not work, can you please paste the entire call stack and a runnable code snippet, so I can try to reproduce.

Regards!

andrewredd commented 4 years ago

Thanks @selitvin I updated the thread with the full stacktrace. output_url is the hadoop directory where I want to generate the dataset. This error occurs when the job is sent out to workers (not driver side)

selitvin commented 4 years ago

Hmmm. Originally you were mentioning PicklingError which I can not see in the new call stack. The new call stack does look like some hdfs connection/environment issue (are there any error lines before the call stack that can be relevant)?

Anyway, I copy-pasted your code into my environment (spark 2.1, petastorm 0.8.0, pyarrow 0.11.1(I know, this is pretty old :) and was able to run it successfully (I think you have some unintentional mixup between url and output_url, but that does not affect the result.

andrewredd commented 4 years ago

Apologies correct now the error is connection related worker side. What should the output_url look like?

On Tue, Jan 28, 2020 at 9:17 PM Yevgeni Litvin notifications@github.com wrote:

Hmmm. Originally you were mentioning PicklingError which I can not see in the new call stack. The new call stack does look like some hdfs connection/environment issue (are there any error lines before the call stack that can be relevant)?

Anyway, I copy-pasted your code into my environment (spark 2.1, petastorm 0.8.0, pyarrow 0.11.1(I know, this is pretty old :) and was able to run it successfully (I think you have some unintentional mixup between url and output_url, but that does not affect the result.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/uber/petastorm/issues/475?email_source=notifications&email_token=ALQO4BXAER55ULMOUZQ6Y2LRADRKJA5CNFSM4KMJFKW2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEKFWRUA#issuecomment-579561680, or unsubscribe https://github.com/notifications/unsubscribe-auth/ALQO4BTB6NLTVUQKIEGCNDTRADRKJANCNFSM4KMJFKWQ .

selitvin commented 4 years ago

I was setting output_url="hdfs:///some/path/".

andrewredd commented 4 years ago

I have the same output url structure. This seems to likely be an issue on our end. Thank you for the help! Will post here as we find out the root cause of the connection issue

selitvin commented 4 years ago

Sure - good luck with the root cause!

andrewredd commented 4 years ago

Hi @selitvin

I've tried loading with your versions and am now getting a different error

image

This seems related to versions of pyarrow. Have you run into this before? Thanks in advance

selitvin commented 4 years ago

Did not see this one in particular. From the error message this looks like some sort of binary incompatibility of pyarrow, perhaps with python runtime? Something seems to be messed up in your environment. Maybe try reinstalling pyarrow?

andrewredd commented 4 years ago

I reinstalled the entire virtual environment with the versions that work above and still got that same error. I got back to the HDFS connection failed by reverting the versions to the latest install. I’m running cloudera2.4 so I imagine something is interfering with the versions when older ones are installed. Thanks!

On Fri, Jan 31, 2020 at 1:45 AM Yevgeni Litvin notifications@github.com wrote:

Did not see this one in particular. From the error message this looks like some sort of binary incompatibility of pyarrow, perhaps with python runtime? Something seems to be messed up in your environment. Maybe try reinstalling pyarrow?

— You are receiving this because you modified the open/close state. Reply to this email directly, view it on GitHub https://github.com/uber/petastorm/issues/475?email_source=notifications&email_token=ALQO4BRTJEAESKWFVVVA7D3RAPCIDA5CNFSM4KMJFKW2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEKNVP6Q#issuecomment-580605946, or unsubscribe https://github.com/notifications/unsubscribe-auth/ALQO4BUV7RGUZLGF2NTAKI3RAPCIDANCNFSM4KMJFKWQ .

selitvin commented 4 years ago

Ok. Closing the issue for now.

andrewredd commented 4 years ago

Hi

We've been able to resolve the issues above by wrapping up the code in a conda env with libhdfs3 installed. We are currently running into the error. PetastormMetadataError: Could not find _common_metadata file. Use materialize_dataset(..) in petastorm.etl.dataset_metadata.py to generate this file in your ETL code. You can generate it on an existing dataset using petastorm-generate-metadata.py This is after running

schema = [
    UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
    UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),
]
HelloWorldSchema = Unischema('HelloWorldSchema', schema)

def row_generator(x):
    """Returns a single entry in the generated dataset. Return a bunch of random values as an example."""
    return {'id': x,
            'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}

def generate_petastorm_dataset(spark, sc, output_url='file:///tmp/hello_world_dataset'):
    rowgroup_size_mb = 256

    # Wrap dataset materialization portion. Will take care of setting up spark environment variables as
    # well as save petastorm specific metadata
    rows_count = 1000
    print('pre_materialize')
    with materialize_dataset(spark, output_url, HelloWorldSchema, rowgroup_size_mb):
        rows_rdd = sc.parallelize(range(rows_count)) \
            .map(row_generator) \
            .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))
        print('pre_write')
        spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \
            .coalesce(10) \
            .write \
            .mode('overwrite') \
            .parquet(output_url)
        print('post_write')
    print('post_materialize')

print('pre-loading')
generate_petastorm_dataset(
    spark,
    sc,
    output_url=file_name)
print('post-loading')

print('pre-print')
with make_reader(file_name) as reader:
    for row in reader:
        print(row)
print('post-print')
print('post_materialize')`

The error is flagged after what seems to be a successful write on the 'print' stage. This is running petastorm 0.8.2 pyarrow 0.16.0 and pyspark 2.4.0. I've looked at the file with pq.dataset. The data is there and I can .to_pandas() the full set. So the write appears to be successful. Just this issue with creating a reader and loading it to a dataloader (pytorch). Any help would be greatly appreciated @selitvin

selitvin commented 4 years ago

Is it possible that the _common_metadata appears later on the disk? Could it be an eventual consistency issue? Would adding a sleep(60) between the dataset generation and make_reader call helps?