confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
77 stars 890 forks source link

Schema registry API response don't match docs for schemas with references #1303

Open RickTalken opened 2 years ago

RickTalken commented 2 years ago

Description

This issue occurs when a schema is registered with schema references. Several of the schema registry APIs return schemas that do not correctly construct their list of references.

The SchemaRegistryClient get_schema() method](https://github.com/confluentinc/confluent-kafka-python/blob/master/src/confluent_kafka/schema_registry/schema_registry_client.py#L372) returns a Schema object with a correctly built list of references. Each item is a SchemaReference object per the documentation.

The lookup_schema(), get_latest_version(), and get_version() methods return a RegisteredSchema. Per the documentation, the schema property is a Schema object. However, the Schema object's list of references is a list of dictionaries rather than a list of SchemaReference objects.

The RegisteredSchema.schema.references property should return a list of SchemaReference objects to be consistent with the Schema returned by the get_schema() method and match the documentation.

How to reproduce

If you run the following script, it will create two schemas. The second schema will reference the first. It will then call all 4 APIs that return Schema objects or RegisteredSchema objects that have a Schema property. The script will show that the references in the Schema from 3 of the APIs are dictionaries instead of SchemaReference objects.

from confluent_kafka.schema_registry import (
    SchemaRegistryClient,
    Schema,
    SchemaReference,
)

schema_registry_client = SchemaRegistryClient({"url": "http://localhost:8081"})

def register_schemas():
    role_schema_str = """
    {
        "name": "Role",
        "type": "record",
        "fields": [
            {"name": "title", "type": "string"},
            {"name": "department", "type": "string"}
        ]
    }
    """
    role_schema = Schema(schema_str=role_schema_str, schema_type="AVRO")
    schema_registry_client.register_schema(subject_name="role", schema=role_schema)
    employee_schema_str = """
    {
        "name": "Employee",
        "type": "record",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "role", "type": "Role"}
        ]
    }
    """
    references = [
        SchemaReference(name="Role", subject="role", version=1),
    ]
    employee_schema = Schema(
        schema_str=employee_schema_str,
        schema_type="AVRO",
        references=references,
    )
    schema_registry_client.register_schema(
        subject_name="employee",
        schema=employee_schema,
    )

def apis():
    registered_employee_schema = schema_registry_client.get_latest_version("employee")
    print(
        f"get_latest_version() returned: {registered_employee_schema.schema.references}"
    )

    schema_id = registered_employee_schema.schema_id
    schema = schema_registry_client.get_schema(schema_id)
    print(f"get_schema() returned: {schema.references}")

    registered_schema = schema_registry_client.get_version("employee", 1)
    print(f"get_version() returned: {registered_schema.schema.references}")

    employee_schema_str = """
        {
            "name": "Employee",
            "type": "record",
            "fields": [
                {"name": "name", "type": "string"},
                {"name": "role", "type": "Role"}
            ]
        }
        """
    references = [
        SchemaReference(name="Role", subject="role", version=1),
    ]
    employee_schema = Schema(
        schema_str=employee_schema_str,
        schema_type="AVRO",
        references=references,
    )
    registered_schema = schema_registry_client.lookup_schema(
        "employee", employee_schema
    )
    print(f"lookup_schema() returned: {registered_schema.schema.references}")

if __name__ == "__main__":
    register_schemas()
    apis()

Checklist

Please provide the following information:

jliunyu commented 2 years ago

Thanks for reporting this and appreciate for the PR, we will review it. But please note that the schema references is not supported yet.

mhowlett commented 1 year ago

combining with #1302