tenable / pyTenable

Python Library for interfacing into Tenable's platform APIs
https://pytenable.readthedocs.io
MIT License
343 stars 172 forks source link

ExportsIterator Class Incorrect Results #661

Closed smcglincy closed 2 months ago

smcglincy commented 1 year ago

Describe the bug PyTenable provide the 'ExportsIterator' class to multithread iterator. The bug is that the iterator is returning too early and when compared to the single thread iterator, the results do not match when run at the same time. The bellow code sample is for 1 day but the results differ the greatly for data set is.

I believe the bug is there are no locks, semaphores, on resources such as 'chunks' and 'uuid' within the 'ExportsIterator' class.

lock = threading.Lock()
with lock:
    Do Stuff HERE

To Reproduce This is a cleaned up code sample that I'm running. In production, we use 'last_updated:int=30' as the default value. The multi threading code is using a thread safe queue for threads to write to.

import pandas as pd
from tenable.io import TenableIO
from queue import Queue
from datetime import  timedelta, datetime, timezone
import time

def get_queue_results(q:Queue)->list:
    results_list = []
    while not q.empty():
        results_list.append(q.get())
    return results_list

def tenableAssets(accessKey:str, secretKey:str, q:Queue=None, chunk_size:int=1000,last_updated:int=1,licensed:bool=True)->pd:
    results_list = []
    updated_at=int((datetime.now(timezone.utc) - timedelta(days=last_updated)).timestamp())

    tio = TenableIO(accessKey, secretKey)
    for asset in  tio.exports.assets(chunk_size=chunk_size, updated_at=updated_at):
        results_list.append(asset)

    dfTenable = pd.DataFrame(results_list)
    print(dfTenable.shape)
    return dfTenable

def getTenableAssetsMultiThread(accessKey:str, secretKey:str,chunk_size:int=1000,last_updated:int=1,licensed:bool=True)->pd:
    def threads_get_assets(data, **kwargs):
        results = []
        for item in data:
            results.append(item)
        q.put(pd.DataFrame(results))

    q = Queue()
    updated_at=int((datetime.now(timezone.utc) - timedelta(days=last_updated)).timestamp())

    tio = TenableIO(accessKey, secretKey)
    asset =  tio.exports.assets(chunk_size=chunk_size, updated_at=updated_at, is_licensed=licensed)
    asset.run_threaded(threads_get_assets, {"q":q}, num_threads=4)

    while len(asset.chunks) > 0 and asset.status != "Finished":
        print("Waiting to finish: Chunks {t.chunks} Status {t.status}")
        time.sleep(1)

    dfTenable = pd.concat(get_queue_results(q))
    print(f"Threadsafe Queue {dfTenable.shape}")
    return dfTenable

Expected behavior The single thread and multi thread functions should return the same number when run at the same time or back to back.

'last_updated=1' multi thread: 72457 items single thread: 76250 items

'last_updated=30' multi thread: 93467 items single thread: 108466 items Screenshots

Screenshots System Information (please complete the following information):

Additional context

smcglincy commented 1 year ago

I must have been tired when I wrote this because I didn't release pytenable returned a futures object. I retried the code polling for all futures to be done and aggregated the results. The results still did not match.

In addition, not sure its something you guys want to do but it's pretty cool so I'll leave the note. If you want to use the 'map' instead of 'submit' function which will return a list of results, you can separate out the arguments and iterator using 'partials' https://stackoverflow.com/questions/5442910/how-to-use-multiprocessing-pool-map-with-multiple-arguments#5443941

Then you can pass your args, kwargs independent of the iterator.

SteveMcGrath commented 1 year ago

I had already noticed some other issues with how the threading was working and corrected that in #658. I'll have to look into this issue after the new year.

SteveMcGrath commented 1 year ago

The following code:

from tenable.io import TenableIO
import arrow

age = int(arrow.now().shift(days=-30).timestamp())
tio = TenableIO(access_key='xxx',
                secret_key='xxx'
                )
asset_filters = {
    'chunk_size': 1000,
    'updated_at': age,
    'is_licensed': True
}

sac = 0
ssec = None
tac = 0
tsec = None

# Single Threaded Locking Loop
single_start = arrow.now()
for asset in tio.exports.assets(**asset_filters):
    sac += 1
ssec = (arrow.now() - single_start).seconds

# Multi-threaded Executor using a Callback
def count_assets(data, **kwargs):
    items = 0
    for item in data:
        items += 1
    return items

threaded_start = arrow.now()
assets = tio.exports.assets(**asset_filters)
for job in assets.run_threaded(count_assets, num_threads=None):
    tac += job.result()
tsec = (arrow.now() - threaded_start).seconds

# Results
print(f'Blocking Loop: {sac} assets in {ssec}\n'
      f'Threaded Loop: {tac} assets in {tsec}'
      )

Yeilded these results:

❯ python export_test.py
Blocking Loop: 120129 assets in 161
Threaded Loop: 120129 assets in 111