apollographql / federation

🌐  Build and scale a single data graph across multiple services with Apollo's federation gateway.
https://apollographql.com/docs/federation/
Other
668 stars 257 forks source link

Enhance handling of entity fetches to reduce traffic #180

Open nicholashagen-hagen opened 4 years ago

nicholashagen-hagen commented 4 years ago

Currently, anytime a reference occurs where Federation must invoke an entity/representation query, it calls the underlying service with the list of representations. This happens within https://github.com/apollographql/federation/blob/main/gateway-js/src/executeQueryPlan.ts#L231-L277.

However, there are a few issues that occur as part of this that we've seen in practice with some more complex queries.

  1. At times, when running over a list of references, the references are null meaning no reference exist at that index. This results in an empty set of representations. As such an entity query of { representations : [ ] } is passed to the service. This is essentially a no-op and unnecessary. Instead, a check could be done after the forEach array to check if any representations were determined. If not, simply return. Otherwise, continue as normal.

  2. This one is far more complex. Essentially, while invoking a request it is possible that either a duplicate reference exists within the same operation or multiple operations within the same request occur and the same entity exists between them. This results in lots of unnecessary traffic since the data already exists. In a way this is like the data loader problem/solution where an entity is fetched multiple times. The data loader maintains a local cache unique per request to help offset those fetches. A similar principle may be useful within the above code. First, we should flatten the list into a set to ensure only unique entities exist and then copy the resulting entity back into the entities. Second, we should maintain a cache in the context (or elsewhere) of representation to promise/data. Often times, the invocations happen in parallel, so treating the cache as a promise would make the parallelism and cache more effective. While determining the representations, the cache should be checked to see if a promise or result exists for the representation already. If so, a resolver should be wired to copy the result over once the promise completes. If not, the representation should be passed to the underlying service and the promise cached for future calls. Once the sendOperation completes, the promises may be resolved (or rejected) allowing all items waiting on cached promises to finish.

The combination of these two will greatly improve the effectiveness and traffic to core systems.

If desired, I can help propose/produce more concrete code if there is a desire to take up this work. I already have some hack'd up code to test and validate some of my theories. I can improve upon that if needed.

nicholashagen-hagen commented 4 years ago

I don't know Typescript well, so I've been just bee messing with the JS generated code, but here is an idea that I've been working off:

/**
 * Build a custom cache for managing the entity cache per context/request.
 * The key may be any object such as a representation, so must stringify to
 * a key.  The number of elements here is not great and per request, so no
 * real reason to use a more elaborate LRU solution or class.
 */
class EntityCache {
    constructor() {
        this.cache = { };
        this.prefix = "entity:";
    }

    get(key){
        return this.cache[this.prefix + JSON.stringify(key)];
    }

    set(key, val) {
        this.cache[this.prefix + JSON.stringify(key)] = val;
    }
}

/**
 * Custom non-promise based class responsible for being cached
 * and then responding with the resulting value once resolved.  This
 * ensures we only fetch a given entity once.
 */
class EntityPromise {
    constructor() {
        this.result = undefined;
        this.listeners = [ ];
    }

    then(fn) {
        if (this.result) { fn(this.result); }
        else { this.listeners.push(fn); }
    }

    resolve(val) {
        this.listeners.forEach(listener => listener(val));
        this.listeners = [];
    }
}

And then within the executeFetch function:

async function executeFetch(context, fetch, results, _path, traceNode) {
    const logger = context.requestContext.logger || console;
    const service = context.serviceMap[fetch.serviceName];
    if (!service) {
        throw new Error(`Couldn't find service with name "${fetch.serviceName}"`);
    }
    const entities = Array.isArray(results) ? results : [results];
    if (entities.length < 1)
        return;
    let variables = Object.create(null);
    if (fetch.variableUsages) {
        for (const variableName of fetch.variableUsages) {
            const providedVariables = context.requestContext.request.variables;
            if (providedVariables &&
                typeof providedVariables[variableName] !== 'undefined') {
                variables[variableName] = providedVariables[variableName];
            }
        }
    }
    if (!fetch.requires) {
        const dataReceivedFromService = await sendOperation(context, fetch.operation, variables);
        for (const entity of entities) {
            deepMerge_1.deepMerge(entity, dataReceivedFromService);
        }
    }
    else {
        ///======= START CHANGES =======

        // This code should be put where the context is created to initialize the
        // entity cache.  For now here as a test.
        if (!context.cache) {
            context.cache = new EntityCache();
        }

        const requires = fetch.requires;
        const representations = [];
        const representationToCache = [];
        entities.forEach((entity, index) => {
            const representation = executeSelectionSet(entity, requires);
            if (representation && representation[graphql_1.TypeNameMetaFieldDef.name]) {

                // check if the request has already seen this representation to de-dup
                let idx = index;
                let existing = context.cache.get(representation);

                // if first time, create the promise, cache it, and add to representations to send
                if (!existing) {
                    existing = new EntityPromise();
                    representations.push(representation);
                    representationToCache.push(existing);
                    context.cache.set(representation, existing);
                }

                // add our callback to update the entity instance with the result
                // this happens in both cases : cached or first time
                // for first time, the code below to send will perform resolution
                existing.then(result => 
                    deepMerge_1.deepMerge(entities[idx], result));

               // only first time requests will be sent and awaited on...any other concurrent
               // fetches with same ids will return immediately and resolve once the first await
               // completes...is there any potential async-loading issue or out of order event
               // that we should await everything maybe?
            }
        });
        // if no representations were found, simply return (no further work to do)
        if (representations.length < 1) {
            return;
        }
        if ('representations' in variables) {
            throw new Error(`Variables cannot contain key "representations"`);
        }
        const dataReceivedFromService = await sendOperation(context, fetch.operation, { ...variables, representations });
        if (!dataReceivedFromService) {
            return;
        }
        if (!(dataReceivedFromService._entities &&
            Array.isArray(dataReceivedFromService._entities))) {
            throw new Error(`Expected "data._entities" in response to be an array`);
        }
        const receivedEntities = dataReceivedFromService._entities;
        if (receivedEntities.length !== representations.length) {
            throw new Error(`Expected "data._entities" to contain ${representations.length} elements`);
        }
        for (let i = 0; i < representationToCache.length; i++) {
            // resolve the cached promise with the result
            // this will update any and all code waiting on this entity
            representationToCache[i].resolve(receivedEntities[i]);
        }
    }
    ///////=========== END CHANGES ========