trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.42k stars 3k forks source link

Add support to Kafka connector for connecting to secured Schema Registry #12195

Open Brideau opened 2 years ago

Brideau commented 2 years ago

I'm trying to connect to our Confluent Schema Registry using the Kafka Connector with the following kafka.properties file:

connector.name=kafka
kafka.nodes=[my_kafka_url]
kafka.hide-internal-columns=false
kafka.default-schema=topics
kafka.config.resources=/etc/kafka-config/kafka-configuration.properties

kafka.table-description-supplier=CONFLUENT
kafka.confluent-schema-registry-url=https://schema_reg_url:443
kafka.confluent-schema-registry-client-cache-size=1000
kafka.empty-field-strategy=IGNORE
kafka.confluent-subjects-cache-refresh-interval=1s

And the following kafka-configuration.properties file:

sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="my_username" password="my_password";
basic.auth.credentials.source=USER_INFO
basic.auth.user.info=registry_username:registry_password

However when I go to run a SHOW SCHEMAS ... command I get the error:

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401

I've tried different variations on the above, including using the prefixed schema.registry.basic.auth.credentials.source and schema.registry.basic.auth.user.info parameters without any luck. It looks like others have had similar problems, which have been fixed by updating the kafka-schema-registry-client package to version 6 or higher.

I was wondering if anyone knows of a fix using the current version (5.5.2) or if you'd be open to updating the client?

Praveen2112 commented 2 years ago

Have we configured any authentication mechanism for schema registry ? Trino doesn't support any authorization when connecting with schema registry.

whatsupbros commented 2 years ago

Same issue here, the Schema Registry client doesn't propogate Basic Auth credentials, despite being specified in the configuration file under kafka.config.resources property.

Same goes to SSL configuration options for Schema Registry:

schema.registry.ssl.key.password=secret
schema.registry.ssl.keystore.location=/path/to/keystore.jks
schema.registry.ssl.keystore.password=secret
schema.registry.ssl.keystore.type=JKS
schema.registry.ssl.truststore.location=/path/to/truststore.jks
schema.registry.ssl.truststore.password=secret
schema.registry.ssl.truststore.type=JKS
DevarajKR commented 2 years ago

I am using below jars, still getting the auth error, providing everything kafka-schema-registry-7.1.1.jar kafka-schema-registry-client-7.1.1.jar

basic.auth.credentials.source=USER_INFO basic.auth.user.info=abc:xyz

errors.SerializationException: Error retrieving Avro schema for id 100005\nCaused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401

Please someone help me with solution

piee9818 commented 2 years ago

To support schema registry authentication, it looks like you need to implement SchemaRegistryClientPropertiesProvider interface. https://github.com/trinodb/trino/pull/6940

https://github.com/trinodb/trino/blob/835695abd2c1aaf8378cbbbc94daef4c3f168292/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java#L93-L104

whatsupbros commented 2 years ago

Any news on this issue maybe?

Currently, it is not possible to connect to a secure Schema Registry instance with the Kafka connector, because neither Basic Auth credentials, nor SSL properties are propagated to the Schema Registry connection, even when have been fed as a separate configuration file under kafka.config.resources property (implemented in #8743, was sure this issue would be covered there as well, but apparently it wasn't).

whatsupbros commented 2 years ago

With such configuration getting the same exception.

kafka.properties:

connector.name=kafka
kafka.nodes=my-broker:9093
kafka.table-description-supplier=CONFLUENT
kafka.confluent-schema-registry-url=http://${ENV:KAFKA_USER}:${ENV:KAFKA_KEY}@my-schema-registry:8081
kafka.config.resources=/trino/server/etc/my.properties
kafka.hide-internal-columns=false
kafka.empty-field-strategy=IGNORE
kafka.buffer-size=8MB

my.properties:

security.protocol=SASL_SSL
sasl.mechanism=PLAIN

ssl.key.password=secret
ssl.keystore.location=/trino/server/cert/keystore.jks
ssl.keystore.password=secret
ssl.truststore.location=/trino/server/cert/truststore.jks
ssl.truststore.password=secret
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="my_username" password="my_password";

basic.auth.credentials.source=URL

When try to retrieve topics list:

ERROR   remote-task-callback-2  io.trino.execution.scheduler.PipelinedStageExecution    Pipelined stage execution for stage 20220822_150206_00015_b276i.2 failed
java.lang.RuntimeException: Failed to retrieve subjects from schema registry
...
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401

However, when I open this URL even in the browser, I am getting a proper response and not "Unauthorized" error code:

http://my_username:my_password@my-schema-registry:8081
alaturqua commented 2 years ago

I am having the same issue. Can not login to confluent kafka schema registry using api key and secret using basic auth.

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getAllSubjects(RestService.java:806)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getAllSubjects(RestService.java:789)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getAllSubjects(CachedSchemaRegistryClient.java:471)
    at io.trino.plugin.kafka.schema.confluent.ClassLoaderSafeSchemaRegistryClient.getAllSubjects(ClassLoaderSafeSchemaRegistryClient.java:291)
    at io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryTableDescriptionSupplier.getAllSubjects(ConfluentSchemaRegistryTableDescriptionSupplier.java:140)
    ... 42 more
hashhar commented 2 years ago

This won't work as said before in https://github.com/trinodb/trino/issues/12195#issuecomment-1114734913.

The kafka.config.resources only passes those properties to consumer client and producer client. This is one of the problems with such generic solution - unless you add test for every possible configuration you cannot know whether it will work or not.

hashhar commented 2 years ago

Marking as an enhancement but AFAIK no one is actively working on this at the moment.

whatsupbros commented 2 years ago

Thanks for categorizing of the issue at least. It's a pity it's not looked into, as most productive setups use mTLS and Basic Auth for Schema Registry.

alaturqua commented 1 year ago

I implemented a quick and dirty solution for our setup with Confluent Schema Registry, which uses basic auth. This works for us.

What we noticed was, that trino kafka does not parse the decimals correctly. They are shown as varbinary. Therefore we had to implement a custom function from varbinary to decimal.

replaced the code in trino/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java

with

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.trino.plugin.kafka.schema.confluent;

import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.multibindings.MapBinder;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.trino.decoder.DispatchingRowDecoderFactory;
import io.trino.decoder.RowDecoderFactory;
import io.trino.decoder.avro.AvroBytesDeserializer;
import io.trino.decoder.avro.AvroDeserializer;
import io.trino.decoder.avro.AvroReaderSupplier;
import io.trino.decoder.avro.AvroRowDecoderFactory;
import io.trino.decoder.dummy.DummyRowDecoder;
import io.trino.decoder.dummy.DummyRowDecoderFactory;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.kafka.KafkaConfig;
import io.trino.plugin.kafka.encoder.DispatchingRowEncoderFactory;
import io.trino.plugin.kafka.encoder.RowEncoderFactory;
import io.trino.plugin.kafka.encoder.avro.AvroRowEncoder;
import io.trino.plugin.kafka.schema.ContentSchemaReader;
import io.trino.plugin.kafka.schema.TableDescriptionSupplier;
import io.trino.spi.HostAddress;
import io.trino.spi.TrinoException;

import javax.inject.Singleton;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.inject.Scopes.SINGLETON;
import static com.google.inject.multibindings.MapBinder.newMapBinder;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.kafka.encoder.EncoderModule.encoderFactory;
import static io.trino.plugin.kafka.utils.PropertiesUtils.readProperties;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;

public class ConfluentModule
        extends AbstractConfigurationAwareModule
{
    @Override
    protected void setup(Binder binder)
    {
        configBinder(binder).bindConfig(ConfluentSchemaRegistryConfig.class);
        install(new ConfluentDecoderModule());
        install(new ConfluentEncoderModule());
        binder.bind(ContentSchemaReader.class).to(AvroConfluentContentSchemaReader.class).in(Scopes.SINGLETON);
        newSetBinder(binder, SchemaRegistryClientPropertiesProvider.class);
        newSetBinder(binder, SchemaProvider.class).addBinding().to(AvroSchemaProvider.class).in(Scopes.SINGLETON);
        newSetBinder(binder, SessionPropertiesProvider.class).addBinding().to(ConfluentSessionProperties.class).in(Scopes.SINGLETON);
        binder.bind(TableDescriptionSupplier.class).toProvider(ConfluentSchemaRegistryTableDescriptionSupplier.Factory.class).in(Scopes.SINGLETON);
        newMapBinder(binder, String.class, SchemaParser.class).addBinding("AVRO").to(AvroSchemaParser.class).in(Scopes.SINGLETON);
    }

    @Provides
    @Singleton
    public static SchemaRegistryClient createSchemaRegistryClient(
            ConfluentSchemaRegistryConfig confluentConfig,
            Set<SchemaProvider> schemaProviders,
            Set<SchemaRegistryClientPropertiesProvider> propertiesProviders,
            ClassLoader classLoader,
            KafkaConfig kafkaConfig) throws Exception
    {
        requireNonNull(schemaProviders, "schemaProviders is null");
        requireNonNull(propertiesProviders, "propertiesProviders is null");

        List<String> baseUrl = confluentConfig.getConfluentSchemaRegistryUrls().stream()
                .map(HostAddress::getHostText)
                .collect(toImmutableList());

        Map<String, String> configurationProperties = readProperties(kafkaConfig.getResourceConfigFiles());

        Map<String, ?> schemaRegistryClientProperties = propertiesProviders.stream()
                .map(SchemaRegistryClientPropertiesProvider::getSchemaRegistryClientProperties)
                .flatMap(properties -> properties.entrySet().stream())
                .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));

        return new ClassLoaderSafeSchemaRegistryClient(
                new CachedSchemaRegistryClient(
                        baseUrl,
                        confluentConfig.getConfluentSchemaRegistryClientCacheSize(),
                        ImmutableList.copyOf(schemaProviders),
                        configurationProperties),
                classLoader);
    }

    private static class ConfluentDecoderModule
            implements Module
    {
        @Override
        public void configure(Binder binder)
        {
            binder.bind(AvroReaderSupplier.Factory.class).to(ConfluentAvroReaderSupplier.Factory.class).in(Scopes.SINGLETON);
            binder.bind(AvroDeserializer.Factory.class).to(AvroBytesDeserializer.Factory.class).in(Scopes.SINGLETON);
            newMapBinder(binder, String.class, RowDecoderFactory.class).addBinding(AvroRowDecoderFactory.NAME).to(AvroRowDecoderFactory.class).in(Scopes.SINGLETON);
            newMapBinder(binder, String.class, RowDecoderFactory.class).addBinding(DummyRowDecoder.NAME).to(DummyRowDecoderFactory.class).in(SINGLETON);
            binder.bind(DispatchingRowDecoderFactory.class).in(SINGLETON);
        }
    }

    private static class ConfluentEncoderModule
            implements Module
    {
        @Override
        public void configure(Binder binder)
        {
            MapBinder<String, RowEncoderFactory> encoderFactoriesByName = encoderFactory(binder);
            encoderFactoriesByName.addBinding(AvroRowEncoder.NAME).toInstance((session, dataSchema, columnHandles) -> {
                throw new TrinoException(NOT_SUPPORTED, "Insert not supported");
            });
            binder.bind(DispatchingRowEncoderFactory.class).in(SINGLETON);
        }
    }
}

and used following config in kafka-config.properties

basic.auth.credentials.source=USER_INFO
basic.auth.user.info=<username>:<password>
whatsupbros commented 1 year ago

The kafka.config.resources only passes those properties to consumer client and producer client. This is one of the problems with such generic solution - unless you add test for every possible configuration you cannot know whether it will work or not.

Any chance this would be changed and the config is also propagated to the Schema Registry client?

hashhar commented 1 year ago

No one is actively working on it at the moment - it should be a simple enough change and I think a PR implementing it (with tests) will be a welcome addition.

dshma commented 11 months ago

Hi @hashhar,

Any updates on this one year later?

One could argue on the categorization, it feels more like a defect, given this hadn't been tested when introduced/released and there is still no mention of it in the documentation.

Thanks!

derbeneviv commented 3 months ago

i want also point that (as i stated in https://github.com/trinodb/trino/issues/22679):
current implementation leads to leakage of registry credentials in bootstrap logs if INFO logs are enabled

which, in my opinion, increases the priority

hashhar commented 3 months ago

Things won't get implemented automatically unfortunately. If this is important to you we'd welcome a contribution for this and can help guide where the change should be made and with reviews. This change is non-trivial and this isn't the focus for the maintainers at the moment.

derbeneviv commented 2 months ago

done ^^ would argue that this is non-trivial, however