IntelPython / sdc

Numba extension for compiling Pandas data frames, Intel® Scalable Dataframe Compiler
https://intelpython.github.io/sdc-doc/
BSD 2-Clause "Simplified" License
646 stars 62 forks source link

Optimize DF.groupby.sum #829

Closed densmirn closed 4 years ago

densmirn commented 4 years ago

Example of generated code with such data as {'A': int, 'B': float, 'C': float, 'D': int, 'E': float, 'F': int, 'G': int, 'H': float, 'I': float, 'G': float, 'K': int} where group by 'A':

def _dataframe_groupby_sum_impl(self):
  by_column_data = self._parent._data[0]
  length = len(by_column_data)
  chunks = parallel_chunks(length)
  chunks_num = len(chunks)
  column_data_0 = self._parent._data[1]
  column_data_1 = self._parent._data[2]
  column_data_2 = self._parent._data[3]
  column_data_3 = self._parent._data[4]
  column_data_4 = self._parent._data[5]
  column_data_5 = self._parent._data[6]
  column_data_6 = self._parent._data[7]
  column_data_7 = self._parent._data[8]
  column_data_8 = self._parent._data[9]
  column_data_9 = self._parent._data[10]
  index_labels_parts = [Dict.empty(types.int64, by_type) for _ in range(chunks_num)]
  label_indices_parts = [Dict.empty(by_type, types.int64) for _ in range(chunks_num)]
  accums_dtype_0 = accums_dtypes[0]
  label_accums_parts_0 = [Dict.empty(by_type, accums_dtype_0) for _ in range(chunks_num)]
  accums_parts_0 = [List.empty_list(accums_dtype_0) for _ in range(chunks_num)]
  accums_dtype_2 = accums_dtypes[2]
  label_accums_parts_2 = [Dict.empty(by_type, accums_dtype_2) for _ in range(chunks_num)]
  accums_parts_2 = [List.empty_list(accums_dtype_2) for _ in range(chunks_num)]
  for i in prange(chunks_num):
    chunk = chunks[i]
    idx = 0
    index_labels = index_labels_parts[i]
    label_indices = label_indices_parts[i]
    label_accums_0 = label_accums_parts_0[i]
    accums_0 = accums_parts_0[i]
    accums_0.append(numpy.zeros(6, dtype=float64))
    label_accums_2 = label_accums_parts_2[i]
    accums_2 = accums_parts_2[i]
    accums_2.append(numpy.zeros(4, dtype=int32))
    for k in range(chunk.start, chunk.stop):
      label = by_column_data[k]
      _accums_0 = label_accums_0.get(label)
      _accums_2 = label_accums_2.get(label)
      if _accums_0 is None:
        new_accums_0 = numpy.zeros(6, dtype=float64)
        _accums_0 = new_accums_0
        label_accums_0[label] = new_accums_0
        new_accums_2 = numpy.zeros(4, dtype=int32)
        _accums_2 = new_accums_2
        label_accums_2[label] = new_accums_2
        index_labels[idx] = label
        label_indices[label] = idx
        idx += 1
      val_0 = column_data_0[k]
      if not numpy.isnan(val_0):
        _accums_0[0] += val_0
      val_1 = column_data_1[k]
      if not numpy.isnan(val_1):
        _accums_0[1] += val_1
      val_3 = column_data_3[k]
      if not numpy.isnan(val_3):
        _accums_0[2] += val_3
      val_6 = column_data_6[k]
      if not numpy.isnan(val_6):
        _accums_0[3] += val_6
      val_7 = column_data_7[k]
      if not numpy.isnan(val_7):
        _accums_0[4] += val_7
      val_8 = column_data_8[k]
      if not numpy.isnan(val_8):
        _accums_0[5] += val_8
      val_2 = column_data_2[k]
      if not numpy.isnan(val_2):
        _accums_2[0] += val_2
      val_4 = column_data_4[k]
      if not numpy.isnan(val_4):
        _accums_2[1] += val_4
      val_5 = column_data_5[k]
      if not numpy.isnan(val_5):
        _accums_2[2] += val_5
      val_9 = column_data_9[k]
      if not numpy.isnan(val_9):
        _accums_2[3] += val_9
      accums_0[0] = _accums_0
      accums_2[0] = _accums_2
  def reduce_indices(main_labels, auxiliary_labels, main_indices):
    idx = len(main_indices)
    for _idx in range(len(auxiliary_labels)):
      label = auxiliary_labels[_idx]
      if label in main_indices:
        continue
      main_labels[idx] = label
      main_indices[label] = idx
      idx += 1
  def reduce_accums(main_accums, auxiliary_accums, accums):
    for label, _acc in auxiliary_accums.items():
      _accums = main_accums.get(label)
      if _accums is None:
        _accums = _acc
        main_accums[label] = _accums
      else:
        for i in range(len(_accums)):
          _accums[i] += _acc[i]
      accums[0] = _accums
  shift = 1
  while shift < chunks_num:
    step = shift * 2
    stop = int(math.ceil((chunks_num - shift) / step))
    for i in prange(stop):
      main_idx = i * step
      auxiliary_idx = main_idx + shift
      main_labels = index_labels_parts[main_idx]
      auxiliary_labels = index_labels_parts[auxiliary_idx]
      main_indices = label_indices_parts[main_idx]
      reduce_indices(main_labels, auxiliary_labels, main_indices)
      main_accums_0 = label_accums_parts_0[main_idx]
      auxiliary_accums_0 = label_accums_parts_0[auxiliary_idx]
      accums_0 = accums_parts_0[main_idx]
      reduce_accums(main_accums_0, auxiliary_accums_0, accums_0)
      main_accums_2 = label_accums_parts_2[main_idx]
      auxiliary_accums_2 = label_accums_parts_2[auxiliary_idx]
      accums_2 = accums_parts_2[main_idx]
      reduce_accums(main_accums_2, auxiliary_accums_2, accums_2)
    shift *= 2
  index_labels = index_labels_parts[0]
  res_size = len(index_labels)
  group_keys = _sdc_asarray([index_labels[i] for i in range(res_size)])
  _sort = self._sort
  if _sort:
    argsorted_index = sdc_arrays_argsort(group_keys, kind="mergesort")
  label_accums_0 = label_accums_parts_0[0]
  label_accums_2 = label_accums_parts_2[0]
  result_data_0 = numpy.empty(res_size, dtype=subject_column_dtypes[0])
  result_data_1 = numpy.empty(res_size, dtype=subject_column_dtypes[1])
  result_data_2 = numpy.empty(res_size, dtype=subject_column_dtypes[2])
  result_data_3 = numpy.empty(res_size, dtype=subject_column_dtypes[3])
  result_data_4 = numpy.empty(res_size, dtype=subject_column_dtypes[4])
  result_data_5 = numpy.empty(res_size, dtype=subject_column_dtypes[5])
  result_data_6 = numpy.empty(res_size, dtype=subject_column_dtypes[6])
  result_data_7 = numpy.empty(res_size, dtype=subject_column_dtypes[7])
  result_data_8 = numpy.empty(res_size, dtype=subject_column_dtypes[8])
  result_data_9 = numpy.empty(res_size, dtype=subject_column_dtypes[9])
  res_chunks = parallel_chunks(res_size)
  for i in prange(len(res_chunks)):
    res_chunk = res_chunks[i]
    for k in range(res_chunk.start, res_chunk.stop):
      idx = argsorted_index[k] if _sort else k
      label = index_labels[idx]
      _accums_0 = label_accums_0[label]
      _accums_2 = label_accums_2[label]
      result_data_0[k] = _accums_0[0]
      result_data_1[k] = _accums_0[1]
      result_data_3[k] = _accums_0[2]
      result_data_6[k] = _accums_0[3]
      result_data_7[k] = _accums_0[4]
      result_data_8[k] = _accums_0[5]
      result_data_2[k] = _accums_2[0]
      result_data_4[k] = _accums_2[1]
      result_data_5[k] = _accums_2[2]
      result_data_9[k] = _accums_2[3]
  res_index = _sdc_take(group_keys, argsorted_index) if _sort else group_keys
  return pandas.DataFrame({'data_0': result_data_0, 'data_1': result_data_1, 'data_2': result_data_2, 'data_3': result_data_3, 'data_4': result_data_4, 'data_5': result_data_5, 'data_6': result_data_6, 'data_7': result_data_7, 'data_8': result_data_8, 'data_9': result_data_9}, index=res_index)
densmirn commented 4 years ago

In above example _accums_0 is array of float accumulators, _accums_2 is array of integer accumulators.

densmirn commented 4 years ago

Laptop numbers: Old implementation:

name nthreads type size median
DataFrame.groupby.sum 1 Python 2000000 0.216
DataFrame.groupby.sum 1 SDC 2000000 0.678
DataFrame.groupby.sum 4 SDC 2000000 0.606

New implementation:

name nthreads type size median
DataFrame.groupby.sum 1 Python 2000000 0.2
DataFrame.groupby.sum 1 SDC 2000000 0.285
DataFrame.groupby.sum 4 SDC 2000000 0.139

Python / NewSDC4 = 1.439 OldSDC4 / NewSDC4 = 4.36

There are 100 unique labels, only float data.