Grokzen / redis-py-cluster

Python cluster client for the official redis cluster. Redis 3.0+.
https://redis-py-cluster.readthedocs.io/
MIT License
1.1k stars 315 forks source link

Attempting to Connect to Private IP #416

Closed micah-williamson closed 4 years ago

micah-williamson commented 4 years ago

I am trying to connect to a redis cluster hosted by AWS ElasticCache. I am properly tunneled and can connect to it via redis-cli

redis-cli -p 6380 -c
127.0.0.1:6380> set test 1
OK
127.0.0.1:6380> get test
"1"

I can connect to it successfully with RedisCluster:

RedisCluster(
        host=host,
        port=port,
        skip_full_coverage_check=True
)

However, while the first connection is successful, subsequent connections (after executing commands) appear to use an internal IP that my tunnel isn't aware of.

First connect:

Screen Shot 2020-11-12 at 10 47 44 AM

Second connect:

Screen Shot 2020-11-12 at 10 47 56 AM

This results in the application hanging, as it will never be able to connect to this address.

Additionally, It doesn't reuse my initial connection port (6380) and defaults to the default redis port (6379). So even if I were to add this internal IP to my hosts (which I'd like not to do), it will still hang at this point.

How do I work with a cluster that's within a private network?

micah-williamson commented 4 years ago

There is a host_port_remap but it's not automatic. It looks like I need to create an initial cluster to discover all of the nodes, then create a second cluster with the correct host port remaps?

def get_redis_cluster(
        host=os.getenv('REDIS_HOST'),
        port=os.getenv('REDIS_PORT', 6379)
) -> RedisCluster:
    cluster = RedisCluster(
        host=host,
        port=port,
        skip_full_coverage_check=True
    )
    host_port_remap = []
    for addr, config in cluster.connection_pool.nodes.nodes.items():
        hst = config["host"]
        prt = config["port"]

        host_port_remap.append({
            "from_host": hst,
            "from_port": prt,
            "to_host": host if host != "localhost" else "127.0.0.1",
            "to_port": port
        })

    cluster2 = RedisCluster(
        host=host,
        port=port,
        skip_full_coverage_check=True,
        host_port_remap=host_port_remap
    )

    return cluster2

This seems wrong. Why doesn't it always use the original host? Will the original host not coordinate commands to other nodes in the cluster?

micah-williamson commented 4 years ago

Unfortunately, the actual connections are being made too deep to reasonably provide a PR for this. I suggest anyone who has this issue provide their own connection class that will tunnel.

Our TunnelManager:

class TunnelManager:
    """
    Can be used as a trait or standalone.
    Do not define an __init__ method for this class.
    """

    def open(self, remote_addr: str, remote_port: int):
        """
        Establishes an ssh tunnel on connect.
        If a tunnel already exists, it will be reused.
        If a local_port is not defined, one will be discovered.

        Returns the local host/port of the tunnel.
        """
        import sshtunnel

        if not self.is_tunneling(remote_addr, remote_port):
            # We may still be tunneled to a different addr/port
            self.close()

            local_port = self._find_free_port()
            tunnel_config, identity_path = self.get_bastion_config()

            self._tunnel = sshtunnel.open_tunnel(
                (tunnel_config["remote_address"], 22),
                remote_bind_address=(remote_addr, remote_port),
                ssh_username="ec2-user",
                ssh_pkey=identity_path,
                local_bind_address=("localhost", local_port)
            )

            self._tunnel.start()

        return self._tunnel.local_bind_host, self._tunnel.local_bind_port

    def close(self):
        if self.is_tunneling():
            self.__tunnel.close()
            self.__tunnel = None

    def is_tunneling(self, remote_addr: str = None, remote_port: int = None):
        """
        Returns True if a tunnel is established.
        If remote_addr and remote_port are defined, returns True if the establish tunnel addr/port matches
        """
        if hasattr(self, "_tunnel") and self._tunnel is not None:
            if remote_addr and remote_port:
                established_remote_addr, established_remote_port = next(iter(self._tunnel.tunnel_bindings.items()))[0]
                if remote_addr == established_remote_addr and remote_port == established_remote_port:
                    return True
            else:
                return True
        return False

    def _find_free_port(self):
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
            s.bind(('', 0))
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            return s.getsockname()[1]

    @staticmethod
    @cached(cache=Cache(maxsize=1), lock=RLock())
    def get_bastion_config() -> Tuple[dict, str]:
                 # we pull what we need from aws secrets manager and fetch an identity here. your implementation may vary
                 return {}, ""

Our TunnelingClusterConnection:

class TunnelingClusterConnection(ClusterConnection, TunnelManager):
    """
    Creates an SSH Tunneled Connection
    """

    def connect(self):
        # Overwrite the connections host and port to the locally bound host/port
        # Pray this doesn't hurt us later
        self.host, self.port = self.open(self.host, self.port)

        try:
            super().connect()
        except Exception:
            self.close()

    def disconnect(self):
        try:
            super().disconnect()
        finally:
            self.close()

Putting these together:

def redis_cluster_factory(
        host=os.getenv('REDIS_HOST'),
        port=os.getenv('REDIS_PORT', 6379)
) -> RedisCluster:
    connection_class = None

    if os.getenv("IS_LOCAL"):
        if host != "localhost":
            tm = TunnelManager()
            # We are attempting to connect to the actual host address
            # Tunnel to the startup node is required
            # We don't need to worry about closing this tunnel manually, it will close when the application stops
            host, port = tm.open(host, int(port))

        # The connection factory will provide tunneling for the inevitable node switching
        connection_class = TunnelingClusterConnection

    cluster = RedisCluster(
        host=host,
        port=port,
        skip_full_coverage_check=True,
        nodemanager_follow_cluster=True,
        connection_class=connection_class
    )

    return cluster