vaexio / vaex

Out-of-Core hybrid Apache Arrow/NumPy DataFrame for Python, ML, visualization and exploration of big tabular data at a billion rows per second 🚀
https://vaex.io
MIT License
8.23k stars 590 forks source link

Groupby Assertion Error #2326

Open ashsharma96 opened 1 year ago

ashsharma96 commented 1 year ago

While doing groupby its giving below error:

  AssertionError                            Traceback (most recent call last)
  ~/.local/lib/python3.7/site-packages/vaex/dataframe.py in data_type(self, expression, array_type, internal, axis)
     2270             try:
  -> 2271                 data = self.evaluate(expression, 0, 1, filtered=False, array_type=array_type, parallel=False)
     2272             except:

  ~/.local/lib/python3.7/site-packages/vaex/dataframe.py in evaluate(self, expression, i1, i2, out, selection, filtered, array_type, parallel, chunk_size, progress)
     3092         else:
  -> 3093             return self._evaluate_implementation(expression, i1=i1, i2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size, progress=progress)
     3094 

  ~/.local/lib/python3.7/site-packages/vaex/dataframe.py in _evaluate_implementation(self, expression, i1, i2, out, selection, filtered, array_type, parallel, chunk_size, raw, progress)
     6520                 deps |= filter_deps
  -> 6521             columns = {k: dataset[k][:] for k in deps if k in dataset}
     6522 

  ~/.local/lib/python3.7/site-packages/vaex/dataframe.py in <dictcomp>(.0)
     6520                 deps |= filter_deps
  -> 6521             columns = {k: dataset[k][:] for k in deps if k in dataset}
     6522 

  ~/.local/lib/python3.7/site-packages/vaex/dataset.py in __getitem__(self, item)
      581             ds = self.ds.__getitem__(item)
  --> 582             for chunk_start, chunk_end, chunks in ds.chunk_iterator([self.name]):
      583                 ar = chunks[self.name]

  ~/.local/lib/python3.7/site-packages/vaex/dataset.py in chunk_iterator(self, columns, chunk_size, reverse)
     1649             assert o1 == c1
  -> 1650             assert o2 == c2
     1651             yield o1, o2, {**ochunks, **cchunks}

  AssertionError: 

  During handling of the above exception, another exception occurred:

  AssertionError                            Traceback (most recent call last)
  /tmp/ipykernel_1477/1034858715.py in <module>
  ----> 1 df37 = remove_duplicates(df37, ['tm_cid', 'tm_sid'])

  /tmp/ipykernel_1477/1939058854.py in remove_duplicates(df, grouping_cols)
        4     df["index"] = vaex.vrange(0, df.shape[0])
        5 #     df = df.materialize()
  ----> 6     df_group = df.groupby(grouping_cols, agg=vaex.agg.min("index"))
        7     df = df.join(df_group[["index_min"]], left_on="index", right_on="index_min")
        8     df = df[df.index_min.notna()]

  ~/.local/lib/python3.7/site-packages/vaex/dataframe.py in groupby(self, by, agg, sort, ascending, assume_sparse, row_limit, copy, progress, delay)
     7114             else:
     7115                 return groupby.agg(agg, delay=delay, progress=progressbar_agg)
  -> 7116         return self._delay(delay, progressbar.exit_on(next(groupby._promise_by)))
     7117 
     7118     @docsubst

  ~/.local/lib/python3.7/site-packages/vaex/dataframe.py in _delay(self, delay, task, progressbar)
     1777         else:
     1778             self.execute()
  -> 1779             return task.get()
     1780 
     1781     @docsubst

  ~/.local/lib/python3.7/site-packages/aplus/__init__.py in get(self, timeout)
      168             return self._value
      169         else:
  --> 170             raise self._reason
      171 
      172     def wait(self, timeout=None):

  ~/.local/lib/python3.7/site-packages/vaex/promise.py in callAndReject(r)
      119             try:
      120                 if aplus._isFunction(failure):
  --> 121                     ret.fulfill(failure(r))
      122                 else:
      123                     ret.reject(r)

  ~/.local/lib/python3.7/site-packages/vaex/progress.py in error(arg)
       89         def error(arg):
       90             self.exit()
  ---> 91             raise arg
       92         return promise.then(ok, error)
       93         # return promise

  ~/.local/lib/python3.7/site-packages/vaex/promise.py in callAndReject(r)
      119             try:
      120                 if aplus._isFunction(failure):
  --> 121                     ret.fulfill(failure(r))
      122                 else:
      123                     ret.reject(r)

  ~/.local/lib/python3.7/site-packages/vaex/delayed.py in _wrapped(exc)
       36         # import vaex
       37         # vaex.utils.print_stack_trace()
  ---> 38         raise exc
       39     return _wrapped
       40 

  ~/.local/lib/python3.7/site-packages/vaex/promise.py in callAndReject(r)
      119             try:
      120                 if aplus._isFunction(failure):
  --> 121                     ret.fulfill(failure(r))
      122                 else:
      123                     ret.reject(r)

  ~/.local/lib/python3.7/site-packages/vaex/progress.py in error(arg)
       89         def error(arg):
       90             self.exit()
  ---> 91             raise arg
       92         return promise.then(ok, error)
       93         # return promise

  ~/.local/lib/python3.7/site-packages/vaex/promise.py in callAndFulfill(v)
      104             try:
      105                 if aplus._isFunction(success):
  --> 106                     ret.fulfill(success(v))
      107                 else:
      108                     ret.fulfill(v)

  ~/.local/lib/python3.7/site-packages/vaex/delayed.py in call(_)
       80             kwargs_real = {key: promise.get() for key, promise in key_promise}
       81             args_real = list([promise.get() for promise in arg_promises])
  ---> 82             return f(*args_real, **kwargs_real)
       83 
       84         return allarguments.then(call, _log_error("delayed decorator"))

  ~/.local/lib/python3.7/site-packages/vaex/groupby.py in possible_combine(*binner_promises)
      640             # we let it mutate *our* dataframe
      641             for binner in self.by:
  --> 642                 binner._create_binner(self.df)
      643             cells = product([grouper.N for grouper in self.by])
      644             @vaex.delayed

  ~/.local/lib/python3.7/site-packages/vaex/groupby.py in _create_binner(self, df)
      314         else:
      315             self.binby_expression = "_ordinal_values(%s, %s)" % (self.expression, self.hash_map_unique_name)
  --> 316             self.binner = self.df._binner_ordinal(self.binby_expression, self.N)
      317         self.combine_expression = self.binby_expression
      318 

  ~/.local/lib/python3.7/site-packages/vaex/dataframe.py in _binner_ordinal(self, expression, ordinal_count, min_value, invert)
     5582 
     5583     def _binner_ordinal(self, expression, ordinal_count, min_value=0, invert=False):
  -> 5584         dtype = self.data_type(expression)
     5585         return BinnerOrdinal(expression, min_value, ordinal_count, invert, dtype)
     5586 

  ~/.local/lib/python3.7/site-packages/vaex/dataframe.py in data_type(self, expression, array_type, internal, axis)
     2271                 data = self.evaluate(expression, 0, 1, filtered=False, array_type=array_type, parallel=False)
     2272             except:
  -> 2273                 data = self.evaluate(expression, 0, 1, filtered=True, array_type=array_type, parallel=False)
     2274         if data_type is None:
     2275             # means we have to determine it from the data

  ~/.local/lib/python3.7/site-packages/vaex/dataframe.py in evaluate(self, expression, i1, i2, out, selection, filtered, array_type, parallel, chunk_size, progress)
     3091             return self.evaluate_iterator(expression, s1=i1, s2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size, progress=progress)
     3092         else:
  -> 3093             return self._evaluate_implementation(expression, i1=i1, i2=i2, out=out, selection=selection, filtered=filtered, array_type=array_type, parallel=parallel, chunk_size=chunk_size, progress=progress)
     3094 
     3095     @docsubst

  ~/.local/lib/python3.7/site-packages/vaex/dataframe.py in _evaluate_implementation(self, expression, i1, i2, out, selection, filtered, array_type, parallel, chunk_size, raw, progress)
     6519                 filter_deps = df.get_selection(vaex.dataframe.FILTER_SELECTION_NAME).dependencies(df)
     6520                 deps |= filter_deps
  -> 6521             columns = {k: dataset[k][:] for k in deps if k in dataset}
     6522 
     6523             if self.filtered and filtered:

  ~/.local/lib/python3.7/site-packages/vaex/dataframe.py in <dictcomp>(.0)
     6519                 filter_deps = df.get_selection(vaex.dataframe.FILTER_SELECTION_NAME).dependencies(df)
     6520                 deps |= filter_deps
  -> 6521             columns = {k: dataset[k][:] for k in deps if k in dataset}
     6522 
     6523             if self.filtered and filtered:

  ~/.local/lib/python3.7/site-packages/vaex/dataset.py in __getitem__(self, item)
      580             array_chunks = []
      581             ds = self.ds.__getitem__(item)
  --> 582             for chunk_start, chunk_end, chunks in ds.chunk_iterator([self.name]):
      583                 ar = chunks[self.name]
      584                 if isinstance(ar, pa.ChunkedArray):

  ~/.local/lib/python3.7/site-packages/vaex/dataset.py in chunk_iterator(self, columns, chunk_size, reverse)
     1648         for (o1, o2, ochunks), (c1, c2, cchunks) in zip(original_iter, cached_iter):
     1649             assert o1 == c1
  -> 1650             assert o2 == c2
     1651             yield o1, o2, {**ochunks, **cchunks}
     1652             for name in columns_to_cache:

  AssertionError: 
ashsharma96 commented 1 year ago

@JovanVeljanoski @maartenbreddels Anything on this one?

ashsharma96 commented 1 year ago

@JovanVeljanoski Can you please look this one?

ashsharma96 commented 1 year ago

@JovanVeljanoski @maartenbreddels Can any of you can convey if anyone can help on this one? Even after updating the library to latest version error is same.