github / codeql

CodeQL: the libraries and queries that power security researchers around the world, as well as code scanning in GitHub Advanced Security
https://codeql.github.com
MIT License
7.49k stars 1.49k forks source link

Python: Why global dataflow not tracking `endpoints` in function `Service.start()`? #17019

Open hksdpc255 opened 1 month ago

hksdpc255 commented 1 month ago

Python code from openstack

from neutron_lib._i18n import _
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_messaging import serializer as om_serializer
from oslo_service import service

LOG = logging.getLogger(__name__)
TRANSPORT = None

class RequestContextSerializer(om_serializer.Serializer):
    def __init__(self, base=None):
        pass

    pass

def get_server(target, endpoints, serializer=None):
    """Get a new RPC server reference.

    :param target: The target for the new RPC server.
    :param endpoints: The endpoints for the RPC server.
    :param serializer: The optional serialize to use for the RPC server.
    :returns: A new RPC server reference.
    """
    if TRANSPORT is None:
        raise AssertionError(_("'TRANSPORT' must not be None"))
    serializer = RequestContextSerializer(serializer)
    return oslo_messaging.get_rpc_server(TRANSPORT, target, endpoints,
                                         'eventlet', serializer)

class Service(service.Service):
    """Service object for binaries running on hosts.

    A service enables rpc by listening to queues based on topic and host.
    """
    def __init__(self, host, topic, manager=None, serializer=None):
        super().__init__()
        self.host = host
        self.topic = topic
        self.serializer = serializer
        if manager is None:
            self.manager = self
        else:
            self.manager = manager

    def start(self):
        super().start()

        self.conn = Connection()
        LOG.debug("Creating Consumer connection for Service %s",
                  self.topic)

        endpoints = [self.manager]

        self.conn.create_consumer(self.topic, endpoints)

        # Hook to allow the manager to do other initializations after
        # the rpc connection is created.
        if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)

        # Consume from all consumers in threads
        self.conn.consume_in_threads()

    def stop(self):
        # Try to shut the connection down, but if we get any sort of
        # errors, go ahead and ignore them.. as we're shutting down anyway
        try:
            self.conn.close()
        except Exception:  # nosec
            pass
        super().stop()

class Connection(object):
    """A utility class that manages a collection of RPC servers."""

    def __init__(self):
        super().__init__()
        self.servers = []

    def create_consumer(self, topic, endpoints, fanout=False):
        target = oslo_messaging.Target(
            topic=topic, server=cfg.CONF.host, fanout=fanout)
        server = get_server(target, endpoints)
        self.servers.append(server)

    def consume_in_threads(self):
        for server in self.servers:
            server.start()
        return self.servers

    def close(self):
        for server in self.servers:
            server.stop()
        for server in self.servers:
            server.wait()

CodeQL

import python
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking

module MyConf implements DataFlow::ConfigSig {
    predicate isSource(DataFlow::Node source) {
        source.asExpr().isConstant() or
        source.asExpr() instanceof Name or
        source.asExpr() instanceof Attribute
    }
    predicate isSink(DataFlow::Node sink) {
        sink = API::moduleImport("oslo_messaging").getMember("get_rpc_server").getACall().getArg(2)
    }
    predicate isAdditionalFlowStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo){
        nodeTo.asCfgNode().(IterableNode).getAnElement() = nodeFrom.asCfgNode()
    }
}

module MyFlow = DataFlow::Global<MyConf>;

from DataFlow::Node source, DataFlow::Node sink
where MyFlow::flow(source, sink)
select source, sink

Output

source sink
endpoints in line 19 endpoints in line 30
endpoints in line 30 endpoints in line 30
endpoints in line 84 endpoints in line 30
endpoints in line 87 endpoints in line 30
hksdpc255 commented 1 month ago

It may be caused by Issue#17021.

aibaars commented 1 month ago

Have you tried evaluation the isSource predicate using the "quick eval" feature of the CodeQL for VScode extension? If that doesn't include endpoints = [self.manager] then there is something wrong with the isSource predicate. If it is included then most likely some flow step is missing, possibly due to a failure to resolve a method. A trick you can use to debug is to implement isSource to only select an expression in endpoints = [self.manager] and define predicate isSink() { any() }. That should allow you to explore all the dataflow going out of that single source and determine where flow gets "stuck".

hksdpc255 commented 1 month ago

A trick you can use to debug is to implement isSource to only select an expression in endpoints = [self.manager] and define predicate isSink() { any() }. The flow stops at endpoints in self.conn.create_consumer(self.topic, endpoints).

source sink
[self.manager] [self.manager]
[self.manager] endpoints in endpoints = [self.manager]
[self.manager] endpoints in self.conn.create_consumer(self.topic, endpoints)
hksdpc255 commented 1 month ago

So, is there any plan to fix the bug?