Closed dcherian closed 2 months ago
xref #396
Big improvement but still scales linearly with number of cohorts.
1736 3650 426000.0 116.7 0.0 reindexer = ( 1737 3650 1115000.0 305.5 0.0 partial(reindex_intermediates, agg=agg, unique_groups=cohort_index) 1738 3650 609000.0 166.8 0.0 if do_simple_combine 1739 else identity 1740 ) 1741 3650 885572000.0 242622.5 20.6 reindexed = subset_to_blocks(intermediate, blks, block_shape, reindexer, chunks_as_array) 1742 # now that we have reindexed, we can set reindex=True explicitlly 1743 7300 1385000.0 189.7 0.0 reduced_.append( 1744 7300 2782229000.0 381127.3 64.6 tree_reduce( 1745 3650 399000.0 109.3 0.0 reindexed, 1746 3650 1490000.0 408.2 0.0 combine=partial(combine, agg=agg, reindex=do_simple_combine), 1747 7300 1570000.0 215.1 0.0 aggregate=partial( 1748 3650 339000.0 92.9 0.0 aggregate, 1749 3650 307000.0 84.1 0.0 expected_groups=cohort_index, 1750 3650 309000.0 84.7 0.0 reindex=do_simple_combine, 1751 ), 1752 ) 1753 ) 1754 # This is done because pandas promotes to 64-bit types when an Index is created 1755 # So we use the index to generate the return value for consistency with "map-reduce" 1756 # This is important on windows 1757 3650 3268000.0 895.3 0.1 groups_.append(cohort_index.values) 1758 1759 1 230072000.0 2e+08 5.3 reduced = dask.array.concatenate(reduced_, axis=-1) 1760 1 368000.0 368000.0 0.0 groups = (np.concatenate(groups_),) 1761 1 379000.0 379000.0 0.0 group_chunks = (tuple(len(cohort) for cohort in groups_),)
xref #396
Big improvement but still scales linearly with number of cohorts.