secretflow / psi

The repo of Private Set Intersection(PSI) and Private Information Retrieval(PIR) from SecretFlow.
https://www.secretflow.org.cn/docs/psi
Apache License 2.0
23 stars 20 forks source link

[Bug]: libspu 测试过程中,会偶发出现错误。 #147

Closed notinghere closed 2 days ago

notinghere commented 1 month ago

Issue Type

Build/Install

Modules Involved

SPU runtime, PSI

Have you reproduced the bug with SPU HEAD?

Yes

Have you searched existing issues?

Yes

SPU Version

version = "0.9.1.dev$$DATE$$"

OS Platform and Distribution

Linux version 5.4.0-169-generic (buildd@lcy02-amd64-102) (gcc version 9.4.0 (Ubuntu 9.4.0-1ubuntu1~20.04.2)) secretflow/spu#187-Ubuntu SMP Thu Nov 23 14:52:28 UTC 2023

Python Version

python3.8

Compiler Version

gcc 版本 11.2.0 (GCC)

Current Behavior?

问题描述

错误日志

2024-07-11 10:59:21.551 [warning] [channel.h:~Channel:163] Channel destructor is called before WaitLinkTaskFinish, try stop send thread


- bob

----------test_kkrt_2pc------------- rank = 0 rank = 1 I0711 10:58:47.467434 905368 external/com_github_brpc_brpc/src/brpc/server.cpp:1181] Server[yacl::link::transport::internal::ReceiverServiceImpl] is serving on port=20223. W0711 10:58:47.467641 905368 external/com_github_brpc_brpc/src/brpc/server.cpp:1187] Builtin services are disabled according to ServerOptions.has_builtin_services I0711 10:58:47.570077 905377 external/com_github_brpc_brpc/src/brpc/socket.cpp:2506] Checking Socket{id=0 addr=127.0.0.1:20222} (0x278a0c0) I0711 10:58:50.572697 905379 external/com_github_brpc_brpc/src/brpc/socket.cpp:2566] Revived Socket{id=0 addr=127.0.0.1:20222} (0x278a0c0) (Connectable) [2024-07-11 10:58:51.506] [info] [launch.cc:164] LEGACY PSI config: {"psi_type":"ECDH_PSI_2PC","broadcast_result":true,"input_params":{"path":"./data/bob.csv","select_fields":["id","idx"]},"output_params":{"path":"./bob-kkrt.csv","need_sort":true},"curve_type":"CURVE_25519"} [2024-07-11 10:58:51.506] [info] [bucket_psi.cc:400] bucket size set to 1048576 Fatal Python error: Aborted

Current thread 0x00007f4ebf735280 (most recent call first): File "/home/evan/src/test/spu/psi.py", line 69 in bucket_psi File "/home/evan/src/test/spu/bob.py", line 58 in wrap File "/home/evan/miniconda3/lib/python3.11/site-packages/multiprocess/process.py", line 108 in run File "/home/evan/miniconda3/lib/python3.11/site-packages/multiprocess/process.py", line 314 in _bootstrap File "/home/evan/miniconda3/lib/python3.11/site-packages/multiprocess/popen_fork.py", line 71 in _launch File "/home/evan/miniconda3/lib/python3.11/site-packages/multiprocess/popen_fork.py", line 19 in init File "/home/evan/miniconda3/lib/python3.11/site-packages/multiprocess/context.py", line 281 in _Popen File "/home/evan/miniconda3/lib/python3.11/site-packages/multiprocess/context.py", line 224 in _Popen File "/home/evan/miniconda3/lib/python3.11/site-packages/multiprocess/process.py", line 121 in start File "/home/evan/src/test/spu/bob.py", line 79 in run_streaming_psi File "/home/evan/src/test/spu/bob.py", line 112 in test_ecdh_2pc File "/home/evan/src/test/spu/bob.py", line 141 in run_psi File "/home/evan/miniconda3/lib/python3.11/site-packages/absl/app.py", line 254 in _run_main File "/home/evan/miniconda3/lib/python3.11/site-packages/absl/app.py", line 308 in run File "/home/evan/src/test/spu/bob.py", line 147 in

Extension modules: google.protobuf.pyext._message, numpy.core._multiarray_umath, numpy.core._multiarray_tests, numpy.linalg._umath_linalg, numpy.fft._pocketfft_internal, numpy.random._common, numpy.random.bit_generator, numpy.random._bounded_integers, numpy.random._mt19937, numpy.random.mtrand, numpy.random._philox, numpy.random._pcg64, numpy.random._sfc64, numpy.random._generator (total: 14)


## 代码部分

- alice.py

import time import unittest

import multiprocess

from absl import app, flags

import libspu.link as link import libspu.logging as logging import psi as psi from utils import get_free_port, wc_count

from spu.utils.simulation import PropagatingThread

class Test:

def run_streaming_psi(self, wsize, self_rank, link_id, party_ids, addrs, inputs, outputs, selected_fields, protocol):
    time_stamp = time.time()
    lctx_desc = link.Desc()
    lctx_desc.id = link_id
    lctx_desc.recv_timeout_ms = 30*1000

    log_options = logging.LogOptions()
    log_options.log_level = logging.LogLevel.DEBUG
    # log_options.enable_console_logger = False
    log_options.system_log_path = "./alice.log"
    # log_options.trace_log_path = "./alice_trace.log"

    logging.setup_logging(log_options)

    for rank in range(wsize):
        print(f"rank = {rank}")
        lctx_desc.add_party(party_ids[rank], addrs[rank])

    def wrap(rank, selected_fields, input_path, output_path, type):
        lctx = link.create_brpc(lctx_desc, rank)

        config = psi.BucketPsiConfig(
            psi_type=type,
            broadcast_result=True,
            input_params=psi.InputParams(
                path=input_path, select_fields=selected_fields
            ),
            output_params=psi.OutputParams(path=output_path, need_sort=True),
            curve_type=psi.CurveType.CURVE_25519,
        )

        if type == psi.PsiType.DP_PSI_2PC:
            config.dppsi_params.bob_sub_sampling = 0.9
            config.dppsi_params.epsilon = 3

        report = psi.bucket_psi(lctx, config)

        source_count = wc_count(input_path)
        output_count = wc_count(output_path)
        print(
            f"id:{lctx.id()}, psi_type: {type}, original_count: {report.original_count}, intersection_count: {report.intersection_count}, source_count: {source_count}, output_count: {output_count}"
        )

        lctx.stop_link()

    # launch with multiprocess
    job = multiprocess.Process(
            target=wrap,
            args=(
                self_rank,
                selected_fields,
                inputs[self_rank],
                outputs[self_rank],
                protocol,
            ),
        )
    job.start()
    job.join()

def test_kkrt_2pc(self):
    print("----------test_kkrt_2pc-------------")

    wsize = 2
    self_rank = 0
    link_id = "abc"
    inputs = ["./data/alice.csv", "./data/bob.csv"]
    outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
    selected_fields = ["id", "idx"]

    party_ids = ["9999","10000"]
    addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

    self.run_streaming_psi(
        wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.KKRT_PSI_2PC
    )

def test_ecdh_2pc(self):
    print("----------test_ecdh_2pc-------------")

    wsize = 2
    self_rank = 0
    link_id = "abc"
    inputs = ["./data/alice.csv", "./data/bob.csv"]
    outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
    selected_fields = ["id", "idx"]

    party_ids = ["9999","10000"]
    addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

    self.run_streaming_psi(
        wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_2PC
    )

def test_ecdh_2pc(self):
    print("----------test_ecdh_2pc-------------")

    wsize = 2
    self_rank = 0
    link_id = "abc"
    inputs = ["./data/alice.csv", "./data/bob.csv"]
    outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
    selected_fields = ["id", "idx"]

    party_ids = ["9999","10000"]
    addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

    self.run_streaming_psi(
        wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_2PC
    )

def test_ecdh_3pc(self):
    print("----------test_ecdh_3pc-------------")

    wsize = 3
    self_rank = 0
    link_id = "abc"
    inputs = [
        "./data/alice.csv",
        "./data/bob.csv",
        "./data/carol.csv",
    ]
    outputs = ["./alice-ecdh3pc.csv", "./bob-ecdh3pc.csv", "./carol-ecdh3pc.csv"]
    selected_fields = ["id", "idx"]

    party_ids = ["9999","10000","9998"]
    addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}",f"127.0.0.1:{20224}"]
    # addrs = [f"127.0.0.1:{30222}",f"127.0.0.1:{30223}",f"127.0.0.1:{30224}"]

    self.run_streaming_psi(
        wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_3PC
    )

def runpsi(): t= Test()

t.test_dppsi_2pc()

t.test_ecdh_2pc()
# t.test_ecdh_3pc()
# t.test_kkrt_2pc()

if name == 'main': app.run(run_psi)


- bob.py

import time import unittest

import multiprocess

from absl import app, flags

import libspu.link as link import psi as psi from utils import get_free_port, wc_count

from spu.utils.simulation import PropagatingThread

class Test:

def run_streaming_psi(self, wsize, self_rank, link_id,party_ids, addrs, inputs, outputs, selected_fields, protocol):
    time_stamp = time.time()
    lctx_desc = link.Desc()
    lctx_desc.id = link_id
    lctx_desc.recv_timeout_ms = 30*1000

    for rank in range(wsize):
        print(f"rank = {rank}")
        lctx_desc.add_party(party_ids[rank], addrs[rank])

    def wrap(rank, selected_fields, input_path, output_path, type):
        lctx = link.create_brpc(lctx_desc, rank)

        config = psi.BucketPsiConfig(
            psi_type=type,
            broadcast_result=True,
            input_params=psi.InputParams(
                path=input_path, select_fields=selected_fields
            ),
            output_params=psi.OutputParams(path=output_path, need_sort=True),
            curve_type=psi.CurveType.CURVE_25519,
        )

        if type == psi.PsiType.DP_PSI_2PC:
            config.dppsi_params.bob_sub_sampling = 0.9
            config.dppsi_params.epsilon = 3

        report = psi.bucket_psi(lctx, config)

        source_count = wc_count(input_path)
        output_count = wc_count(output_path)
        print(
            f"id:{lctx.id()}, psi_type: {type}, original_count: {report.original_count}, intersection_count: {report.intersection_count}, source_count: {source_count}, output_count: {output_count}"
        )

        lctx.stop_link()

    # launch with multiprocess
    job = multiprocess.Process(
            target=wrap,
            args=(
                self_rank,
                selected_fields,
                inputs[self_rank],
                outputs[self_rank],
                protocol,
            ),
        )
    job.start()
    job.join()

def test_kkrt_2pc(self):
    print("----------test_kkrt_2pc-------------")

    wsize = 2
    self_rank = 1
    link_id = "abc"
    inputs = ["./data/alice.csv", "./data/bob.csv"]
    outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
    selected_fields = ["id", "idx"]

    party_ids = ["9999","10000"]
    addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

    self.run_streaming_psi(
        wsize,self_rank, link_id, party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.KKRT_PSI_2PC
    )

def test_ecdh_2pc(self):
    print("----------test_kkrt_2pc-------------")

    wsize = 2
    self_rank = 1
    link_id = "abc"
    inputs = ["./data/alice.csv", "./data/bob.csv"]
    outputs = ["./alice-kkrt.csv", "./bob-kkrt.csv"]
    selected_fields = ["id", "idx"]

    party_ids = ["9999","10000"]
    addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}"]

    self.run_streaming_psi(
        wsize,self_rank, link_id, party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_2PC
    )
def test_ecdh_3pc(self):
    print("----------test_ecdh_3pc-------------")

    wsize = 3
    self_rank = 1
    link_id = "abc"
    inputs = [
        "./data/alice.csv",
        "./data/bob.csv",
        "./data/carol.csv",
    ]
    outputs = ["./alice-ecdh3pc.csv", "./bob-ecdh3pc.csv", "./carol-ecdh3pc.csv"]
    selected_fields = ["id", "idx"]

    party_ids = ["9999","10000","9998"]
    addrs = [f"127.0.0.1:{20222}",f"127.0.0.1:{20223}",f"127.0.0.1:{20224}"]
    # addrs = [f"127.0.0.1:{30222}",f"127.0.0.1:{30223}",f"127.0.0.1:{30224}"]

    self.run_streaming_psi(
        wsize, self_rank, link_id,party_ids,addrs, inputs, outputs, selected_fields, psi.PsiType.ECDH_PSI_3PC
    )

def runpsi(): t= Test()

t.test_dppsi_2pc()

t.test_ecdh_2pc()
# t.test_ecdh_3pc()
# t.test_kkrt_2pc()

if name == 'main': app.run(run_psi)


### Standalone code to reproduce the issue

```Python
见问题描述

Relevant log output

见问题描述
aokaokd commented 1 month ago

你好,使用python3.10重试下看看

notinghere commented 1 month ago

你好,使用python3.10重试下看看

看错了,使用的是 Python 3.11.4

aokaokd commented 1 month ago

好的,

你好,使用python3.10重试下看看

看错了,使用的是 Python 3.11.4

好的。推荐您使用3.10版本

github-actions[bot] commented 1 week ago

Stale issue message. Please comment to remove stale tag. Otherwise this issue will be closed soon.