Open hitriyvalenok opened 1 year ago
Hi @hitriyvalenok!
We ship two implementations of Parcel
which can be used across processes: SharedMemoryParcel
in amphp/sync
and RedisParcel
in amphp/redis
. Neither of these must be serialized to be used in the parent and child process
Use SharedMemoryParcel::create()
in the parent and SharedMemoryParcel::use()
in each child process. You'll need to use a mutex which can be shared across processes as well, so likely PosixSemaphore
wrapped with SempahoreMutex
, all found in amphp/sync
.
RedisParcel
of course requires a redis server, but is a bit simpler and does not require additional extensions. Create a RedisParcel
using the same key in the parent and child.
You also could implement another Parcel
using any data store, such as a file. Accessing the data will need to be mutually exclusive, so a mutex will likely be needed. amphp/file
provides a file mutex, though no file parcel. If you do implement anything which you think would be useful to others, we always appreciate PRs! 👍
Channels are essentially streams which return PHP values rather than only bytes. You can create a new fiber/coroutine using Amp\async()
to read data from a channel in a loop. Values can be sent at any time on a channel, they do not need to be in response to data received on the channel.
Below is a snippet from ContextWorker
in amphp/cluster
which is receiving messages from the parent process and acting upon them in the child process. Another method, send()
(found here) is used to send messages back to the parent, which may be invoked at any time. A similar loop in the parent is receiving messages from the child.
$cancellation = $this->deferredCancellation->getCancellation();
try {
// We get null as last message from the cluster-runner in case it's shutting down cleanly.
// In that case, join it.
/** @var ClusterMessage $message */
while ($message = $this->context->receive($cancellation)) {
$this->lastActivity = \time();
/** @psalm-suppress UnhandledMatchCondition False positive. */
match ($message->type) {
ClusterMessageType::Pong => null,
ClusterMessageType::Data => $this->handleMessage($message->data),
ClusterMessageType::Log => \array_map(
static fn (MonologHandler $handler) => $handler->handle($message->data),
$this->logger->getHandlers(),
),
ClusterMessageType::Ping => throw new \RuntimeException('Unexpected message type received'),
};
}
$this->joinFuture->await(new TimeoutCancellation(Watcher::WORKER_TIMEOUT));
} finally {
EventLoop::cancel($watcher);
$this->close();
}
Hopefully this was helpful. We'll be working more on docs in the coming months and I'll consider adding an example like the above to the readme. Please reply with any additional questions (and I'll try to respond more promptly).
Hello there! 👋 It's not an issue, rather a question 🤗 I have a worker pool with some tasks running. Workers are using consumables they may request at any time and any number of times. Ideally, I wish to use Parcel for this purpose. Parcel is a very convenient way of communicating with data, but it can't be serialized and thus you can't pass it to Task. There's also Channel, but I don't understand how to use it in more complex cases than the docs described (when you sequentially send value, receive value, return result). How to solve my challenge without crutches? Thanks!