halfgaar / FlashMQ

FlashMQ is a fast light-weight MQTT broker/server, designed to take good advantage of multi-CPU environments
https://www.flashmq.org/
Open Software License 3.0
174 stars 24 forks source link

Support HAProxy protocol per listener #24

Closed halfgaar closed 1 year ago

halfgaar commented 1 year ago

Research / implement the HA proxy protocol per listener. Remember to extend the functionality in #25 if implemented.

At this point, I'm not familiar with it. If it's merely a few extra bytes before MQTT connect, it should be possible.

Originally posted by @slavslavov in https://github.com/halfgaar/FlashMQ/discussions/23#discussioncomment-4297890

slavslavov commented 1 year ago

This simple HAproxy configuration will forward the TCP traffic from port 1888 (listening on both IPv4 and IPV6) to 1883 on localhost for IPv4 and prepend the proxy protocol v2 data. Note: this is only for test and development purposes, configuring HAProxy for production is not the goal here :

frontend server_frontend
         bind 0.0.0.0:1888
         bind :::1888
         mode tcp
         default_backend backend_server

backend backend_server
        mode tcp
        server server1 127.0.0.1:1883  send-proxy-v2
        source 127.5.5.5

Although the protocol has a specific header signature which can be detected and distinguished compared to the MQTT header, maybe it will be better to have a flag in the configuration file in the listener section to specify if this listener is expecting the proxy protocol data. Depending on how the IP addresses and ports will be exposed in the plugin, the flag may also be needed there - if there are 2 structures, let's say proxy_data and socket_data, if the flag is set, the client address and port will be in proxy_data and if not set - in socket_data.

Edited: Added a binding for IPv6 so HAProxy will now listen on the same port for both versions. Depending on which one the client is using the proxy data will be different. Also added the source address to be used by the backend when making a connection to the broker - this will appear as the client address in the socket of broker.

halfgaar commented 1 year ago

I have basic code working for #24, but I haven't decided yet whether I'll keep it that way. It may tie into what I'm going do here, for #25.

I want to have something uniform, that these proxy addresses, from x-forwarded-for as well, are also used for the log entries of connecting clients. In web servers, the practice is to define trusted IPs for that, and just set whatever x-forwarded-for says as primary.

For the listeners, it may also be required to set a list of allowed proxy sources, otherwise anybody can spoof it. I don't know what is customary for HAProxy.

BTW, can you describe your use case? Because FlashMQ lacks cluster support at this point, a load balancer isn't terribly useful.

slavslavov commented 1 year ago

In my case HAProxy is not used as a load balancer but to route the traffic to a specific broker (usually running on the same server) based on a URL. For example if there are 3 brokers, HAProxy will forward the traffic of example1.com to broker1, example2.com to broker2 and example3.com to broker3. HAProxy does a few more things in my case, like terminating the TLS certificates.

There is no need to add list of allowed proxy sources as usually the proxy routes inside a trusted network so it is up to the admin how will configure the network

halfgaar commented 1 year ago

There is no need to add list of allowed proxy sources as usually the proxy routes inside a trusted network so it is up to the admin how will configure the network

Then the listeners should definitely have a flag to set HA proxy mode explicitly. Otherwise any random client can connect with a proxy header and claim to be someone else.

Are you ever interested in both addresses (ultimate client source and proxy node)? I guess in most cases people expect the MQTT server's log entries (and the to-be-released flashmq_get_client_address()) to just show the real client IP, like with X-Forwarded-For and set_real_ip_from in Nginx. It would simply things.

slavslavov commented 1 year ago

Agree for the flag so the configuration can be explicit.

Getting the client address is enough. I can't think of a use case to get the client port. The proxy source address is not needed, it should be already known in the network configuration. I just added it in the config to be different from 127.0.0.1 as you are probably testing on the same machine.

halfgaar commented 1 year ago

I have a draft branch: https://github.com/halfgaar/FlashMQ/tree/draft-haproxy. Only version 2.

I don't have any artifacts in "Actions", so you can't download the build, but it's easy to manually build.

I'm still fiddling with the health checks. The protocol spec says stuff like:

About command:

\x0 : LOCAL : the connection was established on purpose by the proxy without being relayed. The connection endpoints are the sender and the receiver. Such connections exist when the proxy sends health-checks to the server. The receiver must accept this connection as valid and must use the real connection endpoints and discard the protocol block including the family which is ignored.

\x1 : PROXY : the connection was established on behalf of another node, and reflects the original connection endpoints. The receiver must then use the information provided in the protocol block to get original the address.

But I see no LOCAL protocol frames. When I specify check in the back-end server, it just sends a normal frame, 1 as command, with the local address as source address. It seems to make HAProxy happy, but the FlashMQ logs are spammed with 'client disconnected'.

I tested with HAProxy 1.8.8-1ubuntu0.11 2020/06/22, on my Ubuntu 18.04 dev machine. I can try others later.

halfgaar commented 1 year ago

I force-pushed an update to the branch.

Apparently, HAProxy only started using a 'local' connection starting with version 2.4.

So far, mixing plain/TLS/IPv4 and IPv6 all works.

slavslavov commented 1 year ago

Sorry for my late reply, I am away this week. I should be able to have a look over the weekend.

halfgaar commented 1 year ago

Do you want a test build? If it's Debian or Ubuntu I can quickly make one for you? What version?

slavslavov commented 1 year ago

No, I can build it from source.

slavslavov commented 1 year ago

I did have a play with the draft branch. All looks good, great work! Thanks. Even the logs now contain the real client address - [2022-12-25 10:28:13] [NOTICE] Client '[ClientID='mosqpub|29751-server.lo', username='', fd=15, keepalive=60s, transport='TCP/MQTT/Non-SSL', address='192.168.0.220', prot=3.1.1, clean=1]' logged in successfully

While reviewing the code I noticed you are copying task queues in several places instead of moving them (you anyway clear the source queue after the copy so I think moving will be better). Maybe you have a valid reason for copying but have you considered moving them? Try this snippet to see the difference:

    std::forward_list<std::function<void()>> taskQueue;
    std::forward_list<std::function<void()>> tasks;

    for (auto i = 0u; i < 1000000; ++i)
    {
        taskQueue.push_front([] {
        });
    }

    auto start = std::chrono::steady_clock::now();
    for (auto & f : taskQueue)
    {
        tasks.push_front(f);
    }
    auto stop = std::chrono::steady_clock::now();
    auto us = std::chrono::duration_cast<std::chrono::microseconds>(stop - start).count();
    std::cout << "Took: " << us << " microseconds to copy tasks" << std::endl;

    tasks.clear();

    start = std::chrono::steady_clock::now();
    tasks = std::move(taskQueue);
    stop = std::chrono::steady_clock::now();
    us = std::chrono::duration_cast<std::chrono::microseconds>(stop - start).count();
    std::cout << "Took: " << us << " microseconds to move tasks" << std::endl;

Also when you push at the front and then you iterate through the queue it acts like a stack you kind of process it backwards - the last task added in the queue will be processed first. If that is the intended behaviour just ignore this.

halfgaar commented 1 year ago

I'll likely merge it in soon then. There are a few things to work out. For one, health-checks from older HAProxy versions appearing as client disconnect with error is a bit ugly. On the other hand, catering to it becomes kind of hacky.

As for the task queues: I see that in mainapp.cpp I copy them, but in threadloop.cpp I move them, but individually. I suppose the reason was that I tend to stay away from moved-from objects. But they're not broken actually, so I can just clear() and reuse them.

The stack-order was not intended that way. Thanks.

I pushed the changes to master.

Task queues are used minimally BTW and should never pile up. As an experiment, I once tried queuing tasks as part of the packet flow (like ping response), but with millions of clients, this exploded right away.

halfgaar commented 1 year ago

Feature released.