SuLab / WikidataIntegrator

A Wikidata Python module integrating the MediaWiki API and the Wikidata SPARQL endpoint
MIT License
244 stars 46 forks source link

Edit queue for making many edits at once #9

Closed harej closed 5 years ago

harej commented 7 years ago

I usually improve performance of my bots through multithreading, and usually this works because the spacing between edits in each thread is sufficient to prevent errors. However, with a perfect storm of a lot of edits needing to be made at once, you get really screwy behavior, usually involving failures that usually don't happen (like 'int' object has no attribute 'startswith' or __init__() got an unexpected keyword argument 'value'). I imagine this is a result of the race conditions that occur with poor multithreading implementations.

I think the best long term solution to this is a queue system. Various scripts/threads can plug edits into the queue at whatever rate, and then the edits are executed sequentially, so as to avoid race conditions (or DoSing Wikidata for that matter).

sebotic commented 7 years ago

Well, for me, it works with up to 8 worker threads, w/o any issues. If you use WDI with multithreading, you should pass whole blocks of data to each tread which will then be processed, e.g. 30 items at a time per process. That said, the basic requirement certainly is that all items in one bot run are independent of each other.

e.g. in case of a literature bot, if you create publication items which should also be used in 'cites' statements on other publications at the same bot run, that will most likely fail.

I will post an example how I implemented multithreading later on here.

harej commented 7 years ago

I was passing entire Item Engines at a time, but with multithreading, multiple item engines were being constructed simultaneously, causing all sorts of weirdness. I get around this by loading multiple separate instances of the Wikidata Integrator using imp:

        self.WRITE_THREAD_COUNT = 6
        self.integrator = []
        for n in range(0, self.WRITE_THREAD_COUNT):
            self.integrator.append({})
            self.integrator[n]['parent'] = imp.load_module('integrator' + str(n), *imp.find_module('wikidataintegrator'))
            self.integrator[n]['login']  = importlib.import_module(self.integrator[n]['parent'].__name__ + '.wdi_login').\
                                                WDLogin(user=site_username, pwd=site_password)
            self.integrator[n]['core']   = importlib.import_module(self.integrator[n]['parent'].__name__ + '.wdi_core')

Ghastly? Perhaps. But it works!

sebotic commented 7 years ago

@harej do you still have this issue and could you give a detailed description of what you experience? Thx!

I do the following:

def worker(num, iq, oq):
    for ikey in iter(iq.get, 'STOP'):
        start = time.time()
    # process the 32 items with Wikidataintegrator here as with any other bot run

for start in range(0, len(unii_data), BLOCK_SIZE):
    print('fresh block started')
    cc = 0

    input_queue = Queue()
    output_queue = Queue()
    for num in range(PROCESSES):
        Process(target=worker, args=(num, input_queue, output_queue)).start()

    for full_index, row in  unii_data.iloc[start : start + BLOCK_SIZE, :].iterrows():
        ikey = row['INCHIKEY']
        if ikey in progress and ((progress[ikey][1] != 'failed' or progress[ikey][1] != 'failed, no core_ids') or progress[ikey][1] == 'failed' and skip_failed ) \
                or pd.isnull(ikey):
            c += 1
            continue

        input_queue.put(ikey)
        cc += 1
        c += 1

    print('global count', c)
    print('cc count', cc)
    print('finishing block')
    for x in range(cc):
        print(x, 'collecting...')
        progress.update(output_queue.get())

    with open(progress_file_name, 'w') as outfile:
        json.dump(progress, outfile)

    for x in range(PROCESSES):
        print('stopping', x)
        input_queue.put('STOP')

    print('count:', c, '##' * 20)

    # if c > 4500:
    #     break
harej commented 7 years ago

I might be misreading this but I don't think this addresses thread safety with respect to constructing item engines or saving to Wikidata. In any case, I've solved it for my particular use case. Solidifying multiple concurrent writes as a use case within WikidataIntegrator itself might be more "elegant" than what I do now but it is not strictly necessary. Thank you for checking in!