alteryx / Automated-Manual-Comparison

Automated vs Manual Feature Engineering Comparison. Implemented using Featuretools.
https://towardsdatascience.com/why-automated-feature-engineering-will-change-the-way-you-do-machine-learning-5c15bf188b96
BSD 3-Clause "New" or "Revised" License
327 stars 150 forks source link

AttributeError: 'functools.partial' object has no attribute '__name__' #3

Closed pjgao closed 6 years ago

pjgao commented 6 years ago

I ran the notebook Featuretools on Dask.ipynb on my local machine, however something wrong happened when b.compute() run. image 10 feature matrix have generated when the error happen. image Here are the error info:

tornado.application - ERROR - Exception in callback <bound method BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado object at 0x7f9488d69d68>>
Traceback (most recent call last):
  File "/home/lili/anaconda3/lib/python3.6/site-packages/tornado/ioloop.py", line 1208, in _run
    self._next_timeout = self.io_loop.time()
  File "/home/lili/anaconda3/lib/python3.6/site-packages/bokeh/server/tornado.py", line 514, in _keep_alive
    c.send_ping()
  File "/home/lili/anaconda3/lib/python3.6/site-packages/bokeh/server/connection.py", line 46, in send_ping
    self._socket.ping(codecs.encode(str(self._ping_count), "utf-8"))
  File "/home/lili/anaconda3/lib/python3.6/site-packages/tornado/websocket.py", line 367, in ping
    self.ws_connection.write_ping(data)
  File "/home/lili/anaconda3/lib/python3.6/site-packages/tornado/websocket.py", line 882, in write_ping
    self._write_frame(True, 0x9, data)
  File "/home/lili/anaconda3/lib/python3.6/site-packages/tornado/websocket.py", line 846, in _write_frame
    return self.stream.write(frame)
  File "/home/lili/anaconda3/lib/python3.6/site-packages/tornado/iostream.py", line 525, in write
    future = self._set_read_callback(callback)
  File "/home/lili/anaconda3/lib/python3.6/site-packages/tornado/iostream.py", line 1058, in _check_closed
    size = 128 * 1024
tornado.iostream.StreamClosedError: Stream is closed
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-34-82e469b60feb> in <module>()
      1 overall_start = timer()
----> 2 b.compute()
      3 overall_end = timer()
      4 
      5 print(f"Total Time Elapsed: {round(overall_end - overall_start, 2)} seconds.")

~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    393     keys = [x.__dask_keys__() for x in collections]
    394     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 395     results = schedule(dsk, keys, **kwargs)
    396     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    397 

~/anaconda3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
   2198             try:
   2199                 results = self.gather(packed, asynchronous=asynchronous,
-> 2200                                       direct=direct)
   2201             finally:
   2202                 for f in futures.values():

~/anaconda3/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1567             return self.sync(self._gather, futures, errors=errors,
   1568                              direct=direct, local_worker=local_worker,
-> 1569                              asynchronous=asynchronous)
   1570 
   1571     @gen.coroutine

~/anaconda3/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    643             return future
    644         else:
--> 645             return sync(self.loop, func, *args, **kwargs)
    646 
    647     def __repr__(self):

~/anaconda3/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    275             e.wait(10)
    276     if error[0]:
--> 277         six.reraise(*error[0])
    278     else:
    279         return result[0]

~/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/anaconda3/lib/python3.6/site-packages/distributed/utils.py in f()
    260             if timeout is not None:
    261                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262             result[0] = yield future
    263         except Exception as exc:
    264             error[0] = sys.exc_info()

~/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1097 
   1098     def set_result(self, key, result):
-> 1099         """Sets the result for ``key`` and attempts to resume the generator."""
   1100         self.results[key] = result
   1101         if self.yield_point is not None and self.yield_point.is_ready():

~/anaconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1105             except:
   1106                 future_set_exc_info(self.future, sys.exc_info())
-> 1107             self.yield_point = None
   1108             self.run()
   1109 

~/anaconda3/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1443                             six.reraise(type(exception),
   1444                                         exception,
-> 1445                                         traceback)
   1446                     if errors == 'skip':
   1447                         bad_keys.add(key)

~/anaconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

~/anaconda3/lib/python3.6/site-packages/dask/bag/core.py in reify()
   1547 def reify(seq):
   1548     if isinstance(seq, Iterator):
-> 1549         seq = list(seq)
   1550     if seq and isinstance(seq[0], Iterator):
   1551         seq = list(map(list, seq))

~/anaconda3/lib/python3.6/site-packages/dask/bag/core.py in map_chunk()
   1707     else:
   1708         for a in zip(*args):
-> 1709             yield f(*a)
   1710 
   1711     # Check that all iterators are fully exhausted

<ipython-input-25-75ac088d04b8> in feature_matrix_from_entityset()
     11                                                  n_jobs = 1,
     12                                                  verbose = True,
---> 13                                                  chunk_size = es['app'].df.shape[0])
     14 
     15     feature_matrix.to_csv('data/fm/p%d_fm.csv' % es_dict['num'], index = True)

~/anaconda3/lib/python3.6/site-packages/featuretools/computational_backends/calculate_feature_matrix.py in calculate_feature_matrix()
    256                                                  cutoff_df_time_var=cutoff_df_time_var,
    257                                                  target_time=target_time,
--> 258                                                  pass_columns=pass_columns)
    259 
    260     feature_matrix = pd.concat(feature_matrix)

~/anaconda3/lib/python3.6/site-packages/featuretools/computational_backends/calculate_feature_matrix.py in linear_calculate_chunks()
    518                                           cutoff_df_time_var,
    519                                           target_time, pass_columns,
--> 520                                           backend=backend)
    521         feature_matrix.append(_feature_matrix)
    522         # Do a manual garbage collection in case objects from calculate_chunk

~/anaconda3/lib/python3.6/site-packages/featuretools/computational_backends/calculate_feature_matrix.py in calculate_chunk()
    340                                            ids,
    341                                            precalculated_features=precalculated_features,
--> 342                                            training_window=window)
    343 
    344             id_name = _feature_matrix.index.name

~/anaconda3/lib/python3.6/site-packages/featuretools/computational_backends/utils.py in wrapped()
     32         def wrapped(*args, **kwargs):
     33             if save_progress is None:
---> 34                 r = method(*args, **kwargs)
     35             else:
     36                 time = args[0].to_pydatetime()

~/anaconda3/lib/python3.6/site-packages/featuretools/computational_backends/calculate_feature_matrix.py in calc_results()
    314                                                     precalculated_features=precalculated_features,
    315                                                     ignored=all_approx_feature_set,
--> 316                                                     profile=profile)
    317             return matrix
    318 

~/anaconda3/lib/python3.6/site-packages/featuretools/computational_backends/pandas_backend.py in calculate_all_features()
    194 
    195                     handler = self._feature_type_handler(test_feature)
--> 196                     result_frame = handler(group, input_frames)
    197 
    198                     output_frames_type = self.feature_tree.output_frames_type(test_feature)

~/anaconda3/lib/python3.6/site-packages/featuretools/computational_backends/pandas_backend.py in _calculate_agg_features()
    421                 funcname = func
    422                 if callable(func):
--> 423                     funcname = func.__name__
    424 
    425                 to_agg[variable_id].append(func)

AttributeError: 'functools.partial' object has no attribute '__name__'
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:50460 remote=tcp://127.0.0.1:45867>
WillKoehrsen commented 6 years ago

You may be using an earlier version of the feature definitions. With the latest release of Featuretools (v=0.3.0), older saved feature definitions are not compatible. Make sure you are using the latest version of the feature definitions which can be found here and are called features.txt.

pjgao commented 6 years ago

Hello, Will! With the new features.txt and after upgrading Featuretools(v=0.3.0), I split the data to 120 partitions(RAM 48G, 24 cores). It only took 1120.01 seconds to run the b.compute()(It seems it costs only 18 minutes to execute dfs in this dataset, right?). And I do get p1_fm.csv to p120_fm.csv, but some errors may have occurred? image Besides, using single core to process the first part data cost only 126s, Can I say that without dask I need 126s*120(partitions) = 15120s = 4.2hon my machine? It seems different from your saying " takes about 25 hours on an AWS EC2 machine" image

WillKoehrsen commented 6 years ago

Thanks for the update! We have noticed some of the same warning messages when we run dfs but I don't think they affect the calculation of the features. We are working on fixing these issues. One way to check that the calculation was successful would be to join together the individual feature matrices into one and make sure the size is as expected.

In regards to the amount of time, Featuretools v0.3.0 is much faster (at least 50% in most cases) than earlier versions. Below is a graph comparing the speedup of the newest version on a number of tests we run on each release!

speedup

It might be possible that running on a single core would only take ~ 4 hours. However, I still think there is a significant speedup with Dask because you are able to use all cores on your machine. Also, this dataset is not that large so running the entire calculation at once is possible. For larger datasets that can't fit in memory, using the partitioning and running in parallel approach is the only way to complete the calculation. Learning Dask (or another parallelization framework such as Spark) is a good time investment if you want to work with large datasets and use your hardware efficiently.

pjgao commented 6 years ago

I am very surprised to hear this news, thanks for your contribution!

pjgao commented 6 years ago

Another question, Will. Can Featuretools extracts features from single table (like Titanic data set), if so, what kind of features it can extract?

WillKoehrsen commented 6 years ago

Yes, Featuretools can extract features from a single table using Transform primitives. You can see a list of all transform primitives on the docs. Transform primitives combine different columns, such as through arithmetic operations, or extract additional information from columns, such as the time of day or day of week. An example of using transform primitives can be found in the Loan Repayment notebook

You can also create additional entities from a single table by normalizing the table. There is an example of this in the Retail Spending notebook. I haven't used Featuretools on the Titanic dataset, but it should be possible to create more entities using the passenger class (Pclass) or the cabin (Cabin). Once you have created more entities, then you can use aggregation primitives to make additional features.

As a side note, a good place for posting general questions about Featuretools is on StackOverflow. Tag the question with Featuretools so we'll see it! We enjoy answering questions, but they can help out more people on a larger forum such as Stack Overflow.

WillKoehrsen commented 6 years ago

@pjgao Are you still having issues or did using the new features solve the problem?