planetlabs / planet-client-python

Python client for Planet APIs
https://planet-sdk-for-python-v2.readthedocs.io/en/latest/
Apache License 2.0
274 stars 92 forks source link

Download failure under heavy load #974

Open sgillies opened 1 year ago

sgillies commented 1 year ago

Under high download concurrency, httpcore and httpx errors propagate up from the StreamingBody instance at https://github.com/planetlabs/planet-client-python/blob/main/planet/clients/orders.py#L259. These errors do not manifest at lower concurrency. Streaming responses is a strategy used to keep the memory footprint of programs manageable while downloading multiple large (up to ~100 MB) TIFFs concurrently.

Possible lead: the same kind of asyncio.exceptions.CancelledError is mentioned at https://github.com/agronholm/anyio/issues/534. Which was closed, concluding that callers have to expect read timeouts and work around them.

Possible workaround: separate order creation from order download. Order creation is more reliable and when it does fail, fails differently. It is probably less complicated to retry order downloads if they are de-interleaved from order creation. This project has tended to document order creation and download as tasks that are done together, but that may not be a best practice for large batches of orders.

Traceback 1:

Traceback (most recent call last):
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/anyio/streams/tls.py", line 130, in _call_sslobject_method
    result = func(*args)
             ^^^^^^^^^^^
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/ssl.py", line 921, in read
    v = self._sslobj.read(len)
        ^^^^^^^^^^^^^^^^^^^^^^
ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2576)
orders/e3c6969b-1df7-497b-8fd6-6776de33c557/SkySatCollect/20230626_072301_ssc1_u0001_pansharpened.tif:  53%|███████████████████████████████████████████████████████▋                                                  | 745k/0.00M [11:57<08:25, 1.39MB/s]
During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/backends/asyncio.py", line 33, in read
    return await self._stream.receive(max_bytes=max_bytes)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/anyio/streams/tls.py", line 195, in receive
    data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/anyio/streams/tls.py", line 137, in _call_sslobject_method
    data = await self.transport_stream.receive()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
    await self._protocol.read_event.wait()
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/asyncio/locks.py", line 213, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions
    yield
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/backends/asyncio.py", line 31, in read
    with anyio.fail_after(timeout):
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
    raise TimeoutError
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_transports/default.py", line 239, in __aiter__
    async for part in self._httpcore_stream:
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 346, in __aiter__
    async for part in self._stream:
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 300, in __aiter__
    raise exc
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 293, in __aiter__
    async for chunk in self._connection._receive_response_body(**kwargs):
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 165, in _receive_response_body
    event = await self._receive_event(timeout=timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 177, in _receive_event
    data = await self._network_stream.read(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/backends/asyncio.py", line 30, in read
    with map_exceptions(exc_map):
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions
    raise to_exc(exc)
httpcore.ReadTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/aayush.malik/Desktop/apollo-taskpro/taskpro/cli.py", line 139, in run_task
    await asyncio.gather(*[
  File "/Users/aayush.malik/Desktop/apollo-taskpro/taskpro/cli.py", line 120, in create_and_download
    await client.download_order(order['id'], directory, progress_bar=True, overwrite=True)
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/site-packages/planet/clients/orders.py", line 298, in download_order
    filenames = [
                ^
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/site-packages/planet/clients/orders.py", line 299, in <listcomp>
    await self.download_asset(i['location'],
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/site-packages/planet/clients/orders.py", line 259, in download_asset
    await body.write(dl_path,
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/site-packages/planet/models.py", line 156, in write
    async for chunk in self._response.aiter_bytes():
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/site-packages/planet/models.py", line 72, in aiter_bytes
    async for c in self._http_response.aiter_bytes():
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_models.py", line 914, in aiter_bytes
    async for raw_bytes in self.aiter_raw():
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_models.py", line 972, in aiter_raw
    async for raw_stream_bytes in self.stream:
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_client.py", line 146, in __aiter__
    async for chunk in self._stream:
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_transports/default.py", line 238, in __aiter__
    with map_httpcore_exceptions():
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ReadTimeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/anaconda3/envs/taskpro/bin/taskpro", line 33, in <module>
    sys.exit(load_entry_point('taskpro', 'console_scripts', 'taskpro')())
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
         ^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/click/core.py", line 1657, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/Desktop/apollo-taskpro/taskpro/cli.py", line 144, in activate_and_download_orders
    asyncio.run(run_task())
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/Desktop/apollo-taskpro/taskpro/cli.py", line 125, in run_task
    async with planet.Session(auth=auth) as s:
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/site-packages/planet/http.py", line 290, in __aexit__
    await self.aclose()
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/site-packages/planet/http.py", line 293, in aclose
    await self._client.aclose()
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_client.py", line 1968, in aclose
    await self._transport.aclose()
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_transports/default.py", line 365, in aclose
    await self._pool.aclose()
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 312, in aclose
    raise RuntimeError(
RuntimeError: The connection pool was closed while 27 HTTP requests/responses were still in-flight.

Traceback 2:

Traceback (most recent call last):fa4021537/SkySatCollect/20230626_062154_ssc4_u0001_pansharpened.tif:  64%|████████████████████████████████████████████████████████████████████▋                                      | 741k/0.00M [13:31<14:16, 505kB/s]
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions██████████████████████████████████████████████████████████████████████▊                             | 901k/0.00M [13:49<08:05, 729kB/s]
    yielde3ed90-cc8a-4fa0-966f-0f11fc9ac5a1/SkySatCollect/20230626_061131_ssc13_u0001_pansharpened.tif:  60%|███████████████████████████████████████████████████████████████▏                                          | 690k/0.00M [12:59<13:57, 585kB/s]
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 174, in _receive_event███████████████████████████████████▎                                                            | 626k/0.00M [11:16<19:42, 743kB/s]
    event = self._h11_state.next_event()
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/h11/_connection.py", line 425, in next_event
    event = self._extract_next_receive_event()
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/h11/_connection.py", line 375, in _extract_next_receive_event
    event = self._reader.read_eof()a4021537/SkySatCollect/20230626_062154_ssc4_u0001_pansharpened.tif:  64%|████████████████████████████████████████████████████████████████████▊                                      | 741k/0.00M [13:31<14:44, 489kB/s]
            ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/h11/_readers.py", line 117, in read_eof
    raise RemoteProtocolError(
h11._util.RemoteProtocolError: peer closed connection without sending complete message body (received 775503406 bytes, expected 1459578601)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_transports/default.py", line 239, in __aiter__
    async for part in self._httpcore_stream:
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 346, in __aiter__
    async for part in self._stream:
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 300, in __aiter__
    raise exc
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 293, in __aiter__
    async for chunk in self._connection._receive_response_body(**kwargs):
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 165, in _receive_response_body
    event = await self._receive_event(timeout=timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 173, in _receive_event
    with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions
    raise to_exc(exc)
httpcore.RemoteProtocolError: peer closed connection without sending complete message body (received 775503406 bytes, expected 1459578601)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/aayush.malik/Desktop/apollo-taskpro/taskpro/cli.py", line 139, in run_task
    await asyncio.gather(*[
  File "/Users/aayush.malik/Desktop/apollo-taskpro/taskpro/cli.py", line 120, in create_and_download
    await client.download_order(order['id'], directory, progress_bar=True, overwrite=True)
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/site-packages/planet/clients/orders.py", line 298, in download_order
    filenames = [
                ^
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/site-packages/planet/clients/orders.py", line 299, in <listcomp>
    await self.download_asset(i['location'],
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/site-packages/planet/clients/orders.py", line 259, in download_asset
    await body.write(dl_path,
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/site-packages/planet/models.py", line 156, in write
    async for chunk in self._response.aiter_bytes():
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/site-packages/planet/models.py", line 72, in aiter_bytes
    async for c in self._http_response.aiter_bytes():
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_models.py", line 914, in aiter_bytes
    async for raw_bytes in self.aiter_raw():
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_models.py", line 972, in aiter_raw
    async for raw_stream_bytes in self.stream:
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_client.py", line 146, in __aiter__
    async for chunk in self._stream:
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_transports/default.py", line 238, in __aiter__
    with map_httpcore_exceptions():
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.RemoteProtocolError: peer closed connection without sending complete message body (received 775503406 bytes, expected 1459578601)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/anaconda3/envs/taskpro/bin/taskpro", line 33, in <module>
    sys.exit(load_entry_point('taskpro', 'console_scripts', 'taskpro')())
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
         ^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/click/core.py", line 1657, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/Desktop/apollo-taskpro/taskpro/cli.py", line 144, in activate_and_download_orders
    asyncio.run(run_task())
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/aayush.malik/Desktop/apollo-taskpro/taskpro/cli.py", line 125, in run_task
    async with planet.Session(auth=auth) as s:
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/site-packages/planet/http.py", line 290, in __aexit__
    await self.aclose()
  File "/opt/anaconda3/envs/taskpro/lib/python3.11/site-packages/planet/http.py", line 293, in aclose
    await self._client.aclose()
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_client.py", line 1968, in aclose
    await self._transport.aclose()
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpx/_transports/default.py", line 365, in aclose
    await self._pool.aclose()
  File "/Users/aayush.malik/.local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 312, in aclose
    raise RuntimeError(
RuntimeError: The connection pool was closed while 24 HTTP requests/responses were still in-flight.

cc @aayushmalik

sgillies commented 1 year ago

In a close read of the code it looks like it is only retrying the initial connection https://github.com/planetlabs/planet-client-python/blob/79d9a3cb952fcd4f75e5e935ee067455580c779d/planet/http.py#L411C38-L411C38. If read timeout errors happen at https://github.com/planetlabs/planet-client-python/blob/79d9a3cb952fcd4f75e5e935ee067455580c779d/planet/clients/orders.py#L259 (for example) they may not be retried.

jreiberkyle commented 1 year ago

calling out this very insightful quote for input into the python api docs effort #994:

This project has tended to document order creation and download as tasks that are done together, but that may not be a best practice for large batches of orders.

jreiberkyle commented 1 year ago

script to create many orders for testing, run with python create_orders.py >> oids.txt

create_orders.py

import asyncio
import planet

async def create(count=1):
    item_ids = ['20230719_071823_96_2479']
    requests = [planet.order_request.build_request(
                    name=str(i),
                    products=[
                        planet.order_request.product(item_ids=item_ids,
                                            product_bundle='analytic_udm2',
                                            item_type='PSScene')],
                )
                for i in range(count)]

    async with planet.Session() as s:
        client = s.client('orders')

        orders = await asyncio.gather(*[
            _create_order(client, request)
            for request in requests
        ])

        for o in orders:
            print(o['id'])

async def _create_order(client, order_detail):
    with planet.reporting.StateBar(state='creating') as reporter:
        order = await client.create_order(order_detail)
        reporter.update(state='created', order_id=order['id'])
    return order

asyncio.run(create(count=100))

Interestingly, one run of out of three runs this got the following error:

Traceback (most recent call last):
  File "/Users/jennifer.kyle/.pyenv/versions/planet-client-python-3.8.6/lib/python3.8/site-packages/planet/http.py", line 279, in _raise_for_status
    response.raise_for_status()
  File "/Users/jennifer.kyle/.pyenv/versions/planet-client-python-3.8.6/lib/python3.8/site-packages/httpx/_models.py", line 749, in raise_for_status
    raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Server error '500 Internal Server Error' for url 'https://api.planet.com/compute/ops/orders/v2'
For more information check: https://httpstatuses.com/500

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "create_orders.py", line 35, in <module>
    asyncio.run(create(count=100))
  File "/Users/jennifer.kyle/.pyenv/versions/3.8.6/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/jennifer.kyle/.pyenv/versions/3.8.6/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "create_orders.py", line 19, in create
    orders = await asyncio.gather(*[
  File "create_orders.py", line 30, in _create_order
    order = await client.create_order(order_detail)
  File "/Users/jennifer.kyle/.pyenv/versions/planet-client-python-3.8.6/lib/python3.8/site-packages/planet/clients/orders.py", line 148, in create_order
    response = await self._session.request(method='POST',
  File "/Users/jennifer.kyle/.pyenv/versions/planet-client-python-3.8.6/lib/python3.8/site-packages/planet/http.py", line 387, in request
    http_response = await self._retry(self._send, request, stream=False)
  File "/Users/jennifer.kyle/.pyenv/versions/planet-client-python-3.8.6/lib/python3.8/site-packages/planet/http.py", line 330, in _retry
    raise e
  File "/Users/jennifer.kyle/.pyenv/versions/planet-client-python-3.8.6/lib/python3.8/site-packages/planet/http.py", line 315, in _retry
    resp = await func(*a, **kw)
  File "/Users/jennifer.kyle/.pyenv/versions/planet-client-python-3.8.6/lib/python3.8/site-packages/planet/http.py", line 393, in _send
    http_resp = await self._client.send(request, stream=stream)
  File "/Users/jennifer.kyle/.pyenv/versions/planet-client-python-3.8.6/lib/python3.8/site-packages/httpx/_client.py", line 1617, in send
    response = await self._send_handling_auth(
  File "/Users/jennifer.kyle/.pyenv/versions/planet-client-python-3.8.6/lib/python3.8/site-packages/httpx/_client.py", line 1645, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/Users/jennifer.kyle/.pyenv/versions/planet-client-python-3.8.6/lib/python3.8/site-packages/httpx/_client.py", line 1703, in _send_handling_redirects
    raise exc
  File "/Users/jennifer.kyle/.pyenv/versions/planet-client-python-3.8.6/lib/python3.8/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    await hook(response)
  File "/Users/jennifer.kyle/.pyenv/versions/planet-client-python-3.8.6/lib/python3.8/site-packages/planet/http.py", line 282, in _raise_for_status
    cls._convert_and_raise(e)
  File "/Users/jennifer.kyle/.pyenv/versions/planet-client-python-3.8.6/lib/python3.8/site-packages/planet/http.py", line 95, in _convert_and_raise
    raise error_type(response.text)
planet.exceptions.ServerError: Internal Server Error
jreiberkyle commented 1 year ago

Speaking to the above error, what is needed is

  1. running asyncio.gather with return_exceptions=True so that the whole process doesn't error out if one error is encountered and possibly
  2. adding exceptions.ServerError to http.RETRY_EXCEPTIONS

create_orders.py

import asyncio
import planet

async def create(count=1):
    item_ids = ['20230719_071823_96_2479']
    requests = [planet.order_request.build_request(
                    name=str(i),
                    products=[
                        planet.order_request.product(item_ids=item_ids,
                                            product_bundle='analytic_udm2',
                                            item_type='PSScene')],
                )
                for i in range(count)]

    async with planet.Session() as s:
        client = s.client('orders')

        orders = await asyncio.gather(*[
            _create_order(client, request)
            for request in requests
        ], return_exceptions=True)

async def _create_order(client, order_detail):
    with planet.reporting.StateBar(state='creating') as reporter:
        order = await client.create_order(order_detail)
        reporter.update(state='created', order_id=order['id'])

    print(order['id'])

asyncio.run(create(count=100))
jreiberkyle commented 1 year ago

script to download many orders that were created already and recorded in oids.txt. For logging, use python download_orders.py > log.txt

download_orders.py

import asyncio

import planet

async def download(count=1, directory_path='downloads'):
    async with planet.Session() as s:
        client = s.client('orders')

        with open('oids.txt', 'r') as f:
            order_ids = f.readlines()

        oids = [order_id.strip() for order_id in order_ids[:count]]

        res = await asyncio.gather(*[
            _download_order(client, oid, directory_path)
                    for oid in oids], return_exceptions=True)

        for res in zip(oids, res):
            if issubclass(type(res[1]), Exception):
                print(f'Failed download: {res[0]}')
                print(res[1])
            else:
                print(f'Successful download: {res[0]}')

async def _download_order(client, order_id, directory):
    with planet.reporting.StateBar(state='waiting') as reporter:
        await client.wait(order_id, callback=reporter.update_state, max_attempts=0, delay=7)
        await client.download_order(order_id, directory, progress_bar=True, overwrite=True)

asyncio.run(download(count=100))
aayushmalik commented 1 year ago

This works now. I refactored the code a little bit for my purpose and it looks like this.

def activate_and_download_orders(api_key, input_file):
    auth = Auth.from_key(api_key)
    def activate_order_wrapper(input_file):
        list_of_order_ids = []
        async def create(df):
            list_of_requests = []

            for index, row in df.iterrows():
                temp_date = datetime.strptime(row['fulfilled_date'], "%Y-%m-%d").strftime("%Y%m%d")
                name = f"{temp_date}_SKYSAT_{row['order_name']}"
                item_ids = eval(row['item_id'])
                item_ids = [item for sublist in item_ids for item in sublist]
                list_of_requests.append(planet.order_request.build_request(
                    name=name,
                    products=[ # see if delivery function of order_request be used here to directory download zip file
                    planet.order_request.product(item_ids=item_ids,
                                                product_bundle='pansharpened_udm2',
                                                item_type='SkySatCollect')],
                    delivery=planet.order_request.delivery(
                    archive_type='zip',
                    single_archive=True,
                    archive_filename=f'{name}.zip')))

            async with planet.Session(auth=auth) as s:
                client = s.client('orders')

                orders = await asyncio.gather(*[
                    _create_order(client, request)
                    for request in list_of_requests
                ], return_exceptions=True)

        async def _create_order(client, order_detail):
            with planet.reporting.StateBar(state='creating') as reporter:
                order = await client.create_order(order_detail)
                reporter.update(state='created', order_id=order['id'])

            list_of_order_ids.append(order['id'])

        df = pd.read_csv(input_file)
        asyncio.run(create(df))
        return list_of_order_ids

    list_of_orders_to_be_downloaded = activate_order_wrapper(input_file)

    def download_order_wrapper(list_of_orders_to_be_downloaded):
        directory = "./orders" 

        # Check if the directory exists
        if not os.path.exists(directory):
            # Create the directory
            os.makedirs(directory)
        else:
            pass

        async def download(list_of_orders_to_be_downloaded, directory_path):
            async with planet.Session(auth=auth) as s:
                client = s.client('orders')

                oids = [order_id for order_id in list_of_orders_to_be_downloaded]

                res = await asyncio.gather(*[
                    _download_order(client, oid, directory_path)
                            for oid in oids], return_exceptions=True)

                for res in zip(oids, res):
                    if issubclass(type(res[1]), Exception):
                        print(f'Failed download: {res[0]}')
                        print(res[1])
                    else:
                        print(f'Successful download: {res[0]}')

        async def _download_order(client, order_id, directory):
            with planet.reporting.StateBar(state='waiting') as reporter:
                await client.wait(order_id, callback=reporter.update_state, max_attempts=0, delay=7)
                await client.download_order(order_id, directory, progress_bar=True, overwrite=True)

        asyncio.run(download(list_of_orders_to_be_downloaded, directory))

    download_order_wrapper(list_of_orders_to_be_downloaded)

    # for UNIX systems
    # write the same for Windows Anaconda Prompt
    os.system('mv ./orders/*/*.zip ./orders/')
    # os.system(move /Y .\orders\*\*.zip .\orders\)

This can be even better if the user types two commands: one for activation and one for downloading. I am gonna ask them if they will agree to it.

aayushmalik commented 1 year ago

But it failed for two of the orders still, I am unsure why it's happening. I just got a failed download error.

jreiberkyle commented 1 year ago

Yeah, we still need to add retry to the download. I'm working on that. These scripts are mostly designed to hone in on and trigger the error. Which they are doing spectacularly =) And the idea is that they won't trigger the error when retry is added. Stay tuned!