DataIntellectTech / kdb-chronicle-queue

Java adaptor from Chronicle queue to kdb+
3 stars 1 forks source link

Chronicle Queue to kdb+ Adapter

What is Chronicle Queue?

This memory mapped file is also used for exceptionally fast inter-process communication (IPC) without affecting your system performance. This is especially useful when there is a need to transfer large amounts of data, its ideal for transferring data between processes very quickly on the same server or across the network. There is no Garbage Collection (GC) as everything is done off heap.

Features

How does it work?

Chronicle uses a memory mapped file to continuously journal messages, Chronicle's file-based storage will slowly grow in size as more data is written to the queue, the size of the queue can exceed your available memory, you are only constrained by the amount of disk space you have on your server. Chronicle writes data directly into off-heap memory which is shared between java processes on the same server.

Chronicle is very fast, it is able to write and read a message in just two microseconds with no garbage. Typically at the end of each day, you archive the queue and start the next day with a fresh empty queue.

More info: https://chronicle.software/libraries/queue/

What is kdb+ Time Series Database?

Kdb+, from Kx, is / has:

More info: https://code.kx.com/q/

Project goal

Create an "adapter" to:

Example Scenario

Components

"Quote" message

Messages representing a Quote will be randomly generated by the Producer and added to the Quote queue in the following format:

Name Value
time 2020.01.24+14:00:16.083Z
sym VOD.L
bid 152
bsize 42035
ask 152
assize 48514
bex XLON
aex XLON

Producer

The Producer will be configured to connect to a specified queue, the quote queue, and write write a stream of quotes to the queue.

The quotes will be randomly generated as per the format above and will be added to the queue using a Chronicle Queue Appender.

Quotes will then be written to the queue as a "self-describing message". E.g.:

appender.writeDocument(w -> w.write("quote").marshallable(
        m -> m.write("time").dateTime(LocalDateTime.now())
                .write("sym").text("VOD.L")
                .write("bid").float64(152)
                .write("bsize").float64(42035)
                .write("ask").float64(152)
                .write("assize").float64(48514)
                .write("bex").text(XLON)
                .write("aex").text(XLON)
)); 

Adapter process

Adapter application

The Adapter for our scenario has been built as in Java and can be built out to a jar file.

The application will implement the process above using vendor specific libraries (Chronicle and kdb+) as well as other 3rd party Open Source libraries.

The main classes in the Adapter component are shown here:

0. Config for starting the Adapter

The Adapter can be configured to be "started" in a number of ways using the adapter.runMode configuration value.

adapter.runMode=NORMAL (Normal data processing mode)
adapter.runMode=BENCH (Benchmarking mode enabling JLBH sampling)
adapter.runMode=KDB_BENCH (Benchmarking mode focused on kdb+ message handling and storage)

As the Adapter runs in continuous polling mode, it needs to be "stoppable". This is achieved by using a stop file. The presence of the configured stop file will shut the Adapter instance down gracefully. This will be checked for on an configurable interval.

adapter.stopFile=C:\\ChronicleQueue\\Producer\\quote\\STOP.txt
adapter.stopFile.checkInterval=5

Using Chronicle's Affinity library (https://github.com/OpenHFT/Java-Thread-Affinity), it is possible to bind the Adapter instance to a given core, this can improve performance (this library works best on linux). If this is configured as -1, it will be ignored. Otherwise it should be an integer >= 0.

adapter.coreAffinity=-1

Finally, the Adapter will be configured to process messages of a particular type. Project currently has an ENUM and options for QUOTE and TRADE.

adapter.messageType=QUOTE

1. Connect to data source i.e. Chronicle Queue

When the application is started, use Chronicle Queue java library to connect to a queue. This library can be added to a maven project pom.xml file as a dependency:

<dependency>
    <groupId>net.openhft</groupId>
    <artifactId>chronicle-queue</artifactId>
    <version>5.20.111</version>
</dependency>

The data source / queue should be identified via an external properties file. Chronicle Queues are typically addressed via filesystem location e.g. "C:\ChronicleQueue\Producer\quote"

chronicle.source=C:\\ChronicleQueue\\Producer\\quote

2. Check queue for new messages

Once connected to the data source, use a Chronicle Queue Tailer to read the queue and check for new messages of the configured "type" (adapter.messageType). The tailer should be named, based on configuration in an external properties file (adapter.tailerName). The tailer should read forward from the last message read when re-started.

adapter.tailerName=quoteTailer

3. Read message data ( -> chronicle obj)

Marshall each message that is read by the tailer into a ChronicleMessage specific POJO that extends the Chronicle SelfDescribingMarshallable class to represent the chronicle queue message.

4. Do mapping (chronicle obj -> kdb obj)

Mapstruct allows simple or very complex mapping between source and destination POJO's to be coded quickly as an interface. Mapping is defined in the interface and when the application is built, code is automatically generated to implement the interface. Use Mapstruct to map the source Chronicle Quote Message POJO to the destination kdb+ Quote POJO.

Maven dependency to include Mapstruct:

<dependency>
    <groupId>org.mapstruct</groupId>
    <artifactId>mapstruct</artifactId>
    <version>1.4.1.Final</version>
</dependency>

Reference guide: https://mapstruct.org/documentation/stable/reference/html/

5. Add kdb msg to current kdb envelope

Rather than send messages to kdb+ one at a time, it is more efficient to batch them together into a single call in a structured Object array (https://code.kx.com/q/wp/tick-profiling/). To facilitate this, the application will manage an "envelope" containing the combined data of one or more messages.

It is at this stage messages can be filtered. Current functionality allows filtering on any String based field e.g. sym, aex or bex. Once the filter field is specified, config can be added to provide a "filter in" white list OR a "filter out" black list. The logic provided means that this is one or the other and the two options cannot be combined. If no filter config is provided, all messages of the correct type will be processed.

# Adapter can filter messages on String field. Also allows different Adapter threads to run independently
# NOTE: use FilterIn OR FilterOut but not both
adapter.messageFilterField=sym
# Filter message IN if Filter Field value is in this comma separated list
adapter.messageFilterIn=JUVE.MI,VOD.L,HEIN.AS
# Filter message OUT if Filter Field value in this comma separated list
#adapter.messageFilterOut=JUVE.MI

6. Send data to destination ( -> kdb+)

Batching methods

The envelope size will be limited by another configuration parameter:

kdb.envelope.size=100

Once the threshold is reached, the current envelope will be sent to kdb+ and then reset. When there are no messages on the queue to process, any messages in the current envelope will be sent to kdb+ and then reset.

kdb+

kdb+ provides a java API in a single source file on GitHub. Inclusion in a development project is, therefore, a straightforward matter of including the file with other source code under the package kx, and ensuring it is properly imported and referenced by other classes.

Reference: https://code.kx.com/q/wp/java-api/

The Adapter will use this API to connect to (or check a connection is already open to) a kdb+ database defined in external config e.g.:

kdb.host=localhost
kdb.port=5000
kdb.login=username:password
kdb.connection-enabled=true

In the kdb+ database there is a quote table defined as:

.schema.addschema ([]table:`quote;col:`time`chrontime`sym`bid`bsize`ask`asize`bex`aex;coltype:`timestamp`timestamp`symbol`float`float`float`float`symbol`symbol;isnested:000000000b);

and a function ".u.upd" to insert data into the database.

The destination table and function we are using are specified in the external config file also:

kdb.destination=quote
kdb.destination.function=.u.upd

To insert entries into the quote table, the adapter uses one of the "ks" methods to send all messages in the current envelope to kdb+:

kdbConnection.ks(kdbMethod, destinationTable, kdbEnvelope.toObjectArray());

the kdb message data is formatted as Object[] similar to this:

[chrontime[],sym[],bsize[],ask[],asize[],bex[],aex[]]

Running the process

Start the Producer

The Producer component in the repositry has been built to facilitate the generation of Quote messages on the Chronicle Queue. Once running, the Producer component can be started and stopped via a Swagger generated web page.

http://localhost:9090/producer/swagger-ui.html

E.g. Using the /quoteLoader endpoint, it is possible to "start" sending messages in batches of 1000, with no gap between each messsage, and repeat this every 2000 ms.

The Producer console showing messages being added to the queue:

18:22:19.999 [pool-1-thread-1] INFO  c.c.d.p.c.ProducerController - TIMING: Added 1000 messages (up to index: 80324478407791) in 0.4166383 seconds
18:22:22.099 [pool-1-thread-1] INFO  c.c.d.p.c.ProducerController - TIMING: Added 1000 messages (up to index: 80324478408791) in 0.0804952 seconds
18:22:24.171 [pool-1-thread-1] INFO  c.c.d.p.c.ProducerController - TIMING: Added 1000 messages (up to index: 80324478409791) in 0.0688605 seconds
18:22:26.277 [pool-1-thread-1] INFO  c.c.d.p.c.ProducerController - TIMING: Added 1000 messages (up to index: 80324478410791) in 0.0373406 seconds
18:22:28.381 [pool-1-thread-1] INFO  c.c.d.p.c.ProducerController - TIMING: Added 1000 messages (up to index: 80324478411791) in 0.0217323 seconds
18:22:30.406 [pool-1-thread-1] INFO  c.c.d.p.c.ProducerController - TIMING: Added 1000 messages (up to index: 80324478412791) in 0.014733 seconds

Start kdb+ instance

If running on a Windows OS, use a cmd prompt / Powershell window and run the following command:

q dummytp.q -p 5000 -u 1

should return the "q prompt similar to that below:

KDB+ 4.0 2020.05.04 Copyright (C) 1993-2020 Kx Systems
w64/ 4(16)core 16080MB xx yy-zz 172.20.64.1 EXPIRE 2021.11.18 e-mail@address.com KOD #1234567

q)

Check that the quote table exists and is empty:

q)quote
time chrontime sym bid bsize ask asize bex aex
----------------------------------------------
q)

Start the Adapter

Once started, the adapter will connect to the kdb+ database first and then the Chronicle queue.

....
Feb 03, 2021 5:16:21 PM com.kdb.adapter.chronicle.ChronicleKdbAdapter processMessages INFO: Starting Chronicle kdb Adapter
Feb 03, 2021 5:16:21 PM com.kdb.adapter.chronicle.ChronicleKdbAdapter processMessages INFO: Tailer starting at index: 80148384825736
Feb 03, 2021 5:16:21 PM com.kdb.adapter.chronicle.ChronicleKdbAdapter saveCurrentEnvelope INFO: TIMING: Stored 10 messages (up to index: 80148384825745) in 0.0013226 seconds
Feb 03, 2021 5:16:21 PM com.kdb.adapter.chronicle.ChronicleKdbAdapter processMessages INFO: Stopping Chronicle kdb Adapter. 10 msgs stored in this cycle (0.083505101 seconds)
....

Each message read is added to kdb+. Kdb+ debug messages have been switched off.

A simple check of the quote table shows that the data has been added:

q)quote
time                          chrontime                     sym     bid  bsiz..
-----------------------------------------------------------------------------..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:25.958331200 VOD.L   154  2661..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:25.962331600 VOD.L   152  1636..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:25.964324800 HEIN.AS 101  2706..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:25.964324800 HEIN.AS 101  3746..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:25.964324800 JUVE.MI 1232 2809..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:25.965327400 HEIN.AS 101  4875..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:25.965327400 HEIN.AS 100  1663..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:25.966337200 JUVE.MI 1238 3975..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:25.966337200 JUVE.MI 1232 4766..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:25.967731000 VOD.L   155  2874..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:38.124967100 HEIN.AS 102  2200..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:38.234965000 HEIN.AS 104  2516..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:38.344966400 VOD.L   153  2304..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:38.456509300 JUVE.MI 1230 4990..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:38.566501900 JUVE.MI 1238 3918..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:38.676502900 JUVE.MI 1237 2580..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:38.787779900 JUVE.MI 1237 2119..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:38.898779900 VOD.L   154  3414..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:39.010326700 JUVE.MI 1231 1613..
2021.02.03D17:16:01.757618000 2021.02.03D16:38:39.121323400 VOD.L   152  2283..
..

NOTE: The "time" column is the inserted time kdb generated and the "chrontime" column is the timestamp value from each message generated by the Producer application.

Summary

As a reminder, the adapter scenario that was used was:

We now have enabled this by developing an adapter that can process quote messages from a configurable source, that is a Chronicle Queue, and write each message as a new entry in a configurable destination table in a kdb+ database.

Further considerations

Whilst there are some generic aspects to the adapter that are configurable e.g. source and destination details, there are some parts of the solution that are difficult to generify.

The scenario used was based on our quote messages. The messages were generated in a specific, known format, added to the queue in a specific manner, i.e. as a self-describing message. This allowed the adapter to map known fields from the source message to known fields in the destination object. Finally, the kdb+ table used was structured specifically for quote data.

The quote adapter example that has been completed, should be considered as a base framework to develop further message specific adapters from Chronicle Queue into kdb+.

Release Notes

Version 1.1.0

Fixed an issue where messages are dropped from the Chronicle Queue if the application restarts when there are messages in KdbEnvelope that have not been sent to kdb+.