secretflow / psi

The repo of Private Set Intersection(PSI) and Private Information Retrieval(PIR) from SecretFlow.
https://www.secretflow.org.cn/docs/psi
Apache License 2.0
26 stars 19 forks source link

关于运行基于cache的ECDH_OPRF_UB_PSI时遇到的问题 #134

Closed winnylyc closed 4 months ago

winnylyc commented 4 months ago

您好,打扰您了。 我在尝试实现基于cache的ECDH_OPRF_UB_PSI遇到了问题。 我的方式是使用ECDH_OPRF_UB_PSI_2PC_GEN_CACHE来作为offline阶段,然后使用ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE作为online阶段。目前在运行ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE时产生报错。下面时两方运行的代码以及产生的错误。 Sender(Server):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'bob'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

alice, bob = sf.PYU('alice'), sf.PYU('bob')
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:'/root/project/psi1/alice_exactpsi_1e6_unique.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv'}, 
    output_path={alice:'/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_GEN_CACHE', 
    preprocess_path='preprocess_cache',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete offline phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:'/root/project/psi1/alice_exactpsi_1e6_unique.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv'}, 
    output_path={alice:'/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE', 
    preprocess_path='preprocess_cache',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete online phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-23 15:49:58,963 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-23 15:49:59.719 INFO api.py:233 [alice] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '0.0.0.0:59179', 'bob': '127.0.0.1:53341'}, 'CURRENT_PARTY_NAME': 'alice', 'TLS_CONFIG': {}}
2024-05-23 15:50:00.424 INFO barriers.py:284 [alice] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=19280) 2024-05-23 15:50:00.419 INFO grpc_proxy.py:359 [alice] -- [Anonymous_job] ReceiverProxy binding port 59179, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=19280) 2024-05-23 15:50:00.422 INFO grpc_proxy.py:379 [alice] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-23 15:50:01.120 INFO barriers.py:333 [alice] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-23 15:50:01.121 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 0 attemp, up to 3600 attemps.
2024-05-23 15:50:04.124 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 1 attemp, up to 3600 attemps.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Complete offline phase
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
(SPURuntime(device_id=None, party=alice) pid=19752) [2024-05-23 15:50:08.337] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi1/alice_exactpsi_1e6_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576}
(SPURuntime(device_id=None, party=alice) pid=19752) [2024-05-23 15:50:08.337] [info] [bucket_psi.cc:400] bucket size set to 1048576
(SPURuntime(device_id=None, party=alice) pid=19752) [2024-05-23 15:50:08.337] [info] [bucket_psi.cc:425] Run psi protocol=8, self_items_count=0
(SPURuntime(device_id=None, party=alice) pid=19752) [2024-05-23 15:50:08.338] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi1/alice_exactpsi_1e6_unique.csv
(SPURuntime(device_id=None, party=alice) pid=19752) [2024-05-23 15:50:08.338] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv
(SPURuntime(device_id=None, party=alice) pid=19752) [2024-05-23 15:50:08.338] [info] [ecdh_oprf_selector.cc:76] use fourq
Traceback (most recent call last):
  File "/root/project/psi1/sf_connect_ub.py", line 60, in <module>
    spu.psi_csv(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1848, in psi_csv
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 321, in psi_csv
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(MemoryError): ray::SPURuntime.psi_csv() (pid=19752, ip=192.168.15.7, actor_id=8c7c48f35d3db7ae1ba590c601000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
2024-05-23 15:50:08.424 WARNING cleanup.py:154 [alice] -- [Anonymous_job] Failed to send ObjectRef(85748392bcd969ccc2dc0ecdcc67afbe6255b5ff0100000001000000) to bob, error: ray::SenderProxyActor.send() (pid=19322, ip=192.168.15.7, actor_id=c2dc0ecdcc67afbe6255b5ff01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fb88c2b5ed0>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=19752, ip=192.168.15.7, actor_id=8c7c48f35d3db7ae1ba590c601000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc,upstream_seq_id: 8#0, downstream_seq_id: 10.
2024-05-23 15:50:08.424 INFO cleanup.py:161 [alice] -- [Anonymous_job] Sending error std::bad_alloc to bob.
2024-05-23 15:50:08.425 WARNING cleanup.py:127 [alice] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-23 15:50:08.425 WARNING api.py:60 [alice] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-23 15:50:08.426 WARNING api.py:325 [alice] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-23 15:50:08.426 ERROR api.py:330 [alice] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=19322, ip=192.168.15.7, actor_id=c2dc0ecdcc67afbe6255b5ff01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fb88c2b5ed0>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=19752, ip=192.168.15.7, actor_id=8c7c48f35d3db7ae1ba590c601000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
2024-05-23 15:50:08.426 INFO api.py:337 [alice] -- [Anonymous_job] No wait for data sending.
2024-05-23 15:50:08.427 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[DataSendingQueueThread] to exit.
2024-05-23 15:50:08.427 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-23 15:50:08.427 INFO api.py:352 [alice] -- [Anonymous_job] Shutdowned rayfed.
2024-05-23 15:50:08.427 CRITICAL api.py:356 [alice] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1

Receiver(Client):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'bob'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

alice, bob = sf.PYU('alice'), sf.PYU('bob')
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:'/root/project/psi1/alice_exactpsi_1e6_unique.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv'}, 
    output_path={alice:'/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_GEN_CACHE', 
    preprocess_path='preprocess_cache',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete offline phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:'/root/project/psi1/alice_exactpsi_1e6_unique.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv'}, 
    output_path={alice:'/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv', bob:'/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE', 
    preprocess_path='preprocess_cache',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete online phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-23 15:50:01,526 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-23 15:50:02.104 INFO api.py:233 [bob] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '127.0.0.1:59179', 'bob': '0.0.0.0:53341'}, 'CURRENT_PARTY_NAME': 'bob', 'TLS_CONFIG': {}}
2024-05-23 15:50:02.743 INFO barriers.py:284 [bob] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=19639) 2024-05-23 15:50:02.740 INFO grpc_proxy.py:359 [bob] -- [Anonymous_job] ReceiverProxy binding port 53341, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=19639) 2024-05-23 15:50:02.742 INFO grpc_proxy.py:379 [bob] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-23 15:50:03.339 INFO barriers.py:333 [bob] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-23 15:50:03.339 INFO barriers.py:520 [bob] -- [Anonymous_job] Try ping ['alice'] at 0 attemp, up to 3600 attemps.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Complete offline phase
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.320] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_GEN_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576,"ecdh_secret_key_path":"/root/project/psi1/alice_oprf_key"}
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.320] [info] [bucket_psi.cc:425] Run psi protocol=7, self_items_count=0
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.321] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.321] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.321] [info] [ecdh_oprf_selector.cc:33] use fourq
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.321] [info] [batch_provider.cc:328] ReadAndShuffle start, idx:0, provider_batch_size:1048576
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.321] [info] [batch_provider.cc:350] ReadAndShuffle end, idx:0 , size:100
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.321] [info] [ecdh_oprf_psi.cc:108] omp_get_num_threads:1 cpus:8
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.325] [info] [batch_provider.cc:318] cursor_index_:0, bucket_index_:0, 0-0
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.325] [info] [batch_provider.cc:240] cursor_index_:0, bucket_index_:0, 0-0
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.327] [info] [ecdh_oprf_psi.cc:192] FullEvaluate finished, batch_count=1 items_count=100
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.331] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576,"preprocess_path":"preprocess_cache"}
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.331] [info] [bucket_psi.cc:400] bucket size set to 1048576
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.337] [info] [bucket_psi.cc:425] Run psi protocol=8, self_items_count=0
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.337] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.337] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv
(SPURuntime(device_id=None, party=bob) pid=19724) [2024-05-23 15:50:08.337] [info] [bucket_ub_psi.cc:186] Start Sync
2024-05-23 15:50:08.430 WARNING api.py:607 [bob] -- [Anonymous_job] Encounter RemoteError happend in other parties, error message: FedRemoteError occurred at alice
Traceback (most recent call last):
  File "/root/project/psi2/sf_connect_ub.py", line 60, in <module>
    spu.psi_csv(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1848, in psi_csv
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 321, in psi_csv
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 613, in get
    raise e
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=19639, ip=192.168.15.7, actor_id=d23873c80f4140328261fbb701000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7ff0b8729c90>)
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
fed.exceptions.FedRemoteError: FedRemoteError occurred at alice

感谢您的帮助!

6fj commented 4 months ago

hi @winnylyc

我看到了 'MemoryError: std::bad_alloc'的报错,请问你现在的配置是否满足单边至少8c16g的最低要求?

winnylyc commented 4 months ago

您好,非常感谢您的回答。 上面的问题产生的环境确实不满足这个最低要求,我是两方都在同一个8c32g的WSL环境中运行。 但是我尝试了换成两方都在同一个16c32g的WSL环境中运行,依然会报和上面一样的问题 Sender:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-23 16:22:09,571 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-23 16:22:10.316 INFO api.py:233 [alice] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '0.0.0.0:59179', 'bob': '127.0.0.1:53341'}, 'CURRENT_PARTY_NAME': 'alice', 'TLS_CONFIG': {}}
2024-05-23 16:22:10.992 INFO barriers.py:284 [alice] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=1237) 2024-05-23 16:22:10.986 INFO grpc_proxy.py:359 [alice] -- [Anonymous_job] ReceiverProxy binding port 59179, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=1237) 2024-05-23 16:22:10.990 INFO grpc_proxy.py:379 [alice] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-23 16:22:11.629 INFO barriers.py:333 [alice] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-23 16:22:11.629 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 0 attemp, up to 3600 attemps.
2024-05-23 16:22:14.633 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 1 attemp, up to 3600 attemps.
2024-05-23 16:22:17.636 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 2 attemp, up to 3600 attemps.
2024-05-23 16:22:20.638 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 3 attemp, up to 3600 attemps.
2024-05-23 16:22:23.641 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 4 attemp, up to 3600 attemps.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Complete offline phase
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Traceback (most recent call last):
  File "/root/project/psi1/sf_connect_ub.py", line 60, in <module>
    spu.psi_csv(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1848, in psi_csv
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 321, in psi_csv
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(MemoryError): ray::SPURuntime.psi_csv() (pid=2078, ip=192.168.15.7, actor_id=dece172e6245582cd500f73501000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
(SPURuntime(device_id=None, party=alice) pid=2078) [2024-05-23 16:22:28.792] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi1/alice_exactpsi_1e6_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576}
(SPURuntime(device_id=None, party=alice) pid=2078) [2024-05-23 16:22:28.792] [info] [bucket_psi.cc:400] bucket size set to 1048576
(SPURuntime(device_id=None, party=alice) pid=2078) [2024-05-23 16:22:28.793] [info] [bucket_psi.cc:425] Run psi protocol=8, self_items_count=0
(SPURuntime(device_id=None, party=alice) pid=2078) [2024-05-23 16:22:28.794] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi1/alice_exactpsi_1e6_unique.csv
(SPURuntime(device_id=None, party=alice) pid=2078) [2024-05-23 16:22:28.794] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi1/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv
(SPURuntime(device_id=None, party=alice) pid=2078) [2024-05-23 16:22:28.794] [info] [ecdh_oprf_selector.cc:76] use fourq
2024-05-23 16:22:28.875 WARNING cleanup.py:154 [alice] -- [Anonymous_job] Failed to send ObjectRef(71b133a11e1c461c696f04c5cb75ea847a480d960100000001000000) to bob, error: ray::SenderProxyActor.send() (pid=1290, ip=192.168.15.7, actor_id=696f04c5cb75ea847a480d9601000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f06f2b7dfc0>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=2078, ip=192.168.15.7, actor_id=dece172e6245582cd500f73501000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc,upstream_seq_id: 8#0, downstream_seq_id: 10.
2024-05-23 16:22:28.875 INFO cleanup.py:161 [alice] -- [Anonymous_job] Sending error std::bad_alloc to bob.
2024-05-23 16:22:28.876 WARNING cleanup.py:127 [alice] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-23 16:22:28.876 WARNING api.py:60 [alice] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-23 16:22:28.876 WARNING api.py:325 [alice] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-23 16:22:28.876 ERROR api.py:330 [alice] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=1290, ip=192.168.15.7, actor_id=696f04c5cb75ea847a480d9601000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f06f2b7dfc0>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=2078, ip=192.168.15.7, actor_id=dece172e6245582cd500f73501000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
2024-05-23 16:22:28.877 INFO api.py:337 [alice] -- [Anonymous_job] No wait for data sending.
2024-05-23 16:22:28.878 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[DataSendingQueueThread] to exit.
2024-05-23 16:22:28.878 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-23 16:22:28.878 INFO api.py:352 [alice] -- [Anonymous_job] Shutdowned rayfed.
2024-05-23 16:22:28.878 CRITICAL api.py:356 [alice] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1

Receiver:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-23 16:22:16,732 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-23 16:22:17.484 INFO api.py:233 [bob] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '127.0.0.1:59179', 'bob': '0.0.0.0:53341'}, 'CURRENT_PARTY_NAME': 'bob', 'TLS_CONFIG': {}}
2024-05-23 16:22:18.110 INFO barriers.py:284 [bob] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=1911) 2024-05-23 16:22:18.106 INFO grpc_proxy.py:359 [bob] -- [Anonymous_job] ReceiverProxy binding port 53341, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=1911) 2024-05-23 16:22:18.109 INFO grpc_proxy.py:379 [bob] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-23 16:22:18.743 INFO barriers.py:333 [bob] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-23 16:22:18.743 INFO barriers.py:520 [bob] -- [Anonymous_job] Try ping ['alice'] at 0 attemp, up to 3600 attemps.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Complete offline phase
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.770] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_GEN_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576,"ecdh_secret_key_path":"/root/project/psi1/alice_oprf_key"}
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.770] [info] [bucket_psi.cc:425] Run psi protocol=7, self_items_count=0
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.778] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.778] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.778] [info] [ecdh_oprf_selector.cc:33] use fourq
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.779] [info] [batch_provider.cc:328] ReadAndShuffle start, idx:0, provider_batch_size:1048576
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.780] [info] [batch_provider.cc:350] ReadAndShuffle end, idx:0 , size:100
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.781] [info] [ecdh_oprf_psi.cc:108] omp_get_num_threads:1 cpus:16
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.781] [info] [ecdh_oprf_psi.cc:119] tid:0 omp_get_num_threads:16
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.781] [info] [batch_provider.cc:318] cursor_index_:0, bucket_index_:0, 0-0
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.781] [info] [batch_provider.cc:240] cursor_index_:0, bucket_index_:0, 0-0
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.783] [info] [ecdh_oprf_psi.cc:192] FullEvaluate finished, batch_count=1 items_count=100
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.786] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576,"preprocess_path":"preprocess_cache"}
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.786] [info] [bucket_psi.cc:400] bucket size set to 1048576
2024-05-23 16:22:28.882 WARNING api.py:607 [bob] -- [Anonymous_job] Encounter RemoteError happend in other parties, error message: FedRemoteError occurred at alice
Traceback (most recent call last):
  File "/root/project/psi2/sf_connect_ub.py", line 60, in <module>
    spu.psi_csv(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1848, in psi_csv
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 321, in psi_csv
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 613, in get
    raise e
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=1911, ip=192.168.15.7, actor_id=d22b5730c77662ce804d8aba01000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7f24816c9d80>)
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
fed.exceptions.FedRemoteError: FedRemoteError occurred at alice
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.793] [info] [bucket_psi.cc:425] Run psi protocol=8, self_items_count=0
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.793] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique.csv
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.793] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi2/bob_exactpsi_1e6_to_1e2_unique_output_ubpsi_cache.csv
(SPURuntime(device_id=None, party=bob) pid=2014) [2024-05-23 16:22:28.793] [info] [bucket_ub_psi.cc:186] Start Sync
2024-05-23 16:24:28.818 WARNING cleanup.py:154 [bob] -- [Anonymous_job] Failed to send ObjectRef(85748392bcd969ccdc36c0b6c411c57d2e34cc490100000001000000) to alice, error: ray::SenderProxyActor.send() (pid=1964, ip=192.168.15.7, actor_id=dc36c0b6c411c57d2e34cc4901000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fde80cd9f60>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=2014, ip=192.168.15.7, actor_id=ea3f375776b2bf62a5995c0c01000000, repr=SPURuntime(device_id=None, party=bob))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
RuntimeError: what:
        [external/yacl/yacl/link/transport/channel.cc:411] Get data timeout, key=root:4:ALLGATHER
stacktrace:
#0 yacl::link::Context::RecvInternal()+0x7f973f4d2657
#1 yacl::link::AllGatherImpl<>()+0x7f973f4cd1b1
#2 yacl::link::AllGather()+0x7f973f4cd643
#3 psi::AllGatherItemsSize()+0x7f973f4cb295
#4 psi::UbPsiClientTransferCache()+0x7f973e1fc1ac
#5 psi::UbPsi()+0x7f973e2007ed
#6 psi::BucketPsi::RunPsi()+0x7f973e1f4528
#7 psi::BucketPsi::Run()+0x7f973e1f6960
#8 psi::RunLegacyPsi()+0x7f973e0aac04
#9 psi::BindLibs()::{lambda()#2}::operator()()+0x7f973e0a0640
#10 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f973e0a0baa
#11 pybind11::cpp_function::dispatcher()+0x7f973e082fed
#12 cfunction_call+0x4fd907,upstream_seq_id: 9#0, downstream_seq_id: 10.
2024-05-23 16:24:28.818 INFO cleanup.py:161 [bob] -- [Anonymous_job] Sending error what:
        [external/yacl/yacl/link/transport/channel.cc:411] Get data timeout, key=root:4:ALLGATHER
stacktrace:
#0 yacl::link::Context::RecvInternal()+0x7f973f4d2657
#1 yacl::link::AllGatherImpl<>()+0x7f973f4cd1b1
#2 yacl::link::AllGather()+0x7f973f4cd643
#3 psi::AllGatherItemsSize()+0x7f973f4cb295
#4 psi::UbPsiClientTransferCache()+0x7f973e1fc1ac
#5 psi::UbPsi()+0x7f973e2007ed
#6 psi::BucketPsi::RunPsi()+0x7f973e1f4528
#7 psi::BucketPsi::Run()+0x7f973e1f6960
#8 psi::RunLegacyPsi()+0x7f973e0aac04
#9 psi::BindLibs()::{lambda()#2}::operator()()+0x7f973e0a0640
#10 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f973e0a0baa
#11 pybind11::cpp_function::dispatcher()+0x7f973e082fed
#12 cfunction_call+0x4fd907

 to alice.
2024-05-23 16:24:28.819 WARNING cleanup.py:127 [bob] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-23 16:24:28.819 WARNING api.py:60 [bob] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-23 16:24:28.819 WARNING api.py:325 [bob] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-23 16:24:28.819 ERROR api.py:330 [bob] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=1964, ip=192.168.15.7, actor_id=dc36c0b6c411c57d2e34cc4901000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fde80cd9f60>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=2014, ip=192.168.15.7, actor_id=ea3f375776b2bf62a5995c0c01000000, repr=SPURuntime(device_id=None, party=bob))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
RuntimeError: what:
        [external/yacl/yacl/link/transport/channel.cc:411] Get data timeout, key=root:4:ALLGATHER
stacktrace:
#0 yacl::link::Context::RecvInternal()+0x7f973f4d2657
#1 yacl::link::AllGatherImpl<>()+0x7f973f4cd1b1
#2 yacl::link::AllGather()+0x7f973f4cd643
#3 psi::AllGatherItemsSize()+0x7f973f4cb295
#4 psi::UbPsiClientTransferCache()+0x7f973e1fc1ac
#5 psi::UbPsi()+0x7f973e2007ed
#6 psi::BucketPsi::RunPsi()+0x7f973e1f4528
#7 psi::BucketPsi::Run()+0x7f973e1f6960
#8 psi::RunLegacyPsi()+0x7f973e0aac04
#9 psi::BindLibs()::{lambda()#2}::operator()()+0x7f973e0a0640
#10 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f973e0a0baa
#11 pybind11::cpp_function::dispatcher()+0x7f973e082fed
#12 cfunction_call+0x4fd907
2024-05-23 16:24:28.819 INFO api.py:337 [bob] -- [Anonymous_job] No wait for data sending.
2024-05-23 16:24:28.820 INFO message_queue.py:70 [bob] -- [Anonymous_job] Notify message polling thread[DataSendingQueueThread] to exit.
2024-05-23 16:24:28.820 INFO message_queue.py:70 [bob] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-23 16:24:28.820 INFO api.py:352 [bob] -- [Anonymous_job] Shutdowned rayfed.
2024-05-23 16:24:28.820 CRITICAL api.py:356 [bob] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1

这次我截取了所有的log信息,不知道能不能给您帮助。

6fj commented 4 months ago

hi @winnylyc

可以尝试一下小数据量的情况吗,比如1k的量级,看看是否仍然有 std::bad_alloc 的问题呢

winnylyc commented 4 months ago

我这边尝试了Sender方1000的数据,Receiver方100的数据,依然有std::bad_alloc的问题。 下面是详细的代码及报错 Sender(Server):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'alice'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

alice, bob = sf.PYU('alice'), sf.PYU('bob')
server_data_mount = '1e3'
query_data_mount = '1e2'
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_GEN_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete offline phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete online phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-27 10:58:01,308 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-27 10:58:02.112 INFO api.py:233 [alice] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '0.0.0.0:59179', 'bob': '127.0.0.1:53341'}, 'CURRENT_PARTY_NAME': 'alice', 'TLS_CONFIG': {}}
2024-05-27 10:58:02.789 INFO barriers.py:284 [alice] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=6577) 2024-05-27 10:58:02.783 INFO grpc_proxy.py:359 [alice] -- [Anonymous_job] ReceiverProxy binding port 59179, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=6577) 2024-05-27 10:58:02.786 INFO grpc_proxy.py:379 [alice] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-27 10:58:03.935 INFO barriers.py:333 [alice] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-27 10:58:03.935 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 0 attemp, up to 3600 attemps.
2024-05-27 10:58:06.940 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 1 attemp, up to 3600 attemps.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Complete offline phase
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Traceback (most recent call last):
  File "/root/project/psi1/sf_connect_ub.py", line 62, in <module>
    spu.psi_csv(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1848, in psi_csv
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 321, in psi_csv
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(MemoryError): ray::SPURuntime.psi_csv() (pid=7302, ip=192.168.15.7, actor_id=684fee1a8bf31d7850793d0301000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
2024-05-27 10:58:10.182 WARNING cleanup.py:154 [alice] -- [Anonymous_job] Failed to send ObjectRef(85748392bcd969ccea7d0273a5056e9b0a95e4a50100000001000000) to bob, error: ray::SenderProxyActor.send() (pid=6690, ip=192.168.15.7, actor_id=ea7d0273a5056e9b0a95e4a501000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f5d28319ff0>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=7302, ip=192.168.15.7, actor_id=684fee1a8bf31d7850793d0301000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc,upstream_seq_id: 8#0, downstream_seq_id: 10.
2024-05-27 10:58:10.182 INFO cleanup.py:161 [alice] -- [Anonymous_job] Sending error std::bad_alloc to bob.
2024-05-27 10:58:10.183 WARNING cleanup.py:127 [alice] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-27 10:58:10.183 WARNING api.py:60 [alice] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-27 10:58:10.183 WARNING api.py:325 [alice] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-27 10:58:10.183 ERROR api.py:330 [alice] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=6690, ip=192.168.15.7, actor_id=ea7d0273a5056e9b0a95e4a501000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f5d28319ff0>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=7302, ip=192.168.15.7, actor_id=684fee1a8bf31d7850793d0301000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
2024-05-27 10:58:10.183 INFO api.py:337 [alice] -- [Anonymous_job] No wait for data sending.
2024-05-27 10:58:10.186 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[DataSendingQueueThread] to exit.
2024-05-27 10:58:10.186 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-27 10:58:10.186 INFO api.py:352 [alice] -- [Anonymous_job] Shutdowned rayfed.
2024-05-27 10:58:10.186 CRITICAL api.py:356 [alice] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
(SPURuntime(device_id=None, party=alice) pid=7302) [2024-05-27 10:58:10.090] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi1/alice_exactpsi_1e3_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi1/bob_exactpsi_1e3_to_1e2_unique_output_psi_ub.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576}
(SPURuntime(device_id=None, party=alice) pid=7302) [2024-05-27 10:58:10.090] [info] [bucket_psi.cc:400] bucket size set to 1048576
(SPURuntime(device_id=None, party=alice) pid=7302) [2024-05-27 10:58:10.091] [info] [bucket_psi.cc:425] Run psi protocol=8, self_items_count=0
(SPURuntime(device_id=None, party=alice) pid=7302) [2024-05-27 10:58:10.093] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi1/alice_exactpsi_1e3_unique.csv
(SPURuntime(device_id=None, party=alice) pid=7302) [2024-05-27 10:58:10.093] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi1/bob_exactpsi_1e3_to_1e2_unique_output_psi_ub.csv
(SPURuntime(device_id=None, party=alice) pid=7302) [2024-05-27 10:58:10.093] [info] [ecdh_oprf_selector.cc:76] use fourq
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1

Receiver(Client):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'bob'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

alice, bob = sf.PYU('alice'), sf.PYU('bob')
server_data_mount = '1e3'
query_data_mount = '1e2'
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_GEN_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete offline phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete online phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-27 10:58:02,931 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-27 10:58:03.746 INFO api.py:233 [bob] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '127.0.0.1:59179', 'bob': '0.0.0.0:53341'}, 'CURRENT_PARTY_NAME': 'bob', 'TLS_CONFIG': {}}
2024-05-27 10:58:04.427 INFO barriers.py:284 [bob] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=7121) 2024-05-27 10:58:04.423 INFO grpc_proxy.py:359 [bob] -- [Anonymous_job] ReceiverProxy binding port 53341, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=7121) 2024-05-27 10:58:04.425 INFO grpc_proxy.py:379 [bob] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-27 10:58:05.064 INFO barriers.py:333 [bob] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-27 10:58:05.064 INFO barriers.py:520 [bob] -- [Anonymous_job] Try ping ['alice'] at 0 attemp, up to 3600 attemps.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Complete offline phase
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.072] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_GEN_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique_output_psi_ub.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576,"ecdh_secret_key_path":"/root/project/psi1/alice_oprf_key"}
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.072] [info] [bucket_psi.cc:425] Run psi protocol=7, self_items_count=0
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.073] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique.csv
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.073] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique_output_psi_ub.csv
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.073] [info] [ecdh_oprf_selector.cc:33] use fourq
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.073] [info] [batch_provider.cc:328] ReadAndShuffle start, idx:0, provider_batch_size:1048576
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.073] [info] [batch_provider.cc:350] ReadAndShuffle end, idx:0 , size:100
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.073] [info] [ecdh_oprf_psi.cc:108] omp_get_num_threads:1 cpus:16
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.077] [info] [ecdh_oprf_psi.cc:119] tid:0 omp_get_num_threads:16
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.077] [info] [batch_provider.cc:318] cursor_index_:0, bucket_index_:0, 0-0
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.077] [info] [batch_provider.cc:240] cursor_index_:0, bucket_index_:0, 0-0
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.080] [info] [ecdh_oprf_psi.cc:192] FullEvaluate finished, batch_count=1 items_count=100
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.085] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE","receiver_rank":1,"input_params":{"path":"/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique.csv","select_fields":["name"],"precheck":true},"output_params":{"path":"/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique_output_psi_ub.csv","need_sort":true},"curve_type":"CURVE_FOURQ","bucket_size":1048576,"preprocess_path":"preprocess_1e3"}
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.085] [info] [bucket_psi.cc:400] bucket size set to 1048576
2024-05-27 10:58:10.188 WARNING api.py:607 [bob] -- [Anonymous_job] Encounter RemoteError happend in other parties, error message: FedRemoteError occurred at alice
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.091] [info] [bucket_psi.cc:425] Run psi protocol=8, self_items_count=0
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.092] [info] [bucket_ub_psi.cc:93] input file path:/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique.csv
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.092] [info] [bucket_ub_psi.cc:94] output file path:/root/project/psi2/bob_exactpsi_1e3_to_1e2_unique_output_psi_ub.csv
(SPURuntime(device_id=None, party=bob) pid=7250) [2024-05-27 10:58:10.092] [info] [bucket_ub_psi.cc:186] Start Sync
Traceback (most recent call last):
  File "/root/project/psi2/sf_connect_ub.py", line 62, in <module>
    spu.psi_csv(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1848, in psi_csv
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 321, in psi_csv
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 613, in get
    raise e
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=7121, ip=192.168.15.7, actor_id=d0e052dffaa2950908c0562001000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7fdfa94edde0>)
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
fed.exceptions.FedRemoteError: FedRemoteError occurred at alice
2024-05-27 11:00:10.104 WARNING cleanup.py:154 [bob] -- [Anonymous_job] Failed to send ObjectRef(85748392bcd969cc02815ae281fdd273dc240ded0100000001000000) to alice, error: ray::SenderProxyActor.send() (pid=7200, ip=192.168.15.7, actor_id=02815ae281fdd273dc240ded01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f52a419df90>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=7250, ip=192.168.15.7, actor_id=37a37c5efe770003561c337501000000, repr=SPURuntime(device_id=None, party=bob))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
RuntimeError: what:
        [external/yacl/yacl/link/transport/channel.cc:411] Get data timeout, key=root:4:ALLGATHER
stacktrace:
#0 yacl::link::Context::RecvInternal()+0x7f4f967fc657
#1 yacl::link::AllGatherImpl<>()+0x7f4f967f71b1
#2 yacl::link::AllGather()+0x7f4f967f7643
#3 psi::AllGatherItemsSize()+0x7f4f967f5295
#4 psi::UbPsiClientTransferCache()+0x7f4f955261ac
#5 psi::UbPsi()+0x7f4f9552a7ed
#6 psi::BucketPsi::RunPsi()+0x7f4f9551e528
#7 psi::BucketPsi::Run()+0x7f4f95520960
#8 psi::RunLegacyPsi()+0x7f4f953d4c04
#9 psi::BindLibs()::{lambda()#2}::operator()()+0x7f4f953ca640
#10 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f4f953cabaa
#11 pybind11::cpp_function::dispatcher()+0x7f4f953acfed
#12 cfunction_call+0x4fd907,upstream_seq_id: 9#0, downstream_seq_id: 10.
2024-05-27 11:00:10.105 INFO cleanup.py:161 [bob] -- [Anonymous_job] Sending error what:
        [external/yacl/yacl/link/transport/channel.cc:411] Get data timeout, key=root:4:ALLGATHER
stacktrace:
#0 yacl::link::Context::RecvInternal()+0x7f4f967fc657
#1 yacl::link::AllGatherImpl<>()+0x7f4f967f71b1
#2 yacl::link::AllGather()+0x7f4f967f7643
#3 psi::AllGatherItemsSize()+0x7f4f967f5295
#4 psi::UbPsiClientTransferCache()+0x7f4f955261ac
#5 psi::UbPsi()+0x7f4f9552a7ed
#6 psi::BucketPsi::RunPsi()+0x7f4f9551e528
#7 psi::BucketPsi::Run()+0x7f4f95520960
#8 psi::RunLegacyPsi()+0x7f4f953d4c04
#9 psi::BindLibs()::{lambda()#2}::operator()()+0x7f4f953ca640
#10 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f4f953cabaa
#11 pybind11::cpp_function::dispatcher()+0x7f4f953acfed
#12 cfunction_call+0x4fd907

 to alice.
2024-05-27 11:00:10.105 WARNING cleanup.py:127 [bob] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-27 11:00:10.106 WARNING api.py:60 [bob] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-27 11:00:10.106 WARNING api.py:325 [bob] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-27 11:00:10.106 ERROR api.py:330 [bob] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=7200, ip=192.168.15.7, actor_id=02815ae281fdd273dc240ded01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f52a419df90>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=7250, ip=192.168.15.7, actor_id=37a37c5efe770003561c337501000000, repr=SPURuntime(device_id=None, party=bob))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
RuntimeError: what:
        [external/yacl/yacl/link/transport/channel.cc:411] Get data timeout, key=root:4:ALLGATHER
stacktrace:
#0 yacl::link::Context::RecvInternal()+0x7f4f967fc657
#1 yacl::link::AllGatherImpl<>()+0x7f4f967f71b1
#2 yacl::link::AllGather()+0x7f4f967f7643
#3 psi::AllGatherItemsSize()+0x7f4f967f5295
#4 psi::UbPsiClientTransferCache()+0x7f4f955261ac
#5 psi::UbPsi()+0x7f4f9552a7ed
#6 psi::BucketPsi::RunPsi()+0x7f4f9551e528
#7 psi::BucketPsi::Run()+0x7f4f95520960
#8 psi::RunLegacyPsi()+0x7f4f953d4c04
#9 psi::BindLibs()::{lambda()#2}::operator()()+0x7f4f953ca640
#10 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f4f953cabaa
#11 pybind11::cpp_function::dispatcher()+0x7f4f953acfed
#12 cfunction_call+0x4fd907
2024-05-27 11:00:10.106 INFO api.py:337 [bob] -- [Anonymous_job] No wait for data sending.
2024-05-27 11:00:10.107 INFO message_queue.py:70 [bob] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-27 11:00:10.107 INFO api.py:352 [bob] -- [Anonymous_job] Shutdowned rayfed.
2024-05-27 11:00:10.107 CRITICAL api.py:356 [bob] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1
winnylyc commented 4 months ago

我这边对ECDH_OPRF_UB_PSI_2PC_GEN_CACHE的理解似乎有问题。我理解ECDH_OPRF_UB_PSI_2PC_GEN_CACHE应该和ECDH_OPRF_UB_PSI_2PC_OFFLINE一样是准备offline阶段,只是将在硬盘上的读写操作改到cashe上。 如果是按照这个理解ECDH_OPRF_UB_PSI_2PC_GEN_CACHE应该和ECDH_OPRF_UB_PSI_2PC_OFFLINE操作的数据一样,但是我用secretnote单个调用ECDH_OPRF_UB_PSI_2PC_GEN_CACHE和ECDH_OPRF_UB_PSI_2PC_OFFLINE的输出不一样。 ECDH_OPRF_UB_PSI_2PC_GEN_CACHE: image ECDH_OPRF_UB_PSI_2PC_OFFLINE: image 这里的场景是,alice作为sender有1000个数据,bob作为receiver有100个数据。ECDH_OPRF_UB_PSI_2PC_OFFLINE这里的输出是对alice的1000个数据做操作,这是符合预期的。但是ECDH_OPRF_UB_PSI_2PC_GEN_CACHE是对bob的100个数据做操作,我就没有搞清楚这里ECDH_OPRF_UB_PSI_2PC_GEN_CACHE的作用是什么😓。

这里想请教您两个问题:

  1. ECDH_OPRF_UB_PSI_2PC_GEN_CACHE的作用是什么?
  2. 怎样减少运行ECDH_OPRF_UB_PSI_2PC_OFFLINE和ECDH_OPRF_UB_PSI_2PC_ONLINE时产生的对preprocess file的硬盘上的读写操作。
6fj commented 4 months ago

hi @winnylyc

ECDH OPRF UB PSI 大致分为两个阶段:离线阶段和在线阶段,离线阶段可以分为大数据方产生cache和大数据方将cache发送给小数据方。

因此你可以用多种方式调用:

  1. ECDH_OPRF_UB_PSI_2PC_GEN_CACHE + ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE + ECDH_OPRF_UB_PSI_2PC_ONLINE
  2. ECDH_OPRF_UB_PSI_2PC_OFFLINE + ECDH_OPRF_UB_PSI_2PC_ONLINE

如果将 ECDH_OPRF_UB_PSI_2PC_ONLINE 替换为 ECDH_OPRF_UB_PSI_2PC_SHUFFLE_ONLINE,大数据方将获取结果而不是小数据方。

winnylyc commented 4 months ago

感谢您的回答! 我理解了ECDH OPRF UB PSI的中四个协议的作用。

那似乎ECDH_OPRF_UB_PSI_2PC_GEN_CACHE的输出确实有问题? 既然它和ECDH_OPRF_UB_PSI_2PC_OFFLINE一样都是对大数据方(sender)的数据进行操作,那为什么输出会不同呢?从我上面的测试来看,ECDH_OPRF_UB_PSI_2PC_GEN_CACHE看上去是对小数据方(receiver)的数据进行操作。

6fj commented 4 months ago

receiver 你需要设成大数据方。你现在设成小数据方了。

winnylyc commented 4 months ago

可是https://www.secretflow.org.cn/zh-CN/docs/secretflow/v1.6.1b0/source/secretflow.device.device.device#secretflow.device.device.spu.SPURuntime.psi_csv 中receiver参数中的描述是receiver is client(small dataset party)。 请问是ECDH_OPRF_UB_PSI_2PC_OFFLINE和ECDH_OPRF_UB_PSI_2PC_GEN_CACHE的引用场景不同吗?即ECDH_OPRF_UB_PSI_2PC_OFFLINE适用于receiver是小数据方,ECDH_OPRF_UB_PSI_2PC_GEN_CACHE适用于receiver是大数据方?

6fj commented 4 months ago

这里的api确实比较混乱:

ECDH_OPRF_UB_PSI_2PC_GEN_CACHE:receiver填大数据方 ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE:receiver 填小数据方 ECDH_OPRF_UB_PSI_2PC_ONLINE:receiver 填小数据方

winnylyc commented 4 months ago

您好, 将ECDH_OPRF_UB_PSI_2PC_GEN_CACHE部分的receiver改为大数据方依然会在ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE这一步出问题。 我这里就提供Sender方和Receiver方的代码和server方在执行ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE的报错了(似乎client方的报错没什么信息) Sender(大数据方,server方)代码:

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'alice'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

alice, bob = sf.PYU('alice'), sf.PYU('bob')
server_data_mount = '1e3'
query_data_mount = '1e2'
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='alice', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_GEN_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete gen cache phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete transfer cache phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_ONLINE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete online phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

Receiver(小数据方,client方)代码:

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'bob'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

alice, bob = sf.PYU('alice'), sf.PYU('bob')
server_data_mount = '1e3'
query_data_mount = '1e2'
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='alice', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_GEN_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete gen cache phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete transfer cache phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
spu.psi_csv(
    key={alice:['name'], bob:['name']}, 
    input_path={alice:f'/root/project/psi1/alice_exactpsi_{server_data_mount}_unique.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique.csv'}, 
    output_path={alice:f'/root/project/psi1/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv', bob:f'/root/project/psi2/bob_exactpsi_{server_data_mount}_to_{query_data_mount}_unique_output_psi_ub.csv'}, 
    receiver='bob', 
    broadcast_result=False, 
    protocol='ECDH_OPRF_UB_PSI_2PC_ONLINE', 
    preprocess_path=f'preprocess_{server_data_mount}',
    ecdh_secret_key_path="/root/project/psi1/alice_oprf_key",
    curve_type='CURVE_FOURQ', 
)
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("Complete online phase")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

Sender(大数据方,server方)报错:

Traceback (most recent call last):
  File "/root/project/psi1/sf_connect_ub.py", line 62, in <module>
    spu.psi_csv(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1848, in psi_csv
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 321, in psi_csv
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(MemoryError): ray::SPURuntime.psi_csv() (pid=56359, ip=192.168.15.7, actor_id=452e9282d3675266f0bdc2a301000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
2024-05-28 11:46:04.429 WARNING cleanup.py:154 [alice] -- [Anonymous_job] Failed to send ObjectRef(c54e76759b2a0c1009459f9c5de8d181d6ef602f0100000001000000) to bob, error: ray::SenderProxyActor.send() (pid=55543, ip=192.168.15.7, actor_id=09459f9c5de8d181d6ef602f01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fbf1e161f60>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=56359, ip=192.168.15.7, actor_id=452e9282d3675266f0bdc2a301000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc,upstream_seq_id: 8#0, downstream_seq_id: 10.
2024-05-28 11:46:04.430 INFO cleanup.py:161 [alice] -- [Anonymous_job] Sending error std::bad_alloc to bob.
2024-05-28 11:46:04.430 WARNING cleanup.py:127 [alice] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-28 11:46:04.430 WARNING api.py:60 [alice] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-28 11:46:04.431 WARNING api.py:325 [alice] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-28 11:46:04.431 ERROR api.py:330 [alice] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=55543, ip=192.168.15.7, actor_id=09459f9c5de8d181d6ef602f01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fbf1e161f60>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.psi_csv() (pid=56359, ip=192.168.15.7, actor_id=452e9282d3675266f0bdc2a301000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 866, in psi_csv
    report = psi.bucket_psi(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 69, in bucket_psi
    report_str = libpsi.libs.bucket_psi(
MemoryError: std::bad_alloc
2024-05-28 11:46:04.431 INFO api.py:337 [alice] -- [Anonymous_job] No wait for data sending.
2024-05-28 11:46:04.432 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[DataSendingQueueThread] to exit.
2024-05-28 11:46:04.432 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-28 11:46:04.432 INFO api.py:352 [alice] -- [Anonymous_job] Shutdowned rayfed.
2024-05-28 11:46:04.432 CRITICAL api.py:356 [alice] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1
6fj commented 4 months ago

hi @winnylyc , 我无法复现你报告的错误,以下为我的代码:

server:

import spu

import secretflow as sf

cluster_config = {
    "parties": {
        "alice": {"address": "127.0.0.1:59179", "listen_addr": "0.0.0.0:59179"},
        "bob": {"address": "127.0.0.1:53341", "listen_addr": "0.0.0.0:53341"},
    },
    "self_party": "alice",
}
sf.shutdown
sf.init(address="local", cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {"party": "alice", "address": "127.0.0.1:45413"},
        {"party": "bob", "address": "127.0.0.1:47480"},
    ],
    "runtime_config": {"protocol": spu.spu_pb2.SEMI2K, "field": spu.spu_pb2.FM128},
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    },
)

alice, bob = sf.PYU("alice"), sf.PYU("bob")

spu.psi_csv(
    key=["id_0", "id_1"],
    input_path="/tmp/server_input.csv",
    output_path="/tmp/server_cache",
    receiver="alice",
    broadcast_result=False,
    protocol="ECDH_OPRF_UB_PSI_2PC_GEN_CACHE",
    ecdh_secret_key_path="/tmp/alice_oprf_key.bin",
    curve_type="CURVE_FOURQ",
)

spu.psi_csv(
    key=[],
    input_path="/tmp/server_cache",
    output_path="",
    receiver="bob",
    broadcast_result=False,
    protocol="ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE",
    preprocess_path="/tmp/client_cache",
    ecdh_secret_key_path="",
    curve_type="CURVE_FOURQ",
)

spu.psi_csv(
    key={alice: [], bob: ["id_0", "id_1"]},
    input_path={
        alice: "",
        bob: "/tmp/client_input.csv",
    },
    output_path={
        alice: "",
        bob: "/tmp/client_output.csv",
    },
    receiver="bob",
    broadcast_result=False,
    protocol="ECDH_OPRF_UB_PSI_2PC_ONLINE",
    preprocess_path={
        alice: "/tmp/server_cache",
        bob: "/tmp/client_cache",
    },
    ecdh_secret_key_path="/tmp/alice_oprf_key.bin",
    curve_type="CURVE_FOURQ",
)

client:

import spu

import secretflow as sf

cluster_config = {
    "parties": {
        "alice": {"address": "127.0.0.1:59179", "listen_addr": "0.0.0.0:59179"},
        "bob": {"address": "127.0.0.1:53341", "listen_addr": "0.0.0.0:53341"},
    },
    "self_party": "bob",
}
sf.shutdown
sf.init(address="local", cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {"party": "alice", "address": "127.0.0.1:45413"},
        {"party": "bob", "address": "127.0.0.1:47480"},
    ],
    "runtime_config": {"protocol": spu.spu_pb2.SEMI2K, "field": spu.spu_pb2.FM128},
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    },
)

alice, bob = sf.PYU("alice"), sf.PYU("bob")

spu.psi_csv(
    key=["id_0", "id_1"],
    input_path="/tmp/server_input.csv",
    output_path="/tmp/server_cache",
    receiver="alice",
    broadcast_result=False,
    protocol="ECDH_OPRF_UB_PSI_2PC_GEN_CACHE",
    ecdh_secret_key_path="/tmp/alice_oprf_key.bin",
    curve_type="CURVE_FOURQ",
)

spu.psi_csv(
    key=[],
    input_path="/tmp/server_cache",
    output_path="",
    receiver="bob",
    broadcast_result=False,
    protocol="ECDH_OPRF_UB_PSI_2PC_TRANSFER_CACHE",
    preprocess_path="/tmp/client_cache",
    ecdh_secret_key_path="",
    curve_type="CURVE_FOURQ",
)

spu.psi_csv(
    key={alice: [], bob: ["id_0", "id_1"]},
    input_path={
        alice: "",
        bob: "/tmp/client_input.csv",
    },
    output_path={
        alice: "",
        bob: "/tmp/client_output.csv",
    },
    receiver="bob",
    broadcast_result=False,
    protocol="ECDH_OPRF_UB_PSI_2PC_ONLINE",
    preprocess_path={
        alice: "/tmp/server_cache",
        bob: "/tmp/client_cache",
    },
    ecdh_secret_key_path="/tmp/alice_oprf_key.bin",
    curve_type="CURVE_FOURQ",
)

请注意,ub psi 的 API 目前比较混乱,我们推荐使用新的 API v2,但是目前 API v2 ub psi 不支持将结果发给大数据方

winnylyc commented 4 months ago

非常感谢您的帮助,已将整个流程跑通!