druid-io / pydruid

A Python connector for Druid
Other
509 stars 200 forks source link

Support for custom payload and headers #154

Open Makesh-Gmak opened 5 years ago

Makesh-Gmak commented 5 years ago

Currently, we are having REST API Layer (RAL) on top of Druid and our app( SQLAlchemy based) sends query through RAL.

RAL is expecting,

  1. HTTP request JSON payload with additional params like info, maxRowCount ,etc (Refer below)
  2. Additional HTTP headers like Session ID, Auth Tokens, etc.
  3. HTTP JSON response is also having additional params like code, elapse ,etc (Refer below)

Sample HTTP request: [Actual query to be executed is under sql param]

{
    "info": {
        "offering": "ABC",
        "tenant": "tenant_1"
    },
    "maxRowCount": 100,
    "outputFormat": "JSON",
    "queryId": "9b0ff3ae-ac6e-4f60-ae9c-46b8e511f454",
    "queryType": "druid",
    "request": "executeQuery",
    "sql": "SELECT * FROM table_name"
}

Sample HTTP response: [Actual result received is under result param]

{
    "code": 0,
    "elapse": 25,
    "msg": "success",
    "queryId": "9b0ff3ae-ac6e-4f60-ae9c-46b8e511f454",
    "result": [
        {
            "name" : "John",
            "age" : 32
        }
    ]
}

Currently, I am updating Cursor::_stream_query() from db/api.py function as per my requirement. Is there any way (like callbacks, etc) that we can send our custom payload and headers to PyDruid without such code-customization ?

Updated Code:

def _stream_query(self, query):

        self.description = None

        headers = {'Content-Type': 'application/json',
                   'x-functions-key': 'eyJ0eXAiOiJKV1QiLCJhbGciOiABC123ABC123', 
                   'session_id': '1234567'}

        payload = {
            "info": {
                "offering": "ABC_offering",
                "tenant": "tenant_1"
            },
            "maxRowCount": 100,
            "outputFormat": "JSON",
            "queryId": str(uuid.uuid4()),
            "queryType": "druid",
            "request": "executeQuery",
            "sql": query,
         "header" : "true"
        }

    # Streaming = False, RAL won't stream response
        r = requests.post(self.url, stream=False, headers=headers, json=payload)

        if r.encoding is None:
            r.encoding = 'utf-8'

        # raise any error messages
        if r.status_code != 200:
            payload = r.json()
            msg = (
                '{error} ({errorClass}): {errorMessage}'.format(**payload)
            )
            raise exceptions.ProgrammingError(msg)

        data = json.loads(r.text, object_pairs_hook=OrderedDict)

        Row = None
        # Note: Reading directly from param "result"
        for row in data["result"]:
            # update description
            if self.description is None:
                self.description = get_description_from_row(row)
                # header is true, Skip the first row
                continue

            # return row in namedtuple
            if Row is None:
                Row = namedtuple('Row', row.keys(), rename=True)

            yield Row(*row.values())