Closed kingtut closed 2 years ago
It is not currently supported. Adding the extensions
field is not enough, the protocol is a little bit different and we should accept the start_ack
message which is not part of the current protocol for example. Do you know if a public server exist somewhere so that we could test a new implementation ?
There seems to be a tutorial for testing this implementation using a boilerplate React App: https://aws.amazon.com/blogs/mobile/appsync-realtime/ and https://docs.amplify.aws/lib/graphqlapi/getting-started/q/platform/js#mocking-and-local-testing
But I couldn't find the protocol on their repository: https://github.com/aws/aws-appsync-community
Only this blog post kinda explain it besides the one shared by the issue author: https://aws.amazon.com/blogs/mobile/appsync-realtime/
I have this implemented in very alpha form. I have tested it with OIDC credentials and it works. I need to test it with API Keys and IAM (sigv4) credentials and then I'll submit a pull request.
While I was doing it (I needed it for a project I have), I noticed that extending the WebsocketsTransport is a bit clunky - requires a bit of copy-paste to insert new protocol stuff into the execution path. NBD, but it would be cleaner if WebsocketsTranport was refactored to allow easier sub-class access to the execution path...
To further elucidate - Websockets in AppSync don't actually directly communicate with AppSync. They communicate with API Gateway, which then does an HTTPS redirect to AppSync in the backend. This makes setting up the socket a little confusing.
Also, the start-ack protocol requires waiting on it's reception before starting the listening loop. Causes a bit of a change to the base class flow.
I'll try to document the PR well enough to explain when I send it. I guess you guys will be the judge on my success...
TODO: dive into the Appsync protocol and create another transport AWSAppsyncTransport which inherits WebsocketsTransport.
Going to start looking into this. Would love to see your preliminary pull request @joseph-wortmann
Ideally it would be possible to use the AWS AppSync SDK directly to facilitate this, but that might be too heavy.
There's lots of tooling built directly into amplify for starting mock servers and we can explore that: https://docs.amplify.aws/cli/usage/mock/#function-mock-environment-variables
An example of the websocket messages that transfer (for later reference): (from here: https://aws.amazon.com/blogs/mobile/appsync-realtime/)
To experiment with this:
npx create-react-app myRealtimeApp
cd myRealtimeApp
npm install @aws-amplify/core@next @aws-amplify/api@next @aws-amplify/pubsub@next bootstrap
# run this in the folder you just made with CRA:
amplify init
amplify add api
amplify push
-- Note: You'll be asked to modify App.js before running amplify serve
Again: don't run amplify serve
but instead run amplify mock
The above is a note for myself or anyone else who picks this up before I get back to it. Right now I'm setting up my local environment and re-establishing my AWS account so I can poke around here a little bit. Ideally I'll get my mock server setup during my next pomodoro and be able to report back on how the mocking stuff goes.
Note that I've tried the above. It looks like amplify mock
doesn't support realtime connections. This only seems to work when I run amplify serve
unfortunately.
From Window 1's perspective:
From Window 2's perspective:
Note this is just the websocket channel. Auth against AWS Amplify is not covered here. For the above to work, it looks like I'm already authenticated against AWS:
Request URL: wss://wewzcj762zgwrfrfumqdgx4xru.appsync-realtime-api.us-east-1.amazonaws.com/graphql?header=
My decrypted JWT looks like:
{
"host": "wewzcj762zgwrfrfumqdgx4xru.appsync-api.us-east-1.amazonaws.com",
"x-amz-date": "20210828T235700Z",
"x-api-key": "da2-q3rtf...snip...6bxmoym"
}
So somehow to make this work we'd need the JWT for the request URL's GET param, and we'd need to handle the keep-alive (ka
), as well as properly send connection_init
and the subscription message with the authorization
body:
{"id":"692f0573-fbbe-47df-b65e-c2add16ea062","payload":{"data":"{\"query\":\"subscription onCreateMessage {\\n onCreateMessage {\\n __typename\\n message\\n }\\n}\\n\",\"variables\":{}}","extensions":{"authorization":{"host":"wewzcj762zgwrfrfumqdgx4xru.appsync-api.us-east-1.amazonaws.com","x-amz-date":"20210829T000410Z","x-api-key":"da2-q3rtfqs64<snip>6bxmoym","x-amz-user-agent":"aws-amplify/2.1.0 js"}}},"type":"start"}
Sent messages:
Received Messages:
The importance of connection_ack
and start_ack
is likely debateable, though I'm sure from an integrity perspective they're important as they tell us that the server did in fact establish the subscription channel and is prepared to send us the messages we've requested. Likely there's also error-states if we try to subscribe to an invalid schema, which should also be explored.
Action Items:
extension
Note that I will be able to commit about an hour or two a week to this. Have to balance this against my other existing commitments. I'll be back next weekend, I expect.
TODO: dive into the Appsync protocol and create another transport AWSAppsyncTransport which inherits WebsocketsTransport.
Going to start looking into this. Would love to see your preliminary pull request @joseph-wortmann
@chadfurman I'm sorry that I haven't had time to come back to this. Been awhile since I touched the code that I was working on, but you are welcome to it as a starting point. I'd love to help, but swamped with other stuff at the moment. If you're still working on it when I get back to the thing that forced me into this interesting corner, I'll be glad to lend a hand.
One thing to note is that AppSync actually goes through ApiGateway for all of it's access, leading to some pretty strange enveloping of the AppSync access credentials as they are proxied through ApiGateway.
My code (no quality guarantees) is below. As I recall, I tested the API Key and OIDC auths (but it has been a minute and my memory isn't what it once was. Also, as I recall, there was some stuff in the underlying WebsocketsTransport
that made sublcassing somewhat more copy/paste than I would normally like. If I was doing the PR for this I probably would have changed that class to reduce the amount of code copy/paste.
import hmac
import json
from abc import ABC, abstractmethod
from asyncio import CancelledError, wait_for, TimeoutError, ensure_future
from base64 import b64encode
from datetime import datetime
from hashlib import sha256
from logging import getLogger
from ssl import SSLContext
from typing import Any, AsyncGenerator, Dict, Optional, Union, cast
from boto3.session import Session
from gql.transport.websockets import ListenerQueue, WebsocketsTransport
from graphql import DocumentNode, ExecutionResult, print_ast
from websockets import connect as wsconnect
from websockets.typing import Subprotocol
from websockets.exceptions import ConnectionClosed
from gql.transport.exceptions import (
TransportAlreadyConnected,
TransportProtocolError,
)
_LOG = getLogger(__name__)
class AppSyncAuthorization(ABC):
def on_connect(self) -> str:
return b64encode(
json.dumps(self.on_subscribe(), separators=(",", ":")).encode()
).decode()
@abstractmethod
def on_subscribe(self, data: Optional[str] = None) -> Dict:
raise NotImplementedError()
def _encodeHeader(self, header: Dict) -> str:
return b64encode(json.dumps(header, separators=(",", ":")).encode()).decode()
class AppSyncApiKeyAuthorization(AppSyncAuthorization):
def __init__(self, host: str, api_key: str) -> None:
self.host = host
self.api_key = api_key
def on_subscribe(self, data: Optional[str] = None) -> Dict:
return {"host": self.host, "x-api-key": self.api_key}
class AppSyncOIDCAuthorization(AppSyncAuthorization):
def __init__(self, host: str, token: str) -> None:
self.host = host
self.token = token
def on_subscribe(self, data: Optional[str] = None) -> Dict:
return {"host": self.host, "Authorization": self.token}
class AppSyncIAMAuthorization(AppSyncAuthorization):
def __init__(self, host: str) -> None:
self.host = host
self.session = Session()
self.region = self.host.split(".")[2]
self.signed_headers = (
"accept;content-encoding;content-type;host;x-amz-date;x-amz-security-token"
)
def on_subscribe(self, data: Optional[str] = None) -> Dict:
utc_now = datetime.utcnow()
amz_date = utc_now.strftime("%Y%m%dT%H%M%SZ")
date_stamp = utc_now.strftime("%Y%m%d")
credentials = self.session.get_credentials()
return {
"accept": "application/json, text/javascript",
"content-encoding": "amz-1.0",
"content-type": "application/json; charset=UTF-8",
"host": self.host,
"x-amz-date": amz_date,
"X-Amz-Security-Token": credentials.token,
"Authorization": self._sigv4(amz_date, date_stamp, credentials, data),
}
def _sigv4(
self, amz_date, date_stamp, credentials, data: Optional[str] = None
) -> str:
def getSignatureKey(key, date_stamp, regionName, serviceName):
def sign(key, msg):
return hmac.new(key, msg.encode("utf-8"), sha256).digest()
kDate = sign(f"AWS4 {key}".encode("utf-8"), date_stamp)
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, "aws4_request")
return kSigning
# Create a date for headers and the credential string
credentials_scope = f"{date_stamp}/{self.region}/appsync/aws4_request"
canonical_request = f"""POST
/graphql{"/connect" if data else ""}
accept:application/json, text/javascript
content-encoding:amz-1.0
content-type:application/json; charset=UTF-8
host:{self.host}
x-amz-date:{amz_date}
x-amz-security-token:{credentials.token}
{self.signed_headers}
{sha256((data or "{}").encode('utf-8')).hexdigest()}
"""
string_to_sign = f"""AWS4-HMAC-SHA256
{amz_date}
{credentials_scope}
{sha256(canonical_request.encode('utf-8')).hexdigest()}"""
signature = hmac.new(
getSignatureKey(
credentials.secret_key,
date_stamp,
self.region,
"appsync",
),
string_to_sign.encode("utf-8"),
sha256,
).hexdigest()
return f"AWS4-HMAC-SHA256 Credential={credentials.access_key}/{credentials_scope},SignedHeaders={self.signed_headers},Signature={signature}"
class AppSyncWebsocketsTransport(WebsocketsTransport):
def __init__(
self,
url: str,
authorization: AppSyncAuthorization,
ssl: Union[SSLContext, bool] = False,
connect_timeout: int = 10,
close_timeout: int = 10,
ack_timeout: int = 10,
connect_args: Dict[str, Any] = {},
) -> None:
self.authorization = authorization
super().__init__(
url,
ssl=ssl,
connect_timeout=connect_timeout,
close_timeout=close_timeout,
ack_timeout=ack_timeout,
connect_args=connect_args,
)
async def _wait_start_ack(self) -> None:
"""Wait for the start_ack message. Keep alive messages are ignored"""
while True:
answer_type = str(json.loads(await self._receive()).get("type"))
if answer_type == "start_ack":
return
if answer_type != "ka":
raise TransportProtocolError(
"AppSync server did not return a start ack"
)
async def _send_start_and_wait_ack(
self,
document: DocumentNode,
variable_values: Optional[Dict[str, str]] = None,
) -> int:
query_id = self.next_query_id
self.next_query_id += 1
data = {"query": print_ast(document)}
if variable_values:
data["variables"] = variable_values
data = json.dumps(data, separators=(",", ":"))
await self._send(
json.dumps(
{
"id": str(query_id),
"type": "start",
"payload": {
"data": data,
"extensions": {
"authorization": self.authorization.on_subscribe(data)
},
},
},
separators=(",", ":"),
)
)
# Wait for the connection_ack message or raise a TimeoutError
await wait_for(self._wait_start_ack(), self.ack_timeout)
# Create a task to listen to the incoming websocket messages
self.receive_data_task = ensure_future(self._receive_data_loop())
return query_id
async def connect(self) -> None:
"""Coroutine which will:
- connect to the websocket address
- send the init message
- wait for the connection acknowledge from the server
- create an asyncio task which will be used to receive
and parse the websocket answers
Should be cleaned with a call to the close coroutine
"""
GRAPHQLWS_SUBPROTOCOL: Subprotocol = cast(Subprotocol, "graphql-ws")
_LOG.debug("connect: starting")
if self.websocket is None and not self._connecting:
# Set connecting to True to avoid a race condition if user is trying
# to connect twice using the same client at the same time
self._connecting = True
# If the ssl parameter is not provided,
# generate the ssl value depending on the url
ssl: Optional[Union[SSLContext, bool]]
if self.ssl:
ssl = self.ssl
else:
ssl = True if self.url.startswith("wss") else None
# Set default arguments used in the websockets.connect call
connect_args: Dict[str, Any] = {
"ssl": ssl,
"extra_headers": self.headers,
"subprotocols": [GRAPHQLWS_SUBPROTOCOL],
}
# Adding custom parameters passed from init
connect_args.update(self.connect_args)
# Connection to the specified url
# Generate a TimeoutError if taking more than connect_timeout seconds
# Set the _connecting flag to False after in all cases
try:
self.websocket = await wait_for(
wsconnect(
f'{self.url.replace("https","wss").replace("appsync-api","appsync-realtime-api")}?header={self.authorization.on_connect()}&payload=e30=',
**connect_args,
),
self.connect_timeout,
)
finally:
self._connecting = False
self.next_query_id = 1
self.close_exception = None
self._wait_closed.clear()
# Send the init message and wait for the ack from the server
# Note: This will generate a TimeoutError
# if no ACKs are received within the ack_timeout
try:
await self._send_init_message_and_wait_ack()
except ConnectionClosed as e:
raise e
except (TransportProtocolError, TimeoutError) as e:
await self._fail(e, clean_close=False)
raise e
else:
raise TransportAlreadyConnected("Transport is already connected")
_LOG.debug("connect: done")
async def subscribe(
self,
document: DocumentNode,
variable_values: Optional[Dict[str, str]] = None,
send_stop: Optional[bool] = None,
) -> AsyncGenerator[ExecutionResult, None]:
# Send the query and receive the id
query_id: int = await self._send_start_and_wait_ack(document, variable_values)
# Create a queue to receive the answers for this query_id
listener = ListenerQueue(query_id, send_stop=(send_stop is True))
self.listeners[query_id] = listener
# We will need to wait at close for this query to clean properly
self._no_more_listeners.clear()
try:
# Loop over the received answers
while True:
# Wait for the answer from the queue of this query_id
# This can raise a TransportError or ConnectionClosed exception.
answer_type, execution_result = await listener.get()
# If the received answer contains data,
# Then we will yield the results back as an ExecutionResult object
if execution_result is not None:
yield execution_result
# If we receive a 'complete' answer from the server,
# Then we will end this async generator output without errors
elif answer_type == "complete":
_LOG.debug(
f"Complete received for query {query_id} --> exit without error"
)
break
except (CancelledError, GeneratorExit) as e:
_LOG.debug("Exception in subscribe: " + repr(e))
if listener.send_stop:
await self._send_stop_message(query_id)
listener.send_stop = False
finally:
del self.listeners[query_id]
if len(self.listeners) == 0:
self._no_more_listeners.set()
Noted with thanks. I will review and clean up a bit next weekend.
Are you sure there's no way to just piggy-back the auth layer or import it from AWS SDKs?
@chadfurman Unfortunately, I don't think so. As I said, AppSync is proxied through ApiGateway. That is also true for the WebSockets portion. As you can see in the code, you have to wrap the AppSync credentials into a SigV4 envelope for IAM to send to the "hidden" ApiGateway. It's kind of a PITA, and not very well documented in the AWS docs (or at least it wasn't when I wrote this).
goals for this weekend:
type: "connection_ack"
, sending type: "connection_init"
, sending type: "ka"
, receiving type: "start_ack"
and sending/receiving type: data
?Re: Above Code: 1. What are the elements of the code whic talk to AppSync?
2. How is Authentication in the above code handled?
3. How may the provided code be improved/extended now or in the future?
connect()
and subscribe()
methods of AppSyncWebsocketsTransport
Re: AppSync Code: 1. Where is the JWT Being Assembled?
boto3
uses botocore
which has both a session
and auth
component.
session
component uses JSON schemas to access individual services, with a separate JSON schema for appsync for example. This implies that all AWS services are accessed in a similar fashion. auth
module, which definitely has support for sigv4
and we should 100% try to use the one in botocore rather than trying to implement our own.
botocore
the sigv4 method is, for reference: https://github.dev/boto/botocore/blob/c49333ca76972718d658924a01b61564aa7a31fe/botocore/auth.py#L176-L1762. Where are the calls necessary to get the token/signature keys?
session
is relying on the JSON structure of the various services in the data
folder of botocore
auth
maps them here: https://github.dev/boto/botocore/blob/c49333ca76972718d658924a01b61564aa7a31fe/botocore/auth.py#L864-L864appsync
service uses v4
add_auth
method somewhere, as noted here: https://github.com/boto/botocore/blob/develop/botocore/auth.py#L1823. What are the pieces of the authentication system that we'd like to replicate?
botocore
, I think we should try and support the full range of Sigv4Auth
as well as the Credentials
class to enable users to talk to AppSync with their existing AWS configurationRe: Existing code 1. How are real-time vs stateless requests segmented?
2. Where/How would we add ~JWT Headers~ Auth Tokens?
3. Where / how might we handle the various message types and error states?
Action Items for next weekend:
botocore
for Sigv4 authenticationFrom there, I will respond to comments and address any outstanding issues and requests in an effort to get this merged.
@joseph-wortmann Thanks for your example code, it is really helpful. @chadfurman Thanks for looking into this, I really appreciate it.
1. Open a PR to add an Auth property in WebsocketTransport designed similarly to the RequestsHTTPTransport and AppSyncTransport example modules
I think you mean add an auth property to the new AWSAppsyncTransport no? There is no point to add it to the current WebsocketsTransport class if it is not used there.
We should try to reuse the connect
and subscribe
methods as much as possible.
The subscribe
method could stay the same if we overload the _send_query
method.
For the connect
method, we could a hook in the connect method in websockets.py (a new argument on_before_connect_hook
by default None, which will allow us to modify the url each time we want to connect). An alternative if possible is just to change the url once in the __init__
of the new class.
2. Open a PR to add AppSyncTransport which uses `botocore` for Sigv4 authentication
Note: if you need to add dependencies, you should keep them separate in a new "aws" extra dependency (put it in a
install_aws_requires
insetup.py
)There's an outstanding question of whether or not we need to segment for AppSync. Initial thinking here is that we will -- I'm not sure if AppSync Websocket Connections can do Queries and Mutations; that may require separate HTTP bindings
For http requests, we should indicate to users that they could use the default AIOHTTPTransport, with custom headers if necessary. I think we should rename the on_subcribe
method of the AppSyncAuthorization
class to get_headers
I think you mean add an auth property to the new AWSAppsyncTransport no? There is no point to add it to the current WebsocketsTransport class if it is not used there.
Possibly, yes. My main concern is not having to copy/paste lots of code to add an Auth hook. If we added an optional auth hook to WebsocketTransport that would make it easier to build custom protocols around it, that could be helpful. Not 100% sure on this, though, and definitely open to suggestions and guidance.
We should try to reuse the
connect
andsubscribe
methods as much as possible. Thesubscribe
method could stay the same if we overload the_send_query
method. For theconnect
method, we could a hook in the connect method in websockets.py (a new argumenton_before_connect_hook
by default None, which will allow us to modify the url each time we want to connect).
My concern here is that Auth for AppSync is more than just a URL. They're adding the "extensions" field to the messages with signed data including timestamps etc which I think have to be updated/maintained as messages fly back and forth. I need to look into this more, but something in WebsocketTransport that would allow us to hook the URL for sure but also the messages themselves could be helpful. If not an "auth" property (because it's somewhat specific), then a more generic hook or set of hooks which would allow for less copy/paste of connect/subscribe stuff.
Note: if you need to add dependencies, you should keep them separate in a new "aws" extra dependency (put it in a
install_aws_requires
insetup.py
)
Noted. Makes sense! Thank you :)
For http requests, we should indicate to users that they could use the default AIOHTTPTransport, with custom headers if necessary.
Aye, when/if doing segmentation then I should really try to re-use the existing transports. I'm thinking that I'll let people pass in the other transports they want to use, as I expect that I will have to wrap the AIOHTTPTransport for ApiGateway auth shenanigans as well.
I think we should rename the
on_subcribe
method of theAppSyncAuthorization
class toget_headers
Could you elaborate on this? I don't quite follow. I'm open to this change, for sure. I think this might go back to the conversation above re: hooks in WebsocketTransport w.r.t. adding auth? We may also want to make sure any hooks we add are consistent in a way that can also be implemented in the AIOHTTPTransport w.r.t. adding auth... TBD
We should try to reuse the
connect
andsubscribe
methods as much as possible. Thesubscribe
method could stay the same if we overload the_send_query
method. For theconnect
method, we could a hook in the connect method in websockets.py (a new argumenton_before_connect_hook
by default None, which will allow us to modify the url each time we want to connect).My concern here is that Auth for AppSync is more than just a URL. They're adding the "extensions" field to the messages with signed data including timestamps etc which I think have to be updated/maintained as messages fly back and forth.
I think that can be done by simply overloading the _send_query
method of WebsocketsTransport(with basically the _send_start_and_wait_ack
above)
I think we should rename the
on_subcribe
method of theAppSyncAuthorization
class toget_headers
Could you elaborate on this? I don't quite follow. I'm open to this change, for sure. I think this might go back to the conversation above re: hooks in WebsocketTransport w.r.t. adding auth? We may also want to make sure any hooks we add are consistent in a way that can also be implemented in the AIOHTTPTransport w.r.t. adding auth... TBD
I noted that the authorization methods of @joseph-wortmann are only returning a dictionnary of elements which are looking like http headers, which are then used:
authorization
key of extensions
in the payload
for each new subscription.I don't really know how different is the Authorization code between websockets or http with aws appsync, the doc is not clear. But if we could reuse the AppSyncAuthorization subclasses, that would be great. Something like this:
auth = AppSyncIAMAuthorization(....)
transport = AIOHTTPTransport(url=url, headers=auth.get_headers())
Thank you for clarifying. I'll follow up on this next weekend and try to get a draft PR open with your suggestions.
Following up on stated goals from last weekend:
Re: #1:
I think you mean add an auth property to the new AWSAppsyncTransport no? There is no point to add it to the current WebsocketsTransport class if it is not used there.
We should try to reuse the connect and subscribe methods as much as possible. The subscribe method could stay the same if we overload the _send_query method. For the connect method, we could a hook in the connect method in websockets.py (a new argument on_before_connect_hook by default None, which will allow us to modify the url each time we want to connect). An alternative if possible is just to change the url once in the init of the new class. I think that can be done by simply overloading the _send_query method of WebsocketsTransport(with basically the _send_start_and_wait_ack above)
Re: #2:
Note: if you need to add dependencies, you should keep them separate in a new "aws" extra dependency (put it in a install_aws_requires in setup.py)
There's an outstanding question of whether or not we need to segment for AppSync. Initial thinking here is that we will -- I'm not sure if AppSync Websocket Connections can do Queries and Mutations; that may require separate HTTP bindings
For http requests, we should indicate to users that they could use the default AIOHTTPTransport, with custom headers if necessary. I think we should rename the on_subcribe method of the AppSyncAuthorization class to get_headers I noted that the authorization methods of @joseph-wortmann are only returning a dictionnary of elements which are looking like http headers, which are then used:
encoded in b64 in the url for the connect in the authorization key of extensions in the payload for each new subscription. I don't really know how different is the Authorization code between websockets or http with aws appsync, the > doc is not clear. But if we could reuse the AppSyncAuthorization subclasses, that would be great. Something like this:
auth = AppSyncIAMAuthorization(....) transport = AIOHTTPTransport(url=url, headers=auth.get_headers())
Goals for today
botocore
for signature verificationLeaving this here for reference: https://aws.amazon.com/blogs/mobile/appsync-websockets-python/
Did not get to #3 above due to complexities in botocore
but I'm fairly sure I figured out how to re-use their existing signing logic. Committing and pushing what I have. Will open a draft PR.
Next Steps:
Added the PR: https://github.com/graphql-python/gql/pull/239
I'll check back in throughout the week for any discussion and will pick away at it again next weekend. Note that we should probably keep discussion on the PR thread, now, for code changes.
Note that the PR is just about ready. I've added what I think are all the necessary code changes to get it to work. Currently, I have not tested it (manually or automatically), and there are no instructions for it. I will be following up next weekend to both test it and improve the documentation. Anything I find in that process will flow over into the following week and from there hopefully we'll be able to wrap this up.
If I get really ambitious I'll try and carve out some time throughout the week to pick away. I'm tracking my efforts in the PR thread, if you're curious, including when I start and stop each session (for no other reason than my own edification, measurements, and improvements).
This is now available in version 3.0.0rc0
AWS Appsync seems to require an
extensions
field on the payload. Does the WebsocketsTransport support this?https://docs.aws.amazon.com/appsync/latest/devguide/real-time-websocket-client.html