googleapis / python-pubsub

Apache License 2.0
392 stars 205 forks source link

google-cloud-pubsub SDK message pull is not working with SOCKS5 proxy #943

Open rkumarkhaniya-crest opened 1 year ago

rkumarkhaniya-crest commented 1 year ago

We are trying to build a pubsub integration to pull messages with proxy support using Pub/Sub SDK. It works fine with HTTP_PROXY but does not work with SOCKS5 proxy without internet connectivity.

Environment details

Steps to reproduce

Use the mentioned code example to check the issue, but it is not working when the VM internet is off. The script works with HTTP_PROXY.

  1. Create virtualenv
  2. pip install google-cloud-pubsub
  3. turn-off internet
  4. Execute the script

Code example

import os
import json

from google.oauth2 import service_account
from google.api_core import retry
from google.cloud import pubsub_v1

import os

NO_PROXY = "localhost,127.0.0.1,0.0.0.0,localaddress"

# TODO
# require to create creds.json file on the same path with Google Service credentials JSON details
proxy_uri = "socks5h://<host/ip>:<port>"
# proxy_uri = "http://<host_ip>:<port>"
project_id = "<project_name>"
subscription_id = "<pubsub_subscription_name>"

os.environ["no_proxy"] = NO_PROXY
os.environ["NO_PROXY"] = NO_PROXY
os.environ["http_proxy"] = proxy_uri
os.environ["HTTP_PROXY"] = proxy_uri 
os.environ["https_proxy"] = proxy_uri 
os.environ["HTTPS_PROXY"] = proxy_uri 

def get_credentials():
    dir_name = os.path.dirname(os.path.abspath(__file__))
    creds_path = f"{dir_name}/creds.json"        
    credentials = service_account.Credentials.from_service_account_file(creds_path)
    return credentials

def synchronous_pull(project_id, subscription_id):
    subscriber = pubsub_v1.SubscriberClient(credentials=get_credentials())
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    NUM_MESSAGES = 10

    with subscriber:                
        while(True):            
            print("Started receiving message...")            
            response = subscriber.pull(
                request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
                retry=retry.Retry(deadline=300),
            )            

            if len(response.received_messages) == 0:
                print(f"No messages to process.")
                break

            ack_ids = []
            for received_message in response.received_messages:                
                print(f"Message: {received_message.message}")                
                ack_ids.append(received_message.ack_id)                        

            subscriber.acknowledge(
                request={"subscription": subscription_path, "ack_ids": ack_ids}
            )

            print(
                f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
            )        

if __name__ == "__main__":    
    synchronous_pull(project_id, subscription_id)

Stack trace

E0630 08:03:56.145507323 2560525 http_proxy.cc:119]                    'socks5h' scheme not supported in proxy URI

We tried to find the documents about the proxies supported by this SDK but had no luck. Thanks!

acocuzzo commented 1 year ago

Can you try setting passing an httplib2.http object with the proxy_info parameter as decribed here in this similar issue? https://github.com/googleapis/google-api-python-client/issues/569#issuecomment-511488575

rkumarkhaniya-crest commented 1 year ago

@acocuzzo I tried following script but not working.

import os
import json
import httplib2

from google.oauth2 import service_account
from google.api_core import retry
from google.cloud import pubsub_v1

proxy_host = 'proxy-ip-host'
proxy_port = 'proxy-port'
proxy_user = None
proxy_pass = None

project_id = "<project_name>"
subscription_id = "<pubsub_subscription_name>"

def get_credentials():
    dir_name = os.path.dirname(os.path.abspath(__file__))
    creds_path = f"{dir_name}/creds.json"        
    credentials = service_account.Credentials.from_service_account_file(creds_path)
    return credentials

def synchronous_pull(project_id, subscription_id):
    proxy_info = httplib2.ProxyInfo(
        proxy_type=httplib2.socks.PROXY_TYPE_SOCKS5,
        proxy_host=proxy_host,
        proxy_port=proxy_port,
        proxy_user=proxy_user,
        proxy_pass=proxy_pass
    )    
    http_client = httplib2.Http(proxy_info=proxy_info)

    credentials = get_credentials()
    http_client = credentials.authorize(http_client)

    subscriber = pubsub_v1.SubscriberClient(credentials=credentials, http=http_client)
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    NUM_MESSAGES = 10

    with subscriber:                
        while(True):            
            print("Started receiving message...")            
            response = subscriber.pull(
                request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
                retry=retry.Retry(deadline=300),
            )            

            if len(response.received_messages) == 0:
                print(f"No messages to process.")
                break

            ack_ids = []
            for received_message in response.received_messages:                
                print(f"Message: {received_message.message}")                
                ack_ids.append(received_message.ack_id)                        

            subscriber.acknowledge(
                request={"subscription": subscription_path, "ack_ids": ack_ids}
            )

            print(
                f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
            )        

if __name__ == "__main__":    
    synchronous_pull(project_id, subscription_id)
acocuzzo commented 1 year ago

If you are trying the method used in: https://github.com/googleapis/google-api-python-client/issues/569#issuecomment-766682245, I believe the use of "authorize" method requires like the use of the oauth2client.service_account package https://oauth2client.readthedocs.io/en/latest/source/oauth2client.service_account.html

However I don't think that credentials object can be passed to the normal SubscriberClient (imported via "from google.cloud import pubsub_v1". I'm going to set this issue to a FR to allow passing the http_proxy argument in this client.

As a mitigation, one option would be to use the underlying auto-generated client which can be imported via "from google import pubsub_v1", and used like so:

from google import pubsub_v1

def sample_pull():
    # Create a client
    client = pubsub_v1.SubscriberClient()

    # Initialize request argument(s)
    request = pubsub_v1.PullRequest(
        subscription="subscription_value",
        max_messages=1277,
    )

    # Make the request
    response = client.pull(request=request)

    # Handle the response
    print(response)

Specifically you can pass this SubscriberClient, a transport of type Optional[Union[str, SubscriberTransport]]

The transport parameter can be set to a SubscriberGrpcTransport, which takes a "channel" parameter, which is a grpc.Channel

    transports.SubscriberGrpcTransport(
            channel: Optional[grpc.Channel])

Please note that the "credentials" parameter is ignore when the "channel" argument is present.

This grpc.Channel object can be constructed with the SubscriberGrpcTransport.create_channel method, which accepts a "credentials" argument as well as the the keyword argument "options".

Options is an optional list of key-value pairs (channel_arguments in gRPC Core runtime) to configure the channel.

The "grpc.http_proxy" key can be include in this list of options, please see all options are described here: https://github.com/grpc/grpc/blob/v1.56.x/include/grpc/impl/grpc_types.h#L444C46-L444C46