celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.81k stars 920 forks source link

Add support for mongodb+srv scheme #1976

Closed H4ad closed 2 weeks ago

H4ad commented 3 months ago

If we try run kombu with scheme mongodb+srv, it will break during URL parsing.

Fixes: #1786 Fixes: #858

H4ad commented 2 months ago

@auvipy Unless you have a valid connection using mongodb+srv, I don't think is possible to add tests due to how this scheme is resolved using DNS.

Nusnus commented 1 month ago

@auvipy Unless you have a valid connection using mongodb+srv, I don't think is possible to add tests due to how this scheme is resolved using DNS.

I understand - would it be ok to ask for you to do some manual testing and at least share here the logs so we can have at least a minimal confirmation of it working?

P.S From a quick review it appears to be ok, but I’d want at least something to hold on to if we can’t add automatic testing 🙏

H4ad commented 1 month ago

I did a monkey-patch on my code to use this change, I've been using since I opened this PR, this is the logs from yesterday:

Screenshot from 2024-06-04 09-47-47

My monkey patch code is basically this:

from kombu.transport.mongodb import Channel
from pymongo import uri_parser

def monkey_patch_channel_parse_uri(self, scheme="mongodb://"):
    # See mongodb uri documentation:
    # https://docs.mongodb.org/manual/reference/connection-string/
    client = self.connection.client
    hostname = client.hostname

    if hostname.startswith("srv://"):
        scheme = "mongodb+srv://"
        hostname = "mongodb+" + hostname
        client.hostname = hostname

    if not hostname.startswith(scheme):
        hostname = scheme + hostname

    if not hostname[len(scheme) :]:
        hostname += self.default_hostname

    if client.userid and "@" not in hostname:
        head, tail = hostname.split("://")

        credentials = client.userid
        if client.password:
            credentials += ":" + client.password

        hostname = head + "://" + credentials + "@" + tail

    port = client.port if client.port else self.default_port

    parsed = uri_parser.parse_uri(hostname, port)

    dbname = parsed["database"] or client.virtual_host

    if dbname in ("/", None):
        dbname = self.default_database

    options = {
        "auto_start_request": True,
        "ssl": self.ssl,
        "connectTimeoutMS": (
            int(self.connect_timeout * 1000) if self.connect_timeout else None
        ),
    }
    options.update(parsed["options"])
    options = self._prepare_client_options(options)

    if "tls" in options:
        options.pop("ssl")

    return hostname, dbname, options

def apply_kombu_monkey_patch():
    Channel._parse_uri = monkey_patch_channel_parse_uri
Nusnus commented 1 month ago

@H4ad Thank you! Much appreciated!

I did a monkey-patch on my code to use this change, I've been using since I opened this PR, this is the logs from yesterday:

Screenshot from 2024-06-04 09-47-47

My monkey patch code is basically this:

from kombu.transport.mongodb import Channel
from pymongo import uri_parser

def monkey_patch_channel_parse_uri(self, scheme="mongodb://"):
    # See mongodb uri documentation:
    # https://docs.mongodb.org/manual/reference/connection-string/
    client = self.connection.client
    hostname = client.hostname

    if hostname.startswith("srv://"):
        scheme = "mongodb+srv://"
        hostname = "mongodb+" + hostname
        client.hostname = hostname

    if not hostname.startswith(scheme):
        hostname = scheme + hostname

    if not hostname[len(scheme) :]:
        hostname += self.default_hostname

    if client.userid and "@" not in hostname:
        head, tail = hostname.split("://")

        credentials = client.userid
        if client.password:
            credentials += ":" + client.password

        hostname = head + "://" + credentials + "@" + tail

    port = client.port if client.port else self.default_port

    parsed = uri_parser.parse_uri(hostname, port)

    dbname = parsed["database"] or client.virtual_host

    if dbname in ("/", None):
        dbname = self.default_database

    options = {
        "auto_start_request": True,
        "ssl": self.ssl,
        "connectTimeoutMS": (
            int(self.connect_timeout * 1000) if self.connect_timeout else None
        ),
    }
    options.update(parsed["options"])
    options = self._prepare_client_options(options)

    if "tls" in options:
        options.pop("ssl")

    return hostname, dbname, options

def apply_kombu_monkey_patch():
    Channel._parse_uri = monkey_patch_channel_parse_uri

@auvipy I know you asked for tests, but I think this proves that it at least does what it is supposed to, in addition to also reviewing the code itself (which makes sense to me as-is).

I think we can go through and come back if someone reports an issue - what do you say?

Nusnus commented 2 weeks ago

LGTM then. We’ll wait for @auvipy to review it as well.

@auvipy hey bro - just giving a ping.

LGTM, see also: https://github.com/celery/kombu/pull/1976#issuecomment-2147486711

auvipy commented 2 weeks ago

we are fine with small patches with manual verification as well in case adding a test is too hard