aws / aws-appsync-community

The AWS AppSync community
https://aws.amazon.com/appsync
Apache License 2.0
506 stars 32 forks source link

RFC: AppSync Enhanced Subscriptions Filtering #186

Closed KoldBrewEd closed 2 years ago

KoldBrewEd commented 2 years ago

AWS AppSync leverages GraphQL subscriptions to push data to clients that choose to listen to specific events from the backend. This means that you can easily and effortlessly make any supported data source in AppSync real-time with connection management handled automatically between the client and the service. A backend service can easily broadcast data to connected clients or clients can send data to other clients, depending on the use case. Real-time data, connections, scalability, fan-out and broadcasting are all handled by intelligent client libraries and AppSync, allowing you to focus on your application business use cases and requirements instead of dealing with the complex infrastructure to manage WebSockets connections at scale. Subscriptions are invoked in response to a mutation or change in data. In other words, when data is modified via a GraphQL mutation operation, AppSync notifies subscribers of that data change on successful completion of the mutation. In short, a mutation publishes data which is sent to clients subscribed to it.

Customers using AppSync real-time subscriptions sometimes want to filter or restrict data sent to specific subscribed clients. An example could be a chat app where 1:1 conversations are private and only users in a specific conversation can receive messages. To accomplish this today, arguments can be defined in the subscription on the client side which means, for instance, AppSync can send data only to clients listening for just a particular ID. There’s a limit of 5 arguments based on strict equality (i.e. the order of arguments matter) that can be defined in a client-side subscription, which can be combined using AND logic only (i.e. events from location X AND date Y AND description Z) allowing for some flexibility when filtering data to subscribed clients (more details on https://docs.aws.amazon.com/appsync/latest/devguide/aws-appsync-real-time-data.html#using-subscription-arguments). If there’s a need to invalidate an active subscription connection and refresh the data due to a change such as group membership, the client must forcibly close and re-open the WSS connection.

We’re evaluating adding service-side enhanced filtering capabilities in AppSync to enable use cases where developers would want to define fine grained filtering criteria for real-time GraphQL subscriptions on the AppSync backend itself as opposed to the client side. These service-side filters would support additional OR logic and operators such as:

Operator Description Allow value type
eq Equal Integer, String, Boolean
ne Not equal Integer, String, Boolean
le Less than or equal Integer
lt Less than Integer
ge Greater than or equal Integer
gt Greater than Integer
contains Checks for a subsequence, or value in a set Integer, String
notContains Checks for absence of a subsequence, or absence of a value in a set. Integer, String
beginsWith Checks for a prefix String
in Checks for matching elements in a list Integer, String
notIn Checks for matching elements in a list Integer, String
between Between two values Interger, String

We propose these new real-time enhanced filtering capabilities in AppSync would enable additional business logic for filtering or authorization to be executed at runtime whenever data is pushed to a subscribed client with filters defined in the GraphQL API backend itself as opposed to the current implementation with filtering arguments defined on the client side. As an example, if a user is removed from a group, the new capability would allow a subscription connection to be automatically invalidated without any action from the client.

Detailed Example: Filtering

The following details the configuration of an example ticket management system powered by an AppSync GraphQL API. Tickets are created with a mutation, data is stored in a DynamoDB table, and subscribed clients should receive notifications based on ticket severity, priority, classification or groups.

Enhanced filters are enabled in the GraphQL resolver’s Response Mapping Template of a given subscription called onTicketCreated defined in the GraphQL schema and are implemented using the $extensions.setSubscriptionFilter() method to define a filter expression that is evaluated against published data that the subscription might be interested in. You can find more information about AppSync request/response mapping templates in https://docs.aws.amazon.com/appsync/latest/devguide/resolver-mapping-template-reference-overview.html.

A filter group defines a list or group of filters. Filters are defined with one or more of rules, each one with fields, operators and values. In the following example, multiple rules in a filter are evaluated with an AND logic and multiple filters in a filter group with an OR logic:

# Response Mapping Template - OnTicketCreated subscription

$extensions.setSubscriptionFilter({
    "filterGroup": [
        {
           "filters" : [
                {
                    "fieldName" : "severity",
                    "operator" : "ge",
                    "value" : 7
                },
                {
                    "fieldName" : "priority",
                    "operator" : "in",
                    "value" : ["high", "medium"]
                }
           ]

        },
        {
           "filters" : [
                {
                    "fieldName" : "classification",
                    "operator" : "eq",
                    "value" : "Security"
                },
                {
                    "fieldName" : "group",
                    "operator" : "in",
                    "value" : ["admin", "operators"]
                }
           ]

        }
    ]
})

$util.toJson($context.result)

When data is published using a mutation such as:

mutation TicketCreate{
     createTicket(input: {description: "This is a ticket.", group: "admin", classification: "Security", priority: "high", severity: 10}) {
        id
        classification
        group
        description
        priority
        severity
     }
 }

Subscribed clients just need to have a subscription in place listening for the data to be automatically pushed via WebSockets upon ticket creation by the mutation:

subscription OnTicketCreated{
     onTicketCreated {
        id
        classification
        group
        description
        priority
        severity
     }
 }

With the service-side filters defined in the above example, important tickets will be automatically pushed to subscribed API clients if a ticket is created with either high or medium priority AND severity 7 or higher OR classified as security tickets assigned to either the admin or the operators groups. Lower priority tickets can still be manually queried however newly created tickets will be filtered in the backend and won’t be pushed in real-time to WebSocket clients. On the client side, clients just need to be subscribed with a subscription without arguments as all filters are defined service side such as:

#Amplify JS client

import Amplify, { API, graphqlOperation } from 'aws-amplify';
import * as subscriptions from './graphql/subscriptions';

...
const subscription = API.graphql(
    graphqlOperation(subscriptions.onTicketCreated)
).subscribe({
    next: ({ provider, value }) => console.log({ provider, value }),
    error: error => console.warn(error)
});

Detailed Example: Invalidation

This example is unrelated to the ticket system on the previous example, it should be looked at in isolation.

There might be a requirement to force unsubscribe clients from the server side as opposed to use unsubscribe logic on the client side. Unsubscribing would close the connection so clients don’t receive data from the subscription anymore. This is accomplished in the following example via mutation (pub) and subscription (sub) operations defined in the GraphQL schema:

...

type User {
    userID: ID!
    userName: String
    group: String
}

type Mutation {
    removeFromGroup(userID: ID!, group: String!): User
}

type Subscription {
    onGroupRemoval: User
        @aws_subscribe(mutations: ["removeFromGroup"])
}

With enhanced filtering, active subscriptions can also be forcibly invalidated from AppSync by defining an invalidation filter with the $extensions.setSubscriptionInvalidationFilter() method in the subscription GraphQL resolver’s Response Mapping Template.

# Response Mapping Template - onGroupRemoval subscription

$extensions.setSubscriptionInvalidationFilter({
    "filterGroup": [
        {
           "filters" : [
                {
                    "fieldName" : "userID",
                    "operator" : "eq",
                    "value" : $ctx.result.userID
                },
                {
                    "fieldName" : "group",
                    "operator" : "eq",
                    "value" : $ctx.result.group
                }
           ]
    ]
})

$util.toJson($context.result)

The invalidation filter requires additional logic in the publishing side (mutation) defined with the $extensions.invalidateSubscriptions() method in the GraphQL resolver’s Response Mapping template of a mutation linked to the subscription via the @aws_subscribe directive in the GraphQL schema:

# Response Mapping Template - removeFromGroup mutation

$extensions.invalidateSubscriptions({
    subscriptionField: "onGroupRemoval",
    payload: {
            "userID": $ctx.args.userID
            "group": $ctx.args.group
    }
})      
$util.toJson($context.result)

The following attributes are required to invalidate one or more subscriptions linked to the mutation:

In the example above, where users and groups are managed in a DynamoDB table, a mutation can be invoked to remove a user from a specific group and trigger the invalidation. A subscription defined in the subscriptionField of $extensions.invalidateSubscriptions() is then invalidated if it matches the filter defined in the payload. All subscribed clients from this user will be automatically unsubscribed in case data should be pushed just to remaining members of the group:

mutation groupRemoval{
     removeFromGroup(userId: "jdoe", group: "admin") {
        userId
        group
     }
 }

Conclusion

In summary, with this proposal GraphQL Resolvers in AppSync would support 3 new extensions:

If there are no $extensions defined in any GraphQL resolver’s response mapping template for a mutation (invalidation) or subscription (filtering and/or invalidation), the previous subscriptions behavior is enforced where clients define arguments to filter real-time data and subscription resolvers are triggered only at connect/subscribe time.

Please comment on this thread if you have some thoughts or suggestions on this feature or if you think we’re missing any story points which you would love to see as a part of this feature.

bboure commented 2 years ago

Looks great! I will try to find use cases and see how it fits.

I have a question, though.

In the example for the sub invalidation, in the filter rule:

$extensions.setSubscriptionInvalidationFilter({
    "filterGroup": [
        {
           "filters" : [
                {
                    "fieldName" : "userID",
                    "operator" : "eq",
                    "value" : $ctx.result.userID
                },
                {
                    "fieldName" : "group",
                    "operator" : "eq",
                    "value" : $ctx.result.group
                }
           ]
    ]
})

What are $ctx.result.userID and $ctx.result.group? (A subscription does not have result, or so I thought)

I would have expected something like $ctx.args.userID and $ctx.args.group, maybe (and the subscription to have filter params in the schema).

Expected use case/beheviour:

If a client subscribes to

onGroupRemoval(userID: "123", group: "admin"): User

(the current way), and that user gets removed from the "admin" group, the client will receive the subscription AND the subscription will be terminated.

Did I miss something?

KoldBrewEd commented 2 years ago

@bboure , the filter defined in $extensions.setSubscriptionInvalidationFilter() in the subscription resolver is linked to the data payload defined in the mutation resolver with $extensions.invalidateSubscriptions(). This way the arguments passed to the mutation (via $ctx.args) can be used to invalidate the subscription and trigger the unsubscribe() logic on specific clients.

In the example above, the user jdoe is getting removed from the admin group. After the invalidation, any data shared with admin group members won't reach jdoe anymore in any of the clients with active subscriptions. Another use case is, for instance, a social network app where users can unsubscribe from specific posts or unfollow other users.

KyleKotowick commented 2 years ago

Thank you for the detailed proposal, and yes, this would be a very welcome addition. Subscriptions are the main reason I prefer to use GraphQL where possible, but the current limitations around them in AppSync are what stop me from using AppSync for most of my projects.

Perhaps what might be of interest to you is a use case I have where enhanced filtering for subscriptions would make my life much, much easier.

Here's an example project/objective (a contrived example, but represents several limitations). Let's say we're building a service for "grocery shoppers". A customer places a PurchaseOrder, and our registered "Shoppers" are able to see the list of PurchaseOrderSummaries (an object that represents summary data of the purchase order, but not specific details like specific items to buy and the address of where to deliver them) and "claim" one of them. Once a Shopper claims a PurchaseOrderSummary, it becomes unavailable for any other Shopper to claim.

A couple things I would want from AppSync:

The current limitations of AppSync that prevent this system from working:

  1. There's no way to dynamically filter things returned from subscriptions based on a Lambda. In this example use case, I would want a Subscription to be able to call a Lambda for each subscribed Shopper, which checks their current GPS location (with an HTTP request, a lookup in a different database, etc.) and only returns the new PurchaseOrderSummary if they're currently in the same geographical region as the PurchaseOrderSummary was created.
  2. It's not possible to return a different type from a subscription than the mutation that it's subscribed to. In this case, since the claimPurchaseOrder mutation returns a PurchaseOrder type, the subscription also has to return the full PurchaseOrder type, even though I only want it to return a subset of the data (the PurchaseOrderSummary type).
  3. Our shoppers need the full PurchaseOrderSummary type from the subscription on createNewPurchaseOrder so all of the relevant data can be shown in their UI when a new one is created. However, subscriptions only return the fields that were specified in the mutation's selection set. So, if I have a malicious client that modifies the javascript in their browser, they can change the mutation selection set and screw up my system by modifying it to not request all of the returned fields; now the subscribers don't receive the data they need.

How I could work around these issues:

  1. I constantly poll each Shopper's location in a different process, and call a mutation when their location changes. Each shopper's client has a subscription on that mutation to be notified when they get moved to a different "region"; when they do, they drop their current subscription to createNewPurchaseOrder (which was filtering on the old region ID) and create a new subscription that filters on the new region ID.
  2. I use the $ctx.info.selectionSetList variable in the subscription's request mapping template to throw an error if the subscriber requests any more data on the createNewPurchaseOrder subscription than I want them to receive.
  3. I use the $ctx.info.selectionSetList again in the createNewPurchaseOrder mutation request mapping template to throw an error if the mutator doesn't request all of the response fields in their selection set; this ensures that subscribers always receive the data they need.

These are unpleasant workarounds because it means I constantly have to modify the request mapping templates as my schema changes, and the first one isn't even really a workaround because it depends on the client to act in good faith and not subscribe to any regions that they're not supposed to.

In summary, these are the wishlist items that solve the above issues in the same order:

  1. Allow subscription resolvers to use pipelines that are run for EACH mutation for EACH subscriber, so we can do things like dynamic filtering based on some external factor (in the example above, the user's current GPS coordinates)
  2. Allow subscriptions to return Interfaces and Unions, and subscribe to any mutation that returns a type that either implements that Interface or is included in that Union
  3. Allow subscriptions to return all fields returned by a Mutation, even if the user making the mutation doesn't request them in the selection set
danrivett commented 2 years ago

Thanks for writing this up @awsed, this functionality will be incredibly helpful for our project, and will overcome some big limitations with the use cases that Amplify DataStore can currently support as it heavily relies on Subscriptions as you know.

I would say though that I found the examples quite hard to follow, so I apologise if it reduces the quality of my comments.

A few things that made it unclear for me, that I wanted to mention/get clarification on:

I have some other thoughts and use cases that I'll write up separately as the above were just clarifying questions to help me understand the proposal better first as I'm not clear if I've understood things correctly yet.

KoldBrewEd commented 2 years ago

Thank you for the feedback and for sharing specifics on your use case, @KyleKotowick. While the items on your wishlist are not specifically related to this RFC, we'll add them to our backlog.

KoldBrewEd commented 2 years ago

@danrivett, you're 100% correct. The examples are completely unrelated, I'll try to make it clear in the description. They should be looked at in isolation.

The first example is just related to data filtering based on specific operators where we want to prevent some clients from receiving subscriptions data based on server-side filtering logic. No clients need to be invalidated/unsubscribed. The filtered messages will never reach clients that don't fit the defined filter profile.

The second example is a different use case, focused on an authorization scenario where a user is removed from a group and the invalidation allows to forcibly unsubscribe the client so no more data directed to the group is sent to the removed member. The group from teh first example is not the same group in the second.

Let me try to answer your questions inline:

I'm confused with the group usage between the two examples. With the Ticket example it seems to indicate that onTicketCreated subscriptions will fire regardless of what group a User is in - it just checks what group the ticket has been assigned to against a hard coded list.

The subscriptions data will only reach subscribed clients that are members of the defined groups for the onTicketCreated mutation. If an authenticated user is not part of the group, it won't receive a notification about a ticket.

If we wanted to filter based on what groups a user is in, would it be as simple as changing the hard-coded list to something like $ctx.args.userGroups?

Correct, you can use anything passed in the context. You can even have groups in a DynamoDB table instead of hard coded in the filter and use something like $context.result.userGroups to validate.

In that case would the onTicketCreated subscription need to define both $extensions.setSubscriptionFilter and $extensions.setSubscriptionInvalidationFilter?

You could have both $extensions.setSubscriptionFilter and $extensions.setSubscriptionInvalidationFilter in the same subscription resolver if it makes sense for your business logic, use case and requirements. The examples above are separate, in that specific ticket system scenario I'd use a separate mutation/subscription to invalidate a user if needed (i.e. removing a user from the admin group is entirely unrelated to tickets being created). An invalidation will always require a mutation with $extensions.invalidateSubscriptions linked to it. You could think of that mutation as a "control plane" operation to invalidate something based on an invalidation filter and close a subscription from a client. A regular filter unrelated to invalidation use cases doesn't need anything to be added to the mutation.

if we wanted to seprately inform clients they've been removed from a group, we could have a separateonGroupRemoval subscription, but I wouldn't think there would be any $extensions.setSubscriptionInvalidationFilter in there then? Is that correct?

In the example all users would be subscribed to the onGroupRemoval indeed listening for invalidation events in case they're removed from a group, however this is not necessary and a specific subscription is not required. You could have any or multiple existing subscriptions (i.e. onOrderCreated, onOrderUpdated) with their own invalidation filter linked to a mutation called removeFromGroup. If you need to invalidate/unsubscribe a client then $extensions.setSubscriptionInvalidationFilter is required in the subscription with a mutation to trigger the invalidation with $extensions.invalidateSubscriptions.

If it's true that the onTicketCreated subscription would need both $extensions.setSubscriptionFilter and $extensions.setSubscriptionInvalidationFilter definitons, my question then is: is there a way to simplify this for the majority of cases - e.g. have a sensible convention-over-configuration default for the subscriptionInvalidationFilter - perhaps based on user id if no explicit $extensions.setSubscriptionInvalidationFilter was used?

We have separate flows and extensions in order to separate a filter from an invalidation. It'd be a common use case to have data filters with no need to invalidate/unsubscribe clients, and vice versa.

Hope it helps, feel free to reach out here if you have any further questions.

KyleKotowick commented 2 years ago

Thank you for the feedback and for sharing specifics on your use case, @KyleKotowick. While the items on your wishlist are not specifically related to this RFC, we'll add them to our backlog.

@awsed I agree that my "wishlist" item 3 is unrelated to this RFC, but I think 1 and 2 are quite related.

  1. Allow subscription resolvers to use pipelines that are run for EACH mutation for EACH subscriber, so we can do things like dynamic filtering based on some external factor (in the example above, the user's current GPS coordinates)

The RFC is looking for comments on "enhanced filtering", and my comment is, if you're going to implement enhanced filtering, make it fully flexible by supporting Lambdas for filtering.

  1. Allow subscriptions to return Interfaces and Unions, and subscribe to any mutation that returns a type that either implements that Interface or is included in that Union.

If we're talking about enhanced filtering, let's talk about enhanced filtering of everything, including field-level filtering. Arguably, field-level filtering is more important than message-level filtering, because field-level filtering can be extended to message-level filtering (all fields in a message are filtered out), but message-level filtering cannot be fine-grained to specific fields.

And, if we're talking about field-level filtering, then returning Interfaces or Unions from a subscription becomes very important because if a Type has a required field, and the Subscription response has filtered out that field, it will error out because the required field is now null. However, if your subscription can return a Union, then it could be a Union of all of the different Types that might result after filtering.

My point with all of this being, if you're going to do a major project with subscriptions, do it in a way that adds the most flexibility for us as developers.

danrivett commented 2 years ago

Thanks @awsed I think that clears some things up, but I just wanted to clarify one part with the aid of an example.

I'm going to use the infamous group chat example. Let's say we have a WhatsApp message group with a group of members, and a subset of them being administrators who can add/remove other members from the group.

Here's the schema:

type User {
  id: ID!
  groupIds: [ID!]!
}

type Group {
  id: ID!
  name: String!
  members: [ID!]!
}

type GroupMessage {
  id: ID!
  group: ID!
  message: String!
}

We want to support only group members receiving new messages sent to that group as well as allow administrators to be able to boot people from a group and they no longer receive those messages.

The process of booting a person out seems clear to me. We would define a Mutation such as removeUserFromGroup and the Mutation Response Resolver would have something like:

$extensions.invalidateSubscriptions({
    subscriptionField: "onGroupMessageCreated",
    payload: {
            "userId": $ctx.args.userId
            "group": $ctx.args.group
    }
})    

Very similar to your example.

However I'm trying to understand how the onGroupMessageCreated Subscription Response Resolver would then be implemented?

We would need to have a filter so the user only receives messages for that group, but I'm not sure about the user id filter since by definition messages are sent to the group and so GroupMessage doesn't have any references to user ids

$extensions.setSubscriptionFilter({
    "filterGroup": [
        {
           "filters" : [
                {
                    "fieldName" : "groupId",
                    "operator" : "eq",
                    "value" : $ctx.args.groupId
                }
           ]
        }
    ]
})

And related to my original question, I would also think we would need to add a $extensions.setSubscriptionInvalidationFilter to that same onGroupMessageCreated Subscription Response Resolver above to handle the user being booted out the group by an administrator, but not sure again what it would be given the lack of user id in the GroupMessage being returned.

So that's the gap I currently see in this proposal (but the gap may also be in my understanding!) I believe we need to be able to invalidate not just based on matching fields on the return object but also based on matching either an explicit input parameter on the initial subscription, or in our case better yet a context field on the initial subscription (e.g. $ctx.identity.username or something). Then we can invalidate just that user's subscription to the group.

However it seems it can only match based on fields on the data object returned?

Sorry if I've completely misunderstood it or gone down a tangent. I'm just trying to make sure I fully understand the proposal so I can then evaluate it against different use cases we already have for this.

(Also we hide behind Amplify DataStore to take care of the AppSync subscriptions under the covers, so that is also a reason for my lack of understanding here, so apologies).

KoldBrewEd commented 2 years ago

Thank you for the additional context, @KyleKotowick. Let me clarify my previous statement, your wishlist items are all great points. They can definitely complement the proposal. When I mentioned "unrelated" I was referring to the fact that they can be useful even if you're not using enhanced filtering. Thank you for taking the time to provide very useful feedback, please keep it coming.

Hideman85 commented 2 years ago

I like this topic because it is the exact limitation we are facing with AWS AppSync & Amplify DataStore. At the moment you cannot filter subscriptions at runtime.

When I look at the proposed solution, I have a certain frustration because this is still a hard-coded solution. I'm wonder if AppSync could run a pipeline resolver or even a lambda that would enable us to compute more complex authorization.

Imagine a slack application, as a user I want to subscribe to created/updated/deleted messages in my workspace. Then a message trigger a mutation and so a subscription. My authorization is not on the message itself, I need to fetch the channel where this message is sent to send the message or not to the client.

Yes you gonna say, "You have to open one subscription per channel and authorize that subscription" but this is not viable and not scalable. In term of data you want to subscribe to any messages on your concerns.

In addition, transitive access right is pretty common, in my previous example your permission on the message depends on the permissions of the channel. Let's try to expand that case.

Another example would be a google drive app, Folder > Document > Comments at the end any tree data that involve user based policies on multiple nodes. And you cannot open one subscription per Document and per Folder at the end you want to subscribe for a DataType and get filtered events.

All this was for the point of an hard-coded filtering is not enough and a pipeline resolver/lambda resolver would be a lot better.

Now I'm coming to the second point, the closing of a subscription. If we take the google drive case and we imagine I opened a single susbcription per each document I want to get Comments from (like we would have to do right know) I don't understand how you can close all the Document subscriptions if you lose the access to the Folder.

I think AppSync have made lot of progress on Query & DeltaSync Query, pipeline resolver and conflict resolution but yet the subscription is flexible enough to enable real "complex" use cases.

(Note: I was writting the massage when I seen @danrivett second case poping via github message subscription :D)

danrivett commented 2 years ago

My Group Messaging example could equally be applied to your Ticket Management System example too Ed:

If the requirement is for all users in the admin or operators groups to be notified of tickets with a security classification, how would the filter be invalidated if a user was removed from one of those groups?

I don't believe the current proposal supports this, since Tickets have no knowledge of who's in the various groups so there's no user data on the Ticket to match on and so invalidate that user's subscription when they are removed from the group.

As mentioned above I believe you'd need to be able to match on the subscription input/context data alongside the data being returned in the mutation, in order to identify which subscriptions to cancel.

That was why I was thinking it would be good to have those two examples linked because I think it shows a gap.

yashpatel6892 commented 2 years ago

@danrivett let me try to clarify few of your concerns with example.

Let’s assume we have AppSync Schema

type User {
  id: ID!
  groupIds: [ID!]!
}

type Group {
  id: ID!
  name: String!
  members: [ID!]!
}

type GroupMessage {
  id: ID!
  groupId: ID!
  message: String!
}

type Mutation {
    createGroupMessage(id: ID!, groupId : ID!, message: String!): GroupMessage
    removeUserFromGroup(userId: ID!, groupId : ID!) : Boolean
}

type Subscription {
    onGroupMessageCreated(userId: ID!, groupId : ID!): GroupMessage
        @aws_subscribe(mutations: ["createGroupMessage"])
}

schema {
    mutation: Mutation
    subscription: Subscription
}

with Resolver mapping template for mutation removeUserFromGroup(userId: ID!, groupId : ID!) : Boolean

$extensions.invalidateSubscriptions({
        subscriptionField: "onGroupMessageCreated",
        payload: {
                "userId": $ctx.args.userId
                "groupId": $ctx.args.groupId
        }
    })    
$util.toJson($context.result)

and Resolver mapping template for subscription onGroupMessageCreated(userId:ID!, groupId :ID!) :GroupMessage

$extensions.setSubscriptionFilter({
    "filterGroup": [
        {
           "filters" : [
                {
                    "fieldName" : "groupId",
                    "operator" : "eq",
                    "value" : $ctx.args.groupId
                }
           ]
        }
    ]
})

$extensions.setSubscriptionInvalidationFilter({
    "filterGroup": [
        {
           "filters" : [
                {
                    "fieldName" : "userId",
                    "operator" : "eq",
                    "value" : $ctx.args.userId 
                },
                {
                    "fieldName" : "groupId",
                    "operator" : "eq",
                    "value" : $ctx.args.groupId
                }
           ]
    ]
})

$util.toJson($context.result)

NOTE : You can also use $ctx.identity to generate subscription and invalidation filters.

Let's assume client c1 is subscribing user-1 to group-1 using subscription request onGroupMessageCreated(userId : "user-1", groupId: :"group-1") . AppSync will run subscription resolver of subscription onGroupMessageCreated and generate subscription and invalidation filters. For client c1, Subscription filters are groupId="group-1" and invalidation filters are userId="user-1" and groupId="group-1".

Let's assume client c2 is subscribing user-1 to group-2 using subscription request onGroupMessageCreated(userId : "user-1", groupId: :"group-2") . AppSync will run subscription resolver of subscription onGroupMessageCreated and generate subscription and invalidation filters. For client c2, Subscription filters are groupId="group-2" and invalidation filters are userId="user-1" and groupId="group-2".

When new group message is created using mutation createGroupMessage(id: "message-1", groupId : "group-1", message: "test message") , it will generate AppSync subscription message m1

## Subscription Message m1 =
{
  "data": {
    "onGroupMessageCreated": {
      "id": "message-1",
      "groupId": "group-1",
      "message": "test message",
    }
  }
}

Client c1 will receive message m1 because client c1 ‘s filter criteria (groupId="group-1") is matching with message m1 , but client c2 will not receive message m1 because client c2 ‘s filter criteria (groupId="group-2") is not matching with message m1.

To explain subscription invalidation let’s assume user-1 is being removed from group group-1 using mutation removeUserFromGroup(userId: "user-1", groupId : "group-1") . This mutation will initiate subscription invalidation as it is using $extensions.invalidateSubscriptions() in resolver. After running mutation resolver, Invalidation payload IP1 will be

## Invalidation Payload IP1 = 
$extensions.invalidateSubscriptions({
    subscriptionField: "onGroupMessageCreated",
    payload: {
            "userId": "user-1"
            "groupId": "group-1"
    }
})   

AppSync will invalidate client c1's subscription because client c1's invalidation filters (userId="user-1" and groupId="group-1") are matching with invalidation payload IP1 but client c2's subscription will not be invalidated because client c2's invalidation filters (userId="user-1" and groupId="group-2") are not matching with invalidation payload IP1.

I hope this helps.

danrivett commented 2 years ago

@yashpatel6892 this is incredibly helpful, thank you so much for taking the time to walk through that example with me, I think I finally understand it now!

The missing piece for me was I didn't realise the onGroupMessageCreated(userId:ID!, groupId :ID!) subscription could define an input field - userId - to subscribe on, that isn't part of the corresponding Mutation's input parameters - createGroupMessage(id: ID!, groupId : ID!, message: String!).

That was the part I thought was the gap in all this. I was looking at the corresponding documentation and couldn't see an example where the Subscription took an input parameter that didn't match a Mutation's input parameter.

Could you perhaps just confirm this is indeed possible, as seems like possibly a new feature as part of this RFC? I just want to make sure the solution you outlined doesn't fall down on that single part.

Thanks again for taking the time to write this up and explain, I really do appreciate it.

yashpatel6892 commented 2 years ago

@danrivett Glad that helps you understand better.

Yes, It is indeed possible. You can define subscription with arguments that are not part of mutation response.

danrivett commented 2 years ago

That is brilliant Yash. In that case I think this proposal should work very well and solve a great many use cases.

We've been waiting for this RFC for a long time, so I'm extremely happy to see this RFC, and looking forward to its implementation and Amplify DataStore's subsequent support for it.

I will think about this RFC more over the next few days and if I think of any use cases worth sharing I'll add a comment, but your detailed explanation filled in a lot of gaps for me and has given me a lot more confidence. Thanks again.

vicary commented 2 years ago

Although not directly conflicts with this RFC, I would argue that supporting custom datasources that trigger a subscription broadcast message is way more useful than the current @aws_subscribe model.

e.g. To trigger a subscription message via SQS, SNS, Redis, MQTT topics in AWS IoT... etc.

The level of control it gives to a product already outpaces a dedicated filtering solution.

sarazanguie commented 2 years ago

Hi, We are creating a social media and for our newsfeed we are using Appsync for real time and Cognito for Authentication. We try to configure Appsync in a way that when a user publish a private post, only people in his Friendlist can see it , What is the alternative for this enhanced filtering? How can we filter the response in Subscription? After a user Post something in his Newsfeed we want only his Friendlist to be able to see that. We use Cognito for Authentication in our website.

markomilicevic commented 2 years ago

Hello @awsed,

Thanks a lot for this RFC, let's introduce our use case, I think it's similar to the @sarazanguie one;

Context

Our AppSync API trigger some mutations per userID, that are subscribed per userID: this is the notify mutation

Problem

With our growth this become a design issue: We have to trigger one mutation per userID associated to the group → We have to trigger 100 mutations (= 100 Lambdas) if the group have 100 users associated

TLDR;

This RFC don’t solve our use case since what we need is something like to be able to request a mutation for an array of IDs:

mutation MyMutation {
  publishMessage(userIDs: ["42", "84", ...], text: "Hello all") {
    ...
  }
}

And this mutation trigger the subscription for the user "42", "84", ... the in filter defined in this RFC don't fit our need

Our feedback

Use case We have to notify all users of the group when a message is published in this group

Constraints

Schema

extend type Subscription {
  onMessagePublished(userID: ID!): ... @aws_subscribe(mutations: ["publishMessage"])
}

extend type Mutation {
  publishMessage(userID: ID!, text: String!): ...

  notify(groupID: ID!, text: String!): ...
}

Subscribe

subscription MySubscription {
  onMessagePublished(userID: "42") {
    text
  }
}

Mutate

mutation MyMutation {
  notify(groupID: "123", text: "Hello all") {
    ...
  }
}

Flow

Problem When a group is composed of 100 users, this means that we will trigger 100 messagePublished mutations (one per user for subscriptions), this is our current problem following our growth

--

Following the setSubscriptionFilter solution:

$extensions.setSubscriptionFilter({
  "filterGroup": [
    {
      "filters" : [
        {
          "fieldName" : "userIDs", // An array of IDs
          "operator" : "in",
          "value" : $ctx.args.userID // An ID
        }
      ]
    }
  ]
});

→ It is possible to include our use case in this RFC? or an another way already exists to solve that?

Ping me please if it's unclear, thanks a lot for you precious feedback and have a nice day

KoldBrewEd commented 2 years ago

Maybe I'm missing something, @markomilicevic , but I believe you could have something like a groupID in your schema and send a single mutation to that groupID as argument as opposed to send separate messages to each user in the group. The whole group would receive a message as a single mutation would trigger subscriptions to be broadcasted to all users in the group. It's a similar concept in a chat appliaction where users are in a chat room and messages are sent to the ChatRoomID, all users in the room receive the message and no one else. If someone is removed from the group/chat room, the new Invalidation feature in Enhanced Filtering would allow to unsubscribe that single user from the AppSync service side.

I think the issue here is:

"The notify mutation will fetch the list of users that are associated to this group (from a backend)"

It looks like you are cycling through the user list of a group retrieved from your backend and sending a message to each user separatedly. This is an old sample of a chat app but might be useful to ilustrate, in this case users are subscribed to a ConversationID and can be in multiple conversations. In this example Conversations/Chat Rooms are stored in a separate DDB table: https://aws.amazon.com/blogs/mobile/building-a-serverless-real-time-chat-application-with-aws-appsync/

markomilicevic commented 2 years ago

Thanks a lot @awsed for your feedback - For info: We switched from subscriptions per groupIDs to subscription per userID since we detected that our apps lost some add/remove requests to groups along the time (lost of connection, ...), but looks that we have to return to the groupIDs subscriptions following this RFC

I double-checked the last @yashpatel6892 comment and based on our use case:

In short:

@awsed: It is the right goal of this RFC? Thanks again and have a nice day

KoldBrewEd commented 2 years ago

I'm excited to share Enhanced Subscriptions Filtering is now available in AppSync, more information on:

https://aws.amazon.com/blogs/mobile/appsync-enhanced-filtering/ https://docs.aws.amazon.com/appsync/latest/devguide/aws-appsync-real-time-enhanced-filtering.html

Thank you all for the feedback!

matsus2 commented 2 years ago

Hi, I noticed that the appsync enhanced filtering only supports mutations that returns one result if I'm understanding this correctly. If I want to do enhanced subscriptions filtering on mutation that returns list of objects, are there any workarounds or any plans to support this in the future?

rebolloso commented 2 years ago

Hi, I noticed that the appsync enhanced filtering only supports mutations that returns one result if I'm understanding this correctly. If I want to do enhanced subscriptions filtering on mutation that returns list of objects, are there any workarounds or any plans to support this in the future?

I agree. Instead of sending a batch of createMutations, I would like to simply send a mutation with a list / array. That is possible but I can't find a way to create a filtered subscription when a list is returned. It seems to only work when it's not a list. One possible solution is to just have the same filter behavior on lists also.

dwrynngs commented 1 year ago

I have been trying to implement this by following the links above and I have come across an issue that I cannot explain. Hopefully someone here can shed some light on it for me.

I can define the $extensions.setSubscriptionFilter no problem, but when I try to return the result of the filter as above using

$util.toJson($context.result)

I get an error in the app sync console and subscriptions on my app stop. However if I replace that line with the follow line

$util.toJson(Anything)

the enhanced filtering works perfectly with no errors either in the console or the app. By the way 'Anything' is the exact term I used in there. Why does it work with a random undefined term and not with the stated $context.result. what am I missing? The logs of my subscriptions shows the $context.result is always empty also, so dont understand why that is being called in the examples.

rebolloso commented 1 year ago

This is what I have that works: Configure the request mapping template. {"version": "2017-02-28", "payload": $util.toJson($context)}

Configure the response mapping template.

#if($context.arguments.filter)
    $extensions.setSubscriptionFilter($util.transform.toSubscriptionFilter($context.result.arguments.filter))
#end
$util.toJson($context.arguments.result)
danielbender1989 commented 1 year ago

@rebolloso I am struggling making your template run. It works great i I setup the subscription without variables like this:

subscription MySubscription {
  subscription {
    id
    timestamp
  }
}

However when I setup the subscription with a filter as variable I get an error

Setup of subscription in AWS Console - Queries:

subscription MySubscription($filter: String) {
  updatedFteForecast(filter: $filter) {
    id
    timestamp
  }
}

{"filter" : "{\"id\":{\"ne\":\"b\"}}"}

Error received:

error@https://a.b.cdn.console.awsstatic.com/a/v1/OTEUDIDSM46LFH4KS6TVXQKGL6SCYZ44G32OFEBOFCUHSM4FWZSA/main.js:190:207538
b@https://a.b.cdn.console.awsstatic.com/a/v1/OTEUDIDSM46LFH4KS6TVXQKGL6SCYZ44G32OFEBOFCUHSM4FWZSA/main.js:334:593699
S@https://a.b.cdn.console.awsstatic.com/a/v1/OTEUDIDSM46LFH4KS6TVXQKGL6SCYZ44G32OFEBOFCUHSM4FWZSA/main.js:334:594154
value@https://a.b.cdn.console.awsstatic.com/a/v1/OTEUDIDSM46LFH4KS6TVXQKGL6SCYZ44G32OFEBOFCUHSM4FWZSA/main.js:334:594772
_handleIncomingSubscriptionMessage@https://a.b.cdn.console.awsstatic.com/a/v1/OTEUDIDSM46LFH4KS6TVXQKGL6SCYZ44G32OFEBOFCUHSM4FWZSA/main.js:186:129388

Would you be able to share how you subscribe and how your filter variable looks like?

rebolloso commented 1 year ago

My schema looks like this below. If you need more info I'm happy to provide more info: ` input FilterIDInput { eq: ID ne: ID le: ID lt: ID ge: ID gt: ID contains: ID notContains: ID beginsWith: ID in: [ID] notIn: [ID] between: [ID] }

input FilterIntInput { ne: Int eq: Int le: Int lt: Int ge: Int gt: Int contains: Int notContains: Int between: [Int] }

input FilterJtomInput { or: [FilterJtomInput!] and: [FilterJtomInput!] g: FilterIDInput s: FilterIDInput e: FilterIDInput a: FilterIDInput t: FilterIntInput }

type Jtom { g: ID s: ID e: ID a: ID v: AWSJSON t: Int o: AWSDateTime av: JtomAv }

type JtomAv { a: ID isArray: Boolean }

input JtomAvInput { a: ID isArray: Boolean }

input JtomInput { g: ID! s: ID! e: ID! a: ID! v: AWSJSON t: Int o: AWSDateTime av: JtomAvInput }

type Mutation { notifyJtoms(jtoms: [JtomInput!]!): [Jtom] notifyJtom(jtom: JtomInput!): Jtom }

type Query { getJtomsByGS( g: ID!, s: ID, limit: Int, nextToken: String ): jtomConnection getJtomsByAT( a: ID!, t: Int, limit: Int, nextToken: String ): jtomConnection getJtomsBySProcessing( s: ID!, processing: ID!, limit: Int, nextToken: String ): jtomConnection listJtoms(filter: FilterJtomInput, limit: Int, nextToken: String): jtomConnection }

type Subscription { notifyJtomsSubscription: [Jtom] @aws_subscribe(mutations: ["notifyJtoms"]) notifyJtomSubscription(filter: FilterJtomInput): Jtom @aws_subscribe(mutations: ["notifyJtom"]) }

type jtomConnection { items: [Jtom] nextToken: String }

schema { query: Query mutation: Mutation subscription: Subscription } `