stac-utils / pgstac

Schema, functions and a python library for storing and accessing STAC collections and items in PostgreSQL
MIT License
146 stars 34 forks source link

load_ndjson with loadopt.upsert throws UniqueViolationError #95

Open sharkinsspatial opened 2 years ago

sharkinsspatial commented 2 years ago

pg-stac version - 0.4.3 pypgstac version - 0.4.5

When using load_ndjson with the method loadopt.upsert https://github.com/NASA-IMPACT/cmr-pgstac-loader/blob/main/lambdas/pgstac_loader/handler.py#L40 records with duplicate ids throw UniqueViolationError. An example full stack trace

[ERROR] UniqueViolationError: duplicate key value violates unique constraint "items_p2022w11_id_pk"
DETAIL:  Key (id)=(HLS.S30.T49TCF.2022073T032539.v2.0) already exists.
Traceback (most recent call last):
  File "/var/task/aws_lambda_powertools/middleware_factory/factory.py", line 133, in wrapper
    response = middleware()
  File "/var/task/aws_lambda_powertools/utilities/data_classes/event_source.py", line 39, in event_source
    return handler(data_class(event), context)
  File "/var/task/handler.py", line 36, in handler
    asyncio.run(
  File "/var/lang/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/var/lang/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/var/task/pypgstac/load.py", line 246, in load_ndjson
    await load_iterator(f, table, conn, method)
  File "/var/task/pypgstac/load.py", line 234, in load_iterator
    await copy_upsert(iter, table, conn)
  File "/var/task/pypgstac/load.py", line 212, in copy_upsert
    await conn.copy_to_table(
  File "/var/task/asyncpg/connection.py", line 897, in copy_to_table
    return await self._copy_in(copy_stmt, source, timeout)
  File "/var/task/asyncpg/connection.py", line 1101, in _copy_in
    return await self._protocol.copy_in(
  File "asyncpg/protocol/protocol.pyx", line 529, in copy_in
    status_msg = await waiter

I can confirm that loadopt.upsert appears to work in pypgstac tests by modifying https://github.com/stac-utils/pgstac/blob/main/test/testdata/items.ndjson to include an item with a duplicate id and running the pypgstac LoadTest.

Inconsistently I am also seeing the following error immediately following

[ERROR] CardinalityViolationError: ON CONFLICT DO UPDATE command cannot affect row a second time
HINT:  Ensure that no rows proposed for insertion within the same command have duplicate constrained values.
Traceback (most recent call last):
  File "/var/task/aws_lambda_powertools/middleware_factory/factory.py", line 133, in wrapper
    response = middleware()
  File "/var/task/aws_lambda_powertools/utilities/data_classes/event_source.py", line 39, in event_source
    return handler(data_class(event), context)
  File "/var/task/handler.py", line 36, in handler
    asyncio.run(
  File "/var/lang/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/var/lang/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/var/task/pypgstac/load.py", line 246, in load_ndjson
    await load_iterator(f, table, conn, method)
  File "/var/task/pypgstac/load.py", line 234, in load_iterator
    await copy_upsert(iter, table, conn)
  File "/var/task/pypgstac/load.py", line 212, in copy_upsert
    await conn.copy_to_table(
  File "/var/task/asyncpg/connection.py", line 897, in copy_to_table
    return await self._copy_in(copy_stmt, source, timeout)
  File "/var/task/asyncpg/connection.py", line 1101, in _copy_in
    return await self._protocol.copy_in(
  File "asyncpg/protocol/protocol.pyx", line 529, in copy_in
    status_msg = await waiter
sharkinsspatial commented 2 years ago

@bitner After some discussion with @edkeeble I've traced this issue to loading duplicate items with different datetime. You can replicate this issue by modifying https://github.com/stac-utils/pgstac/blob/main/test/testdata/items.ndjson to include

{"id": "pgstac-test-item-0003", "bbox": [-85.379245, 30.933949, -85.308201, 31.003555], "type": "Feature", "links": [], "assets": {"image": {"href": "https://naipeuwest.blob.core.windows.net/naip/v002/al/2011/al_100cm_2011/30085/m_3008506_nw_16_1_20110825.tif", "type": "image/tiff; application=geotiff; profile=cloud-optimized", "roles": ["data"], "title": "RGBIR COG tile", "eo:bands": [{"name": "Red", "common_name": "red"}, {"name": "Green", "common_name": "green"}, {"name": "Blue", "common_name": "blue"}, {"name": "NIR", "common_name": "nir", "description": "near-infrared"}]}, "metadata": {"href": "https://naipeuwest.blob.core.windows.net/naip/v002/al/2011/al_fgdc_2011/30085/m_3008506_nw_16_1_20110825.txt", "type": "text/plain", "roles": ["metadata"], "title": "FGDC Metdata"}, "thumbnail": {"href": "https://naipeuwest.blob.core.windows.net/naip/v002/al/2011/al_100cm_2011/30085/m_3008506_nw_16_1_20110825.200.jpg", "type": "image/jpeg", "roles": ["thumbnail"], "title": "Thumbnail"}}, "geometry": {"type": "Polygon", "coordinates": [[[-85.309412, 30.933949], [-85.308201, 31.002658], [-85.378084, 31.003555], [-85.379245, 30.934843], [-85.309412, 30.933949]]]}, "collection": "pgstac-test-collection", "properties": {"gsd": 1, "datetime": "2011-08-25T01:00:00Z", "naip:year": "2011", "proj:bbox": [654842, 3423507, 661516, 3431125], "proj:epsg": 26916, "providers": [{"url": "https://www.fsa.usda.gov/programs-and-services/aerial-photography/imagery-programs/naip-imagery/", "name": "USDA Farm Service Agency", "roles": ["producer", "licensor"]}], "naip:state": "al", "proj:shape": [7618, 6674], "eo:cloud_cover": 28, "proj:transform": [1, 0, 654842, 0, -1, 3431125, 0, 0, 1]}, "stac_version": "1.0.0-beta.2", "stac_extensions": ["eo", "projection"]}

Which is identical to the first item in the file but with it's datetime value incremented by 1 second. Now when running the pypgstac tests you'll receive

Traceback (most recent call last):
  File "/opt/src/pypgstac/tests/test_load.py", line 26, in test_load_testdata_succeeds
    asyncio.run(
  File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/opt/src/pypgstac/pypgstac/load.py", line 246, in load_ndjson
    await load_iterator(f, table, conn, method)
  File "/opt/src/pypgstac/pypgstac/load.py", line 234, in load_iterator
    await copy_upsert(iter, table, conn)
  File "/opt/src/pypgstac/pypgstac/load.py", line 212, in copy_upsert
    await conn.copy_to_table(
  File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 859, in copy_to_table
    return await self._copy_in(copy_stmt, source, timeout)
  File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 1041, in _copy_in
    return await self._protocol.copy_in(
  File "asyncpg/protocol/protocol.pyx", line 506, in copy_in
asyncpg.exceptions.UniqueViolationError: duplicate key value violates unique constraint "items_p2011w34_id_pk"
DETAIL:  Key (id)=(pgstac-test-item-0003) already exists.