deephaven / deephaven-core

Deephaven Community Core
Other
254 stars 80 forks source link

Python: Add Optional `ObjectMapper` Argument to Kafka Consumer `json_spec` #4178

Open nbauernfeind opened 1 year ago

nbauernfeind commented 1 year ago

In #4164 I've added the ability to pass in a custom ObjectMapper for Kafka consumption to use when deserializing from json. We need some form of plumbing in Python. I'm suspecting a change to json_spec that allows us to pass in the Java object. We can probably stick to JPY to construct the object mapper.

This is the code snippet I've given a community user:

from typing import Dict, Tuple, List

import jpy

from deephaven import dtypes
from deephaven.jcompat import j_hashmap
from deephaven.dherror import DHError
from deephaven.dtypes import DType
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import KeyValueSpec

_JObjectMapper = jpy.get_type("com.fasterxml.jackson.databind.ObjectMapper")
_JJsonNodeFactory = jpy.get_type("com.fasterxml.jackson.databind.node.JsonNodeFactory")
_JDeserializationFeature = jpy.get_type("com.fasterxml.jackson.databind.DeserializationFeature")
_JJsonReadFeature = jpy.get_type("com.fasterxml.jackson.core.json.JsonReadFeature")
_JKafkaTools_Consume = jpy.get_type("io.deephaven.kafka.KafkaTools$Consume")

# Create an ObjectMapper configured similarly to KafkaTool's default
mapper = _JObjectMapper()
mapper.setNodeFactory(_JJsonNodeFactory.withExactBigDecimals(True))
mapper.configure(_JDeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, True)

# Enable that NaN parsing feature
mapper.enable(_JJsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS.mappedFeature())

# Next release we'll likely add the objectMapper parameter to the existing `kc.json_spec` - for now here is a modified version
def custom_json_spec(col_defs: List[Tuple[str, DType]], mapping: Dict = None) -> KeyValueSpec:
    """Creates a spec for how to use JSON data when consuming a Kafka stream to a Deephaven table.

    Args:
        col_defs (List[Tuple[str, DType]]):  a list of tuples specifying names and types for columns to be
            created on the resulting Deephaven table.  Tuples contain two elements, a string for column name
            and a Deephaven type for column data type.
        mapping (Dict):  a dict mapping JSON fields to column names defined in the col_defs
            argument.  Fields starting with a '/' character are interpreted as a JSON Pointer (see RFC 6901,
            ISSN: 2070-1721 for details, essentially nested fields are represented like "/parent/nested").
            Fields not starting with a '/' character are interpreted as toplevel field names.
            If the mapping argument is not present or None, a 1:1 mapping between JSON fields and Deephaven
           table column names is assumed.

    Returns:
        a KeyValueSpec

    Raises:
        DHError
    """
    try:
        col_defs = [c.j_column_definition for c in kc._build_column_definitions(col_defs)]
        if mapping is None:
            return KeyValueSpec(j_spec=_JKafkaTools_Consume.jsonSpec(col_defs))
        mapping = j_hashmap(mapping)
        ## note this is where we differ from the existing DH implementation by adding the mapper
        return KeyValueSpec(j_spec=_JKafkaTools_Consume.jsonSpec(col_defs, mapping, mapper))
    except Exception as e:
        raise DHError(e, "failed to create a Kafka key/value spec") from e

# here is the modified json kafka consumer from our documentation
from deephaven import kafka_consumer as kc
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht

quotes = kc.consume({ 'bootstrap.servers' : '<IP_address>:9092' },
                    'quotes',
                    key_spec=KeyValueSpec.IGNORE,
                    ## note this is where we differ from the example:
                    value_spec=custom_json_spec([('Sym', dht.string),
                                ('AskSize',  dht.int_),
                                ('AskPrice',  dht.double),
                                ('BidSize',  dht.int_),
                                ('BidPrice',  dht.double),
                                ('AskExchange', dht.string),
                                ('BidExchange', dht.string),
                                ('AskTime', dht.long),
                                ('BidTime', dht.long)]),
                    table_type=TableType.append())
jmao-denver commented 1 year ago

Jianfeng Regarding https://github.com/deephaven/deephaven-core/issues/4178, is it expected that the user who needs to use the feature should have the knowledge of jackson and JPY?

nate We will add an specifically this example of constructing an object mapper -- but I believe it is not our job to wrap jackson core details to make it easier for our users. Those API's might change out of our control and it will be frustrating. Also, do we really want to keep an eye out for new ser/deser features? I'm thinking probably not. I think Ryan is on board with that thinking too.

Also, the JPY for just the object mapper is pretty easy. I wouldn't be bothered if you want to open a quick chat Ryan + Chip + You + Me. I'm worried about DH signing for some API scope creep that we have no control over.

Jianfeng Thanks! Yeah, we have tried very hard to hide JPY from Python users, but we certainly don’t want to be too dogmatic about it. Still, looking at the code I am wondering if we can offer some predefined mappers (e.g. for ALLOW_NON_NUMERIC_NUMBERS, ...) to make the lives of 99% of the users a lot easier?

nate I'm ok with that but then we should add all of these: https://javadoc.io/static/com.fasterxml.jackson.core/jackson-core/2.15.2/com/fasterxml/jackson/core/json/JsonReadFeature.html And we should write a test that fails if we upgrade jackson and it has a new feature or an old feature is gone.

Jianfeng Is there a way to find the change history of this Enum just to get a sense of how often the features are deprecated and added?

nate

This is the source: https://github.com/FasterXML/jackson-core/blob/2.16/src/main/java/com/fasterxml/jackson/core/json/JsonReadFeature.java This is the history: https://github.com/FasterXML/jackson-core/commits/2.16/src/main/java/com/fasterxml/jackson/core/json/JsonReadFeature.java It looks like 3 new features in the past 1.5yrs but otherwise pretty stable. JsonReadFeature.java https://github.com/[FasterXML/jackson-core](https://github.com/FasterXML/jackson-core)|FasterXML/jackson-coreFasterXML/jackson-core | Added by GitHub

Jianfeng There seems to be a wholesale change at 2.10 https://javadoc.io/static/com.fasterxml.jackson.core/jackson-core/2.15.2/deprecated-list.html#enum.constant Ah, we also need to worry about the DeserializationFeature or maybe even SerializationFeature ? (edited)

nate We currently do not use jackson to serialize json back onto Kafka at this time. That is maybe a good example where wrapping pieces of jackson feels awkward.

jmao-denver commented 1 year ago

@rcaudy @chipkent I have captured the conversation @nbauernfeind and I had over how we should approach the change. It doesn't seem be a good way to make it Pythonic or user friendly. What are your thoughts?

chipkent commented 1 year ago

I am not familiar with all of the Kafka code, so take my comments with a grain of salt.

We wouldn't need to support all of the enum values because there are valueOf methods for the relevant classes. This would allow python to work in terms of strings. https://javadoc.io/static/com.fasterxml.jackson.core/jackson-core/2.15.2/com/fasterxml/jackson/core/json/JsonReadFeature.html#valueOf-java.lang.String-

Using the strings, there could be a configuration like: mapper={"ALLOW_NON_NUMERIC_NUMBERS":True, "USE_BIG_DECIMAL_FOR_FLOATS":False} which calls ObjectMapper.configure(a,b). This would give a pure python interface and would hide jpy stuff.

This gets to the part that is less obvious to me. I'm not sure how the jackson XML parse plays into the bigger picture. Will it always be with us? Is it an option? Does it have to work with com.fasterxml.jackson.databind.ObjectMapper? Do other XML backends use their own object mapper?

Given my uncertainty, would something like this work:

quotes = kc.consume({ 'bootstrap.servers' : '<IP_address>:9092' },
                    'quotes',
                    key_spec=KeyValueSpec.IGNORE,
                    ## note this is where we differ from the example:
                    value_spec=custom_json_spec([('Sym', dht.string),
                                ('AskSize',  dht.int_),
                                ('AskPrice',  dht.double),
                                ('BidSize',  dht.int_),
                                ('BidPrice',  dht.double),
                                ('AskExchange', dht.string),
                                ('BidExchange', dht.string),
                                ('AskTime', dht.long),
                                ('BidTime', dht.long)], 
                               mapper="com.fasterxml.jackson.databind.ObjectMapper", 
                               mapping={"ALLOW_NON_NUMERIC_NUMBERS":True, "USE_BIG_DECIMAL_FOR_FLOATS":False}),
                    table_type=TableType.append())
rcaudy commented 1 year ago

So, one could make a good case that we're leaking internal API details by exposing the ObjectMapper to the API. On the other hand, at this time the Jackson JSON parser is by far the most adopted; if we were to move away from that, we would need to deprecate API methods that exposed ObjectMapper.

Configure is interesting; it would certainly let us stringify the interface, although without giving us any long-term API safety, since we're still exposing Jackson API details.

The alternative approach is to wrap all the options we want to permit; this is vastly more onerous for the implementor (us) with limited payoff. The other major downside here is that we have to do work for any feature the user may want that we didn't code for.

We could declare that the existing JSON spec is forever going to use Jackson until/unless we deprecate and remove it, and that we will add new names/factories if we switch to a different backend.

Or we could build some kind of JSON parsing interface, with an extensible Jackson-based implementation, and give users more control if they want it.

devinrsmith commented 11 months ago

I think it makes sense to add some common optional params to json_spec such as non_numeric_support=False by default. This doesn't directly expose ObjectMapper to python, but does allow some degree of configurability. (See #4821 where I ran into the same issue.)

devinrsmith commented 10 months ago

com.fasterxml.jackson.core.json.JsonReadFeature is a bit lower level and more widely applicable, and I think the layer we'd probably want to target - com.fasterxml.jackson.databind.DeserializationFeature only comes into play when you are actually mapping into a POJO (which our current implementation does do, but isn't a hard and fast requirement). I'm currently building out a new ObjectProcessor that is fully streaming (no databind), and would benefit from JsonReadFeature.