ruflin / Elastica

Elastica is a PHP client for elasticsearch
http://elastica.io/
MIT License
2.26k stars 732 forks source link

Add helper to populate index with documents #840

Open im-denisenko opened 9 years ago

im-denisenko commented 9 years ago

Library doesn't provide standartized way to populate some index with set of documents, while this is very common task. As a result, every new user should implement this chunk of code again and again.

Examples are: one, two, three, four, five, six, seven, eight, nine, etc.

I think, that #829, #830, and #836 is related to this, because "populate index from another" is just special case of more general task "populate index from somewhere".

If this issue will be resolved, we don't need Util::copy function at all. Implementation draft below explains why.

im-denisenko commented 9 years ago

Implementation could be similar to fos elastica one, but without symfony.

How it works:

interface ProviderInterface
{
    /**
     * @return int
     */
    public function getDocumentCount();

    /**
     * @return Document
     */
    public function getNextDocument();
}
class Populator
{
    public function __construct(Client $client);

    public function setBulkSize($size = 1000);

    public function processProvider(ProviderInterface $provider);

    public function setIterationCompletedCallback(callable $callback);

    public function setDocumentRetrievedCallback(callable $callback);
}

Predefined provider - uses elastica index as data source.

class IndexProvider implements ProviderInterface
{
    protected $scanAndScroll;

    protected $documentCount;

    public function __construct(Index $index)
    {
        $search = $index->createSearch();
        $this->documentCount = $search->count();
        $this->scanAndScroll = new ScanAndScroll($search);
    }

    public function getDocumentCount()
    {
        return $this->documentCount;
    }

    public function getNextDocument()
    {
        // Iterate $this->scanAndScroll and return documents one by one
    }
}

Usage example (copy from old index to new one):

$client = new Client();

$oldIndex = $client->getIndex('foo');
$newIndex = $client->getIndex('bar');

$provider = new IndexProvider($oldIndex);
$provider->setDocumentRetrievedCallback(function(Document $document) use ($newIndex) {
    return $document->setIndex($newIndex);
});

$populator = new Populator($client);
$populator->processProvider($provider);
im-denisenko commented 9 years ago

With that, users will have just one interface they must implement for whatever data source they want to use, and don't dig into bulk api or how to add/update multiple documents at once, because library takes care of this.

Welcome @ruflin @webdevsHub @xwei3752 @virtuman

webdevsHub commented 9 years ago

Populator::processProvider() would execute $provider->getNextDocument() in a loop and send them one by one to DocumentRetrievedCallback.

I can not see how Populator::setBulkSize() can be used?

im-denisenko commented 9 years ago

Like this. Note $this->bulkSize:

class Populator
{
    public function processProvider(ProviderInterface $provider)
    {
        // preparations
        $documents = [];
        $chunkSize = 0;

        // fetch loop
        while ($document = $provider->getNextDocument()) {

            if (is_callable($this->onDocumentRetrievedCallback)) {
                $document = call_user_func_array($this->onDocumentRetrievedCallback, [$document]);
            }

            // send chunk to ES if overflow
            if (++$chunkSize > $this->bulkSize) {
                $this->client->addDocuments($documents);
                $documents = [];
                $chunkSize = 0;

                if (is_callable($this->onIterationCompletedCallback)) {
                    call_user_func_array($this->onIterationCompletedCallback, [/* some arguments*/]);
                }
            }

            // add new document to chunk
            $documents[] = $document;
        }

        // if last chunk was not sended, send now
        if (!empty($documents)) {
            $this->client->addDocuments($documents);
        }
    }
}
webdevsHub commented 9 years ago

Ahh okay I see: IterationCompletedCallback is fired when a bulk iteration is complete.

It is better then Util::copy() because one can implement different data sources. The optional listeners are a very good idea too. :+1:

Util::copy() would copy the mapping too (see https://github.com/ruflin/Elastica/issues/830#issuecomment-101061610). Looks like this can be not handled by Populator because it works with documents only.

ruflin commented 9 years ago

Is getDocumentCount() required for the implementation or is it only good to have in the interface?

I could actually see that we implement this one and the Util function copy just uses the popular and offers a simple "mapper" for the user (eat your own dog food)

im-denisenko commented 9 years ago

It's not necessary. My thoughts was that with getDocumentCount populator can track current progress and pass it to onIterationCompletedCallback as argument. It's useful in case you have a lot of documents and want to see how much were already processed, then you could do it in callback.

However, it's just an extra feature and could be removed to minimize interface.

ruflin commented 9 years ago

I actually quite like the callback option. Suggestion: