apify / apify-client-python

Apify API client for Python
https://docs.apify.com/api/client/python/
Apache License 2.0
47 stars 12 forks source link

Trying to run multiple async tasks results in an error #187

Closed knyghtryda closed 6 months ago

knyghtryda commented 6 months ago

I am trying to parallelize a scraping task by breaking up long jobs into smaller chunks and running multiple tasks simultaneously. Doing this manually through the Apify UI works fine. However, when I to automate it via a python sdk call I'm getting an error. I am currently using ApifyClientAsync to parallelize everything. This is able to kick off up to 6 jobs before it errors out.

Sample Code:
async def start_tasks():
    for l in split_list:
        task_input = {
           "input1":l
        }
        run = await client.task("my-task").start(task_input=task_input)
Error:
File ~/anaconda3/lib/python3.11/site-packages/apify_client/_logging.py:61, in _injects_client_details_to_log_context.<locals>.async_wrapper(resource_client, *args, **kwargs)
     58 log_context.client_method.set(fun.__qualname__)
     59 log_context.resource_id.set(resource_client.resource_id)
---> 61 return await fun(resource_client, *args, **kwargs)

File ~/anaconda3/lib/python3.11/site-packages/apify_client/clients/resource_clients/task.py:400, in TaskClientAsync.start(self, task_input, build, max_items, memory_mbytes, timeout_secs, wait_for_finish, webhooks)
    364 """Start the task and immediately return the Run object.
    365 
    366 https://docs.apify.com/api/v2#/reference/actor-tasks/run-collection/run-task
   (...)
    389     dict: The run object
    390 """
    391 request_params = self._params(
    392     build=build,
    393     maxItems=max_items,
   (...)
    397     webhooks=encode_webhook_list_to_base64(webhooks) if webhooks is not None else None,
    398 )
--> 400 response = await self.http_client.call(
    401     url=self._url('runs'),
    402     method='POST',
    403     headers={'content-type': 'application/json; charset=utf-8'},
    404     json=task_input,
    405     params=request_params,
    406 )
    408 return parse_date_fields(pluck_data(response.json()))

File ~/anaconda3/lib/python3.11/site-packages/apify_client/_http_client.py:221, in HTTPClientAsync.call(self, method, url, headers, params, data, json, stream, parse_response)
    218 if stream and parse_response:
    219     raise ValueError('Cannot stream response and parse it at the same time!')
--> 221 headers, params, content = self._prepare_request_call(headers, params, data, json)
    223 httpx_async_client = self.httpx_async_client
    225 async def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response:

File ~/anaconda3/lib/python3.11/site-packages/apify_client/_http_client.py:114, in _BaseHTTPClient._prepare_request_call(self, headers, params, data, json)
    112 # dump JSON data to string, so they can be gzipped
    113 if json:
--> 114     data = jsonlib.dumps(json, ensure_ascii=False, allow_nan=False, default=str).encode('utf-8')
    115     headers['Content-Type'] = 'application/json'
    117 if isinstance(data, (str, bytes, bytearray)):

File ~/anaconda3/lib/python3.11/json/__init__.py:238, in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    232 if cls is None:
    233     cls = JSONEncoder
    234 return cls(
    235     skipkeys=skipkeys, ensure_ascii=ensure_ascii,
    236     check_circular=check_circular, allow_nan=allow_nan, indent=indent,
    237     separators=separators, default=default, sort_keys=sort_keys,
--> 238     **kw).encode(obj)

File ~/anaconda3/lib/python3.11/json/encoder.py:200, in JSONEncoder.encode(self, o)
    196         return encode_basestring(o)
    197 # This doesn't pass the iterator directly to ''.join() because the
    198 # exceptions aren't as detailed.  The list call should be roughly
    199 # equivalent to the PySequence_Fast that ''.join() would do.
--> 200 chunks = self.iterencode(o, _one_shot=True)
    201 if not isinstance(chunks, (list, tuple)):
    202     chunks = list(chunks)

File ~/anaconda3/lib/python3.11/json/encoder.py:258, in JSONEncoder.iterencode(self, o, _one_shot)
    253 else:
    254     _iterencode = _make_iterencode(
    255         markers, self.default, _encoder, self.indent, floatstr,
    256         self.key_separator, self.item_separator, self.sort_keys,
    257         self.skipkeys, _one_shot)
--> 258 return _iterencode(o, 0)

ValueError: Out of range float values are not JSON compliant
vdusek commented 6 months ago

Hi @knyghtryda,

I was not able to replicate the issue. I tried it with my own task and the following code executing 10 task runs at once:

import asyncio

from apify_client import ApifyClientAsync

TOKEN = 'my_token'
TASK_ID = 'my_task_id'

async def main() -> None:
    apify_client = ApifyClientAsync(TOKEN)
    task_client = apify_client.task(TASK_ID)

    for i in range(10):
        task_run = await task_client.start()
        print(f'Run {i}: {task_run}')

if __name__ == '__main__':
    asyncio.run(main())

and everything works fine for me.

So, could you please provide a full replicable code sample?

vdusek commented 6 months ago

Closing, if it is still a problem, please let us know and give us more details.