robertmrk / aiosfstream

Salesforce Streaming API client for asyncio
MIT License
47 stars 31 forks source link

[Question] How do I actually connect? (example code not working) #2

Closed balmasi closed 6 years ago

balmasi commented 6 years ago

Hey Robert I'm a noob to python, but it seems I can't get the basic password auth to work at all. I tried with and without my salesforce Security Token appended to the password

async def run():
    async with SalesforceStreamingClient(
        consumer_key="AAAAA",
        consumer_secret="XXXX",
        username="YYYY",
        password="PPPPPP"
    ) as client:
        # subscribe to channels to receive chat messages and
        # notifications about new members
        await client.subscribe("/event/Test__e")

        # listen for incoming messages
        async for message in client:
            print(message)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

I'm using the correct credentials (cause I use the same ones to connect to another client) but I get the error:

Traceback (most recent call last):
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/exceptions.py", line 108, in translate_errors_context
    yield
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/exceptions.py", line 131, in async_wrapper
    return await func(*args, **kwargs)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/client.py", line 110, in open
    await self.auth.authenticate()
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/auth.py", line 75, in authenticate
    raise AuthenticationError("Authentication failed", response_data)
aiosfstream.exceptions.AuthenticationError: ('Authentication failed', {'error': 'invalid_grant', 'error_description': 'authentication failure'})

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "src/sfdc_platform_events/sfdc_subscribe.py", line 71, in <module>
    loop.run_until_complete(run())
  File "/anaconda3/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
    return future.result()
  File "src/sfdc_platform_events/sfdc_subscribe.py", line 55, in run
    password="LEQ8FhF)ZktQX*N{gpRW/Nc=pRpf"
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/exceptions.py", line 131, in async_wrapper
    return await func(*args, **kwargs)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/client.py", line 190, in __aenter__
    return await super().__aenter__()
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiocometd/client.py", line 412, in __aenter__
    await self.open()
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/exceptions.py", line 131, in async_wrapper
    return await func(*args, **kwargs)
  File "/anaconda3/lib/python3.6/contextlib.py", line 99, in __exit__
    self.gen.throw(type, value, traceback)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/exceptions.py", line 110, in translate_errors_context
    error_cls = EXCEPTION_PAIRS[type(cometd_error)]
KeyError: <class 'aiosfstream.exceptions.AuthenticationError'>

Using: Python 3.6.4 (anaconda dist) aiosfstream 0.2.3 MacOS 10.13.3 (17D102)

Got any tips for me?

robertmrk commented 6 years ago

Hi @balmasi

In order to use aiosfstream all you need is your username and password, without the security token appended to it, and the consumer key and consumer secret values of a connected app which has OAuth enabled. Unfortunately even if you have all this configured, Salesforce can still reject your login attempt depending on how your connected app's OAuth policies are configured. Please try to set the Permitted Users policy to All Users may self-authorize and the IP Relaxation policy to Relax IP restrictions.

balmasi commented 6 years ago

Thank you for the swift response @robertmrk

So, it turns out I was trying to connect to sandbox which apparently there's no support for. I tried adding an authenticator which supported sandbox but I can't extend from AuthenticatorBase cause it's not exposed. I'd have to override all the things to get it to work :(

Anyway, even in production, it seems to trip out when I read all events

Traceback (most recent call last):
  File "src/sfdc_platform_events/aio.py", line 26, in <module>
    loop.run_until_complete(run())
  File "/anaconda3/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
    return future.result()
  File "src/sfdc_platform_events/aio.py", line 21, in run
    async for message in client:
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/client.py", line 184, in __aiter__
    async for message in super().__aiter__():
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiocometd/client.py", line 396, in __aiter__
    yield await self.receive()
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/exceptions.py", line 131, in async_wrapper
    return await func(*args, **kwargs)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/client.py", line 177, in receive
    await self.replay_storage.extract_replay_id(response)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/replay.py", line 81, in extract_replay_id
    marker = ReplayMarker(date=event["createdDate"],
KeyError: 'createdDate'

It seems you're looking for the wrong key. I'm subbing to Platform events and the key there is "CreatedDate". e.g.


{
  "data":{
    "schema":"VzC0752aIYrG4l6cFY9t8Q",
    "payload": {
       "CreatedById":"005C000000B3xGIIAZ",
       "CreatedDate":"2018-11-05T15:26:54.787Z"
     }
  }
}`
robertmrk commented 6 years ago

Hi @balmasi

Unfortunately I can't test the library with a sandbox, because at the moment I only have access to a developer edition org, which doesn't allows the creation of sandboxes. However, I doubt that you would have to create a custom Athenticator for this purpose. According to the docs:

Users can log in to the sandbox at https://test.salesforce.com by appending .sandbox_name to their Salesforce usernames. For example, if a username for a production org is user1@acme.com, and the sandbox is named “test,” the modified username to log in to the sandbox is user1@acme.com.test.

I strongly suspect that it would be enough to append the sandbox's name to your username.

The main focus for me during the development of this library was always the support for push topics, so I done way more tests with push topics than with platform events. It turns out that there is a bug in version 0.2.3 if you try to use it with platform events and with some kind of replay storage. The library tried to access the wrong date property in platform event messages. This issues is now fixed in the latest version. Please upgrade and retest your script.

sureshbabumandava commented 6 years ago

Thank you for the swift response @robertmrk

So, it turns out I was trying to connect to sandbox which apparently there's no support for. I tried adding an authenticator which supported sandbox but I can't extend from AuthenticatorBase cause it's not exposed. I'd have to override all the things to get it to work :(

Anyway, even in production, it seems to trip out when I read all events

Traceback (most recent call last):
  File "src/sfdc_platform_events/aio.py", line 26, in <module>
    loop.run_until_complete(run())
  File "/anaconda3/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
    return future.result()
  File "src/sfdc_platform_events/aio.py", line 21, in run
    async for message in client:
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/client.py", line 184, in __aiter__
    async for message in super().__aiter__():
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiocometd/client.py", line 396, in __aiter__
    yield await self.receive()
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/exceptions.py", line 131, in async_wrapper
    return await func(*args, **kwargs)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/client.py", line 177, in receive
    await self.replay_storage.extract_replay_id(response)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/replay.py", line 81, in extract_replay_id
    marker = ReplayMarker(date=event["createdDate"],
KeyError: 'createdDate'

It seems you're looking for the wrong key. I'm subbing to Platform events and the key there is "CreatedDate". e.g.

{
  "data":{
    "schema":"VzC0752aIYrG4l6cFY9t8Q",
    "payload": {
       "CreatedById":"005C000000B3xGIIAZ",
       "CreatedDate":"2018-11-05T15:26:54.787Z"
     }
  }
}`

Can you please let me how did you get this working with sandbox? i am facing some problem now.

robertmrk commented 6 years ago

Hi @sureshbabumandava

I was finally able to get my hands on a non developer edition org, and implemented support for sandbox orgs in the latest version. You can find the related documentation here.

balmasi commented 6 years ago

Awesome! Can't wait to try it out now. thanks

robertmrk commented 6 years ago

Awesome! Can't wait to try it out now. thanks

Can you confirm that it works as expected? If so then I would like to close this issue.

balmasi commented 6 years ago

@robertmrk Thank you! great work on the docs and new features! Great work on this package. Wish I could buy you a bottle of wine :)

I can connect and read events from the sandbox now.

I'm trying to implement a ReplayMarkerStorage using DynamoDB and am noticing a behaviour regarding the replay mechanism.

The docs seem to indicate that the replay_fallback is an option on the SalesforceStreamingClient constructor, however, I'm being forced to place it on my class DynamoReplayMarkerStorage(ReplayMarkerStorage) init function. Is this intended behaviour? My intuition told me to set the option on the SalesforceStreamingClient constructor

Code Used:

async def stream_events():
    # connect to Streaming API
    async with SalesforceStreamingClient(
            ...,
            sandbox=True,
            replay=DynamoReplayMarkerStorage(),
            replay_fallback=ReplayOption.ALL_EVENTS
        ) as client:

        # subscribe to topics
        await client.subscribe("/event/Test__e")

        async for message in client:
            print(f"Processed => {message["channel"]}: {message["data"]}")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(stream_events())

Result:

Traceback (most recent call last):
  File "src/test_aiosf.py", line 100, in <module>
    loop.run_until_complete(stream_events())
  File "/anaconda3/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
    return future.result()
  File "src/test_aiosf.py", line 87, in stream_events
    await client.subscribe("/event/Test__e")
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/exceptions.py", line 133, in async_wrapper
    return await func(*args, **kwargs)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/client.py", line 125, in subscribe
    await super().subscribe(channel)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiocometd/client.py", line 295, in subscribe
    response = await self._transport.subscribe(channel)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiocometd/transports/base.py", line 600, in subscribe
    subscription=channel)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiocometd/transports/base.py", line 282, in _send_message
    return await self._send_payload_with_auth([message])
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiocometd/transports/base.py", line 297, in _send_payload_with_auth
    response = await self._send_payload(payload)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiocometd/transports/base.py", line 323, in _send_payload
    await self._process_outgoing_payload(payload, headers)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiocometd/transports/base.py", line 334, in _process_outgoing_payload
    await extension.outgoing(payload, headers)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/replay.py", line 43, in outgoing
    await self.insert_replay_id(message)
  File "/Users/balmasi/.local/share/virtualenvs/sfdc-spring-sls-txAZIo5h/lib/python3.6/site-packages/aiosfstream/replay.py", line 56, in insert_replay_id
    if self.replay_fallback:
AttributeError: 'DynamoReplayMarkerStorage' object has no attribute 'replay_fallback'

Thoughts?

robertmrk commented 6 years ago

@balmasi Everything looks fine in your code except for one thing which causes both of your problems. In python you should call the base class's __init__ method explicitly in the __init__ method of your derived class. You should add that call to the __init__ method of DynamoReplayMarkerStorage and remove the line which sets the replay_fallback attribute. So the __init__ method of DynamoReplayMarkerStorage should look like this:

def __init__(self):        
    super().__init__()
    dynamo = boto3.resource(...)
    self.replay_markers_db = dynamo.Table(f'sfdc-platform-events')
    self.replay_markers_local_storage = {}

    try:
        response = self.replay_markers_db.scan()
        for mark in response['Items']:
            self.replay_markers_local_storage[mark['event']] = ReplayMarker(mark['date'], int(mark['replay_id']))
        print(f'DynamoReplayMarkerStorage initialized with version markers {self.replay_markers_local_storage}')
    except:
        log.exception('Could not retrieve platform event version markers from the database')
        raise
balmasi commented 6 years ago

How embarrassing 😳 You're a champ! Thank you!

robertmrk commented 6 years ago

How embarrassing You're a champ! Thank you!

Everybody goes through this stage with a new language, so don't be embarrassed about it! :wink:

sureshbabumandava commented 6 years ago

@robertmrk , thanks again for the change. its working as expected. looking at @balmasi comments in the start of this thread, "it seems to trip out when I read all events" . @balmasi , can you please confirm if this problem still exists? we are planning to use it obviously for the production, have not done stress test yet on this. would be great to know your experience. Also @robertmrk , just checking if you had done any similar stress test? Thanks again for this library, highly appreciated.

robertmrk commented 6 years ago

Hi @sureshbabumandava

I'm using the library in production (in rabbit_force) and I haven't encountered any stability issues yet. The problem in @balmasi's case was a logical error in the handling of platform messages, not a stability issue. However, there is no such thing as bug free software. So if you happen to find something I'm here to fix it.