narcisoguillen / kafka-node-avro

ISC License
26 stars 13 forks source link

Registry.fetchVersions endpoint is not customizable #17

Closed mossaab-melloul closed 4 years ago

mossaab-melloul commented 4 years ago

Version: 4.1.3

Hello,

Actually the endpoint of fetching schemas is always: subjects/${topicName}-value/versions is it possible to make the endpoint customizable? in my case, I need to fetch schemas at subjects/${topicName}/versions to respect the convention decided by our architects.

boissierflorian commented 4 years ago

+1

narcisoguillen commented 4 years ago

Hey thanks for helping make kafka-node-avro better, sure lets make that happen !...

I will suggest we have the ability to customize the urls on the config and use a template system so we can customize it anyhow as:

Options.schema.endpoints = {
  alive : 'subjects',
  fetchVersions : 'subjects/{{topicName}}-value/versions',
  fetchByVersion : 'subjects/{{topicName}}-value/versions/{{version}}',
  fetchById : 'schemas/ids/{{id}}'
};

We need to add on the README how to customize it and how to name the variables like

id : Id of the topic
topicName : The name of the topic 
version : Version of the topic

What are your thoughts ?

mossaab-melloul commented 4 years ago

what about this:

Options.schema.endpoints = {
  alive : 'subjects',
  fetchVersions : topicName => `subjects/${topicName}-value/versions`,
  fetchByVersion : (topicName, version) => `subjects/${topicName}-value/versions/${version}`,
  fetchById : id => `schemas/ids/${id}`
}

I think using callbacks is more powerful because the user can do match more. For example if the strategy of naming schemas is 'topic name - namespace' the callback can be something like:

import {getNamesapceByTopic} from 'someModule'
Options.schema.endpoint.fetchVersions = topicName => {
    const namespace = getNamesapceByTopic(topicName)
    return `${topicName}-${namespace}`
}

what do you think?

narcisoguillen commented 4 years ago

I like the way you think.

1.- We should follow a standard on all the url builders, basically provide same dat across the url functions like

Options.schema.endpoint.byVersion = function(id, name, version){
 // ...
};

Options.schema.endpoint.byId = function(id, name, version){
 // ...
};

Options.schema.endpoint.byName = function(id, name, version){
 // ...
};

2.- Thinking a bit more about this , we should not mix responsibilities from settings to wire kafka and the schema registry with the config to customize mechanisms and our own processes

Also settings should be key - value string pairs , taking kubernetes as example who ever is implementing it will not be able to simply add functions on the ENV.

I think we should implement a way to customize the endpoints on a more dynamic way like :

const Settings  = {
  "kafka" : {
    "kafkaHost" : "localhost:9092"
  },
  "schema": {
    "registry" : "http://schemaregistry.example.com:8081"
  }
};

KafkaAvro.config({
  schema_endpoints : {
    alive     : function(id, name, version){ return "my.string"; },
    byId      : function(id, name, version){ return "my.string"; },
    byName    : function(id, name, version){ return "my.string"; },
    byVersion : function(id, name, version){ return "my.string"; }
  }
});

KafkaAvro.init(Settings).then( kafka => {
  // ...
} , error => {
  console.error(error);
});

if we follow this approach we could event continue to extend the config section with a way to customize the AVRO encoder and decoder .

What do you think ?

narcisoguillen commented 4 years ago

I made a PR with this logic implemented https://github.com/narcisoguillen/kafka-node-avro/pull/18 please feel free to comment or contribute

mossaab-melloul commented 4 years ago

Thank you for your fast answers :) I was playing with your code a little bit and I've come with something like that:

in lib/registry

const defaultSchemaEndpointResolver = {
  resolveByTopicName: topicName => {
    const endpoint = Settings.schema.endpoints && Settings.schema.endpoints.byTopicName
    if (endpoint) {
      return endpoint.replace(/{{topicName}}/g, topicName)
    }
    return `subjects/${topicName}-value/versions`
  },

  resolveById: id => {
    const endpoint = Settings.schema.endpoints && Settings.schema.endpoints.byId
    if (endpoint) {
      return endpoint.replace(/{{id}}/g, id)
    }
    return `schemas/ids/${id}`
  },

  resolveByVersion: (topicName, version) => {
    const endpoint = Settings.schema.endpoints && Settings.schema.endpoints.byVersion
    if (endpoint) {
      return endpoint.replace(/{{version}}/g, version)
                     .replace(/{{topicName}}/g, topicName)
    }
    return `subjects/${topicName}-value/versions/${version}`
  }
}

Registry.schemaEndpointResolver = { ...defaultSchemaEndpointResolver }
Registry.setSchemaEndpointResolver = function (resolver = {}) {
    return {
     ...defaultSchemaEndpointResolver,
     ...resolver
    }
}

Registry.fetchVersions = function (topicName) {
    const endpoint = schemaEndpointResolver.resolveByVersion(topicName, version)
    return new Promise( (resolve, reject) => {
        Registry.GET(endpoint, (error, response, body) => {...})
        ...
    }
}

in /index.js

kafkaAvro.use = plugin => {
    plugin(kafkaAvro)
    return kafkaAvro
}

so

kafkaAvro.use(({core}) => core.Registry.setSchemaResolver(myCustomResolver))
         .init()

what's your thoughts on that?

narcisoguillen commented 4 years ago

I like the idea of making plugins to customize any internal ( core ) implementation. I keep it simple and allow straight overwrites.

I made few more commits on the branch, basically on the settings we have the Default endpoints string witch can be set up from there as ENV vars, or overwrite the whole mechanism through a plugin like

From Settings

const Settings  = {
  "kafka" : {
    "kafkaHost" : "localhost:9092"
  },
  "schema": {
    "registry" : "http://schemaregistry.example.com:8081",
    "endpoints" : {
      "byId" : "schemas/bla/{{id}}"
    }
  }
};

KafkaAvro.init(Settings).then( kafka => {
  // ..
} , error => {
  //..
});

With Plugin

const Settings  = {
  "kafka" : {
    "kafkaHost" : "localhost:9092"
  },
  "schema": {
    "registry" : "http://schemaregistry.example.com:8081"
  }
};

const myCustomPlugin = function(core){
  // Overwrite
  core.Registry.endpoints.allVersions = function(id, name, version){
    return `subjects/${name}-value/versions`;
  };
};

KafkaAvro
  .use(myCustomPlugin)
.init(Settings).then( kafka => {
  // ..
} , error => {
  //..
});

With this implementation now anyone can overwrite any mechanism , as example to customize the default consumer parser

const myCustomPlugin = function(core){
  // Overwrite
  core.Consumer.prototype.parse = function(){
    // new parser
  };
};

KafkaAvro
  .use(myCustomPlugin)
.init(Settings).then( kafka => {
  // ..
} , error => {
  //..
});

We just need to add good documentation and test around this.

narcisoguillen commented 4 years ago

Can you please use kafka-node-avro latest version ( 4.2.0 ) ? it has this new features.

mossaab-melloul commented 4 years ago

Nice work, thank you :)