kordk / torch-ecpg

(GPU accelerated) eCpG mapper
BSD 3-Clause "New" or "Revised" License
2 stars 0 forks source link

Regression optimization: fastest way to save tensors #20

Closed liamgd closed 1 year ago

liamgd commented 1 year ago

The regression_full algorithm utilizes the power of the CUDA device much better than regression_single and has a large number of optimizations that drastically reduce the time it takes to compute the regression result tensors. However, the algorithm is severely bottlenecked by the saving of these tensors, particularly on the GPU because of the tensor conversion from GPU to CPU.

liamgd commented 1 year ago
out = pandas.DataFrame(
    torch.cat(results),
    index=index_chunk,
    columns=columns,
).astype(float)
Tensor to dataframe as type float: 33.2069 s ``` [INFO] Initializing regression variables [INFO] Use CPU not supplied. Checking if CUDA is available. [INFO] Using CUDA [INFO] Running with 96 degrees of freedom [INFO] Running regression_full... [INFOTIMER] Converted C to tensor in 0.0004 seconds [INFOTIMER] Converted G to tensor in 0.0011 seconds [INFOTIMER] Created ones in 0.0001 seconds [INFOTIMER] Created X in 0.0003 seconds [INFOTIMER] Transposed X in 0.0001 seconds [INFOTIMER] Calculated XtXi in 0.6234 seconds [INFOTIMER] Calculated XtXi_diag in 0.0002 seconds [INFOTIMER] Calculated XtXi_Xt in 0.0001 seconds [INFOTIMER] Calculated X constants in 0.6256 seconds [INFOTIMER] Looped over methylation loci in 0.9293 seconds [INFOTIMER] Calculated regression_full in 1.555 seconds [INFO] Generating dataframe from results... [INFOTIMER] Finished creating indices in 0.0112 seconds [INFOTIMER] Finished creating preliminary dataframe in 30.7853 seconds [INFOTIMER] Created output dataframe in 30.7965 total seconds const_p mt_p age_p sex_p gt_site mt_site ILMN_001 cg001 0.004421 0.333175 0.280666 0.342481 ILMN_004 cg001 0.002836 0.390539 0.273210 0.342764 ILMN_005 cg001 0.013886 0.312036 0.256995 0.323502 ILMN_008 cg001 0.004311 0.367510 0.273985 0.341991 ILMN_011 cg001 0.004960 0.392216 0.272195 0.338011 ... ... ... ... ... ILMN_991 cg1000 0.000358 0.397814 0.296329 0.028715 ILMN_995 cg1000 0.000356 0.362612 0.296870 0.027926 ILMN_996 cg1000 0.001099 0.397293 0.295435 0.028507 ILMN_998 cg1000 0.001028 0.391571 0.296948 0.028234 ILMN_1000 cg1000 0.000242 0.377790 0.286980 0.029150 [544752 rows x 4 columns] Wrote profile results to regression_full.py.lprof Timer unit: 1e-06 s Total time: 33.2069 s File: tecpg/regression_full.py Function: regression_full at line 15 Line # Hits Time Per Hit % Time Line Contents ============================================================== 15 @profile 16 def regression_full( 17 M: pandas.DataFrame, 18 G: pandas.DataFrame, 19 C: pandas.DataFrame, 20 loci_per_chunk: Optional[int] = None, 21 p_thresh: Optional[float] = None, 22 output_dir: Optional[str] = None, 23 *, 24 logger: Logger = Logger(), 25 ) -> Optional[pandas.DataFrame]: 26 1 1.0 1.0 0.0 if (output_dir is None) != (loci_per_chunk is None): 27 error = 'Output dir and chunk size must be defined together.' 28 logger.error(error) 29 raise ValueError(error) 30 31 1 28.4 28.4 0.0 logger.info('Initializing regression variables') 32 1 10404.6 10404.6 0.0 device = get_device(**logger) 33 1 1.3 1.3 0.0 dtype = torch.float32 34 1 13.8 13.8 0.0 nrows, ncols = C.shape[0], C.shape[1] + 1 35 1 4.6 4.6 0.0 gt_count, mt_count = len(G), len(M) 36 1 11.9 11.9 0.0 mt_site_names = numpy.array(M.index.values) 37 1 5.1 5.1 0.0 gt_site_names = numpy.array(G.index.values) 38 1 0.6 0.6 0.0 df = nrows - ncols - 1 39 1 11.2 11.2 0.0 logger.info('Running with {0} degrees of freedom', df) 40 1 853867.3 853867.3 2.6 dft_sqrt = torch.tensor(df, device=device, dtype=dtype).sqrt() 41 1 343.3 343.3 0.0 log_prob = torch.distributions.studentT.StudentT(df).log_prob 42 1 36.9 36.9 0.0 M_np = M.to_numpy() 43 1 0.6 0.6 0.0 index_names = ['gt_site', 'mt_site'] 44 1 13.4 13.4 0.0 columns = ['const_p', 'mt_p'] + [val + '_p' for val in C.columns] 45 1 0.3 0.3 0.0 last_index = 0 46 1 0.2 0.2 0.0 results = [] 47 1 0.3 0.3 0.0 if loci_per_chunk: 48 chunk_count = math.ceil(len(M) / loci_per_chunk) 49 logger.info('Initializing output directory') 50 initialize_dir(output_dir, **logger) 51 1 0.7 0.7 0.0 if p_thresh is not None: 52 1 0.2 0.2 0.0 output_sizes = [] 53 1 0.2 0.2 0.0 indices_list = [] 54 55 1 27.7 27.7 0.0 logger.start_timer('info', 'Running regression_full...') 56 1 124.5 124.5 0.0 Ct: torch.Tensor = torch.tensor( 57 1 190.5 190.5 0.0 C.to_numpy(), device=device, dtype=dtype 58 1 60.5 60.5 0.0 ).repeat(gt_count, 1, 1) 59 1 34.3 34.3 0.0 logger.time('Converted C to tensor in {l} seconds') 60 1 1033.7 1033.7 0.0 Gt: torch.Tensor = torch.tensor( 61 1 18.4 18.4 0.0 G.to_numpy(), device=device, dtype=dtype 62 1 29.2 29.2 0.0 ).unsqueeze(2) 63 1 38.1 38.1 0.0 logger.time('Converted G to tensor in {l} seconds') 64 1 67.3 67.3 0.0 ones = torch.ones((gt_count, nrows, 1), device=device, dtype=dtype) 65 1 21.9 21.9 0.0 logger.time('Created ones in {l} seconds') 66 1 228.2 228.2 0.0 X: torch.Tensor = torch.cat((ones, Gt, Ct), 2) 67 1 34.7 34.7 0.0 logger.time('Created X in {l} seconds') 68 1 23.5 23.5 0.0 Xt = X.mT 69 1 29.4 29.4 0.0 logger.time('Transposed X in {l} seconds') 70 1 623321.9 623321.9 1.9 XtXi = Xt.bmm(X).inverse() 71 1 64.6 64.6 0.0 logger.time('Calculated XtXi in {l} seconds') 72 1 96.6 96.6 0.0 XtXi_diag_sqrt = torch.diagonal(XtXi, dim1=1, dim2=2).sqrt() 73 1 31.5 31.5 0.0 logger.time('Calculated XtXi_diag in {l} seconds') 74 1 74.3 74.3 0.0 XtXi_Xt = XtXi.bmm(Xt) 75 1 36.6 36.6 0.0 logger.time('Calculated XtXi_Xt in {l} seconds') 76 1 33.7 33.7 0.0 logger.time('Calculated X constants in {t} seconds') 77 1 65277.1 65277.1 0.2 with Pool() as pool: 78 1000 2619.7 2.6 0.0 for index, M_row in enumerate(M_np, 1): 79 1000 52116.5 52.1 0.2 Y = torch.tensor(M_row, device=device, dtype=dtype) 80 1000 42083.3 42.1 0.1 B = XtXi_Xt.matmul(Y) 81 1000 63120.0 63.1 0.2 E = (Y.unsqueeze(1) - X.bmm(B.unsqueeze(2))).squeeze(2) 82 1000 89935.9 89.9 0.3 scalars = (torch.sum(E * E, 1)).view((-1, 1)).sqrt() / dft_sqrt 83 1000 20774.0 20.8 0.1 S = XtXi_diag_sqrt * scalars 84 1000 17721.2 17.7 0.1 T = B / S 85 1000 347824.5 347.8 1.0 P = torch.exp(log_prob(T)) 86 1000 691.2 0.7 0.0 if p_thresh is None: 87 results.append(P) 88 else: 89 1000 35074.8 35.1 0.1 indices = P[:, 1] >= p_thresh 90 1000 80705.7 80.7 0.2 output_sizes.append(indices.count_nonzero().item()) 91 1000 668.1 0.7 0.0 indices_list.append(indices) 92 1000 100511.5 100.5 0.3 results.append(P[indices]) 93 1000 861.4 0.9 0.0 if loci_per_chunk and ( 94 index % loci_per_chunk == 0 or index == mt_count 95 ): 96 mt_site_name_chunk = mt_site_names[last_index:index] 97 last_index = index 98 if p_thresh is None: 99 mt_sites = mt_site_name_chunk.repeat(gt_count) 100 gt_sites = numpy.tile(gt_site_names, len(results)) 101 else: 102 mt_sites = mt_site_name_chunk.repeat(output_sizes) 103 mask = torch.cat(indices_list).cpu().numpy() 104 gt_sites = numpy.tile(gt_site_names, len(results))[mask] 105 index_chunk = [gt_sites, mt_sites] 106 107 file_name = str(logger.current_count + 1) + '.csv' 108 file_path = os.path.join(output_dir, file_name) 109 out = pandas.DataFrame( 110 torch.cat(results), 111 index=index_chunk, 112 columns=columns, 113 ).astype(float) 114 out.index.set_names(index_names, inplace=True) 115 logger.count( 116 'Saving part {i}/{0}:', 117 chunk_count, 118 ) 119 pool.apply_async( 120 save_dataframe_part, 121 (out, file_path, logger.current_count), 122 dict(logger), 123 ) 124 results.clear() 125 output_sizes.clear() 126 indices_list.clear() 127 128 1 37.4 37.4 0.0 logger.time('Looped over methylation loci in {l} seconds') 129 1 17.2 17.2 0.0 logger.time('Calculated regression_full in {t} seconds') 130 131 1 0.3 0.3 0.0 if loci_per_chunk: 132 logger.time('Waiting for chunks to save...') 133 pool.close() 134 pool.join() 135 logger.time('Finished waiting for chunks to save in {l} seconds') 136 return 137 138 1 12.5 12.5 0.0 logger.start_timer('info', 'Generating dataframe from results...') 139 1 0.4 0.4 0.0 if p_thresh is None: 140 mt_sites = mt_site_names.repeat(gt_count) 141 gt_sites = numpy.tile(gt_site_names, len(results)) 142 else: 143 1 2210.1 2210.1 0.0 mt_sites = mt_site_names.repeat(output_sizes) 144 1 975.5 975.5 0.0 mask = torch.cat(indices_list).cpu().numpy() 145 1 7952.2 7952.2 0.0 gt_sites = numpy.tile(gt_site_names, len(results))[mask] 146 1 1.6 1.6 0.0 index_chunk = [gt_sites, mt_sites] 147 1 55.1 55.1 0.0 logger.time('Finished creating indices in {l} seconds') 148 1 8723994.5 8723994.5 26.3 out = pandas.DataFrame( 149 1 419.4 419.4 0.0 torch.cat(results), 150 1 0.4 0.4 0.0 index=index_chunk, 151 1 0.2 0.2 0.0 columns=columns, 152 1 22060788.6 22060788.6 66.4 ).astype(float) 153 1 64.6 64.6 0.0 out.index.set_names(index_names, inplace=True) 154 1 43.5 43.5 0.0 logger.time('Finished creating preliminary dataframe in {l} seconds') 155 1 16.6 16.6 0.0 logger.time('Created output dataframe in {t} total seconds') 156 1 0.3 0.3 0.0 return out ```

out = pandas.DataFrame(
    torch.cat(results),
    index=index_chunk,
    columns=columns,
)
Tensor to dataframe: 10.8959 s ``` [INFO] Initializing regression variables [INFO] Use CPU not supplied. Checking if CUDA is available. [INFO] Using CUDA [INFO] Running with 96 degrees of freedom [INFO] Running regression_full... [INFOTIMER] Converted C to tensor in 0.0004 seconds [INFOTIMER] Converted G to tensor in 0.0012 seconds [INFOTIMER] Created ones in 0.0001 seconds [INFOTIMER] Created X in 0.0004 seconds [INFOTIMER] Transposed X in 0.0001 seconds [INFOTIMER] Calculated XtXi in 0.6199 seconds [INFOTIMER] Calculated XtXi_diag in 0.0001 seconds [INFOTIMER] Calculated XtXi_Xt in 0.0001 seconds [INFOTIMER] Calculated X constants in 0.6223 seconds [INFOTIMER] Looped over methylation loci in 0.8445 seconds [INFOTIMER] Calculated regression_full in 1.4669 seconds [INFO] Generating dataframe from results... [INFOTIMER] Finished creating indices in 0.0107 seconds [INFOTIMER] Finished creating preliminary dataframe in 8.5413 seconds [INFOTIMER] Created output dataframe in 8.552 total seconds const_p ... sex_p gt_site mt_site ... ILMN_003 cg001 tensor(1.1658e-05, device='cuda:0') ... tensor(0.3957, device='cuda:0') ILMN_006 cg001 tensor(3.2687e-05, device='cuda:0') ... tensor(0.3969, device='cuda:0') ILMN_007 cg001 tensor(1.5297e-05, device='cuda:0') ... tensor(0.3962, device='cuda:0') ILMN_010 cg001 tensor(7.3037e-06, device='cuda:0') ... tensor(0.3932, device='cuda:0') ILMN_011 cg001 tensor(2.1221e-05, device='cuda:0') ... tensor(0.3954, device='cuda:0') ... ... ... ... ILMN_993 cg1000 tensor(0.0001, device='cuda:0') ... tensor(0.3946, device='cuda:0') ILMN_994 cg1000 tensor(7.5990e-05, device='cuda:0') ... tensor(0.3946, device='cuda:0') ILMN_995 cg1000 tensor(8.7772e-05, device='cuda:0') ... tensor(0.3961, device='cuda:0') ILMN_997 cg1000 tensor(5.5967e-05, device='cuda:0') ... tensor(0.3958, device='cuda:0') ILMN_999 cg1000 tensor(0.0003, device='cuda:0') ... tensor(0.3954, device='cuda:0') [544500 rows x 4 columns] Wrote profile results to regression_full.py.lprof Timer unit: 1e-06 s Total time: 10.8959 s File: tecpg/regression_full.py Function: regression_full at line 15 Line # Hits Time Per Hit % Time Line Contents ============================================================== 15 @profile 16 def regression_full( 17 M: pandas.DataFrame, 18 G: pandas.DataFrame, 19 C: pandas.DataFrame, 20 loci_per_chunk: Optional[int] = None, 21 p_thresh: Optional[float] = None, 22 output_dir: Optional[str] = None, 23 *, 24 logger: Logger = Logger(), 25 ) -> Optional[pandas.DataFrame]: 26 1 0.9 0.9 0.0 if (output_dir is None) != (loci_per_chunk is None): 27 error = 'Output dir and chunk size must be defined together.' 28 logger.error(error) 29 raise ValueError(error) 30 31 1 28.1 28.1 0.0 logger.info('Initializing regression variables') 32 1 10681.8 10681.8 0.1 device = get_device(**logger) 33 1 1.3 1.3 0.0 dtype = torch.float32 34 1 13.8 13.8 0.0 nrows, ncols = C.shape[0], C.shape[1] + 1 35 1 4.7 4.7 0.0 gt_count, mt_count = len(G), len(M) 36 1 13.7 13.7 0.0 mt_site_names = numpy.array(M.index.values) 37 1 6.0 6.0 0.0 gt_site_names = numpy.array(G.index.values) 38 1 1.0 1.0 0.0 df = nrows - ncols - 1 39 1 12.5 12.5 0.0 logger.info('Running with {0} degrees of freedom', df) 40 1 873219.6 873219.6 8.0 dft_sqrt = torch.tensor(df, device=device, dtype=dtype).sqrt() 41 1 362.0 362.0 0.0 log_prob = torch.distributions.studentT.StudentT(df).log_prob 42 1 37.3 37.3 0.0 M_np = M.to_numpy() 43 1 0.5 0.5 0.0 index_names = ['gt_site', 'mt_site'] 44 1 12.6 12.6 0.0 columns = ['const_p', 'mt_p'] + [val + '_p' for val in C.columns] 45 1 0.3 0.3 0.0 last_index = 0 46 1 0.3 0.3 0.0 results = [] 47 1 0.3 0.3 0.0 if loci_per_chunk: 48 chunk_count = math.ceil(len(M) / loci_per_chunk) 49 logger.info('Initializing output directory') 50 initialize_dir(output_dir, **logger) 51 1 0.3 0.3 0.0 if p_thresh is not None: 52 1 0.5 0.5 0.0 output_sizes = [] 53 1 0.2 0.2 0.0 indices_list = [] 54 55 1 26.1 26.1 0.0 logger.start_timer('info', 'Running regression_full...') 56 1 122.4 122.4 0.0 Ct: torch.Tensor = torch.tensor( 57 1 183.1 183.1 0.0 C.to_numpy(), device=device, dtype=dtype 58 1 60.3 60.3 0.0 ).repeat(gt_count, 1, 1) 59 1 29.9 29.9 0.0 logger.time('Converted C to tensor in {l} seconds') 60 1 1123.9 1123.9 0.0 Gt: torch.Tensor = torch.tensor( 61 1 18.3 18.3 0.0 G.to_numpy(), device=device, dtype=dtype 62 1 33.8 33.8 0.0 ).unsqueeze(2) 63 1 49.0 49.0 0.0 logger.time('Converted G to tensor in {l} seconds') 64 1 91.1 91.1 0.0 ones = torch.ones((gt_count, nrows, 1), device=device, dtype=dtype) 65 1 36.3 36.3 0.0 logger.time('Created ones in {l} seconds') 66 1 325.1 325.1 0.0 X: torch.Tensor = torch.cat((ones, Gt, Ct), 2) 67 1 36.7 36.7 0.0 logger.time('Created X in {l} seconds') 68 1 27.9 27.9 0.0 Xt = X.mT 69 1 29.1 29.1 0.0 logger.time('Transposed X in {l} seconds') 70 1 619890.8 619890.8 5.7 XtXi = Xt.bmm(X).inverse() 71 1 51.7 51.7 0.0 logger.time('Calculated XtXi in {l} seconds') 72 1 71.6 71.6 0.0 XtXi_diag_sqrt = torch.diagonal(XtXi, dim1=1, dim2=2).sqrt() 73 1 24.0 24.0 0.0 logger.time('Calculated XtXi_diag in {l} seconds') 74 1 48.3 48.3 0.0 XtXi_Xt = XtXi.bmm(Xt) 75 1 20.1 20.1 0.0 logger.time('Calculated XtXi_Xt in {l} seconds') 76 1 14.7 14.7 0.0 logger.time('Calculated X constants in {t} seconds') 77 1 54596.2 54596.2 0.5 with Pool() as pool: 78 1000 1549.5 1.5 0.0 for index, M_row in enumerate(M_np, 1): 79 1000 46447.5 46.4 0.4 Y = torch.tensor(M_row, device=device, dtype=dtype) 80 1000 36865.2 36.9 0.3 B = XtXi_Xt.matmul(Y) 81 1000 57342.8 57.3 0.5 E = (Y.unsqueeze(1) - X.bmm(B.unsqueeze(2))).squeeze(2) 82 1000 82693.8 82.7 0.8 scalars = (torch.sum(E * E, 1)).view((-1, 1)).sqrt() / dft_sqrt 83 1000 19605.7 19.6 0.2 S = XtXi_diag_sqrt * scalars 84 1000 17636.0 17.6 0.2 T = B / S 85 1000 317211.1 317.2 2.9 P = torch.exp(log_prob(T)) 86 1000 540.1 0.5 0.0 if p_thresh is None: 87 results.append(P) 88 else: 89 1000 32748.0 32.7 0.3 indices = P[:, 1] >= p_thresh 90 1000 74561.7 74.6 0.7 output_sizes.append(indices.count_nonzero().item()) 91 1000 577.7 0.6 0.0 indices_list.append(indices) 92 1000 94037.5 94.0 0.9 results.append(P[indices]) 93 1000 757.0 0.8 0.0 if loci_per_chunk and ( 94 index % loci_per_chunk == 0 or index == mt_count 95 ): 96 mt_site_name_chunk = mt_site_names[last_index:index] 97 last_index = index 98 if p_thresh is None: 99 mt_sites = mt_site_name_chunk.repeat(gt_count) 100 gt_sites = numpy.tile(gt_site_names, len(results)) 101 else: 102 mt_sites = mt_site_name_chunk.repeat(output_sizes) 103 mask = torch.cat(indices_list).cpu().numpy() 104 gt_sites = numpy.tile(gt_site_names, len(results))[mask] 105 index_chunk = [gt_sites, mt_sites] 106 107 file_name = str(logger.current_count + 1) + '.csv' 108 file_path = os.path.join(output_dir, file_name) 109 out = pandas.DataFrame( 110 torch.cat(results), 111 index=index_chunk, 112 columns=columns, 113 ).astype(float) 114 out.index.set_names(index_names, inplace=True) 115 logger.count( 116 'Saving part {i}/{0}:', 117 chunk_count, 118 ) 119 pool.apply_async( 120 save_dataframe_part, 121 (out, file_path, logger.current_count), 122 dict(logger), 123 ) 124 results.clear() 125 output_sizes.clear() 126 indices_list.clear() 127 128 1 42.9 42.9 0.0 logger.time('Looped over methylation loci in {l} seconds') 129 1 18.0 18.0 0.0 logger.time('Calculated regression_full in {t} seconds') 130 131 1 0.4 0.4 0.0 if loci_per_chunk: 132 logger.time('Waiting for chunks to save...') 133 pool.close() 134 pool.join() 135 logger.time('Finished waiting for chunks to save in {l} seconds') 136 return 137 138 1 12.5 12.5 0.0 logger.start_timer('info', 'Generating dataframe from results...') 139 1 0.3 0.3 0.0 if p_thresh is None: 140 mt_sites = mt_site_names.repeat(gt_count) 141 gt_sites = numpy.tile(gt_site_names, len(results)) 142 else: 143 1 2061.9 2061.9 0.0 mt_sites = mt_site_names.repeat(output_sizes) 144 1 942.2 942.2 0.0 mask = torch.cat(indices_list).cpu().numpy() 145 1 7624.8 7624.8 0.1 gt_sites = numpy.tile(gt_site_names, len(results))[mask] 146 1 0.9 0.9 0.0 index_chunk = [gt_sites, mt_sites] 147 1 46.9 46.9 0.0 logger.time('Finished creating indices in {l} seconds') 148 1 8540527.2 8540527.2 78.4 out = pandas.DataFrame( 149 1 636.2 636.2 0.0 torch.cat(results), 150 1 0.4 0.4 0.0 index=index_chunk, 151 1 0.4 0.4 0.0 columns=columns, 152 ) 153 1 66.2 66.2 0.0 out.index.set_names(index_names, inplace=True) 154 1 40.5 40.5 0.0 logger.time('Finished creating preliminary dataframe in {l} seconds') 155 1 16.7 16.7 0.0 logger.time('Created output dataframe in {t} total seconds') 156 1 0.2 0.2 0.0 return out ```

out = pandas.DataFrame(
    torch.cat(results).cpu().numpy(),
    index=index_chunk,
    columns=columns,
)
Tensor to CPU to numpy to dataframe: 2.38555 s ``` [INFO] Initializing regression variables [INFO] Use CPU not supplied. Checking if CUDA is available. [INFO] Using CUDA [INFO] Running with 96 degrees of freedom [INFO] Running regression_full... [INFOTIMER] Converted C to tensor in 0.0004 seconds [INFOTIMER] Converted G to tensor in 0.0011 seconds [INFOTIMER] Created ones in 0.0001 seconds [INFOTIMER] Created X in 0.0003 seconds [INFOTIMER] Transposed X in 0.0001 seconds [INFOTIMER] Calculated XtXi in 0.6013 seconds [INFOTIMER] Calculated XtXi_diag in 0.0001 seconds [INFOTIMER] Calculated XtXi_Xt in 0.0001 seconds [INFOTIMER] Calculated X constants in 0.6035 seconds [INFOTIMER] Looped over methylation loci in 0.8541 seconds [INFOTIMER] Calculated regression_full in 1.4577 seconds [INFO] Generating dataframe from results... [INFOTIMER] Finished creating indices in 0.0117 seconds [INFOTIMER] Finished creating preliminary dataframe in 0.047 seconds [INFOTIMER] Created output dataframe in 0.0587 total seconds const_p mt_p age_p sex_p gt_site mt_site ILMN_001 cg001 2.451083e-04 0.387071 0.355624 0.140495 ILMN_002 cg001 2.829592e-04 0.385012 0.344798 0.130455 ILMN_003 cg001 6.615710e-04 0.329287 0.359290 0.114346 ILMN_005 cg001 2.676225e-04 0.390684 0.356240 0.131217 ILMN_007 cg001 2.470858e-04 0.393242 0.355970 0.133631 ... ... ... ... ... ILMN_992 cg1000 2.406522e-09 0.332719 0.020688 0.148668 ILMN_993 cg1000 1.178166e-08 0.369408 0.024876 0.133541 ILMN_995 cg1000 5.184020e-10 0.364143 0.026288 0.155803 ILMN_997 cg1000 2.185808e-07 0.345224 0.033190 0.148746 ILMN_1000 cg1000 2.835331e-10 0.395862 0.027974 0.136625 [545691 rows x 4 columns] Wrote profile results to regression_full.py.lprof Timer unit: 1e-06 s Total time: 2.38555 s File: tecpg/regression_full.py Function: regression_full at line 15 Line # Hits Time Per Hit % Time Line Contents ============================================================== 15 @profile 16 def regression_full( 17 M: pandas.DataFrame, 18 G: pandas.DataFrame, 19 C: pandas.DataFrame, 20 loci_per_chunk: Optional[int] = None, 21 p_thresh: Optional[float] = None, 22 output_dir: Optional[str] = None, 23 *, 24 logger: Logger = Logger(), 25 ) -> Optional[pandas.DataFrame]: 26 1 0.9 0.9 0.0 if (output_dir is None) != (loci_per_chunk is None): 27 error = 'Output dir and chunk size must be defined together.' 28 logger.error(error) 29 raise ValueError(error) 30 31 1 28.1 28.1 0.0 logger.info('Initializing regression variables') 32 1 10121.4 10121.4 0.4 device = get_device(**logger) 33 1 1.6 1.6 0.0 dtype = torch.float32 34 1 13.6 13.6 0.0 nrows, ncols = C.shape[0], C.shape[1] + 1 35 1 4.8 4.8 0.0 gt_count, mt_count = len(G), len(M) 36 1 11.5 11.5 0.0 mt_site_names = numpy.array(M.index.values) 37 1 5.2 5.2 0.0 gt_site_names = numpy.array(G.index.values) 38 1 0.6 0.6 0.0 df = nrows - ncols - 1 39 1 11.0 11.0 0.0 logger.info('Running with {0} degrees of freedom', df) 40 1 866319.7 866319.7 36.3 dft_sqrt = torch.tensor(df, device=device, dtype=dtype).sqrt() 41 1 333.4 333.4 0.0 log_prob = torch.distributions.studentT.StudentT(df).log_prob 42 1 36.8 36.8 0.0 M_np = M.to_numpy() 43 1 0.5 0.5 0.0 index_names = ['gt_site', 'mt_site'] 44 1 12.9 12.9 0.0 columns = ['const_p', 'mt_p'] + [val + '_p' for val in C.columns] 45 1 0.3 0.3 0.0 last_index = 0 46 1 0.2 0.2 0.0 results = [] 47 1 0.3 0.3 0.0 if loci_per_chunk: 48 chunk_count = math.ceil(len(M) / loci_per_chunk) 49 logger.info('Initializing output directory') 50 initialize_dir(output_dir, **logger) 51 1 0.4 0.4 0.0 if p_thresh is not None: 52 1 0.3 0.3 0.0 output_sizes = [] 53 1 0.2 0.2 0.0 indices_list = [] 54 55 1 26.9 26.9 0.0 logger.start_timer('info', 'Running regression_full...') 56 1 122.3 122.3 0.0 Ct: torch.Tensor = torch.tensor( 57 1 185.2 185.2 0.0 C.to_numpy(), device=device, dtype=dtype 58 1 59.4 59.4 0.0 ).repeat(gt_count, 1, 1) 59 1 29.9 29.9 0.0 logger.time('Converted C to tensor in {l} seconds') 60 1 1036.6 1036.6 0.0 Gt: torch.Tensor = torch.tensor( 61 1 17.5 17.5 0.0 G.to_numpy(), device=device, dtype=dtype 62 1 34.3 34.3 0.0 ).unsqueeze(2) 63 1 47.8 47.8 0.0 logger.time('Converted G to tensor in {l} seconds') 64 1 91.3 91.3 0.0 ones = torch.ones((gt_count, nrows, 1), device=device, dtype=dtype) 65 1 35.2 35.2 0.0 logger.time('Created ones in {l} seconds') 66 1 288.3 288.3 0.0 X: torch.Tensor = torch.cat((ones, Gt, Ct), 2) 67 1 35.9 35.9 0.0 logger.time('Created X in {l} seconds') 68 1 24.4 24.4 0.0 Xt = X.mT 69 1 29.7 29.7 0.0 logger.time('Transposed X in {l} seconds') 70 1 601244.9 601244.9 25.2 XtXi = Xt.bmm(X).inverse() 71 1 50.3 50.3 0.0 logger.time('Calculated XtXi in {l} seconds') 72 1 73.9 73.9 0.0 XtXi_diag_sqrt = torch.diagonal(XtXi, dim1=1, dim2=2).sqrt() 73 1 24.1 24.1 0.0 logger.time('Calculated XtXi_diag in {l} seconds') 74 1 44.4 44.4 0.0 XtXi_Xt = XtXi.bmm(Xt) 75 1 19.6 19.6 0.0 logger.time('Calculated XtXi_Xt in {l} seconds') 76 1 15.1 15.1 0.0 logger.time('Calculated X constants in {t} seconds') 77 1 52774.8 52774.8 2.2 with Pool() as pool: 78 1000 1841.1 1.8 0.1 for index, M_row in enumerate(M_np, 1): 79 1000 46865.6 46.9 2.0 Y = torch.tensor(M_row, device=device, dtype=dtype) 80 1000 38396.1 38.4 1.6 B = XtXi_Xt.matmul(Y) 81 1000 59218.5 59.2 2.5 E = (Y.unsqueeze(1) - X.bmm(B.unsqueeze(2))).squeeze(2) 82 1000 84889.6 84.9 3.6 scalars = (torch.sum(E * E, 1)).view((-1, 1)).sqrt() / dft_sqrt 83 1000 19503.3 19.5 0.8 S = XtXi_diag_sqrt * scalars 84 1000 17467.6 17.5 0.7 T = B / S 85 1000 320595.7 320.6 13.4 P = torch.exp(log_prob(T)) 86 1000 594.8 0.6 0.0 if p_thresh is None: 87 results.append(P) 88 else: 89 1000 33048.7 33.0 1.4 indices = P[:, 1] >= p_thresh 90 1000 74651.6 74.7 3.1 output_sizes.append(indices.count_nonzero().item()) 91 1000 745.4 0.7 0.0 indices_list.append(indices) 92 1000 94872.6 94.9 4.0 results.append(P[indices]) 93 1000 849.9 0.8 0.0 if loci_per_chunk and ( 94 index % loci_per_chunk == 0 or index == mt_count 95 ): 96 mt_site_name_chunk = mt_site_names[last_index:index] 97 last_index = index 98 if p_thresh is None: 99 mt_sites = mt_site_name_chunk.repeat(gt_count) 100 gt_sites = numpy.tile(gt_site_names, len(results)) 101 else: 102 mt_sites = mt_site_name_chunk.repeat(output_sizes) 103 mask = torch.cat(indices_list).cpu().numpy() 104 gt_sites = numpy.tile(gt_site_names, len(results))[mask] 105 index_chunk = [gt_sites, mt_sites] 106 107 file_name = str(logger.current_count + 1) + '.csv' 108 file_path = os.path.join(output_dir, file_name) 109 out = pandas.DataFrame( 110 torch.cat(results), 111 index=index_chunk, 112 columns=columns, 113 ).astype(float) 114 out.index.set_names(index_names, inplace=True) 115 logger.count( 116 'Saving part {i}/{0}:', 117 chunk_count, 118 ) 119 pool.apply_async( 120 save_dataframe_part, 121 (out, file_path, logger.current_count), 122 dict(logger), 123 ) 124 results.clear() 125 output_sizes.clear() 126 indices_list.clear() 127 128 1 39.1 39.1 0.0 logger.time('Looped over methylation loci in {l} seconds') 129 1 17.3 17.3 0.0 logger.time('Calculated regression_full in {t} seconds') 130 131 1 0.3 0.3 0.0 if loci_per_chunk: 132 logger.time('Waiting for chunks to save...') 133 pool.close() 134 pool.join() 135 logger.time('Finished waiting for chunks to save in {l} seconds') 136 return 137 138 1 12.5 12.5 0.0 logger.start_timer('info', 'Generating dataframe from results...') 139 1 0.4 0.4 0.0 if p_thresh is None: 140 mt_sites = mt_site_names.repeat(gt_count) 141 gt_sites = numpy.tile(gt_site_names, len(results)) 142 else: 143 1 2074.7 2074.7 0.1 mt_sites = mt_site_names.repeat(output_sizes) 144 1 888.6 888.6 0.0 mask = torch.cat(indices_list).cpu().numpy() 145 1 8721.5 8721.5 0.4 gt_sites = numpy.tile(gt_site_names, len(results))[mask] 146 1 1.9 1.9 0.0 index_chunk = [gt_sites, mt_sites] 147 1 59.7 59.7 0.0 logger.time('Finished creating indices in {l} seconds') 148 1 44480.0 44480.0 1.9 out = pandas.DataFrame( 149 1 2407.2 2407.2 0.1 torch.cat(results).cpu().numpy(), 150 1 0.8 0.8 0.0 index=index_chunk, 151 1 0.6 0.6 0.0 columns=columns, 152 ) 153 1 38.8 38.8 0.0 out.index.set_names(index_names, inplace=True) 154 1 34.5 34.5 0.0 logger.time('Finished creating preliminary dataframe in {l} seconds') 155 1 16.2 16.2 0.0 logger.time('Created output dataframe in {t} total seconds') 156 1 0.2 0.2 0.0 return out ```
liamgd commented 1 year ago

The fastest method:

out = pandas.DataFrame(
    torch.cat(results).cpu().numpy(),
    index=index_chunk,
    columns=columns,
)

Was implemented in 97c456d.