secretflow / psi

The repo of Private Set Intersection(PSI) and Private Information Retrieval(PIR) from SecretFlow.
https://www.secretflow.org.cn/docs/psi
Apache License 2.0
22 stars 16 forks source link

[Bug]: 生产模式下psi时卡住 #142

Closed XiaoLazi closed 3 weeks ago

XiaoLazi commented 4 weeks ago

Describe the bug

生产模式下psi时卡住 ,程序卡在这里,不继续运行也不报错: (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.581] [info] [thread_pool.cc:30] Create a fixed thread pool with size 7

Steps To Reproduce

生产模式下运行 psi参数 : reports = spu.psi_csv( key=select_keys, input_path=input_path, output_path=output_path, receiver='alice', protocol='KKRT_PSI_2PC', precheck_input= False, sort=False, broadcast_result=False, )

运行日志: [root@localhost psi]# python bob_testdata.py 2024-06-26 03:17:18,353 INFO worker.py:1540 -- Connecting to existing Ray cluster at address: 192.168.11.130:7751... 2024-06-26 03:17:18,367 INFO worker.py:1724 -- Connected to Ray cluster. 2024-06-26 03:17:18.396 INFO api.py:233 [bob] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '192.168.11.131:10420', 'bob': '0.0.0.0:10430'}, 'CURRENT_PARTY_NAME': 'bob', 'TLS_CONFIG': {}} 2024-06-26 03:17:19.196 INFO barriers.py:284 [bob] -- [Anonymous_job] Succeeded to create receiver proxy actor. (ReceiverProxyActor pid=780) 2024-06-26 03:17:19.186 INFO grpc_proxy.py:359 [bob] -- [Anonymous_job] ReceiverProxy binding port 10430, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))... (ReceiverProxyActor pid=780) 2024-06-26 03:17:19.194 INFO grpc_proxy.py:379 [bob] -- [Anonymous_job] Successfully start Grpc service without credentials. 2024-06-26 03:17:19.960 INFO barriers.py:333 [bob] -- [Anonymous_job] SenderProxyActor has successfully created. 2024-06-26 03:17:19.961 INFO barriers.py:520 [bob] -- [Anonymous_job] Try ping ['alice'] at 0 attemp, up to 3600 attemps. /root/psi/bob_testdata.py:68: UserWarning: pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy. data = pd.read_sql(sql,conn).sample(frac=sample_param) psi start_time: 1719371840.0284116 (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.552] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"KKRT_PSI_2PC","receiver_rank":1,"input_params":{"path":"./data/psi_input_bob_test.csv","select_fields":["id"]},"output_params":{"path":"./data/psi_output_test.csv"},"curve_type":"CURVE_25519","bucket_size":1048576} (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.553] [info] [bucket_psi.cc:400] bucket size set to 1048576 (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.560] [info] [bucket_psi.cc:252] Begin sanity check for input file: ./data/psi_input_bob_test.csv, precheck_switch:false (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.568] [info] [bucket_psi.cc:265] End sanity check for input file: ./data/psi_input_bob_test.csv, size=2 (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.570] [info] [bucket_psi.cc:425] Run psi protocol=2, self_items_count=2 (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.572] [info] [bucket_psi.cc:514] psi protocol=2, rank=0 item_size=4 (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.572] [info] [bucket_psi.cc:514] psi protocol=2, rank=1 item_size=2 (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.572] [info] [bucket_psi.cc:539] psi protocol=2, bucket_count=1 (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.575] [info] [arrow_csv_batch_provider.cc:75] Reach the end of csv file ./data/psi_input_bob_test.csv. (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.575] [info] [arrow_csv_batch_provider.cc:75] Reach the end of csv file ./data/psi_input_bob_test.csv. (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.575] [info] [bucket_psi.cc:551] run psi bucket_idx=0, bucket_item_size=2 (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.581] [info] [memory_psi.cc:68] psi protocol=2, rank=0, inputs_size=4 (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.581] [info] [memory_psi.cc:68] psi protocol=2, rank=1, inputs_size=2 (SPURuntime(device_id=None, party=bob) pid=876) [2024-06-26 03:17:24.581] [info] [thread_pool.cc:30] Create a fixed thread pool with size 7

Expected behavior

怀疑是数据问题,用了几条数据进行测试,还是卡在这里,想请教下是什么问题?

Version

Secretflow 1.6.1b0

Operating system

centos 7 x64

Hardware Resources

8C80G

lq0404510 commented 4 weeks ago

另外一方的日志也是卡在这个位置吗?

XiaoLazi commented 4 weeks ago

是的,也是这个位置,也都没有报错

---原始邮件--- 发件人: @.> 发送时间: 2024年6月26日(周三) 中午12:56 收件人: @.>; 抄送: @.**@.>; 主题: Re: [secretflow/psi] [Bug]: 生产模式下psi时卡住 (Issue #142)

另外一方的日志也是卡在这个位置吗?

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

6fj commented 4 weeks ago
  1. 麻烦贴一下 bob_testdata.py 的代码
  2. 8C80G 这台机器上是同时执行alice和bob两方的代码吗
XiaoLazi commented 3 weeks ago

@6fj bob_test_data.py

init log

logging.basicConfig(stream=sys.stdout, level=logging.INFO)

sf setting

cluster_config={ 'parties': { 'alice': { 'address': '192.168.11.131:10420', 'listen_addr': '0.0.0.0:10420' }, 'bob': { 'address': '192.168.11.130:10430', 'listen_addr': '0.0.0.0:10430' } }, 'self_party': 'bob' }

cluster_def={ 'nodes': [ {'party': 'alice', 'address': '192.168.11.131:10720','listen_addr': '0.0.0.0:10720'}, {'party': 'bob', 'address': '192.168.11.130:10730','listen_addr': '0.0.0.0:10730'} ], 'runtime_config': { 'protocol': spu.spu_pb2.SEMI2K, 'field': spu.spu_pb2.FM128, 'sigmoid_mode': spu.spu_pb2.RuntimeConfig.SIGMOID_REAL } }

def main(_):

sf init

sf.shutdown()
sf.init( address='192.168.11.130:7751', cluster_config=cluster_config,log_to_driver=True)
alice = sf.PYU('alice')
bob = sf.PYU('bob')

get data

connect=Connect('10.3.0.12',3306,'root','root','prac')
conn=get_conn(connect)
sql='select * from psi_bob_test'
inputpath='./data/psi_input_bob_test'
get_data(conn,sql,inputpath,0.8)

input_path = {
    alice: './data/psi_input_alice_test.csv',
    bob: './data/psi_input_bob_test.csv',
}
output_path = {
    alice: './data/psi_output_test.csv',
    bob: './data/psi_output_test.csv',
}
select_keys = {
    alice: ['id'],
    bob: ['id'],
}
spu = sf.SPU(cluster_def)

psi

start = time.time()
print(f"psi start_time: {start}")
reports = spu.psi_csv(
    key=select_keys,
    input_path=input_path,
    output_path=output_path,
    receiver='bob',
    protocol='KKRT_PSI_2PC',
    precheck_input=False,
    sort=False,
    broadcast_result=False,
)
print(f"psi reports: {reports}")
logging.info(f"cost time: {time.time() - start}")

sf.shutdown()

write mysql

path='./data/psi_output_test.csv'
table_name='psi_output_test'
write_data(connect,path,table_name)

alice_testdata.py

init log

logging.basicConfig(stream=sys.stdout, level=logging.INFO)

sf setting

cluster_config={ 'parties': { 'alice': { 'address': '192.168.11.131:10420', 'listen_addr': '0.0.0.0:10420' }, 'bob': { 'address': '192.168.11.130:10430', 'listen_addr': '0.0.0.0:10430' } }, 'self_party': 'alice' }

cluster_def={ 'nodes': [ {'party': 'alice', 'address': '192.168.11.131:10720','listen_addr': '0.0.0.0:10720'}, {'party': 'bob', 'address': '192.168.11.130:10730','listen_addr': '0.0.0.0:10730'} ], 'runtime_config': { 'protocol': spu.spu_pb2.SEMI2K, 'field': spu.spu_pb2.FM128, 'sigmoid_mode': spu.spu_pb2.RuntimeConfig.SIGMOIDREAL } } def main():

sf init

sf.shutdown()
sf.init( address='192.168.11.131:7751', cluster_config=cluster_config,log_to_driver=True)
alice = sf.PYU('alice')
bob = sf.PYU('bob')

get data

connect=Connect('10.3.0.12',3306,'root','root','prac')
conn=get_conn(connect)
sql='select * from psi_alice_test'
inputpath='./data/psi_input_alice_test'
get_data(conn,sql,inputpath,0.9)

input_path = {
    alice: './data/psi_input_alice_test.csv',
    bob: './data/psi_input_bob_test.csv',
}
output_path = {
    alice: './data/psi_output_test.csv',
    bob: './data/psi_output_test.csv',
}
select_keys = {
    alice: ['id'],
    bob: ['id'],
}
spu = sf.SPU(cluster_def)

psi

start = time.time()
print(f"psi start_time: {start}")
reports = spu.psi_csv(
    key=select_keys,
    input_path=input_path,
    output_path=output_path,
    receiver='alice',
    protocol='KKRT_PSI_2PC',
    precheck_input= False,
    sort=False,
    broadcast_result=False,
)
print(f"psi reports: {reports}")
logging.info(f"cost time: {time.time() - start}")

sf.shutdown()

write mysql

path='./data/psi_output_test.csv'
table_name='psi_output_test'
write_data(connect,path,table_name)

不是, Alice机器上80G bob机器上20G ,我等下给bob的机器扩下容 再试试。

aokaokd commented 3 weeks ago

你的secretflow版本是多少

6fj commented 3 weeks ago

两边数据有无重复值,建议先将两边 precheck_input 设为 True

XiaoLazi commented 3 weeks ago

@aokaokd Secretflow 1.6.1b0

XiaoLazi commented 3 weeks ago

@6fj 两边都是只有几条数据,无重复。 将两边precheck_input改为True后程序还是运行在这里卡住了

6fj commented 3 weeks ago

确实看不出问题,方便的话可以和我们联系debug @Chrisdehe

XiaoLazi commented 3 weeks ago

@6fj 好的,非常感谢,请问怎么和@Chrisdehe联系比较方便。

Chrisdehe commented 3 weeks ago

@XiaoLazi hey,辛苦添加 technical support WeChat: secretflow02

XiaoLazi commented 3 weeks ago

已发送添加邀请,谢谢!!

XiaoLazi commented 3 weeks ago

问题已得到解决,谢谢隐语团队的技术支持 问题:两台机器执行程序中的 psi_csv()函数的receiver参数不一致 receiver参数用法:哪一方要获取到数据 双方的receiver参数要保持一致; receiver参数和broadcast_result参数可以组合使用的,如果broadcast_result参数为True的话,双方都可以获取求交结果