airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.26k stars 4.15k forks source link

Low-code SDK: CursorPagination removes base path from from url_base #18136

Open delenamalan opened 2 years ago

delenamalan commented 2 years ago

Environment

Current Behavior

When a requester's url_base has an additional path, e.g. https://www.example.com/some/path instead of just https://www.example.com, then the CursorPagination pagination strategy breaks. It removes the additional path part of the base URL when requesting subsequent pages.

For example, when I update the Workable connector's base URL to "https://{{ config['account_subdomain'] }}.workable.com/spi/v3" and the jobs stream's path to "/jobs", then:

  1. Airbyte correctly requests the first page from: "https://test-432879.workable.com/spi/v3/jobs?created_after=20221001T115616Z&limit=1.
  2. Workable returns \"paging\":{\"next\":\"https://test-432879.workable.com/spi/v3/jobs?created_after=20221001T115616Z&limit=1&since_id=2a1018\"}}" in the response.
  3. Airbyte incorrectly requests the second page from: https://test-432879.workable.com/jobs?created_after=20221001T115616Z&limit=1&since_id=2a1018&created_after=20221001T115616Z&limit=1 (spi/v3 base path missing).

Expected Behavior

Airbyte should use the full URL returned by the API to request the next page.

Logs

{"type": "DEBUG", "message": "Making outbound API request", "data": {"headers": "{'User-Agent': 'python-requests/2.28.1', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Authorization': 'Bearer ****'}", "url": "https://test-432879.workable.com/spi/v3/jobs?created_after=20221001T115616Z&limit=1", "request_body": "None"}}
{"type": "DEBUG", "message": "Receiving response", "data": {"status": "200", "headers": "{'Date': 'Tue, 18 Oct 2022 21:27:38 GMT', 'Content-Type': 'application/json; charset=utf-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'x-frame-options': 'SAMEORIGIN', 'x-xss-protection': '1; mode=block', 'x-content-type-options': 'nosniff', 'x-download-options': 'noopen', 'x-permitted-cross-domain-policies': 'none', 'referrer-policy': 'strict-origin-when-cross-origin', 'vary': 'User-Agent', 'x-rate-limit-limit': '10', 'x-rate-limit-remaining': '9', 'x-rate-limit-reset': '1666128468', 'etag': 'W/\"1033f18a72f56fa2849f0700fdf4572a\"', 'cache-control': 'max-age=0, private, must-revalidate', 'x-request-id': '4a563e73-b30c-9fc6-9728-f12d23408005', 'CF-Cache-Status': 'DYNAMIC', 'Strict-Transport-Security': 'max-age=15552000; includeSubDomains', 'X-KB': 'false', 'X-TS': '0', 'Server': 'cloudflare', 'CF-RAY': '75c45beeef7bb3a9-MUC', 'Content-Encoding': 'gzip'}", "body": "{\"jobs\":[{\"id\":\"2a0e68\",\"title\":\"Test job 1\",\"full_title\":\"Test job 1 - Cape Town\",\"shortcode\":\"9D3E25A694\",\"code\":null,\"state\":\"draft\",\"sample\":false,\"department\":null,\"department_hierarchy\":[],\"url\":\"https://test-432879.workable.com/jobs/2754966\",\"application_url\":\"https://test-432879.workable.com/jobs/2754966/candidates/new\",\"shortlink\":\"https://apply.workable.com/j/9D3E25A694\",\"location\":{\"location_str\":\"Cape Town, Western Cape, South Africa\",\"country\":\"South Africa\",\"country_code\":\"ZA\",\"region\":\"Western Cape\",\"region_code\":\"WC\",\"city\":\"Cape Town\",\"zip_code\":null,\"telecommuting\":false},\"created_at\":\"2022-10-11T18:25:19Z\"}],\"paging\":{\"next\":\"https://test-432879.workable.com/spi/v3/jobs?created_after=20221001T115616Z&limit=1&since_id=2a1018\"}}"}}
{"type": "RECORD", "record": {"stream": "jobs", "data": {"jobs": [{"id": "2a0e68", "title": "Test job 1", "full_title": "Test job 1 - Cape Town", "shortcode": "9D3E25A694", "code": null, "state": "draft", "sample": false, "department": null, "department_hierarchy": [], "url": "https://test-432879.workable.com/jobs/2754966", "application_url": "https://test-432879.workable.com/jobs/2754966/candidates/new", "shortlink": "https://apply.workable.com/j/9D3E25A694", "location": {"location_str": "Cape Town, Western Cape, South Africa", "country": "South Africa", "country_code": "ZA", "region": "Western Cape", "region_code": "WC", "city": "Cape Town", "zip_code": null, "telecommuting": false}, "created_at": "2022-10-11T18:25:19Z"}], "paging": {"next": "https://test-432879.workable.com/spi/v3/jobs?created_after=20221001T115616Z&limit=1&since_id=2a1018"}}, "emitted_at": 1666128458252}}
{"type": "DEBUG", "message": "Making outbound API request", "data": {"headers": "{'User-Agent': 'python-requests/2.28.1', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Authorization': 'Bearer ****'}", "url": "https://test-432879.workable.com/jobs?created_after=20221001T115616Z&limit=1&since_id=2a1018&created_after=20221001T115616Z&limit=1", "request_body": "None"}}
{"type": "DEBUG", "message": "Receiving response", "data": {"status": "404", "headers": "{'Date': 'Tue, 18 Oct 2022 21:27:38 GMT', 'Content-Type': 'text/html; charset=utf-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'x-request-id': '963a832d-7715-9a38-9ccc-1e96e4ec5b28', 'CF-Cache-Status': 'DYNAMIC', 'Strict-Transport-Security': 'max-age=15552000; includeSubDomains', 'X-KB': 'false', 'X-TS': '0', 'Server': 'cloudflare', 'CF-RAY': '75c45bf09a33b3a9-MUC', 'Content-Encoding': 'gzip'}", "body": "<!DOCTYPE html>\n<html>\n<head>\n  <meta charSet=\"utf-8\" />\n  <meta httpEquiv=\"Content-Type\" content=\"text/html; charSet=utf-8\" />\n  <meta httpEquiv=\"X-UA-Compatible\" content=\"IE=edge\" />\n  <meta name=\"viewport\" content=\"initial-scale=1.0, width=device-width\" />\n  <title>Error 404 - This page does not exist</title>\n  <link rel=\"stylesheet\" href=\"https://use.typekit.net/isu1vjb.css\" />\n  <style type=\"text/css\">\n    /* Layout */\n    body {\n      background: #fff;\n      color: #636d77;\n      font: 20px proxima-nova, \"Helvetica Neue\", Helvetica, Arial, sans-serif;\n      line-height: 32px;\n      margin: 0;\n      padding: 0;\n    }\n    html,\n    body {\n      height: 100%;\n    }\n\n    .container {\n      display: flex;\n      align-items: center;\n      justify-content: center;\n      height: calc(100vh - 150px);\n      flex-direction: column;\n      text-align: center;\n      padding: 0 20px;\n      max-width: 600px;\n      margin: 0 auto 30px;\n    }\n\n    img {\n      max-width: 100%;\n    }\n\n    .header {\n      padding: 24px 0;\n      text-align: center;\n      margin: 0 0 30px;\n    }\n\n    /* Typography */\n    h1 {\n      font-size: 40px;\n      line-height: 42px;\n      color: #00756a;\n      margin: 30px 0;\n      padding: 0;\n    }\n\n    a {\n      color: #00756a;\n      text-decoration: none;\n    }\n\n    a:hover {\n      text-decoration: underline;\n    }\n\n    p {\n      margin: 0;\n      padding: 0;\n    }\n\n    .btn {\n      background-color: #00756a;\n      border: none;\n      color: #ffffff;\n      text-align: center;\n      font-size: 19px;\n      line-height: 23px;\n      font-weight: 700;\n      border-radius: 8px;\n      min-width: 187px;\n      padding: 20px;\n      transition: all 0.3s ease-out;\n      cursor: pointer;\n      display: inline-block;\n      margin: 30px 0;\n    }\n\n    .btn:hover {\n      text-decoration: none;\n      box-shadow: 0 9px 30.5px 1.5px rgba(0, 0, 0, 0.35);\n    }\n\n    p.disclaimer {\n      font-size: 16px;\n      line-height: 24px;\n      margin-bottom: 10px;\n    }\n\n    /* Responsiveness */\n    @media (max-height: 930px) {\n      .container {\n        display: block;\n        height: auto;\n      }\n    }\n  </style>\n</head>\n\n<body>\n  <div class=\"header\">\n    <a href=\"https://www.workable.com\" title=\"Workable homepage\">\n      <img\n        src=\"https://workablehr-pages.s3.amazonaws.com/images/workable-logo.png\"\n        width=\"177\"\n        height=\"30\"\n        alt=\"Workable logo\"\n      />\n    </a>\n  </div>\n  <div class=\"container\">\n    <img\n      src=\"https://workablehr-pages.s3.amazonaws.com/images/404.jpg\"\n      width=\"631\"\n      height=\"auto\"\n      alt=\"404 error graphic\"\n    />\n    <h1>Hmm, this page doesn\u2019t exist</h1>\n    <p>Sorry, the page you\u2019re looking for isn\u2019t here. This can happen because of typos in the web address or outdated links.</p>\n    <a href=\"https://www.workable.com\" class=\"btn\">Go to homepage</a>\n    <p class=\"disclaimer\">You can also <a href=\"https://twitter.com/Workable\" target=\"_blank\">follow us on Twitter</a> or send us an e-mail at <a href=\"/cdn-cgi/l/email-protection#added8ddddc2dfd9eddac2dfc6cccfc1c883cec2c0\"><span class=\"__cf_email__\" data-cfemail=\"8cfff9fcfce3fef8ccfbe3fee7edeee0e9a2efe3e1\">[email&#160;protected]</span></a> if you think something is terribly wrong.</p>\n  </div>\n<script data-cfasync=\"false\" src=\"/cdn-cgi/scripts/5c5dd728/cloudflare-static/email-decode.min.js\"></script></body>\n</html>\n"}}
{"type": "LOG", "log": {"level": "ERROR", "message": "Encountered an exception while reading stream jobs\nTraceback (most recent call last):\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/requests/models.py\", line 971, in json\n    return complexjson.loads(self.text, **kwargs)\n  File \"/Users/delena.malan/.pyenv/versions/3.9.11/lib/python3.9/json/__init__.py\", line 346, in loads\n    return _default_decoder.decode(s)\n  File \"/Users/delena.malan/.pyenv/versions/3.9.11/lib/python3.9/json/decoder.py\", line 337, in decode\n    obj, end = self.raw_decode(s, idx=_w(s, 0).end())\n  File \"/Users/delena.malan/.pyenv/versions/3.9.11/lib/python3.9/json/decoder.py\", line 355, in raw_decode\n    raise JSONDecodeError(\"Expecting value\", s, err.value) from None\njson.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 113, in read\n    yield from self._read_stream(\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 182, in _read_stream\n    for record in record_iterator:\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 285, in _read_full_refresh\n    for record in records:\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py\", line 115, in read_records\n    for record in self.retriever.read_records(sync_mode, cursor_field, stream_slice, stream_state):\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py\", line 348, in read_records\n    for r in records_generator:\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py\", line 427, in read_records\n    response = self._send_request(request, request_kwargs)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py\", line 345, in _send_request\n    return backoff_handler(user_backoff_handler)(request, request_kwargs)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/backoff/_sync.py\", line 105, in retry\n    ret = target(*args, **kwargs)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/backoff/_sync.py\", line 105, in retry\n    ret = target(*args, **kwargs)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py\", line 300, in _send\n    if self.should_retry(response):\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py\", line 98, in should_retry\n    return self.requester.should_retry(response).action == ResponseAction.RETRY\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/http_requester.py\", line 93, in should_retry\n    return self.error_handler.should_retry(response)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py\", line 133, in should_retry\n    filter_action = response_filter.matches(response)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py\", line 51, in matches\n    or (self._response_matches_predicate(response))\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py\", line 59, in _response_matches_predicate\n    return self.predicate and self.predicate.eval(None, response=response.json(), headers=response.headers)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/requests/models.py\", line 975, in json\n    raise RequestsJSONDecodeError(e.msg, e.doc, e.pos)\nrequests.exceptions.JSONDecodeError: Expecting value: line 1 column 1 (char 0)"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing jobs"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceWorkable runtimes:\nSyncing stream jobs 0:00:00.565659"}}
{"type": "LOG", "log": {"level": "FATAL", "message": "Expecting value: line 1 column 1 (char 0)\nTraceback (most recent call last):\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/requests/models.py\", line 971, in json\n    return complexjson.loads(self.text, **kwargs)\n  File \"/Users/delena.malan/.pyenv/versions/3.9.11/lib/python3.9/json/__init__.py\", line 346, in loads\n    return _default_decoder.decode(s)\n  File \"/Users/delena.malan/.pyenv/versions/3.9.11/lib/python3.9/json/decoder.py\", line 337, in decode\n    obj, end = self.raw_decode(s, idx=_w(s, 0).end())\n  File \"/Users/delena.malan/.pyenv/versions/3.9.11/lib/python3.9/json/decoder.py\", line 355, in raw_decode\n    raise JSONDecodeError(\"Expecting value\", s, err.value) from None\njson.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/main.py\", line 13, in <module>\n    launch(source, sys.argv[1:])\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 123, in launch\n    for message in source_entrypoint.run(parsed_args):\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 114, in run\n    for message in generator:\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 127, in read\n    raise e\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 113, in read\n    yield from self._read_stream(\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 182, in _read_stream\n    for record in record_iterator:\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 285, in _read_full_refresh\n    for record in records:\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py\", line 115, in read_records\n    for record in self.retriever.read_records(sync_mode, cursor_field, stream_slice, stream_state):\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py\", line 348, in read_records\n    for r in records_generator:\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py\", line 427, in read_records\n    response = self._send_request(request, request_kwargs)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py\", line 345, in _send_request\n    return backoff_handler(user_backoff_handler)(request, request_kwargs)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/backoff/_sync.py\", line 105, in retry\n    ret = target(*args, **kwargs)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/backoff/_sync.py\", line 105, in retry\n    ret = target(*args, **kwargs)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py\", line 300, in _send\n    if self.should_retry(response):\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py\", line 98, in should_retry\n    return self.requester.should_retry(response).action == ResponseAction.RETRY\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/http_requester.py\", line 93, in should_retry\n    return self.error_handler.should_retry(response)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py\", line 133, in should_retry\n    filter_action = response_filter.matches(response)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py\", line 51, in matches\n    or (self._response_matches_predicate(response))\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py\", line 59, in _response_matches_predicate\n    return self.predicate and self.predicate.eval(None, response=response.json(), headers=response.headers)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/requests/models.py\", line 975, in json\n    raise RequestsJSONDecodeError(e.msg, e.doc, e.pos)\nrequests.exceptions.JSONDecodeError: Expecting value: line 1 column 1 (char 0)"}}
{"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": 1666128458457.4949, "error": {"message": "Something went wrong in the connector. See the logs for more details.", "internal_message": "Expecting value: line 1 column 1 (char 0)", "stack_trace": "Traceback (most recent call last):\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/requests/models.py\", line 971, in json\n    return complexjson.loads(self.text, **kwargs)\n  File \"/Users/delena.malan/.pyenv/versions/3.9.11/lib/python3.9/json/__init__.py\", line 346, in loads\n    return _default_decoder.decode(s)\n  File \"/Users/delena.malan/.pyenv/versions/3.9.11/lib/python3.9/json/decoder.py\", line 337, in decode\n    obj, end = self.raw_decode(s, idx=_w(s, 0).end())\n  File \"/Users/delena.malan/.pyenv/versions/3.9.11/lib/python3.9/json/decoder.py\", line 355, in raw_decode\n    raise JSONDecodeError(\"Expecting value\", s, err.value) from None\njson.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/main.py\", line 13, in <module>\n    launch(source, sys.argv[1:])\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 123, in launch\n    for message in source_entrypoint.run(parsed_args):\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 114, in run\n    for message in generator:\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 127, in read\n    raise e\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 113, in read\n    yield from self._read_stream(\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 182, in _read_stream\n    for record in record_iterator:\n  File \"/Users/delena.malan/Workspace/airbyte/airbyt
e-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 285, in _read_full_refresh\n    for record in records:\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py\", line 115, in read_records\n    for record in self.retriever.read_records(sync_mode, cursor_field, stream_slice, stream_state):\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py\", line 348, in read_records\n    for r in records_generator:\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py\", line 427, in read_records\n    response = self._send_request(request, request_kwargs)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py\", line 345, in _send_request\n    return backoff_handler(user_backoff_handler)(request, request_kwargs)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/backoff/_sync.py\", line 105, in retry\n    ret = target(*args, **kwargs)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/backoff/_sync.py\", line 105, in retry\n    ret = target(*args, **kwargs)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/streams/http/http.py\", line 300, in _send\n    if self.should_retry(response):\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py\", line 98, in should_retry\n    return self.requester.should_retry(response).action == ResponseAction.RETRY\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/http_requester.py\", line 93, in should_retry\n    return self.error_handler.should_retry(response)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py\", line 133, in should_retry\n    filter_action = response_filter.matches(response)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py\", line 51, in matches\n    or (self._response_matches_predicate(response))\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py\", line 59, in _response_matches_predicate\n    return self.predicate and self.predicate.eval(None, response=response.json(), headers=response.headers)\n  File \"/Users/delena.malan/Workspace/airbyte/airbyte-integrations/connectors/source-workable/.venv/lib/python3.9/site-packages/requests/models.py\", line 975, in json\n    raise RequestsJSONDecodeError(e.msg, e.doc, e.pos)\nrequests.exceptions.JSONDecodeError: Expecting value: line 1 column 1 (char 0)\n", "failure_type": "system_error"}}}

Steps to Reproduce

  1. Checkout the Workable branch.
  2. Replace workable.yaml with the following:
version: "0.1.0"

definitions:
  page_size: 1
  schema_loader:
    type: JsonSchema
    file_path: "./source_sentry/schemas/{{ options.name }}.json"
  selector:
    extractor:
      field_pointer: []
  requester:
    url_base: "https://{{ config['account_subdomain'] }}.workable.com/spi/v3"
    http_method: "GET"
    authenticator:
      type: BearerAuthenticator
      api_token: "{{ config['api_key'] }}"
    request_options_provider:
      request_parameters:
        created_after: "{{ config['start_date'] }}"
  retriever:
    record_selector:
      $ref: "*ref(definitions.selector)"
    requester:
      $ref: "*ref(definitions.requester)"
    paginator:
      type: DefaultPaginator
      url_base: "*ref(definitions.requester.url_base)"
      limit_option:
        inject_into: "request_parameter"
        field_name: ""
      page_token_option:
        inject_into: "path"
      page_size_option:
        inject_into: "request_parameter"
        field_name: "limit"
      pagination_strategy:
        type: "CursorPagination"
        cursor_value: "{{ response.paging.next }}"
        stop_condition: "{{ 'next' not in response['paging'] }}"
        page_size: "*ref(definitions.page_size)"
  base_stream:
    retriever:
      $ref: "*ref(definitions.retriever)"
  jobs_stream:
    $ref: "*ref(definitions.base_stream)"
    $options:
      name: "jobs"
      primary_key: "id"
      path: "/jobs"
      field_pointer: ["jobs"]
  candidates_stream:
    $ref: "*ref(definitions.base_stream)"
    $options:
      name: "candidates"
      primary_key: "id"
      path: "/spi/v3/candidates"
      field_pointer: ["candidates"]
  stages_stream:
    $ref: "*ref(definitions.base_stream)"
    $options:
      name: "stages"
      primary_key: "slug"
      path: "/spi/v3/stages"
      field_pointer: ["stages"]
  recruiters_stream:
    $ref: "*ref(definitions.base_stream)"
    $options:
      name: "recruiters"
      primary_key: "id"
      path: "/spi/v3/recruiters"
      field_pointer: ["recruiters"]

streams:
  - "*ref(definitions.jobs_stream)"

check:
  stream_names:
    - "jobs"
    - "candidates"
    - "stages"
    - "recruiters"
girarda commented 2 years ago

@delenamalan you should be able to work around this issue by moving the spi/v3/ part of the base_url to the streams' path

edfincham commented 1 year ago

@girarda I've just ran into a very similar issue for which your proposed solution is unfortunately not working. I'm using the low-code SDK (airbyte-cdk==0.53.0).

Here's the relevant part of my manifest.yaml:

  cursor_paginator:
    type: DefaultPaginator
    pagination_strategy:
      type: CursorPagination
      cursor_value: "{{ response._links.next }}"
    page_token_option:
      type: "RequestPath"

  base_retriever:
    type: SimpleRetriever
    paginator:
      $ref: "#/definitions/cursor_paginator"
    record_selector:
      $ref: "#/definitions/selector"

  requester:
    type: HttpRequester
    url_base: https://{{ config['domain_name'] }}
    path: "{{ parameters.path }}"
    http_method: GET
    authenticator:
      type: BasicHttpAuthenticator
      username: "{{ config['email'] }}"
      password: "{{ config['api_token'] }}"
    request_body_json: {}
    request_headers: {}

  base_stream:
    type: DeclarativeStream
    schema_loader:
      $ref: "#/definitions/schema_loader"

  search_stream:
    $ref: "#/definitions/base_stream"
    retriever:
      $ref: "#/definitions/base_retriever"
      requester:
        $ref: "#/definitions/requester"
        request_parameters:
          cql: space={{ config['space_id'] }} and type=page
          expand: '["history.lastUpdated"]'
    primary_key: "id"
    $parameters:
      name: "search"
      path: /wiki/rest/api/content/search

In concrete terms, this means that the initial request is posted to:

https://domain.service.net/wiki/rest/api/content/search?cql=space%3DFOO+and+type%3Dpage&expand=history.lastUpdated&after=1699493398'

Calling the self._next_page_token method in the CDK's simple_retriever.py outputs the following next page token:

{'next_page_token': '/rest/api/content/search?next=true&cursor=BAR&expand=history.lastUpdated&limit=25&start=25&after=1699493398&cql=space%3DFOO+and+type%3Dpage'}

However, this then leads to the next request:

https://domain.service.net/rest/api/content/search?next=true&cursor=BAR&expand=history.lastUpdated&limit=25&start=25&after=1699493398&cql=space%3DFOO+and+type%3Dpage&expand=history.lastUpdated

This is missing the leading /wiki in the path, which results in a 404. I've tried shifting the /wiki/rest/api/content between the base_url and the parameters.path but can't get anything to work. Do you have any advice here?

Thanks in advance :slightly_smiling_face:

edfincham commented 1 year ago

Having dug a bit deeper, it looks like the call below is the issue (see source):

url = urljoin(self.get_url_base(), path)

I added a breakpoint to poke around and found this to be the case:

In other words, the first path has no leading / whereas the path returned by the API does. Looks like this is an implementation detail on my end after all :slightly_smiling_face:

Feels a bit janky but the following works:

cursor_value: "{{ response._links.next[1:] }}"
girarda commented 1 year ago

@edfincham thanks for providing details on your issue. we'll prioritize fixing this over the next couple of weeks