nucypher / nufhe

NuCypher fully homomorphic encryption (NuFHE) library implemented in Python
https://nufhe.readthedocs.io/en/latest/
GNU General Public License v3.0
441 stars 53 forks source link

GPU concatenation raises an error after computation #23

Closed alheliou closed 4 years ago

alheliou commented 4 years ago

Hello,

We are trying to use the concatenation of two ciphertext on a GPU device, but we are encountering different issues. We have tried different approaches :

  1. An implementation that is similar to https://github.com/nucypher/nufhe/blob/master/examples/multi_gpu.py, but the concatenation call raises an error when the Thread ends "PyCUDA ERROR: The context stack was not empty upon module cleanup." Run python3 code.py 2 1 to reproduce it
  2. We try the same implementation but without the Thread encapsulation. This time the concatenation seems to run fine, but the decryption then raised an error "pycuda._driver.LogicError: cuLaunchKernel failed: invalid resource handle", preventing us to use the result Run python3 code.py 3 1 to reproduce it
  3. We try a method similar to the one used in the pytest : https://github.com/nucypher/nufhe/blob/master/test/test_lwe.py It run fine when we do not output the result Run python3 code.py 5 to check it
  4. We try a method similar to the one of point 3, but outputting the result. Again the decryption raises an error. Run python3 code.py 7 1 to reproduce it We tried to save the result into a file, and then load it. It works but only when the loading is done on another call, if it is done just after the saving it fails: Run python3 code.py 77 to reproduce it (it will save the result and load it for decryption right after). While, python3 code.py 7 followed by python3 code.py 11 does not raise errors.

Another problem that we observe is that some tests that seems to run fine, cannot be done consecutively. For example python3 code.py 10 python3 code.py 14 python3 code.py 15 etc .. While python3 code.py 2 python3 code.py 3 python3 code.py 5 python3 code.py 7 Do not raise errors.

If you have ideas of what I'm doing wrong, or an example of a code performing a concatenation on GPU, and output the result. It would be of great help.

Many thanks

Here is the source of 'code.py' I'm referring to:

from threading import Thread
from queue import Queue

import random
import nufhe
import sys

from nufhe.api_low_level import  NuFHECloudKey, NuFHESecretKey
from nufhe.lwe import LweSampleArray, concatenate

from reikna.cluda import cuda_api, ocl_api, get_api, supported_api_ids, find_devices

class MyThread:
    """
    A simple wrapper that allows one to receive the value
    returned from the thread worker transparently.
    """

    def __init__(self, target, args=()):
        self.return_queue = Queue()
        self.target = target
        self.thread = Thread(target=self._target_wrapper, args=args)

    def _target_wrapper(self, *args):
        res = self.target(*args)
        self.return_queue.put(res)

    def start(self):
        self.thread.start()
        return self

    def join(self):
        ret_val = self.return_queue.get()
        self.thread.join()
        return ret_val

def concate(device_id, cloud_key_cpu, ciphertext1_cpu, ciphertext2_cpu, get_error):
    """
    The thread concat function.
    Runs a concatenation over two provided ciphertexts and returns the serialized result.
    """

    print("Running a thread with", device_id)

    ctx = nufhe.Context(device_id=device_id)

    cloud_key = ctx.load_cloud_key(cloud_key_cpu)
    ciphertext1 = ctx.load_ciphertext(ciphertext1_cpu)
    ciphertext2 = ctx.load_ciphertext(ciphertext2_cpu)
    if get_error == 1 :
        result = nufhe.lwe.concatenate([ciphertext1, ciphertext2], axis=0, out=None)
    else :
        result = ciphertext1
    result_serialize = result.dumps()

    print("Done")
    return result_serialize

def mythread():
    api_id = "cuda"
    api = get_api(api_id)
    devices = find_devices(api)
    platform = api.get_platforms()[0]
    device = platform.get_devices()[0]
    thread = api.Thread(device)
    return thread

def lwe_concatenate(thread, key_pair_s, size=32,axis=0, out=None):
    print(thread)
    # Load secret and cloud keys form serialized couple
    secret_key_s, cloud_key_s = key_pair_s
    secret_key = NuFHESecretKey.loads(secret_key_s, thread)
    cloud_key = NuFHECloudKey.loads(cloud_key_s, thread)
    rng = nufhe.DeterministicRNG()
    # Encrypt random bit vectors
    bits1 = [random.choice([False, True]) for i in range(size//2)]
    bits2 = [random.choice([False, True]) for i in range(size//2)]
    ciphertexts = [nufhe.encrypt(thread, rng, secret_key, bits1), nufhe.encrypt(thread, rng, secret_key, bits2)]
    # Concatenate the vectors
    out = nufhe.lwe.concatenate(ciphertexts, axis=axis, out=out)
    # Decrypt
    r = nufhe.decrypt(thread, secret_key, out)
    assert r.tolist() == bits1 + bits2
    print("Done")

if __name__ == '__main__':

    bits = [False, False, True, True, True, True, False, True, False, False, False, False, False, True, False, False, True, False, True, True, False, True, True, True, True, False, True, False, False, False, False, False]
    size = len(bits)
    print(bits)

    ctx = nufhe.Context()
    secret_key, cloud_key = ctx.make_key_pair()
    ciphertext = ctx.encrypt(secret_key, bits)

    # Serialize the cloud key to pass it to child threads.
    ck = cloud_key.dumps()

    # Split ciphertexts into two parts each and serialize them.
    ct_part1 = ciphertext[:size//2].dumps()
    ct_part2 = ciphertext[size//2:].dumps()

    # Start threads each applying concatenation of the ciphertext.
    devices = nufhe.find_devices()

    test = 2
    get_error = 0
    if len(sys.argv) > 1 :
        test = int(sys.argv[1])
    if len(sys.argv) > 2 :
       get_error = int(sys.argv[2])

    # The low level thread is started at the beginning
    # As starting two threads on the same device raises an error.
    # Exception ignored in: <bound method Thread.__del__ of <reikna.cluda.cuda.Thread object at 0x7fdf7beddd68>>
    # Traceback (most recent call last):
    # File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/cuda.py", line 246, in __del__
    # File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/api.py", line 566, in release
    # File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/cuda.py", line 241, in _release_specific
    # pycuda._driver.LogicError: context::pop failed: invalid device context - cannot pop non-current context

    if (test%5 ==0 or test%7==0):
        thr = mythread()

    if (test%2 == 0):
            print("test = 0[2]")
            # Try the concatenation inside a Thread
            # Similar implementation as in https://github.com/nucypher/nufhe/blob/master/examples/multi_gpu.py
            # But here the concatenation line induces an error "PyCUDA ERROR: The context stack was not empty upon module cleanup." that is raised when the Thread ends.
            t = MyThread(target=concate, args=(devices[0], ck, ct_part1, ct_part2, get_error)).start()
            print("Thread started")
            result = t.join()
            print("join done")
            result = ctx.load_ciphertext(result)
            print("result loaded")
            r = ctx.decrypt(secret_key, result)
            print("result decrypted")
            #assert r.tolist() == bits

    if (test%3==0):
            print("test = 0[3]")
            # Try the concatenation directly, the concatenation does not raise an error
            # But the decryption raises an error "pycuda._driver.LogicError: cuLaunchKernel failed: invalid resource handle"
            result = concate(devices[0], ck, ct_part1, ct_part2, 1)
            print("concatenation done")
            result = ctx.load_ciphertext(result)
            print("result loaded")
            if (get_error == 1):
                r = ctx.decrypt(secret_key, result)
                print("result decrypted")
                assert r.tolist() == bits

    if (test%5==0):
            print("test = 0[5]")
            # The concatenation is done without input and ouput.
            # Using a thread and a method similar to the one used in the pytest
            # https://github.com/nucypher/nufhe/blob/master/test/test_lwe.py
            lwe_concatenate(thr, (secret_key.dumps(), cloud_key.dumps()), size, 0, None)

    if (test%7==0):
            print("test = 0[7]")
            # Same method as above but with input and output
            # Similarly as in test=0[3], the decryption raises an error
            # But we can write the encrypted result into a file and then read it
            ciphertext1 = LweSampleArray.loads(ct_part1, thr)
            ciphertext2 = LweSampleArray.loads(ct_part2, thr)
            ciphertexts = [ciphertext1, ciphertext2]
            result = nufhe.lwe.concatenate(ciphertexts, axis=0, out=None)
            print("concatenation done")
            if (get_error == 1):
                r = ctx.decrypt(secret_key, result)
                print("result decrypted")
                assert r.tolist() == bits
            with open("result","wb") as f :
                result.dump(file_obj=f)
                f.close()
                secret_key.dump(file_obj=open("secret_key","wb"))
                print("Result and Secret_Key saved into files")

    if (test%11==0):
            print("test = 0[11]")
            # Read the saved file. 
            # The decryption raises an error if it is realised just after writing the file.
            # pycuda._driver.LogicError: cuLaunchKernel failed: invalid resource handle
            with open("result","rb") as f :
                secret_key_loaded = ctx.load_secret_key(open("secret_key","rb"))
                result = ctx.load_ciphertext(f)
                r = ctx.decrypt(secret_key_loaded, result)
                print(r.tolist())
                f.close()   
                assert r.tolist() == bits

    # The thread is clear at the end
    if (test%5 ==0 or test%7==0):
        nufhe.clear_computation_cache(thr)

    print("The end")
fjarri commented 4 years ago

Thank you for reporting this! The problem here is that concatenate() creates a Reikna's Computation object internally, and uses it to move the memory around. These computations are cached in the Thread object, and since they retain references to this object, it creates a reference loop. This results in Thread not being deleted when concate() finishes, and CUDA complains about resources not being released properly.

As an immediate workaround, you can force cache clear by calling ctx.thread._computation_cache.clear() at the end of concate(), all your tests seem to be running without errors after that. I'm working on a fix for that which will hopefully be ready tomorrow.

alheliou commented 4 years ago

Thanks !! It works :)

fjarri commented 4 years ago

Ok, this should be fixed in Reikna v0.7.5 now (the version is bumped in NuFHE in commit 638e12e931fe39c0a5cd5ee5d271d40084e29b31). Could you please check that it works for you?

alheliou commented 4 years ago

Yes it works many thanks !

I've tried to run the test in loop, sometimes it goes well but sometimes it fails at some point with the following error. I don't know if it is related to your libraries or to my device.

ERROR:root:Failed to compile:
1:
2:
3:
4:    #define CUDA
5:    // taken from pycuda._cluda
6:    #define LOCAL_BARRIER __syncthreads()
7:
8:    #define WITHIN_KERNEL __device__
9:    #define KERNEL extern "C" __global__
10:    #define GLOBAL_MEM /* empty */
11:    #define GLOBAL_MEM_ARG /* empty */
12:    #define LOCAL_MEM __shared__
13:    #define LOCAL_MEM_DYNAMIC extern __shared__
14:    #define LOCAL_MEM_ARG /* empty */
15:    #define CONSTANT_MEM __constant__
16:    #define CONSTANT_MEM_ARG /* empty */
17:    #define INLINE __forceinline__
18:    #define SIZE_T int
19:    #define VSIZE_T int
20:
21:    // used to align fields in structures
22:    #define ALIGN(bytes) __align__(bytes)
23:
24:    
25:
26:    WITHIN_KERNEL SIZE_T get_local_id(unsigned int dim)
27:    {
28:        if(dim == 0) return threadIdx.x;
29:        if(dim == 1) return threadIdx.y;
30:        if(dim == 2) return threadIdx.z;
31:        return 0;
32:    }
33:
34:    WITHIN_KERNEL SIZE_T get_group_id(unsigned int dim)
35:    {
36:        if(dim == 0) return blockIdx.x;
37:        if(dim == 1) return blockIdx.y;
38:        if(dim == 2) return blockIdx.z;
39:        return 0;
40:    }
41:
42:    WITHIN_KERNEL SIZE_T get_local_size(unsigned int dim)
43:    {
44:        if(dim == 0) return blockDim.x;
45:        if(dim == 1) return blockDim.y;
46:        if(dim == 2) return blockDim.z;
47:        return 1;
48:    }
49:
50:    WITHIN_KERNEL SIZE_T get_num_groups(unsigned int dim)
51:    {
52:        if(dim == 0) return gridDim.x;
53:        if(dim == 1) return gridDim.y;
54:        if(dim == 2) return gridDim.z;
55:        return 1;
56:    }
57:
58:    WITHIN_KERNEL SIZE_T get_global_size(unsigned int dim)
59:    {
60:        return get_num_groups(dim) * get_local_size(dim);
61:    }
62:
63:    WITHIN_KERNEL SIZE_T get_global_id(unsigned int dim)
64:    {
65:        return get_local_id(dim) + get_group_id(dim) * get_local_size(dim);
66:    }
67:
68:
69:
70:
71:    #define COMPLEX_CTR(T) make_##T
72:
73:    WITHIN_KERNEL float2 operator+(float2 a, float2 b)
74:    {
75:        return COMPLEX_CTR(float2)(a.x + b.x, a.y + b.y);
76:    }
77:    WITHIN_KERNEL float2 operator-(float2 a, float2 b)
78:    {
79:        return COMPLEX_CTR(float2)(a.x - b.x, a.y - b.y);
80:    }
81:    WITHIN_KERNEL float2 operator+(float2 a) { return a; }
82:    WITHIN_KERNEL float2 operator-(float2 a) { return COMPLEX_CTR(float2)(-a.x, -a.y); }
83:    WITHIN_KERNEL double2 operator+(double2 a, double2 b)
84:    {
85:        return COMPLEX_CTR(double2)(a.x + b.x, a.y + b.y);
86:    }
87:    WITHIN_KERNEL double2 operator-(double2 a, double2 b)
88:    {
89:        return COMPLEX_CTR(double2)(a.x - b.x, a.y - b.y);
90:    }
91:    WITHIN_KERNEL double2 operator+(double2 a) { return a; }
92:    WITHIN_KERNEL double2 operator-(double2 a) { return COMPLEX_CTR(double2)(-a.x, -a.y); }
93:
94:WITHIN_KERNEL VSIZE_T virtual_local_id(unsigned int dim)
95:{
96:    if (dim == 0)
97:    {
98:
99:        SIZE_T flat_id =
100:            get_local_id(0) * 1 +
101:            0;
102:
103:        return (flat_id / 1);
104:
105:    }
106:
107:    return 0;
108:}
109:
110:WITHIN_KERNEL VSIZE_T virtual_local_size(unsigned int dim)
111:{
112:    if (dim == 0)
113:    {
114:        return 16;
115:    }
116:
117:    return 1;
118:}
119:
120:WITHIN_KERNEL VSIZE_T virtual_group_id(unsigned int dim)
121:{
122:    if (dim == 0)
123:    {
124:
125:        return 0;
126:
127:    }
128:
129:    return 0;
130:}
131:
132:WITHIN_KERNEL VSIZE_T virtual_num_groups(unsigned int dim)
133:{
134:    if (dim == 0)
135:    {
136:        return 1;
137:    }
138:
139:    return 1;
140:}
141:
142:WITHIN_KERNEL VSIZE_T virtual_global_id(unsigned int dim)
143:{
144:    return virtual_local_id(dim) + virtual_group_id(dim) * virtual_local_size(dim);
145:}
146:
147:WITHIN_KERNEL VSIZE_T virtual_global_size(unsigned int dim)
148:{
149:    if(dim == 0)
150:    {
151:        return 16;
152:    }
153:
154:    return 1;
155:}
156:
157:WITHIN_KERNEL VSIZE_T virtual_global_flat_id()
158:{
159:    return
160:        virtual_global_id(0) * 1 +
161:        0;
162:}
163:
164:WITHIN_KERNEL VSIZE_T virtual_global_flat_size()
165:{
166:    return
167:        virtual_global_size(0) *
168:        1;
169:}
170:
171:
172:WITHIN_KERNEL bool virtual_skip_local_threads()
173:{
174:
175:    return false;
176:}
177:
178:WITHIN_KERNEL bool virtual_skip_groups()
179:{
180:
181:    return false;
182:}
183:
184:WITHIN_KERNEL bool virtual_skip_global_threads()
185:{
186:
187:    return false;
188:}
189:
190:
191:
192:#ifndef CUDA
193:#define MARK_VIRTUAL_FUNCTIONS_AS_USED (void)(virtual_num_groups(0)); (void)(virtual_global_flat_id()); (void)(virtual_global_flat_size())
194:#else
195:#define MARK_VIRTUAL_FUNCTIONS_AS_USED
196:#endif
197:
198:#define VIRTUAL_SKIP_THREADS MARK_VIRTUAL_FUNCTIONS_AS_USED; if(virtual_skip_local_threads() || virtual_skip_groups() || virtual_skip_global_threads()) return
199:
200:
201:    // leaf output macro for "output"
202:    #define _module1_(_idx0, _val) _leaf_output[(_idx0) * (1) + (0)] = (_val)
203:    
204:
205:
206:
207:
208:    // output for a transformation for "output"
209:    #define _module0_(_val) _module1_(_idx0, _val)
210:    
211:
212:
213:
214:WITHIN_KERNEL int _module4_(int x)
215:{
216:
217:    return (int)x;
218:}
219:
220:
221:
222:
223:    // leaf input macro for "src_input"
224:    #define _module6_(_idx0) (_leaf_src_input[(_idx0) * (1) + (0)])
225:    
226:
227:
228:
229:
230:    // input for a transformation for "src_input"
231:    #define _module5_ _module6_(_idx0)
232:    
233:
234:
235:
236:
237:    // input transformation node for "input"
238:    
239:    INLINE WITHIN_KERNEL int _module3_func(
240:        GLOBAL_MEM int *_leaf_src_input,
241:VSIZE_T _idx0)
242:    {
243:        int _val;
244:
245:        
246:_val =(_module4_(_module5_));
247:
248:
249:        return _val;
250:    }
251:    
252:    #define _module3_(_idx0) _module3_func(        _leaf_src_input, _idx0)
253:    
254:
255:
256:
257:
258:    // input for a transformation for "input"
259:    #define _module2_ _module3_(_idx0)
260:    
261:
262:
263:
264:
265:            
266:KERNEL void kernel_pure_parallel(GLOBAL_MEM int *_leaf_output, GLOBAL_MEM int *_leaf_src_input)
267:
268:            {
269:                VIRTUAL_SKIP_THREADS;
270:
271:                VSIZE_T _idx0 = virtual_global_id(0);
272:
273:                
274:_module0_(_module2_);
275:
276:            }
277:            
278:
Exception in thread Thread-14:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-5-81ae7ff4fb4f>", line 13, in _target_wrapper
    res = self.target(*args)
  File "<ipython-input-5-81ae7ff4fb4f>", line 39, in concate
    result = nufhe.lwe.concatenate([ciphertext1, ciphertext2], axis=0, out=None)
  File "/usr/local/lib/python3.6/dist-packages/nufhe/lwe.py", line 440, in concatenate
    reikna.concatenate(lwes_b, axis=axis),
  File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/array_helpers.py", line 122, in concatenate
    out[tuple(slices)] = array
  File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/cuda.py", line 128, in __setitem__
    setitem_method(self, index, value)
  File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/array_helpers.py", line 59, in setitem_method
    setitem_computation, Type.from_value(view), Type.from_value(value), is_array)
  File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/api.py", line 334, in get_cached_computation
    compiled_comp = comp.compile(ThreadWeakRef(self))
  File "/usr/local/lib/python3.6/dist-packages/reikna/core/computation.py", line 207, in compile
    self._tr_tree, translator, thread, fast_math, compiler_options, keep).finalize()
  File "/usr/local/lib/python3.6/dist-packages/reikna/core/computation.py", line 192, in _get_plan
    return self._build_plan(plan_factory, thread.device_params, *args)
  File "/usr/local/lib/python3.6/dist-packages/reikna/algorithms/pureparallel.py", line 123, in _build_plan
    snippet=self._snippet))
  File "/usr/local/lib/python3.6/dist-packages/reikna/core/computation.py", line 473, in kernel_call
    keep=self._keep)
  File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/api.py", line 179, in compile_static
    return self.thread_cls.compile_static(self, *args, **kwds)
  File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/api.py", line 566, in compile_static
    constant_arrays=constant_arrays, keep=keep)
  File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/api.py", line 786, in __init__
    constant_arrays=constant_arrays, keep=keep)
  File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/api.py", line 655, in __init__
    self.source, fast_math=fast_math, compiler_options=compiler_options, keep=keep)
  File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/api.py", line 504, in _create_program
    src, fast_math=fast_math, compiler_options=compiler_options, keep=keep)
  File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/cuda.py", line 227, in _compile
    return SourceModule(src, no_extern_c=True, options=options, keep=keep)
  File "/usr/local/lib/python3.6/dist-packages/pycuda/compiler.py", line 294, in __init__
    self.module = module_from_buffer(cubin)
pycuda._driver.LogicError: cuModuleLoadDataEx failed: invalid device context - 

Exception ignored in: <bound method Thread.__del__ of <reikna.cluda.cuda.Thread object at 0x7efe16ea7dd8>>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/cuda.py", line 247, in __del__
    self.release()
  File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/api.py", line 597, in release
    self._release_specific()
  File "/usr/local/lib/python3.6/dist-packages/reikna/cluda/cuda.py", line 242, in _release_specific
    cuda.Context.pop()
pycuda._driver.LogicError: context::pop failed: invalid device context - cannot pop non-current context
fjarri commented 4 years ago

Hm, it's hard to say what the reason is. You are running it on one videocard, right?

The problem with CUDA is that its API is stateful: it maintains a context stack, and whatever context is on top of the stack is considered active. So sometimes there are problems when some Python object is not released in time, and a CUDA object belonging to the wrong context is used. Although in your case it seems that you are using one context per thread, which shouldn't cause problems. Is it possible for you to give me the exact code that reproduces this error?

alheliou commented 4 years ago

Hi, I'm running the test on google Colab: https://colab.research.google.com/drive/11fOVaMsssVQMESjC6DkV8rLxlia8tyIv I was not able to reproduce the error of yesterday, I got another one from time to time today. So I guess it is due to the Colab ressources and not your libraries. Thanks a lot

fjarri commented 4 years ago

I cannot open the link - it tells me I don't have permissions to view the notebook.

Just as an experiment you can try putting import gc; gc.collect() in _target_wrapper() after the call to target() - this will help if there are some circular references preventing the context object from being finalized. Although if there's some external object holding this reference, it will not do much.

alheliou commented 4 years ago

Sorry for the delay. I have open the permissions, it should be available for reading to anyone. I've added the 'gc.collect' line, and I've not received the error message :) Many thanks