ARPA-SIMC / dballe

Fast on-disk database for meteorological observed and forecast data.
Other
19 stars 6 forks source link

python api error "could not serialize access due to concurrent update" in tr.remove_data(rec) inside a transaction #255

Closed pat1 closed 10 months ago

pat1 commented 3 years ago
    with db.transaction() as tr:
        exporter = dballe.Exporter("BUFR")
        with open(tmpdatafile, 'wb') as tmpfile:
            for row in tr.query_messages(rec):
                tmpfile.write(exporter.to_binary(row.message))

        if opts.explorer is not None:
            with dballe.Explorer(opts.explorer) as explorer:
                with explorer.update() as updater:
                    importer = dballe.Importer("BUFR")
                    with importer.from_file(tmpdatafile) as message:
                        updater.add_messages(message)

        returncode = subprocess.call (("arki-scan", "--dispatch="+opts.arkiconf , "bufr:"+tmpdatafile,  "--summary", "--dump", "--status"),stderr=stderr,stdout=stdout)

        logging.info("End to migrate data to arkimet")
    logging.info("Return status: %s" % (returncode))

        if (returncode==0 or returncode==2):
            logging.info("Start to delete data from dballe")
            tr.remove_data(rec)
            logging.info("End to delete data from dballe")
        else:
            tr.rollback()
            logging.error("Error migrate data from dballe to arkimet")
            logging.warning("Do not delete data from dballe")

--dsn=postgresql:

Traceback (most recent call last):
  File "/usr/bin/dballe2arkimet", line 203, in main
    dballe2arkimet(date)
  File "/usr/bin/dballe2arkimet", line 161, in dballe2arkimet
    tr.remove_data(rec)
OSError: executing DELETE FROM data WHERE id IN (SELECT d.id FROM station s JOIN data d ON s.id = d.id_station JOIN levtr ltr ON ltr.id = d.id_levtr WHERE d.datetime<='2021-04-27 23:59:59'): ERROR:  could not serialize access due to concurrent update

INFO:__main__:end
Traceback (most recent call last):
  File "/usr/bin/dballe2arkimet", line 219, in <module>
    sys.exit(main())
  File "/usr/bin/dballe2arkimet", line 203, in main
    dballe2arkimet(date)
  File "/usr/bin/dballe2arkimet", line 161, in dballe2arkimet
    tr.remove_data(rec)
OSError: executing DELETE FROM data WHERE id IN (SELECT d.id FROM station s JOIN data d ON s.id = d.id_station JOIN levtr ltr ON ltr.id = d.id_levtr WHERE d.datetime<='2021-04-27 23:59:59'): ERROR:  could not serialize access due to concurrent update
spanezz commented 3 years ago

Can you help me reproduce thjis?

I tried with this script, which uses data from dballe's test suite, and I don't get the error:

#!/usr/bin/python3
import dballe

# db = dballe.DB.connect("sqlite:test.sqlite")
db = dballe.DB.connect("postgresql:///…")
db.reset()

importer = dballe.Importer("BUFR")
with db.transaction() as tr:    
    with dballe.File("extra/bufr/synop-sunshine.bufr", "BUFR") as f:
        tr.import_messages(importer.from_file(f))                            

rec = {}                                                   
with db.transaction() as tr:                                    
    exporter = dballe.Exporter("BUFR")                          
    with open("/tmp/issue255.bufr", 'wb') as tmpfile:           
        for row in tr.query_messages(rec):
            tmpfile.write(exporter.to_binary(row.message))      

    with dballe.Explorer("/tmp/test-explorer.json") as explorer:         
        with explorer.update() as updater:                      
            importer = dballe.Importer("BUFR")                  
            with importer.from_file("/tmp/issue255.bufr") as message:
                updater.add_messages(message)                   

    print("Start to delete data from dballe")
    tr.remove_data(rec)
    print("End to delete data from dballe")
pat1 commented 1 year ago

I have more process that access the DB

in the process that delete:

INFO:__main__:end
Traceback (most recent call last):
  File "/usr/bin/dballe2arkimet", line 219, in <module>
    sys.exit(main())
  File "/usr/bin/dballe2arkimet", line 203, in main
    dballe2arkimet(date)
  File "/usr/bin/dballe2arkimet", line 161, in dballe2arkimet
    tr.remove_data(rec)
OSError: executing DELETE FROM data WHERE id IN (SELECT d.id FROM station s JOIN data d ON s.id = d.id_station JOIN levtr ltr ON ltr.id = d.id_levtr WHERE d.datetime<='2023-10-11 23:59:59'): ERROR:  could not serialize access due to concurrent update

Do you have tried with this two process ?

spanezz commented 1 year ago

I assume you mean that your example has to be run twice as two concurrent processes?

In this case this seems like a case of postgresql detecting a conflict between two writes that it cannot resolve.

The scripts do all their work inside a big transaction, for efficiency. This means postgresql will find a big blob of changes, and two different big blobs of changes will probabily conflict (that is, postgres cannot reorder them one after the other safely) and you get this error.

An option is, instead of using one big transaction, create lots of smaller transactions, like one per BUFR: that way it may be that smaller changes are more serializable, but you'll still have to retry the occasional conflict.

Given the database layout dballe uses it is nontrivial to do big concurrent updates: if this is an important requirement we might need to do some redesign either of the database or of the transaction isolation model.

I would suggest to serialize one layer above dballe, updating your tooling so that it either loads data or cleans data, with a lock inbetween: it's much easier to do above the database layer, at a layer who understands messages

pat1 commented 1 year ago

the user case is reported in first post: extract from dballe, import in arkimet, remove data from dballe. At the same time I have an other process that load data in dballe. It works but some times (when I load data with reference time in deteted period ?) I get the OSError Yes the first transaction in my code take some time (extract, archive in arkimet and delete). In my case is better if the import process fail (I have a queue with ack and retry and transaction are very short), not the export and delete that run one time a day and easy go to a fast escalation in time execution with period bigger than one day.

pat1 commented 10 months ago

DBall.e non gestisce scritture concorrenti. L'implementazione dei lock deve essere fatta esternamente ad esempio tramite: https://pypi.org/project/fasteners/