Open Bell-Fintech opened 1 year ago
import os from gemini import GeminiSession, GeminiApplication
CONSOLE_ADDRESS = 'dist://192.168.202.24:32391' session = GeminiSession(CONSOLE_ADDRESS, name="mpc_xgb") app = GeminiApplication(session)
workspace_base = {"Alice": "/opt/gemini", "Bob": "/opt/gemini"} parties = {"Alice": "ds01", "Bob": "ds02"}
dataConfig = { "inputs": { "Alice": "/dataurl/b61520346ed92f33a40dd14a005b8cd2.csv", # 包含 y 列(第0列) "Bob": "/dataurl/99f7f460127b94756991a6c5eff5bc5a.csv", } }
def get_path(ds): app_params = [ app.param("data_url", dataConfig["inputs"][ds]), app.param("workspace_base", workspace_base[ds]), ]
@app.task(task_type="plain", parties=[parties[ds]]) @app.runner(engine="python", params=app_params) def get_file(): import os import json import argparse import requests import numpy as np import pandas as pd from gemini_utils import GeminiUtils parser = argparse.ArgumentParser() parser.add_argument("--data_url", type=str) parser.add_argument("--workspace_base", type=str) args = parser.parse_args() def gen_info(setname, key, dtype, shape, workspace): info = { "setname": setname, "key": key, "type": dtype, "shape": f"{shape[0]},{shape[1]}", "description": "", "sample": "" } with open(f"{workspace}/{key}.info", 'w') as f_obj: json.dump(info, f_obj) app_id = GeminiUtils().get_app_id() workspace = os.path.join(args.workspace_base, app_id) if not os.path.exists(workspace): os.makedirs(workspace) data_csv = f"{workspace}/X_train.csv" if os.path.exists(data_csv): os.remove(data_csv) file_path = args.data_url os.system(f"ln -s {file_path} {data_csv}") data = pd.read_csv(file_path) gen_info(app_id, "X_train", "double", data.shape, workspace) sort_index = pd.DataFrame(np.argsort(data.values, axis=0, kind="stable")) sort_index.to_csv(f"{workspace}/X_index.csv", index=False) gen_info(app_id, "X_index", "int32", data.shape, workspace) return get_file
cipher_inputs = { "x_train1": f"cipher://{parties['Alice']}/" + "{APP_ID}" + "/X_train.csv;type=double", "x_train2": f"cipher://{parties['Bob']}/" + "{APP_ID}" + "/X_train.csv;type=double", "x_index1": f"cipher://{parties['Alice']}/" + "{APP_ID}" + "/X_index.csv;type=int32", "x_index2": f"cipher://{parties['Bob']}/" + "{APP_ID}" + "/X_index.csv;type=int32", } cipher_outputs = [ f"cipher://{parties['Alice']}/" + "{APP_ID}" + "/y_pred.csv", f"cipher://{parties['Alice']}/" + "{APP_ID}" + "/score.csv" ]
@app.task(inputs=cipher_inputs, outputs=cipher_outputs, task_type="cipher") @app.runner(engine="privpy", params=[ app.param("ds", parties['Alice']), app.param("pred_res", cipher_outputs[0].split('/')[-1]), app.param("score_res", cipher_outputs[1].split('/')[-1]) ]) def cipher_xgb(): import privpy as pp from gemini_utils import GeminiUtils
args = pp.FLAGS() args.DEFINE_string("ds", "", "") args.DEFINE_string("pred_res", "", "") args.DEFINE_string("score_res", "", "") GeminiUtils(args).get_app_id() @pp.es def main(): import pnumpy as pnp from pai.preprocessing import StandardScaler from pai.ensemble import XGBClassifier def score(y_true, y_score): import putil import numpy as np if isinstance(y_true, pp.FixedArr): y_true = pnp.round(y_true) y_pred = pnp.round(y_score) n = len(y_true) tp = pnp.dot(y_true, y_pred) tp_and_fp = pnp.sum(y_pred) tp_and_fn = pnp.sum(y_true) n_positive = tp_and_fn n_negative = n - n_positive reciprocal_arr = pnp.reciprocal( pp.iarr( [n_positive, n_negative, tp_and_fp, tp_and_fp + tp_and_fn])) acc = (pnp.dot(y_true, y_pred) + pnp.dot(1 - y_true, 1 - y_pred)) / n precision = tp * reciprocal_arr[2] recall = tp * reciprocal_arr[0] f1 = 2 * tp * reciprocal_arr[3] idx = putil.odd_even_merge_argsort(y_score)[::-1] y_score = pp.oblivious_shuffle(y_score, idx) y_true = pp.oblivious_shuffle(y_true, idx) distinct_value_indices = pnp.nonzero(pnp.diff(y_score))[0] threshold_idxs = np.append(distinct_value_indices, n - 1) tps = pnp.cumsum(y_true)[threshold_idxs] fps = 1 + threshold_idxs - tps tpr = tps * reciprocal_arr[0] fpr = fps * reciprocal_arr[1] ks = pnp.max(pnp.absolute(tpr - fpr)) fpr_x = pnp.diff(fpr) tpr_y = 0.5 * (tpr[:-1] + tpr[1:]) auc_score = pnp.dot(fpr_x, tpr_y) return pp.farr([acc, precision, recall, f1, auc_score, ks]) x_train1 = pp.ss("x_train1") x_train2 = pp.ss("x_train2") x_index1 = pp.ss("x_index1") x_index2 = pp.ss("x_index2") train = pnp.hstack([x_train1, x_train2]) x_train, y_train = train[:, 1:], train[:, 0:1] y_train = pnp.hstack([1-y_train, y_train]) # xgb要求 y 是one-hot格式 del x_train1, x_train2, train # # xgb fit的时候要求有x_train的索引,这一步比较耗时,如可以,在明文做,然后ss进来 # train_index = [] # for i in range(x_train.shape[1]): # train_index.append(pnp.argsort(x_train[:, i:i+1], axis=0)) # train_index = pnp.hstack(train_index) train_index = pnp.hstack([x_index1[:, 1:], x_index2]) # 设置模型参数 n_estimators = 3 # 树的数目 max_depth = 3 # 树的最大深度 objective = "log" # 损失函数(log/mse) # 模型训练 XGB_model = XGBClassifier(n_estimators=n_estimators, max_depth=max_depth, objective=objective) # 模型训练 XGB_model.fit(x_train, y_train, train_index) # 模型预测 y_pred = XGB_model.predict_proba(x_train) # 预测概率值(两列) # 模型评估 score_res = score(y_train[:, 1], y_pred[:, 1]) # 结果输出 pp.reveal(y_pred, f"cipher://{args.ds}/{args.app_id}/{args.pred_res}") pp.reveal(score_res, f"cipher://{args.ds}/{args.app_id}/{args.score_res}") pp.run()
@app.job() def job(): steps = [ app.mrstep(mappers=[get_path("Alice"), get_path("Bob")], reducers=[cipher_xgb]) ]
return steps
if name == 'main': app_id, status = app.run() if status == 1: raise Exception( f"The task execution of gemini app {app_id} fails! Please check the " "failure infomation in tm console or the gemini pod log.")
outputs = {"app_id": [app_id], "result_path": []} outputs["result_path"].append( os.path.join(workspace_base["Alice"], app_id, "y_pred.csv")) outputs["result_path"].append( os.path.join(workspace_base["Alice"], app_id, "score.csv")) print(outputs)
import os from gemini import GeminiSession, GeminiApplication
CONSOLE_ADDRESS = 'dist://192.168.202.24:32391' session = GeminiSession(CONSOLE_ADDRESS, name="mpc_xgb") app = GeminiApplication(session)
workspace_base = {"Alice": "/opt/gemini", "Bob": "/opt/gemini"} parties = {"Alice": "ds01", "Bob": "ds02"}
dataConfig = { "inputs": { "Alice": "/dataurl/b61520346ed92f33a40dd14a005b8cd2.csv", # 包含 y 列(第0列) "Bob": "/dataurl/99f7f460127b94756991a6c5eff5bc5a.csv", } }
def get_path(ds): app_params = [ app.param("data_url", dataConfig["inputs"][ds]), app.param("workspace_base", workspace_base[ds]), ]
cipher_inputs = { "x_train1": f"cipher://{parties['Alice']}/" + "{APP_ID}" + "/X_train.csv;type=double", "x_train2": f"cipher://{parties['Bob']}/" + "{APP_ID}" + "/X_train.csv;type=double", "x_index1": f"cipher://{parties['Alice']}/" + "{APP_ID}" + "/X_index.csv;type=int32", "x_index2": f"cipher://{parties['Bob']}/" + "{APP_ID}" + "/X_index.csv;type=int32", } cipher_outputs = [ f"cipher://{parties['Alice']}/" + "{APP_ID}" + "/y_pred.csv", f"cipher://{parties['Alice']}/" + "{APP_ID}" + "/score.csv" ]
@app.task(inputs=cipher_inputs, outputs=cipher_outputs, task_type="cipher") @app.runner(engine="privpy", params=[ app.param("ds", parties['Alice']), app.param("pred_res", cipher_outputs[0].split('/')[-1]), app.param("score_res", cipher_outputs[1].split('/')[-1]) ]) def cipher_xgb(): import privpy as pp from gemini_utils import GeminiUtils
@app.job() def job(): steps = [ app.mrstep(mappers=[get_path("Alice"), get_path("Bob")], reducers=[cipher_xgb]) ]
if name == 'main': app_id, status = app.run() if status == 1: raise Exception( f"The task execution of gemini app {app_id} fails! Please check the " "failure infomation in tm console or the gemini pod log.")