Open cookiejoo opened 1 year ago
Hi @cookiejoo
隐语cluster中各节点需要互相通信保证同步。你可以了解一下Rayfed:https://github.com/ray-project/rayfed
谢谢两位,我大概知道原理了。
@ian-huu 你好,我想再问问,是否能实现,A机器可以访问B和C...N,B和C...N之间不互通。 只要从A聚合就好了,因为我看联邦学习就是这种理念,从隐语的示例文档中我一直没理解透,所以再来咨询一下。
用单控制器模式能实现,在A机器执行一份代码,让其他的B和C...N训练数据,再汇聚给A去做聚合这种。
谢谢
生产模式网络通信只取决于算法的设定,对水平联邦来说你这种场景完全是可以的
有示例吗?
有示例吗?
有仿真模式的水平联邦的示例:https://www.secretflow.org.cn/docs/secretflow/zh_CN/tutorial/Federate_Learning_for_Image_Classification.html
你可以用生产模式搭建集群后运行这个教程(但是“数据准备”的部分需要你自己切分后放到各个机器上)
需要注意:
sf.init
时需要按照生产模式的方式你好根据 @cookiejoo 说明的场景,中心服务器和各个客户端节点的网络是通的,但是各个客户端之间是不通的,但是根据生产模式的部署文档说明: 各个客户端节点会互相 ping cluster_config 中声明的其他客户端的地址,实际我们在用生产模式部署3个 ray 集群并用横向联邦测试时确实也看到了相应的日志: 上图的日志是 bob 端的日志,其中 charlie 是中心服务器,alice 是另一个客户端。所以想问题,横向联邦怎么样才能做到各个客户端互相不通信只和中心服务器通信呢?
sf.init
默认会ping其他参与方,你可以在init中添加参数enable_waiting_for_other_parties_ready=False
,参见secretflow.init
sf.init
默认会ping其他参与方,你可以在init中添加参数enable_waiting_for_other_parties_ready=False
,参见secretflow.init
谢谢解答,我们在加了该配置后,确实是看不到 ping 其他客户端的日志了,但是最终的训练还是失败了,下面是 alice 侧的错误信息:
PC: @ 0x7fa558461de2 (unknown) pthread_cond_timedwait@@GLIBC_2.3.2
@ 0x7fa558465630 (unknown) (unknown)
@ 0x7fa4d579a077 368 ray::core::CoreWorkerMemoryStore::GetImpl()
@ 0x7fa4d579a8fc 64 ray::core::CoreWorkerMemoryStore::Get()
@ 0x7fa4d579ab0d 208 ray::core::CoreWorkerMemoryStore::Get()
@ 0x7fa4d5744257 464 ray::core::CoreWorker::Get()
@ 0x7fa4d5623647 240 __pyx_pw_3ray_7_raylet_10CoreWorker_31get_objects()
@ 0x4e9f31 (unknown) method_vectorcall_VARARGS_KEYWORDS
@ 0x70f420 (unknown) (unknown)
[2023-05-06 11:00:00,644 E 26185 26185] logging.cc:361: *** SIGTERM received at time=1683342000 on cpu 22 ***
[2023-05-06 11:00:00,644 E 26185 26185] logging.cc:361: PC: @ 0x7fa558461de2 (unknown) pthread_cond_timedwait@@GLIBC_2.3.2
[2023-05-06 11:00:00,647 E 26185 26185] logging.cc:361: @ 0x7fa558465630 (unknown) (unknown)
[2023-05-06 11:00:00,647 E 26185 26185] logging.cc:361: @ 0x7fa4d579a077 368 ray::core::CoreWorkerMemoryStore::GetImpl()
[2023-05-06 11:00:00,647 E 26185 26185] logging.cc:361: @ 0x7fa4d579a8fc 64 ray::core::CoreWorkerMemoryStore::Get()
[2023-05-06 11:00:00,647 E 26185 26185] logging.cc:361: @ 0x7fa4d579ab0d 208 ray::core::CoreWorkerMemoryStore::Get()
[2023-05-06 11:00:00,647 E 26185 26185] logging.cc:361: @ 0x7fa4d5744257 464 ray::core::CoreWorker::Get()
[2023-05-06 11:00:00,647 E 26185 26185] logging.cc:361: @ 0x7fa4d5623647 240 __pyx_pw_3ray_7_raylet_10CoreWorker_31get_objects()
[2023-05-06 11:00:00,647 E 26185 26185] logging.cc:361: @ 0x4e9f31 (unknown) method_vectorcall_VARARGS_KEYWORDS
[2023-05-06 11:00:00,651 E 26185 26185] logging.cc:361: @ 0x70f420 (unknown) (unknown)
(SendProxyActor pid=26413) 2023-05-06 11:00:00 ERROR barriers.py:240 [alice] -- Failed to send data to seq_id 4 of bob from {upstream_seq_id}, error: <AioRpcError of RPC that terminated with:
(SendProxyActor pid=26413) status = StatusCode.UNAVAILABLE
(SendProxyActor pid=26413) details = "failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection timed out"
(SendProxyActor pid=26413) debug_error_string = "UNKNOWN:Failed to pick subchannel {created_time:"2023-05-06T11:00:00.625740535+08:00", children:[UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection timed out {created_time:"2023-05-06T11:00:00.62572915+08:00", grpc_status:14}]}"
(SendProxyActor pid=26413) >
2023-05-06 11:00:01 INFO cleanup.py:103 [alice] -- Notify check sending thread to exit.
看起来 alice 节点还是会尝试向 bob 节点通信,可以从错误信息看到这句:Failed to send data to seq_id 4 of bob from {upstream_seq_id}
辛苦贴下执行代码与启动Ray节点的命令?
@dutyu 你用的聚合策略是什么,必须是PlainAggregator
如果已经是 PlainAggregator 的话,麻烦给一下代码我们复现一下
辛苦贴下执行代码与启动Ray节点的命令?
charlie (中心服务器):
# ray start --head --node-ip-address="xxx.206" --port="9000" --include-dashboard=False --disable-usage-stats
from secretflow.data.horizontal import read_csv
from secretflow.security.aggregation import PlainAggregator
from secretflow.security.compare import PlainComparator
# from secretflow.utils.simulation.datasets import dataset
# from secretflow.security.aggregation import SecureAggregator
# from secretflow.security.compare import SPUComparator
# from secretflow.utils.simulation.datasets import load_dermatology
from secretflow.ml.boost.homo_boost import SFXgboost
import secretflow as sf
# In case you have a running secretflow runtime already.
sf.shutdown()
cluster_config = {
'parties': {
'charlie': {
# replace with alice's real address.
'address': 'xxx.206:9001',
'listen_addr': '0.0.0.0:9001'
},
'alice': {
# replace with alice's real address.
'address': 'xxx.207:9001',
'listen_addr': '0.0.0.0:9001'
},
'bob': {
# replace with bob's real address.
'address': 'xxx.208:9001',
'listen_addr': '0.0.0.0:9001'
},
},
'self_party': 'charlie'
}
sf.init(address='xxx.206:9000', cluster_config=cluster_config)
# sf.init(['alice', 'bob', 'charlie'], address='local')
alice, bob, charlie = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('charlie')
# aggr = SecureAggregator(charlie, [alice, bob])
# spu = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob']))
# comp = SPUComparator(spu)
# data = load_dermatology(parts=[alice, bob], aggregator=aggr,
# comparator=comp)
data = read_csv({alice: 'alice.csv', bob: 'bob.csv'},
aggregator=PlainAggregator(alice),
comparator=PlainComparator(alice))
data.fillna(value=0, inplace=True)
params = {
# XGBoost parameter tutorial
# https://xgboost.readthedocs.io/en/latest/parameter.html
'max_depth': 4, # max depth
'eta': 0.3, # learning rate
'objective': 'multi:softmax',
# objection function,support "binary:logistic","reg:logistic","multi:softmax","multi:softprob","reg:squarederror"
'min_child_weight': 1, # The minimum value of weight
'lambda': 0.1, # L2 regularization term on weights (xgb's lambda)
'alpha': 0, # L1 regularization term on weights (xgb's alpha)
'max_bin': 10, # Max num of binning
'num_class': 6, # Only required in multi-class classification
'gamma': 0, # Same to min_impurity_split,The minimux gain for a split
'subsample': 1.0, # Subsample rate by rows
'colsample_bytree': 1.0, # Feature selection rate by tree
'colsample_bylevel': 1.0, # Feature selection rate by level
'eval_metric': 'merror', # supported eval metric:
# 1. rmse
# 2. rmsle
# 3. mape
# 4. logloss
# 5. error
# 6. error@t
# 7. merror
# 8. mlogloss
# 9. auc
# 10. aucpr
# Special params in SFXgboost
# Required
'hess_key': 'hess', # Required, Mark hess columns, optionally choosing a column name that is not in the data set
'grad_key': 'grad', # Required,Mark grad columns, optionally choosing a column name that is not in the data set
'label_key': 'class', # Required,ark label columns, optionally choosing a column name that is not in the data set
}
bst = SFXgboost(server=charlie, clients=[alice, bob])
bst.train(data, data, params=params, num_boost_round=6)
bst.save_model({alice: 'demo.model', bob: 'demo.model'})
alice(trainer ):
# ray start --head --node-ip-address="xxx.207" --port="9000" --include-dashboard=False --disable-usage-stats
from secretflow.data.horizontal import read_csv
from secretflow.security.aggregation import PlainAggregator
from secretflow.security.compare import PlainComparator
# from secretflow.utils.simulation.datasets import dataset
# from secretflow.security.aggregation import SecureAggregator
# from secretflow.security.compare import SPUComparator
# from secretflow.utils.simulation.datasets import load_dermatology
from secretflow.ml.boost.homo_boost import SFXgboost
import secretflow as sf
# In case you have a running secretflow runtime already.
sf.shutdown()
cluster_config = {
'parties': {
'charlie': {
# replace with alice's real address.
'address': 'xxx.206:9001',
'listen_addr': '0.0.0.0:9001'
},
'alice': {
# replace with alice's real address.
'address': 'xxx.207:9001',
'listen_addr': '0.0.0.0:9001'
},
'bob': {
# replace with bob's real address.
'address': 'xxx.208:9001',
'listen_addr': '0.0.0.0:9001'
},
},
'self_party': 'alice'
}
sf.init(address='xxx.207:9000', cluster_config=cluster_config, enable_waiting_for_other_parties_ready=False)
# sf.init(['alice', 'bob', 'charlie'], address='local')
alice, bob, charlie = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('charlie')
# aggr = SecureAggregator(charlie, [alice, bob])
# spu = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob']))
# comp = SPUComparator(spu)
# data = load_dermatology(parts=[alice, bob], aggregator=aggr,
# comparator=comp)
data = read_csv({alice: 'alice.csv', bob: 'bob.csv'},
aggregator=PlainAggregator(alice),
comparator=PlainComparator(alice))
data.fillna(value=0, inplace=True)
params = {
# XGBoost parameter tutorial
# https://xgboost.readthedocs.io/en/latest/parameter.html
'max_depth': 4, # max depth
'eta': 0.3, # learning rate
'objective': 'multi:softmax',
# objection function,support "binary:logistic","reg:logistic","multi:softmax","multi:softprob","reg:squarederror"
'min_child_weight': 1, # The minimum value of weight
'lambda': 0.1, # L2 regularization term on weights (xgb's lambda)
'alpha': 0, # L1 regularization term on weights (xgb's alpha)
'max_bin': 10, # Max num of binning
'num_class': 6, # Only required in multi-class classification
'gamma': 0, # Same to min_impurity_split,The minimux gain for a split
'subsample': 1.0, # Subsample rate by rows
'colsample_bytree': 1.0, # Feature selection rate by tree
'colsample_bylevel': 1.0, # Feature selection rate by level
'eval_metric': 'merror', # supported eval metric:
# 1. rmse
# 2. rmsle
# 3. mape
# 4. logloss
# 5. error
# 6. error@t
# 7. merror
# 8. mlogloss
# 9. auc
# 10. aucpr
# Special params in SFXgboost
# Required
'hess_key': 'hess', # Required, Mark hess columns, optionally choosing a column name that is not in the data set
'grad_key': 'grad', # Required,Mark grad columns, optionally choosing a column name that is not in the data set
'label_key': 'class', # Required,ark label columns, optionally choosing a column name that is not in the data set
}
bst = SFXgboost(server=charlie, clients=[alice, bob])
bst.train(data, data, params=params, num_boost_round=6)
bst.save_model({alice: 'demo.model', bob: 'demo.model'})
bob (trainer):
# ray start --head --node-ip-address="xxx.208" --port="9000" --include-dashboard=False --disable-usage-stats
from secretflow.data.horizontal import read_csv
from secretflow.security.aggregation import PlainAggregator
from secretflow.security.compare import PlainComparator
# from secretflow.utils.simulation.datasets import dataset
# from secretflow.security.aggregation import SecureAggregator
# from secretflow.security.compare import SPUComparator
# from secretflow.utils.simulation.datasets import load_dermatology
from secretflow.ml.boost.homo_boost import SFXgboost
import secretflow as sf
# In case you have a running secretflow runtime already.
sf.shutdown()
cluster_config = {
'parties': {
'charlie': {
# replace with alice's real address.
'address': 'xxx.206:9001',
'listen_addr': '0.0.0.0:9001'
},
'alice': {
# replace with alice's real address.
'address': 'xxx.207:9001',
'listen_addr': '0.0.0.0:9001'
},
'bob': {
# replace with bob's real address.
'address': 'xxx.208:9001',
'listen_addr': '0.0.0.0:9001'
},
},
'self_party': 'bob'
}
sf.init(address='xxx.208:9000', cluster_config=cluster_config, enable_waiting_for_other_parties_ready=False)
# sf.init(['alice', 'bob', 'charlie'], address='local')
alice, bob, charlie = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('charlie')
# aggr = SecureAggregator(charlie, [alice, bob])
# spu = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob']))
# comp = SPUComparator(spu)
# data = load_dermatology(parts=[alice, bob], aggregator=aggr,
# comparator=comp)
data = read_csv({alice: 'alice.csv', bob: 'bob.csv'},
aggregator=PlainAggregator(alice),
comparator=PlainComparator(alice))
data.fillna(value=0, inplace=True)
params = {
# XGBoost parameter tutorial
# https://xgboost.readthedocs.io/en/latest/parameter.html
'max_depth': 4, # max depth
'eta': 0.3, # learning rate
'objective': 'multi:softmax',
# objection function,support "binary:logistic","reg:logistic","multi:softmax","multi:softprob","reg:squarederror"
'min_child_weight': 1, # The minimum value of weight
'lambda': 0.1, # L2 regularization term on weights (xgb's lambda)
'alpha': 0, # L1 regularization term on weights (xgb's alpha)
'max_bin': 10, # Max num of binning
'num_class': 6, # Only required in multi-class classification
'gamma': 0, # Same to min_impurity_split,The minimux gain for a split
'subsample': 1.0, # Subsample rate by rows
'colsample_bytree': 1.0, # Feature selection rate by tree
'colsample_bylevel': 1.0, # Feature selection rate by level
'eval_metric': 'merror', # supported eval metric:
# 1. rmse
# 2. rmsle
# 3. mape
# 4. logloss
# 5. error
# 6. error@t
# 7. merror
# 8. mlogloss
# 9. auc
# 10. aucpr
# Special params in SFXgboost
# Required
'hess_key': 'hess', # Required, Mark hess columns, optionally choosing a column name that is not in the data set
'grad_key': 'grad', # Required,Mark grad columns, optionally choosing a column name that is not in the data set
'label_key': 'class', # Required,ark label columns, optionally choosing a column name that is not in the data set
}
bst = SFXgboost(server=charlie, clients=[alice, bob])
bst.train(data, data, params=params, num_boost_round=6)
bst.save_model({alice: 'demo.model', bob: 'demo.model'})
按照你的网络设定,明文聚合或者比较提供方应该是server,即charlie,可以试下改成
PlainAggregator(charlie)
、PlainComparator(charlie)
按照你的网络设定,明文聚合或者比较提供方应该是server,即charlie,可以试下改成
PlainAggregator(charlie)
、PlainComparator(charlie)
你好,已经按照你说的这么改了,但是 alice 节点还是异常退出,错误日志如下:
2023-05-06 11:18:11 WARNING cleanup.py:62 [alice] -- Signal self to exit.
*** SIGTERM received at time=1683343091 on cpu 12 ***
2023-05-06 11:18:11 INFO cleanup.py:66 [alice] -- Check sending thread was exited.
PC: @ 0x7f03ddc92de2 (unknown) pthread_cond_timedwait@@GLIBC_2.3.2
@ 0x7f03ddc96630 1172714944 (unknown)
@ 0x7f03598ee077 368 ray::core::CoreWorkerMemoryStore::GetImpl()
@ 0x7f03598ee8fc 64 ray::core::CoreWorkerMemoryStore::Get()
@ 0x7f03598eeb0d 208 ray::core::CoreWorkerMemoryStore::Get()
(SendProxyActor pid=46674) 2023-05-06 11:18:11 ERROR barriers.py:240 [alice] -- Failed to send data to seq_id 4 of bob from {upstream_seq_id}, error: <AioRpcError of RPC that terminated with:
(SendProxyActor pid=46674) status = StatusCode.UNAVAILABLE
(SendProxyActor pid=46674) details = "failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection timed out"
(SendProxyActor pid=46674) debug_error_string = "UNKNOWN:Failed to pick subchannel {created_time:"2023-05-06T11:18:11.591661177+08:00", children:[UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection timed out {grpc_status:14, created_time:"2023-05-06T11:18:11.591652831+08:00"}]}"
(SendProxyActor pid=46674) >
@ 0x7f0359898257 464 ray::core::CoreWorker::Get()
@ 0x7f0359777647 240 __pyx_pw_3ray_7_raylet_10CoreWorker_31get_objects()
@ 0x4e9f31 (unknown) method_vectorcal
看起来和改之前错误一样
我们测试的时候是使用 iptables 把 bob 和 alice 的节点互相禁止访问了 iptables -I INPUT -s xx.xx.xx.xx -j DROP
read_csv里会对所有输入文件的schema做校验,确保是一致的,这里会导致a b互相发送schema信息。你可以把这段代码单独摘抄出来,只保留前半部分。
def read_csv(
filepath: Dict[PYU, str],
aggregator: Aggregator = None,
comparator: Comparator = None,
**kwargs,
) -> HDataFrame:
"""Read a comma-separated values (csv) file into HDataFrame.
Args:
filepath: a dict {PYU: file path}.
aggregator: optionla; the aggregator assigned to the dataframe.
comparator: optionla; the comparator assigned to the dataframe.
kwargs: all other arguments are same with :py:meth:`pandas.DataFrame.read_csv`.
Returns:
HDataFrame
Examples:
>>> read_csv({PYU('alice'): 'alice.csv', PYU('bob'): 'bob.csv'})
"""
assert filepath, 'File path shall not be empty!'
df = HDataFrame(aggregator=aggregator, comparator=comparator)
for device, path in filepath.items():
df.partitions[device] = Partition(device(read_csv_wrapper)(path, **kwargs))
filepath
多谢,按照你的说明改了,重写了 read_csv
把后面的校验逻辑都删了,但是还是同样的问题,要不你们这边也测试下?
PlainComparator
你好,我们的例子中使用了 PlainComparator ,想咨询下这样会不会导致各个客户端有数据泄漏的风险?以及能大概解释下 sf 内置的安全聚合有 client 之间的通信
这个为什么会有各个 client 之间的通信的原因?
- PlainComparator是不安全的,仅限于测试。
- 目前SF实现的安全聚合需要两两之间协商密钥,所以会有通信。
了解了,多谢解答,还请帮忙解答这两个问题:
2023-05-06 11:18:11 WARNING cleanup.py:62 [alice] -- Signal self to exit.
*** SIGTERM received at time=1683343091 on cpu 12 ***
2023-05-06 11:18:11 INFO cleanup.py:66 [alice] -- Check sending thread was exited.
PC: @ 0x7f03ddc92de2 (unknown) pthread_cond_timedwait@@GLIBC_2.3.2
@ 0x7f03ddc96630 1172714944 (unknown)
@ 0x7f03598ee077 368 ray::core::CoreWorkerMemoryStore::GetImpl()
@ 0x7f03598ee8fc 64 ray::core::CoreWorkerMemoryStore::Get()
@ 0x7f03598eeb0d 208 ray::core::CoreWorkerMemoryStore::Get()
(SendProxyActor pid=46674) 2023-05-06 11:18:11 ERROR barriers.py:240 [alice] -- Failed to send data to seq_id 4 of bob from {upstream_seq_id}, error: <AioRpcError of RPC that terminated with:
(SendProxyActor pid=46674) status = StatusCode.UNAVAILABLE
(SendProxyActor pid=46674) details = "failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection timed out"
(SendProxyActor pid=46674) debug_error_string = "UNKNOWN:Failed to pick subchannel {created_time:"2023-05-06T11:18:11.591661177+08:00", children:[UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection timed out {grpc_status:14, created_time:"2023-05-06T11:18:11.591652831+08:00"}]}"
(SendProxyActor pid=46674) >
@ 0x7f0359898257 464 ray::core::CoreWorker::Get()
@ 0x7f0359777647 240 __pyx_pw_3ray_7_raylet_10CoreWorker_31get_objects()
@ 0x4e9f31 (unknown) method_vectorcal
- PlainComparator是不安全的,仅限于测试。
- 目前SF实现的安全聚合需要两两之间协商密钥,所以会有通信。
了解了,多谢解答,还请帮忙解答这两个问题:
- 因为 SF 实现的安全聚合需要两两之间协商密钥,如果想要在生产环境进行横向联邦,在客户端不通信的情况下一定会有数据泄漏风险?或者说目前 SF 能不能做到在生产环境保证数据安全的情况下,在各个训练方不通信的情况下进行横向联邦?
- 重写了 read_csv 把后面的校验逻辑都删了,alice 端还是会失败,报错信息如下:
2023-05-06 11:18:11 WARNING cleanup.py:62 [alice] -- Signal self to exit. *** SIGTERM received at time=1683343091 on cpu 12 *** 2023-05-06 11:18:11 INFO cleanup.py:66 [alice] -- Check sending thread was exited. PC: @ 0x7f03ddc92de2 (unknown) pthread_cond_timedwait@@GLIBC_2.3.2 @ 0x7f03ddc96630 1172714944 (unknown) @ 0x7f03598ee077 368 ray::core::CoreWorkerMemoryStore::GetImpl() @ 0x7f03598ee8fc 64 ray::core::CoreWorkerMemoryStore::Get() @ 0x7f03598eeb0d 208 ray::core::CoreWorkerMemoryStore::Get() (SendProxyActor pid=46674) 2023-05-06 11:18:11 ERROR barriers.py:240 [alice] -- Failed to send data to seq_id 4 of bob from {upstream_seq_id}, error: <AioRpcError of RPC that terminated with: (SendProxyActor pid=46674) status = StatusCode.UNAVAILABLE (SendProxyActor pid=46674) details = "failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection timed out" (SendProxyActor pid=46674) debug_error_string = "UNKNOWN:Failed to pick subchannel {created_time:"2023-05-06T11:18:11.591661177+08:00", children:[UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: Failed to connect to remote host: Connection timed out {grpc_status:14, created_time:"2023-05-06T11:18:11.591652831+08:00"}]}" (SendProxyActor pid=46674) > @ 0x7f0359898257 464 ray::core::CoreWorker::Get() @ 0x7f0359777647 240 __pyx_pw_3ray_7_raylet_10CoreWorker_31get_objects() @ 0x4e9f31 (unknown) method_vectorcal
Issue Type
Others
Source
binary
Secretflow Version
0.8
OS Platform and Distribution
Linux
Python version
3.10
Bazel version
No response
GCC/Compiler version
No response
What happend and What you expected to happen.
Reproduction code to reproduce the issue.