robertmrk / aiosfstream

Salesforce Streaming API client for asyncio
MIT License
47 stars 30 forks source link

Auth based on session token #17

Open reach4bawer opened 3 years ago

reach4bawer commented 3 years ago

Is it possible to use the session token that I have obtained using a different mechanism (cannot obtain it via the username and password because of organization having SSO) to be used with aiosfstream? I went through the docs but couldn't find it.

caseyzak24 commented 2 years ago

I solved this problem by subclassing the AutheticatorBase class and creating a PassThroughAuthenticator. If this is something that @robertmrk thinks would be useful, I can try to submit a PR:

from aiosfstream.auth import AuthenticatorBase

class PassThruAuthenticator(AuthenticatorBase):

    def __init__(self, access_token, token_type, instance_url, id_, signature, issued_at, json_dumps=json.dumps,
                 json_loads=json.loads) -> None:
        super().__init__(json_dumps=json_dumps, json_loads=json_loads)
        self.access_token = access_token
        self.token_type = token_type
        self.instance_url = instance_url
        self.id = id_
        self.signature = signature
        self.issued_at = issued_at

    async def authenticate(self) -> None:
        pass

    async def _authenticate(self):
        pass

and then use as follows:

from aiosfstream import Client

stream_client = Client(authenticator=PassThruAuthenticator(...), ...)