secretflow / psi

The repo of Private Set Intersection(PSI) and Private Information Retrieval(PIR) from SecretFlow.
https://www.secretflow.org.cn/docs/psi
Apache License 2.0
22 stars 16 forks source link

[Feature]: 关于PIR对 unlabeled PSI功能支持的询问 #123

Open winnylyc opened 2 months ago

winnylyc commented 2 months ago

您好,打扰您了。 我想询问PIR上两个功能上是否有相关的实现或者是否能降低开销。

  1. unlabeled PSI 我关注到PIR功能所对应的APSI库是支持unlabeld PSI功能的,而且也提到其开销会比label PSI更小的,我想请问一下这个功能是否支持以及如何调用,我目前的尝试是直接将label_columns设为[],即没有label输入,但是似乎无法执行。
    spu.pir_setup(
    server="alice",
    input_path="/root/project/psi1/alice_exactpsi_1e6.csv",
    key_columns=['name'],
    label_columns=[],
    oprf_key_path="/root/project/psi1/alice_oprf_key",
    setup_path="/root/project/psi1/alice_exactpsi_setup_npq20",
    num_per_query=20,
    label_max_len=80,
    bucket_size=1000000
    )

    在secretnote上执行的结果如下。

    
    Bob's Output:
    2024-05-09 14:55:07.545 WARNING api.py:607 [bob] -- [Anonymous_job] Encounter RemoteError happend in other parties, error message: FedRemoteError occurred at alice
    RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=201684, ip=192.168.15.7, actor_id=57475061dc167106e064d65801000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7f08dc26df60>)
    File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
    File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
    File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
    fed.exceptions.FedRemoteError: FedRemoteError occurred at alice
    ---------------------------------------------------------------------------
    RayTaskError(FedRemoteError)              Traceback (most recent call last)
    Cell In[3], line 1
    ----> 1 spu.pir_setup(
      2     server="alice",
      3     input_path="/root/project/psi1/alice_exactpsi_1e6.csv",
      4     key_columns=['name'],
      5     # label_columns=['country', 'location'],
      6     label_columns=[],
      7     oprf_key_path="/root/project/psi1/alice_oprf_key",
      8     setup_path="/root/project/psi1/alice_exactpsi_setup_npq20",
      9     num_per_query=20,
     10     label_max_len=80,
     11     bucket_size=1000000
     12 )

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py:1990, in SPU.pir_setup(self, server, input_path, key_columns, label_columns, oprf_key_path, setup_path, num_per_query, label_max_len, bucket_size) 1961 def pir_setup( 1962 self, 1963 server: str, (...) 1971 bucket_size: int, 1972 ): 1973 """Private information retrival offline setup. 1974 Args: 1975 server (str): Which party is pir server. (...) 1988 Dict: PIR report output by SPU. 1989 """ -> 1990 return dispatch( 1991 'pir_setup', 1992 self, 1993 server, 1994 input_path, 1995 key_columns, 1996 label_columns, 1997 oprf_key_path, 1998 setup_path, 1999 num_per_query, 2000 label_max_len, 2001 bucket_size, 2002 )

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py:111, in dispatch(name, self, *args, kwargs) 101 def dispatch(name: str, self, *args, *kwargs): 102 """Dispatch device kernel. 103 104 Args: (...) 109 Kernel execution result. 110 """ --> 111 return _registrar.dispatch(self.device_type, name, self, args, kwargs)

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py:80, in Registrar.dispatch(self, device_type, name, *args, *kwargs) 78 if name not in self._ops[device_type]: 79 raise KeyError(f'device: {device_type}, op: {name} not registered') ---> 80 return self._ops[device_type][name](args, **kwargs)

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py:521, in pir_setup(device, server, input_path, key_columns, label_columns, oprf_key_path, setup_path, num_per_query, label_max_len, bucket_size) 506 res.append( 507 actor.pir_setup.remote( 508 server, (...) 517 ) 518 ) 520 # wait for all tasks done --> 521 return sfd.get(res)

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py:156, in get(object_refs) 148 def get( 149 object_refs: Union[ 150 Union[ray.ObjectRef, List[ray.ObjectRef]], (...) 153 ] 154 ): 155 if get_distribution_mode() == DISTRIBUTION_MODE.PRODUCTION: --> 156 return fed.get(object_refs) 157 elif get_distribution_mode() == DISTRIBUTION_MODE.SIMULATION: 158 return ray.get(object_refs)

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py:613, in get(fed_objects) 611 if get_global_context() is not None: 612 get_global_context().set_last_recevied_error(e) --> 613 raise e

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py:602, in get(fed_objects) 599 ray_refs.append(received_ray_object_ref) 601 try: --> 602 values = ray.get(ray_refs) 603 if is_individual_id: 604 values = values[0]

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init..auto_init_wrapper(*args, kwargs) 21 @wraps(fn) 22 def auto_init_wrapper(*args, *kwargs): 23 auto_init_ray() ---> 24 return fn(args, kwargs)

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py:103, in client_mode_hook..wrapper(*args, kwargs) 101 if func.name != "init" or is_client_mode_enabled_by_default: 102 return getattr(ray, func.name)(*args, *kwargs) --> 103 return func(args, kwargs)

File ~/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py:2524, in get(object_refs, timeout) 2522 worker.core_worker.dump_object_store_memory_usage() 2523 if isinstance(value, RayTaskError): -> 2524 raise value.as_instanceof_cause() 2525 else: 2526 raise value

RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=201684, ip=192.168.15.7, actor_id=57475061dc167106e064d65801000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7f08dc26df60>) File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result return self.get_result() File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in get_result raise self._exception File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data raise data fed.exceptions.FedRemoteError: FedRemoteError occurred at alice

Alice's Output:

KeyboardInterrupt


2. 只返回单个结果
我目前使用PIR功能时,获得的结果都是返回匹配的所有结果。如果我只想获得匹配的第一个结果,想请教一下是否开销会变小,以及是否有相应的支持。这个功能似乎是与unlabeled PSI部分重合的,我目前对这个功能的需求与unlabeled PSI的场景相同。
6fj commented 2 months ago

感谢你这边的输入,请持续关注这个issue的状态

winnylyc commented 2 months ago

我之前的输入应该信息不是很足😓,secretnote上的错误信息应该不是很准确,另外上次实验存在重复的key所以本质应该无法运行。 我这边重新生成了不存在重复key的数据,并直接使用python运行。下面尝试了将label_columns设为[],但是不能运行。想要请问一下是不是暂时还没有unlabeled PSI相关的实现? 下面提供了运行结果。 Sender(Server):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'alice'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

npq = 1
spu.pir_setup(
    server="alice",
    input_path="/root/project/psi1/alice_exactpsi_1e6_unique.csv",
    key_columns=['name'],
    label_columns=[],
    oprf_key_path="/root/project/psi1/alice_oprf_key",
    setup_path=f"/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq{npq}",
    num_per_query=npq,
    label_max_len=55,
    bucket_size=1000000
)

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-23 18:43:48,666 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-23 18:43:49.412 INFO api.py:233 [alice] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '0.0.0.0:59179', 'bob': '127.0.0.1:53341'}, 'CURRENT_PARTY_NAME': 'alice', 'TLS_CONFIG': {}}
2024-05-23 18:43:50.036 INFO barriers.py:284 [alice] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=2406) 2024-05-23 18:43:50.032 INFO grpc_proxy.py:359 [alice] -- [Anonymous_job] ReceiverProxy binding port 59179, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=2406) 2024-05-23 18:43:50.035 INFO grpc_proxy.py:379 [alice] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-23 18:43:50.674 INFO barriers.py:333 [alice] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-23 18:43:50.674 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 0 attemp, up to 3600 attemps.
2024-05-23 18:43:53.678 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 1 attemp, up to 3600 attemps.
(SPURuntime(device_id=None, party=alice) pid=3107) WARNING:root:config=mode: MODE_SERVER_SETUP
(SPURuntime(device_id=None, party=alice) pid=3107) pir_protocol: PIR_PROTOCOL_KEYWORD_PIR_APSI
(SPURuntime(device_id=None, party=alice) pid=3107) pir_server_config {
(SPURuntime(device_id=None, party=alice) pid=3107)   input_path: "/root/project/psi1/alice_exactpsi_1e6_unique.csv"
(SPURuntime(device_id=None, party=alice) pid=3107)   setup_path: "/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq1"
(SPURuntime(device_id=None, party=alice) pid=3107)   key_columns: "name"
(SPURuntime(device_id=None, party=alice) pid=3107)   label_max_len: 55
(SPURuntime(device_id=None, party=alice) pid=3107)   bucket_size: 1000000
(SPURuntime(device_id=None, party=alice) pid=3107)   apsi_server_config {
(SPURuntime(device_id=None, party=alice) pid=3107)     oprf_key_path: "/root/project/psi1/alice_oprf_key"
(SPURuntime(device_id=None, party=alice) pid=3107)     num_per_query: 1
(SPURuntime(device_id=None, party=alice) pid=3107)   }
(SPURuntime(device_id=None, party=alice) pid=3107) }
(SPURuntime(device_id=None, party=alice) pid=3107)
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.445] [info] [pir.cc:245] table_params hash_func_count:1
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.445] [info] [pir.cc:247] table_params max_items_per_bin:55
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.445] [info] [pir.cc:250] seal_params poly_modulus_degree:2048
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.445] [info] [pir.cc:252] query_params query_powers size:55
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.455] [info] [pir.cc:267] bucket:0 bucket_setup_path:/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq1/bucket_0
2024-05-23 18:43:58.476 WARNING cleanup.py:154 [alice] -- [Anonymous_job] Failed to send ObjectRef(80e22aed7718a125377d0994e1b53538ecef50370100000001000000) to bob, error: ray::SenderProxyActor.send() (pid=2464, ip=192.168.15.7, actor_id=377d0994e1b53538ecef503701000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f81b0409f00>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.pir_setup() (pid=3107, ip=192.168.15.7, actor_id=66e4abb0bac5fbda63689d7f01000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1245, in pir_setup
    report = psi.pir(config)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 148, in pir
    report_str = libpsi.libs.pir(config.SerializeToString(), link)
RuntimeError: what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f6d8952b828
#1 psi::apsi::Launch()+0x7f6d895332f9
#2 psi::RunPir()+0x7f6d8943c003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f6d894365d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f6d894367e0
#5 pybind11::cpp_function::dispatcher()+0x7f6d89417fed
#6 cfunction_call+0x4fd907,upstream_seq_id: 5#0, downstream_seq_id: 6.
2024-05-23 18:43:58.476 INFO cleanup.py:161 [alice] -- [Anonymous_job] Sending error what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f6d8952b828
#1 psi::apsi::Launch()+0x7f6d895332f9
#2 psi::RunPir()+0x7f6d8943c003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f6d894365d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f6d894367e0
#5 pybind11::cpp_function::dispatcher()+0x7f6d89417fed
#6 cfunction_call+0x4fd907

 to bob.
2024-05-23 18:43:58.477 WARNING cleanup.py:127 [alice] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-23 18:43:58.477 WARNING api.py:60 [alice] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-23 18:43:58.477 WARNING api.py:325 [alice] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-23 18:43:58.477 ERROR api.py:330 [alice] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=2464, ip=192.168.15.7, actor_id=377d0994e1b53538ecef503701000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f81b0409f00>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.pir_setup() (pid=3107, ip=192.168.15.7, actor_id=66e4abb0bac5fbda63689d7f01000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1245, in pir_setup
    report = psi.pir(config)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 148, in pir
    report_str = libpsi.libs.pir(config.SerializeToString(), link)
RuntimeError: what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f6d8952b828
#1 psi::apsi::Launch()+0x7f6d895332f9
#2 psi::RunPir()+0x7f6d8943c003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f6d894365d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f6d894367e0
#5 pybind11::cpp_function::dispatcher()+0x7f6d89417fed
#6 cfunction_call+0x4fd907
2024-05-23 18:43:58.477 INFO api.py:337 [alice] -- [Anonymous_job] No wait for data sending.
2024-05-23 18:43:58.478 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-23 18:43:58.478 INFO api.py:352 [alice] -- [Anonymous_job] Shutdowned rayfed.
2024-05-23 18:43:58.479 CRITICAL api.py:356 [alice] -- [Anonymous_job] Exit now due to the previous error.

Receiver(Client):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'bob'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

npq = 1
spu.pir_setup(
    server="alice",
    input_path="/root/project/psi1/alice_exactpsi_1e6_unique.csv",
    key_columns=['name'],
    label_columns=[],
    oprf_key_path="/root/project/psi1/alice_oprf_key",
    setup_path=f"/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq{npq}",
    num_per_query=npq,
    label_max_len=55,
    bucket_size=1000000
)

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-23 18:43:51,091 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-23 18:43:51.803 INFO api.py:233 [bob] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '127.0.0.1:59179', 'bob': '0.0.0.0:53341'}, 'CURRENT_PARTY_NAME': 'bob', 'TLS_CONFIG': {}}
2024-05-23 18:43:52.553 INFO barriers.py:284 [bob] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=2981) 2024-05-23 18:43:52.548 INFO grpc_proxy.py:359 [bob] -- [Anonymous_job] ReceiverProxy binding port 53341, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=2981) 2024-05-23 18:43:52.551 INFO grpc_proxy.py:379 [bob] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-23 18:43:53.244 INFO barriers.py:333 [bob] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-23 18:43:53.244 INFO barriers.py:520 [bob] -- [Anonymous_job] Try ping ['alice'] at 0 attemp, up to 3600 attemps.
2024-05-23 18:43:58.482 WARNING api.py:607 [bob] -- [Anonymous_job] Encounter RemoteError happend in other parties, error message: FedRemoteError occurred at alice
Traceback (most recent call last):
  File "/root/project/psi2/sf_connect_unlabeled.py", line 46, in <module>
    spu.pir_setup(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1990, in pir_setup
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 521, in pir_setup
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 613, in get
    raise e
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=2981, ip=192.168.15.7, actor_id=4f32dff258c018cdbff1ac3001000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7f98089b5d50>)
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
fed.exceptions.FedRemoteError: FedRemoteError occurred at alice
^C2024-05-23 18:45:37.998 WARNING api.py:60 [bob] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-23 18:45:37.998 WARNING api.py:325 [bob] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-23 18:45:37.999 INFO api.py:337 [bob] -- [Anonymous_job] No wait for data sending.
2024-05-23 18:45:38.000 INFO message_queue.py:70 [bob] -- [Anonymous_job] Notify message polling thread[DataSendingQueueThread] to exit.
2024-05-23 18:45:38.000 INFO message_queue.py:70 [bob] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-23 18:45:38.000 INFO api.py:352 [bob] -- [Anonymous_job] Shutdowned rayfed.
2024-05-23 18:45:38.000 CRITICAL api.py:356 [bob] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1
6fj commented 2 months ago

hi @winnylyc

能否尝试将 label_max_len 同时设为 0 ?

winnylyc commented 2 months ago

感谢您的回复! 我这边尝试了一下,似乎还是不行😓,下面是代码和输出。 Sender(Server):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'alice'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

npq = 1
spu.pir_setup(
    server="alice",
    input_path="/root/project/psi1/alice_exactpsi_1e6_unique.csv",
    key_columns=['name'],
    label_columns=[],
    oprf_key_path="/root/project/psi1/alice_oprf_key",
    setup_path=f"/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq{npq}",
    num_per_query=npq,
    label_max_len=0,
    bucket_size=1000000
)

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-24 10:29:06,237 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-24 10:29:07.584 INFO api.py:233 [alice] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '0.0.0.0:59179', 'bob': '127.0.0.1:53341'}, 'CURRENT_PARTY_NAME': 'alice', 'TLS_CONFIG': {}}
2024-05-24 10:29:08.255 INFO barriers.py:284 [alice] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=1837) 2024-05-24 10:29:08.251 INFO grpc_proxy.py:359 [alice] -- [Anonymous_job] ReceiverProxy binding port 59179, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=1837) 2024-05-24 10:29:08.253 INFO grpc_proxy.py:379 [alice] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-24 10:29:08.940 INFO barriers.py:333 [alice] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-24 10:29:08.940 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 0 attemp, up to 3600 attemps.
(SPURuntime(device_id=None, party=alice) pid=2042) WARNING:root:config=mode: MODE_SERVER_SETUP
(SPURuntime(device_id=None, party=alice) pid=2042) pir_protocol: PIR_PROTOCOL_KEYWORD_PIR_APSI
(SPURuntime(device_id=None, party=alice) pid=2042) pir_server_config {
(SPURuntime(device_id=None, party=alice) pid=2042)   input_path: "/root/project/psi1/alice_exactpsi_1e6_unique.csv"
(SPURuntime(device_id=None, party=alice) pid=2042)   setup_path: "/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq1"
(SPURuntime(device_id=None, party=alice) pid=2042)   key_columns: "name"
(SPURuntime(device_id=None, party=alice) pid=2042)   bucket_size: 1000000
(SPURuntime(device_id=None, party=alice) pid=2042)   apsi_server_config {
(SPURuntime(device_id=None, party=alice) pid=2042)     oprf_key_path: "/root/project/psi1/alice_oprf_key"
(SPURuntime(device_id=None, party=alice) pid=2042)     num_per_query: 1
(SPURuntime(device_id=None, party=alice) pid=2042)   }
(SPURuntime(device_id=None, party=alice) pid=2042) }
(SPURuntime(device_id=None, party=alice) pid=2042)
2024-05-24 10:29:11.165 WARNING cleanup.py:154 [alice] -- [Anonymous_job] Failed to send ObjectRef(8849b62d89cb30f9c4aba1b6791ea8e6170e26cf0100000001000000) to bob, error: ray::SenderProxyActor.send() (pid=1945, ip=192.168.15.7, actor_id=c4aba1b6791ea8e6170e26cf01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fd04d2e9f90>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.pir_setup() (pid=2042, ip=192.168.15.7, actor_id=29ba65c288e8d8664bc7d8d201000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1245, in pir_setup
    report = psi.pir(config)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 148, in pir
    report_str = libpsi.libs.pir(config.SerializeToString(), link)
RuntimeError: what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f7a76196828
#1 psi::apsi::Launch()+0x7f7a7619e2f9
#2 psi::RunPir()+0x7f7a760a7003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f7a760a15d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f7a760a17e0
#5 pybind11::cpp_function::dispatcher()+0x7f7a76082fed
#6 cfunction_call+0x4fd907,upstream_seq_id: 5#0, downstream_seq_id: 6.
2024-05-24 10:29:11.165 INFO cleanup.py:161 [alice] -- [Anonymous_job] Sending error what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f7a76196828
#1 psi::apsi::Launch()+0x7f7a7619e2f9
#2 psi::RunPir()+0x7f7a760a7003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f7a760a15d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f7a760a17e0
#5 pybind11::cpp_function::dispatcher()+0x7f7a76082fed
#6 cfunction_call+0x4fd907

 to bob.
2024-05-24 10:29:11.166 WARNING cleanup.py:127 [alice] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-24 10:29:11.166 WARNING api.py:60 [alice] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-24 10:29:11.166 WARNING api.py:325 [alice] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-24 10:29:11.167 ERROR api.py:330 [alice] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=1945, ip=192.168.15.7, actor_id=c4aba1b6791ea8e6170e26cf01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fd04d2e9f90>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.pir_setup() (pid=2042, ip=192.168.15.7, actor_id=29ba65c288e8d8664bc7d8d201000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1245, in pir_setup
    report = psi.pir(config)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 148, in pir
    report_str = libpsi.libs.pir(config.SerializeToString(), link)
RuntimeError: what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f7a76196828
#1 psi::apsi::Launch()+0x7f7a7619e2f9
#2 psi::RunPir()+0x7f7a760a7003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f7a760a15d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f7a760a17e0
#5 pybind11::cpp_function::dispatcher()+0x7f7a76082fed
#6 cfunction_call+0x4fd907
2024-05-24 10:29:11.167 INFO api.py:337 [alice] -- [Anonymous_job] No wait for data sending.
2024-05-24 10:29:11.168 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-24 10:29:11.169 INFO api.py:352 [alice] -- [Anonymous_job] Shutdowned rayfed.
2024-05-24 10:29:11.169 CRITICAL api.py:356 [alice] -- [Anonymous_job] Exit now due to the previous error.
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.138] [info] [pir.cc:245] table_params hash_func_count:1
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.138] [info] [pir.cc:247] table_params max_items_per_bin:55
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.138] [info] [pir.cc:250] seal_params poly_modulus_degree:2048
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.138] [info] [pir.cc:252] query_params query_powers size:55
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.150] [info] [pir.cc:267] bucket:0 bucket_setup_path:/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq1/bucket_0

Receiver(Client):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'bob'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

npq = 1
spu.pir_setup(
    server="alice",
    input_path="/root/project/psi1/alice_exactpsi_1e6_unique.csv",
    key_columns=['name'],
    label_columns=[],
    oprf_key_path="/root/project/psi1/alice_oprf_key",
    setup_path=f"/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq{npq}",
    num_per_query=npq,
    label_max_len=0,
    bucket_size=1000000
)

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-24 10:29:06,237 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-24 10:29:07.600 INFO api.py:233 [bob] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '127.0.0.1:59179', 'bob': '0.0.0.0:53341'}, 'CURRENT_PARTY_NAME': 'bob', 'TLS_CONFIG': {}}
2024-05-24 10:29:08.255 INFO barriers.py:284 [bob] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=1838) 2024-05-24 10:29:08.251 INFO grpc_proxy.py:359 [bob] -- [Anonymous_job] ReceiverProxy binding port 53341, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=1838) 2024-05-24 10:29:08.254 INFO grpc_proxy.py:379 [bob] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-24 10:29:08.936 INFO barriers.py:333 [bob] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-24 10:29:08.937 INFO barriers.py:520 [bob] -- [Anonymous_job] Try ping ['alice'] at 0 attemp, up to 3600 attemps.
2024-05-24 10:29:11.172 WARNING api.py:607 [bob] -- [Anonymous_job] Encounter RemoteError happend in other parties, error message: FedRemoteError occurred at alice
Traceback (most recent call last):
  File "/root/project/psi2/sf_connect_unlabeled.py", line 46, in <module>
    spu.pir_setup(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1990, in pir_setup
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 521, in pir_setup
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 613, in get
    raise e
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=1838, ip=192.168.15.7, actor_id=cb60c17fef717f98f61a8d5f01000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7fd726c7dd80>)
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
fed.exceptions.FedRemoteError: FedRemoteError occurred at alice
6fj commented 2 months ago

hi @winnylyc

感谢你的反馈,pir这边我们正处在代码调整中,会将目前这些问题一并解决。我们大概会在6月底之前完成。

winnylyc commented 2 months ago

感谢您的反馈!期待PIR部分的调整!