Adds PostgreSql position manager. The solution creates a table named kafka_position inside of the schema topos. It has an composite primary key based on consumer_group, topic and partition. I decided not to use an approach where the schema and table name is defined by the user, since that implementation was not optimal after working on it a bit. If you have any better suggestion to the name of the schema and table please say so and I'll make a new commit with the updated names.
The flow is as follows:
If the schema does not exist the manager creates it. It does this by checking if the schema exists, if it does it will not do anything otherwise it will create the schema and the table.
On GET we will return the DefaultPosition like it is done in the MongoDB implementation otherwise we return the position queried from the database.
On SET we will upsert the table, so if a position with the composite key already exist we update the position otherwise we insert a new row with the composite key and the newest position.
The implementation is based on the MongoDbPositionsManagerConfigurationExtensions and followed the same structure for the unit tests.
Adds PostgreSql position manager. The solution creates a table named
kafka_position
inside of the schematopos
. It has an composite primary key based onconsumer_group
,topic
andpartition
. I decided not to use an approach where the schema and table name is defined by the user, since that implementation was not optimal after working on it a bit. If you have any better suggestion to the name of theschema
andtable
please say so and I'll make a new commit with the updated names.The flow is as follows:
schema
and thetable
.GET
we will return theDefaultPosition
like it is done in the MongoDB implementation otherwise we return the position queried from the database.SET
we willupsert
the table, so if a position with the composite key already exist we update the position otherwise we insert a new row with the composite key and the newest position.The implementation is based on the
MongoDbPositionsManagerConfigurationExtensions
and followed the same structure for the unit tests.