api-platform / core

The server component of API Platform: hypermedia and GraphQL APIs in minutes
https://api-platform.com
MIT License
2.42k stars 863 forks source link

Mercure normalization context per topic. #5969

Open ThomasCarbon opened 10 months ago

ThomasCarbon commented 10 months ago

Duplicate: https://github.com/api-platform/core/issues/4326

Since the original authord can't reopon the issue even after i added "temporary" solution. I post my own issue.
Here is the repost of my comment in the older issue :

I had the same problem and i managed to made something work. Disclaimer : this is not unit tested since it has been done under an end of a project like "oh shit, this is not natively possible". This is a dirty implementation probably but a good track on how to achieve it in a proper version. But we use it in production since somes weeks now, and it work like a charm. So we did a complete override of ApiPlatform\Doctrine\EventListener\PublishMercureUpdatesListener with the following code.

final class PublishMercureUpdatesListenerDecorator
{
    use DispatchTrait;
    use ResourceClassInfoTrait;

    private const ALLOWED_KEYS = [
        'topics' => true,
        'data' => true,
        'private' => true,
        'id' => true,
        'type' => true,
        'retry' => true,
        'normalization_context' => true,
        'hub' => true,
        'enable_async_update' => true,
    ];
    private readonly ?ExpressionLanguage $expressionLanguage;
    private \SplObjectStorage $createdObjects;
    private \SplObjectStorage $updatedObjects;
    private \SplObjectStorage $deletedObjects;

    /**
     * @param array<string, string[]|string> $formats
     */
    public function __construct(
        ResourceClassResolverInterface                                    $resourceClassResolver,
        private readonly IriConverterInterface                            $iriConverter,
        ResourceMetadataCollectionFactoryInterface                        $resourceMetadataFactory,
        private readonly SerializerInterface                              $serializer,
        private readonly array                                            $formats,
        MessageBusInterface                                               $messageBus = null,
        private readonly ?HubRegistry                                     $hubRegistry = null,
        private readonly ?GraphQlSubscriptionManagerInterface             $graphQlSubscriptionManager = null,
        private readonly ?GraphQlMercureSubscriptionIriGeneratorInterface $graphQlMercureSubscriptionIriGenerator = null,
        ExpressionLanguage                                                $expressionLanguage = null,
        private bool                                                      $includeType = false
    )
    {
        if (null === $messageBus && null === $hubRegistry) {
            throw new InvalidArgumentException('A message bus or a hub registry must be provided.');
        }

        $this->resourceClassResolver = $resourceClassResolver;

        $this->resourceMetadataFactory = $resourceMetadataFactory;
        $this->messageBus = $messageBus;
        $this->expressionLanguage = $expressionLanguage ?? (class_exists(ExpressionLanguage::class) ? new ExpressionLanguage() : null);
        $this->reset();

        if ($this->expressionLanguage) {
            $rawurlencode = ExpressionFunction::fromPhp('rawurlencode', 'escape');
            $this->expressionLanguage->addFunction($rawurlencode);

            $this->expressionLanguage->addFunction(
                new ExpressionFunction('iri', static fn(string $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL): string => sprintf('iri(%s, %d)', $apiResource, $referenceType), static fn(array $arguments, $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL): string => $iriConverter->getIriFromResource($apiResource, $referenceType))
            );
        }

        if (false === $this->includeType) {
            trigger_deprecation('api-platform/core', '3.1', 'Having mercure.include_type (always include @type in Mercure updates, even delete ones) set to false in the configuration is deprecated. It will be true by default in API Platform 4.0.');
        }
    }

    /**
     * Collects created, updated and deleted objects.
     */
    public function onFlush(EventArgs $eventArgs): void
    {
        if ($eventArgs instanceof OrmOnFlushEventArgs) {
            $uow = method_exists($eventArgs, 'getObjectManager') ? $eventArgs->getObjectManager()->getUnitOfWork() : $eventArgs->getEntityManager()->getUnitOfWork();
        } elseif ($eventArgs instanceof MongoDbOdmOnFlushEventArgs) {
            $uow = $eventArgs->getDocumentManager()->getUnitOfWork();
        } else {
            return;
        }

        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityInsertions' : 'getScheduledDocumentInsertions';
        foreach ($uow->{$methodName}() as $object) {
            $this->storeObjectToPublish($object, 'createdObjects');
        }

        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityUpdates' : 'getScheduledDocumentUpdates';
        foreach ($uow->{$methodName}() as $object) {
            $this->storeObjectToPublish($object, 'updatedObjects');
        }

        $methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityDeletions' : 'getScheduledDocumentDeletions';
        foreach ($uow->{$methodName}() as $object) {
            $this->storeObjectToPublish($object, 'deletedObjects');
        }
    }

    /**
     * Publishes updates for changes collected on flush, and resets the store.
     */
    public function postFlush(): void
    {
        try {
            foreach ($this->createdObjects as $object) {
                $this->publishUpdate($object, $this->createdObjects[$object], 'create');
            }

            foreach ($this->updatedObjects as $object) {
                $this->publishUpdate($object, $this->updatedObjects[$object], 'update');
            }

            foreach ($this->deletedObjects as $object) {
                $this->publishUpdate($object, $this->deletedObjects[$object], 'delete');
            }
        } finally {
            $this->reset();
        }
    }

    private function reset(): void
    {
        $this->createdObjects = new \SplObjectStorage();
        $this->updatedObjects = new \SplObjectStorage();
        $this->deletedObjects = new \SplObjectStorage();
    }

    private function storeObjectToPublish(object $object, string $property): void
    {
        if (null === $resourceClass = $this->getResourceClass($object)) {
            return;
        }

        $operation = $this->resourceMetadataFactory->create($resourceClass)->getOperation();
        try {
            $options = $operation->getMercure() ?? false;
        } catch (OperationNotFoundException) {
            return;
        }

        if (\is_string($options)) {
            if (null === $this->expressionLanguage) {
                throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".');
            }

            $options = $this->expressionLanguage->evaluate($options, ['object' => $object]);
        }

        if (false === $options) {
            return;
        }

        if (true === $options) {
            $options = [];
        }

        if (!\is_array($options)) {
            throw new InvalidArgumentException(sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of options or an expression returning this array, "%s" given.', $resourceClass, \gettype($options)));
        }

        foreach ($options as $key => $value) {
            if (!isset(self::ALLOWED_KEYS[$key])) {
                throw new InvalidArgumentException(sprintf('The option "%s" set in the "mercure" attribute of the "%s" resource does not exist. Existing options: "%s"', $key, $resourceClass, implode('", "', self::ALLOWED_KEYS)));
            }
        }

        $options['enable_async_update'] ??= true;

        if ($options['topics'] ?? false) {
            $topics = [];
            foreach ((array)$options['topics'] as $topic) {
                //------------------------ Home made code ---------------------------- //
                if (!\is_string($topic) && !\is_array($topic)) {
                    $topics[] = $topic;
                    continue;
                }

                //------------------------ Home made code ---------------------------- //
                if (\is_string($topic) && !str_starts_with($topic, '@=')) {
                    $topics[] = $topic;
                    continue;
                }

                //------------------------ Home made code ---------------------------- //
                if (\is_array($topic) && !str_starts_with($topic['iri'], '@=')) {
                    $topics[] = $topic;
                    continue;
                }

                if (null === $this->expressionLanguage) {
                    throw new \LogicException('The "@=" expression syntax cannot be used without the Expression Language component. Try running "composer require symfony/expression-language".');
                }

                //------------------------ Home made code ---------------------------- //
                if (is_array($topic)) {
                    $topics[] = [
                        'iri' => $this->expressionLanguage->evaluate(substr($topic['iri'], 2), ['object' => $object]),
                        'context' => $topic['context']
                    ];
                } else {
                    $topics[] = $this->expressionLanguage->evaluate(substr($topic, 2), ['object' => $object]);
                }
            }

            $options['topics'] = $topics;
        }

        if ('deletedObjects' === $property) {
            $types = $operation instanceof HttpOperation ? $operation->getTypes() : null;
            if (null === $types) {
                $types = [$operation->getShortName()];
            }

            $this->deletedObjects[(object)[
                'id' => $this->iriConverter->getIriFromResource($object),
                'iri' => $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL),
                'type' => 1 === \count($types) ? $types[0] : $types,
            ]] = $options;

            return;
        }

        $this->{$property}[$object] = $options;
    }

    private function publishUpdate(object $object, array $options, string $type): void
    {
        if ($object instanceof \stdClass) {
            // By convention, if the object has been deleted, we send only its IRI and its type.
            // This may change in the feature, because it's not JSON Merge Patch compliant,
            // and I'm not a fond of this approach.

            //------------------------ Home made code ---------------------------- //
            $iris = isset($options['topics']) ? $this->groupIri($options['topics']) : [$object->iri];
            /** @var string $data */
            $data = json_encode(['@id' => $object->id] + ($this->includeType ? ['@type' => $object->type] : []), \JSON_THROW_ON_ERROR);
        } else {
            $resourceClass = $this->getObjectClass($object);
            $originalContext = $context ?? [];
            $context = is_array($iri) ? array_unique(array_merge($iri['context'], $originalContext)) : $originalContext;

            //------------------------ Home made code ---------------------------- //
            $iris = isset($options['topics']) ? $this->groupIriByTopicContext($options['topics'], $context) : [$this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL)];
        }

        //------------------------ Home made code ---------------------------- //
        $updateByIris = [];
        foreach ($iris as $iri) {
            if ($object instanceof \stdClass) {
                $updateByIris[] = $this->buildUpdate($iri, $data, $options);
                continue;
            }

            $context = is_array($iri) ? $iri['context'] : $context;
            $iri = is_array($iri) ? $iri['topics'] : $iri;

            if(empty($iri)){
                continue;
            }

            $data = $options['data'] ?? $this->serializer->serialize($object, key($this->formats), $context);
            $updateByIris[] = $this->buildUpdate($iri, $data, $options);
        }

        $updates = array_merge($updateByIris, $this->getGraphQlSubscriptionUpdates($object, $options, $type));
        //------------------------ Home made code end ---------------------------- //

        foreach ($updates as $update) {
            if ($options['enable_async_update'] && $this->messageBus) {
                $this->dispatch($update);
                continue;
            }

            $this->hubRegistry->getHub($options['hub'] ?? null)->publish($update);
        }
    }

    //------------------------ Home made code ---------------------------- //
    private function groupIri(array $topics): array
    {
        $iris = [];
        foreach ($topics as $topic) {
            if (!\is_array($topic)) {
                $iris[] = $topic;
            } else {
                $iris[] = $topic['iri'];
            }
        }

        return $iris;
    }

    //------------------------ Home made code ---------------------------- //
    private function groupIriByTopicContext(array $topics, array $defaultContext): array
    {
        $irisGroupedByContext = [
            [
                'context' => $defaultContext,
                'topics' => [],
            ]
        ];

        foreach ($topics as $topic) {
            if (!\is_array($topic)) {
                foreach ($irisGroupedByContext as $key => $iriGroupedByContext) {
                    if ($iriGroupedByContext['context'] === $defaultContext) {
                        $irisGroupedByContext[$key]['topics'][] = $topic;
                    }
                }
                continue;
            }

            $topicContext = $topic['context'];

            $filled = false;
            foreach ($irisGroupedByContext as $key => $iriGroupedByContext) {
                if ($iriGroupedByContext['context'] === $topicContext) {
                    $irisGroupedByContext[$key]['topics'][] = $topic['iri'];
                    $filled = true;
                    break;
                }
            }

            if ($filled) {
                continue;
            }

            $irisGroupedByContext[] = [
                'context' => $topicContext,
                'topics' => [$topic['iri']]
            ];

        }

        return $irisGroupedByContext;
    }

    /**
     * @return Update[]
     */
    private function getGraphQlSubscriptionUpdates(object $object, array $options, string $type): array
    {
        if ('update' !== $type || !$this->graphQlSubscriptionManager || !$this->graphQlMercureSubscriptionIriGenerator) {
            return [];
        }

        $payloads = $this->graphQlSubscriptionManager->getPushPayloads($object);

        $updates = [];
        foreach ($payloads as [$subscriptionId, $data]) {
            $updates[] = $this->buildUpdate(
                $this->graphQlMercureSubscriptionIriGenerator->generateTopicIri($subscriptionId),
                (string)(new JsonResponse($data))->getContent(),
                $options
            );
        }

        return $updates;
    }

    /**
     * @param string|string[] $iri
     */
    private function buildUpdate(string|array $iri, string $data, array $options): Update
    {
        return new Update($iri, $data, $options['private'] ?? false, $options['id'] ?? null, $options['type'] ?? null, $options['retry'] ?? null);
    }
}

Basically, only storeObjectToPublish() and publishUpdate() had changed. And we added those two function to avoid duplicate code : groupIri() and groupIriByTopicContext()

Then inside service.yaml :

ApiPlatform\Doctrine\EventListener\PublishMercureUpdatesListener:
    class: App\Service\PublishMercureUpdatesListenerDecorator
    arguments:
      $formats: '%api_platform.formats%'

Then inside a mercure option its simple :

$topics = [
            [
                'iri' => "@='/admin/?topic=' ~ escape(iri(object))",
                'context' => ['groups' => ['admin:read']]
            ],
        ];

Note that you can mix whatever you want, it is totally working with the traditional way if for some topics, serialization group are not needed. For example :

$topics = [
            [
                'iri' => "@='/admin/?topic=' ~ escape(iri(object))",
                'context' => ['groups' => ['admin:read']]
            ],
            "@='/users/$uuid/assets' ~ '/?topic=' ~ escape(iri(object))",
        ];

Basically, the change just look at the array entry, if its a string it does the old way, if its an array, it look for an 'iri' key and a 'context' key. We use it to always publish complete object to admin iri and the default context of the object to users. Feel free to ask any question and also feel free to improve our shit haha. But at least it works great.

Originally posted by @ThomasCarbon in https://github.com/api-platform/core/issues/4326#issuecomment-1812516742

ThomasCarbon commented 10 months ago

@norkunas done :)

ThomasCarbon commented 10 months ago

In publish update function, i just changed :

$context = $options['normalization_context'] ?? $this->resourceMetadataFactory->create($resourceClass)->getOperation()->getNormalizationContext() ?? [];

TO

$originalContext = $context ?? ['groups' => []];
$context["groups"] = is_array($iri) ? array_unique(array_merge($iri['context']['groups'], $originalContext['groups'])) : $originalContext['groups'];
$iri = is_array($iri) ? $iri['topics'] : $iri;

Since my change, default entity context were not used while overriding context by topics