keboola / storage-api-php-client

Storage API Client
MIT License
10 stars 8 forks source link

Batch Write Tables #233

Closed ondrejhlavacek closed 5 years ago

ondrejhlavacek commented 6 years ago

Popisek je DEPRECATED

Nová verze je na https://github.com/keboola/storage-api-php-client/issues/233#issuecomment-439356063


Oproti slackové diskusi bych to asi nechal zatím minimalistický. Při output mappingu se děje následující sekvence operací (provádí se v trochu jiném pořadí, než tu píšu a nejsou všechny kroky povinné)

1) založení bucketu 2) uložení metadat bucketu 3) založení tabulky 4) modifikace nebo nastavení pk tabulky 5) vymazání řádků tabulky 6) upload do S3 7) import do tabulky 8) uložení metadat tabulky

Soustředil bych se na kroky 6 a 7, tj. upload do S3 a samotný importní joby, zbytek by zůstal zachovaný, jako je teď. Ostatní kroky bych, pokud by bylo nutné, řešil s podporou api (například možnost poslat metadata pro všechny tabulky naráz, hromadné zakládání tabulek).

Do S3 by neměl být problém tlačit víc souborů naráz, už se to teď dělá pro slicované soubory.

https://github.com/keboola/storage-api-php-client/blob/a96d0493f90ed4ae366d320a0d60f7a95d8a86bb/src/Keboola/StorageApi/Client.php#L1524-L1600

Je to jen trochu ohnuté ofiko doporučení od AWS.

Importní joby by taky neměly být problém - naházely by se joby do fronty, v poli se udržovaly idčka jobů a ty se pak pollovaly. Udělal bych stop on failure - funkce by vyhodila exception okamžitě jak by se objevil první error a ostatní joby už nechala bez kontroly. Pro situaci s chybou ještě zvažuju, že by se metadata updatnuly před importem?

keboola/output-mapping používá pro zápis tabulek tyhle metody

A pro upload souborů

Asi bych rád tohle nějak zjednodušil a udělal jednotný interface - pro batch load musí být jedno, jestli je soubor slicovaný nebo ne. Zakládání tabulek bych z toho vynechal - tabulka by se založila prázdná předem a pak se udělal dávkový import. Batch Writer by tedy potřeboval mít založené, promazané a zmodifikované všechny tabulky ve Storage (ideálně už i s metadatama?) a jen by vzal soubory a natlačil je do tabulek.

Do Batch Writeru by se předal seznam souborů (pokud je to slicovaná tabulka, tak to bude folder) a tabulek, kam se mají nasypat. Batch Writer hromadně natlačí soubory na příslušný místa do S3 a pak pustí naráz importní joby.

Super by bylo, kdyby importní joby uměl pustit ihned po tom, co má vše v S3 a nemusel by čekat na dokončení všech uploadů, ale to si nejsem jistej, jestli půjde.

Jako alternativní řešení by bylo jet serializovaně, ale nečekat na dokončení jobu a ty posbírat až na konci, tj.

Dost záleží, jestli paralelní import do S3 něco zrychlí, zkusil bych to nějak obenchmarkovat. Určitě nemá význam paralelizovat upload slicovaných souborů - ty jsou už paralelizovaný per slice.

Nicméně tyhle detaily by neměly ovlivnit, jak vypadá rozhraní.

ref https://github.com/keboola/storage-api-php-client/issues/220 ref https://keboola.slack.com/archives/C31U6BGJC/p1542203865142800 ref https://github.com/keboola/internal/issues/48 ref https://github.com/keboola/output-mapping/issues/15

ondrejhlavacek commented 6 years ago

Zkusím revivnout https://github.com/keboola/sliced-upload-test, přidat tam importy do Storage a něco poměřit.

ondrejhlavacek commented 6 years ago

Pár benchmarků loadů do S3

uploadSlicedFile, 100x10 MB

chunk size throughput (MB/s) compressed* (MB/s)
100 98,83,78,97 115,139,117
50 (default) 91,83,83 124,116,109
25 82,69,72 -
10 76,84,67,74 -
1 23,27,21 -
ondrejhlavacek commented 6 years ago

uploadSlicedFile, 100x95 MB

chunk size throughput (MB/s) compressed (MB/S)
100 error (max open files) -
50 (default) 77,64,69,77 98,96,137
25 61,78,75 96,132,133
10 70,54,63 -
1 49,47,46 -

Tohle je dost zajímavý, 100mb soubory se tomu vůbec nelíbí a je to o dost pomalejší. Varianty

EDIT: komprimovaný soubory maj lepší výsledky, protože možná nevyvolaj tolik reuploadů v multipartu?

ondrejhlavacek commented 6 years ago

Benefity paralelních uploadů jsou furt značný (~40%), ale i serializovanej upload používal multipart. Možná by stálo ještě za pokus podívat se na použití nějakých jiných AWS SDK funkcí (non-multipart), imho už jsem to dělal, ale nemůžu to najít.

ondrejhlavacek commented 6 years ago

V https://github.com/keboola/docker-bundle/pull/364 se ukázalo, že Odin zvolil dost dobrej postup. Soubory do S3 se nahrajou po zpracování každýho row a pak se pustí importní job, u kterého se nečeká na výsledek a pustí se další row. To částečně emulujue paralelní config rows. Na hromadný zpracování bych se tedy teď vybodl a udělal následující změny

Uploady

Pokusit se optimalizovat uploady, aby maximálně využili bandwidth i pro upload jednoho souboru (https://github.com/keboola/storage-api-php-client/issues/233#issuecomment-439025372).

Optimální je používat slicovaný soubory, který jsou nejrychlejší na všech frontách. Pokud by SAPI klient optimalizoval upload jednoho velkýho souboru do S3, furt budou v use casu 2 další uzký hrdla - stažení souboru a zpracování ve Snowflake.

Asi bych teda nakonec řekl, že

Komprese

Nemrknem se, jestli by nešlo soubory před uploadem do S3 komprimovat? Docker Runner to posílá nekomprimovaně, maximální propustnost sítě je teď ~100 MB/s, pokud bude soubor 10x menší, proběhne 10x rychleji. Kolik času zabere komprese?

https://github.com/keboola/output-mapping/issues/20

Podpora Fire & Forget

ondrejhlavacek commented 6 years ago

Jdu zkusit obenchmarkovat tu kompresi.

odinuv commented 6 years ago

nic chytrejsiho me ted nenapada, ale porad mi to jeste moc nemysli

odinuv commented 6 years ago

otazka je jestli nam dlouhodobe bude k necemu Podpora Fire & Forget, protoze jestli budem spoustet cely rows paralellne tak to uz zase potreba nebude

odinuv commented 6 years ago

mozna by teda stalo spis za uvahu jestli by slicovany soubory nemohly produkovat treba DB extractory, ktery maji sanci to udelat a je to velkej zdroj dat

ondrejhlavacek commented 6 years ago

otazka je jestli nam dlouhodobe bude k necemu Podpora Fire & Forget, protoze jestli budem spoustet cely rows paralellne tak to uz zase potreba nebude

je možný, že ne, ale dlouhodobě je až za dlouho. teď se to může super hodit i pro komponenty bez config rows a je dost možný, že bude hromada komponent, který config rows bejt nemůžou.

mozna by teda stalo spis za uvahu jestli by slicovany soubory nemohly produkovat treba DB extractory, ktery maji sanci to udelat a je to velkej zdroj dat

některý třeba jo, jinejm může pomoct procesor.

odinuv commented 6 years ago

interface: nejjednodussi mi prijde podpora parametru $handleAsyncTask u metody writeTableAsync - to je za malo penez hodne muziky. Vic highlevel podpora handleAsyncTask i u createTableAsync - tam je to ale slozitejsi, protoze se musi vzit jeden radek, udelat create a pak ze zbytku udelat load. (to je otazka jestli stoji za to)

Kdyby to melo byt cisty, tak reknu - zrusit metodu writeTable (je deprecated), writeTableAsync (a vsechno async) prejmenovat na writeTable a udelat writeTableAsync (mozna radsi writeTableAsynchornous at se to neplete), ktera bude opravdu async - tj. vracet jobid

ondrejhlavacek commented 6 years ago

@odinuv ten createTableAsync bych ořízl čistě na založení tabulky (bez dat) a pak tam nahrál data jako u existujících tabulek. imho to za to stojí.

ondrejhlavacek commented 6 years ago

Přejmenování metod bych možná řešil ve vlastním PR? Se bojim, že bych se v tom mohl zamotat.

odinuv commented 5 years ago

no kdyz jsem procetl https://github.com/keboola/storage-api-php-client/issues/236 a https://github.com/keboola/storage-api-php-client/issues/237 tak si rikam, ze by se to mozna mohlo udelat cely jinak:

$job1 = new Job('writeTable', ['tableId' => $tableId1, 'csvFile' => $csvFile1, 'options' => new WriteTableOptions([]);
$job2 = new Job('writeTable', ['tableId' => $tableId2, 'csvFile' => $csvFile2, 'options' => new WriteTableOptions([]);
$job3 = new Job('postTableMetadata', ['tableId' => $tableId1, 'provider' => 'system', [['key'=> 'foo', 'value' => 'bar']];
$client->postJobs([$job1, $job2, $job3]);

a kdybych chtel mit metadata serializovany, tak treba

$client->postJobs([[$job1, $job3], $job2]);

nebo vic objektove

$client->postJobs(new ParallelJobs(new SerialJobs([$job1, $job3]), $job2]));
ondrejhlavacek commented 5 years ago

Nebo

$job1 = new WriteTableJob(
  new WriteTableJobOptions(['tableId' => $tableId1, 'csvFile' => $csvFile1, 'options' => new WriteTableOptions([]))
);
$job2 = new WriteTableJob(
  new WriteTableJobOptions(['tableId' => $tableId2, 'csvFile' => $csvFile2, 'options' => new WriteTableOptions([]))
);
$job3 = new PostTableMetadataJob(
  new PostTableMetadataJobOptions(['tableId' => $tableId1, 'provider' => 'system', [['key'=> 'foo', 'value' => 'bar']])
);
$client->postJobs([[$job1, $job3], [$job2]]);

// interface JobInterface
// class WriteTableJob implements JobInterface
// class PostTableMetadataJob implements JobInterface

// interface JobOptions
// class WriteTableJobOptions implements JobOptions
// class PostTableMetadataJobOptions implements JobOptions

// /* @var JobInterface[][] $jobs */ je tohle vůbec validní?
// client::postJobs($jobs) 

alternativně

$client->postJobs(new ParallelJob(new SerialJob([$job1, $job3]), new SerialJob([$job2])));

// interface JobSequencerInterface
// class ParallelJob implements JobSequencerInterface
// class SerialJob implements JobSequencerInterface
// asi to bude ještě komplikovanější?

// client::postJobs(JobSequencerInterface $jobs)

❓za 💵

odinuv commented 5 years ago

kdyz uz writeTableJob, tak:

$job1 = new WriteTableJob($tableId1, $csvFile1, new WriteTableOptions([]))
);

jak by se vytvořil bucket a předal jeho ID dál?

nijak, nekomplikoval bych to. Tohle predpoklada, ze mas vytvorenou tabulku a delas do ni load (a pripadne posles metadata. Neni to Karel, aby se v tom programovalo

kolik úrovní nestingu serial/parallel jobů bychom dovolili?

pokud bude umet ParallelJob prijmout SerialJob a obracene, tak kolik chcem, neni potreba to nejak omezovat

ondrejhlavacek commented 5 years ago

U těch WriteTableJobOptions bys mohl celkem snadno udělat něco jako WriteTableJobOptions::getUrlParams, WriteTableJobOptions::getPostPayload a ty bleskurychle unittestovat. U WriteTableJob bude nejspíš jen metoda WriteTableJob::createJob a kdyby to generování requestu bylo v téhe classe, tak bude nejspíš private/protected a hůř by se testovalo?

pokud bude umet ParallelJob prijmout SerialJob a obracene, tak kolik chcem, neni potreba to nejak omezovat

To se bude děsně blbě rozmotávat a sestavovat execution plan :-(

odinuv commented 5 years ago

U těch WriteTableJobOptions bys mohl celkem snadno udělat něco jako WriteTableJobOptions::getUrlParams, WriteTableJobOptions::getPostPayload a ty bleskurychle

takze aby se ti to lip unittestovalo, tak donutis kazdyho, kdo toho klienta bude pouzivat napsat 2x tolik kodu a pouzit 3 tridy na zavolani API callu s 2 parametrama...

To se bude děsně blbě rozmotávat a sestavovat execution plan :-(

no tak to urcite nechces delat execution plan. Pokud vsechy joby bou mit interface run(): integer[] ($jobIds)

tak imho staci mit:

ParallelJob::run()
   foreach ($jobs as $job)
         $jobIds[] = $job->run()[0]
   waitForJobs($jobIds)  

SerialJob::run()
  foreach ($jobs as $job)
         waitForJobs($job->run())
ondrejhlavacek commented 5 years ago

Pokud bude víc zanoření, tak tam bude ještě rekurze a zbytečně se to bude zdržovat, než se vynoříš z první hlubší/delší rekurze.

ParallelJob::run()
    foreach ($jobs as $job)
        if ($job instanceof ParallelJob or $job instanceof SerialJob)
            $job->run();
        else
            $jobIds[] = $job->run()[0]
    waitForJobs($jobIds)  

SerialJob::run()
    foreach ($jobs as $job)
        if ($job instanceof ParallelJob or $job instanceof SerialJob)
            $job->run();
        else
            waitForJobs($job->run())
ondrejhlavacek commented 5 years ago

Na tohle by se sakra docela hodily promisy, callbacky a podobný srandy, zkusím mrknout, jestli není aspoň nějaká pseudocesta v php (viz guzzle).

ujovlado commented 5 years ago

Mozno nieco z https://reactphp.org/?

ondrejhlavacek commented 5 years ago

@ujovlado není to moc lowlevel? splnění promisu pro nás není dokončení http requestu, ale dokončení jobu.

natocTo commented 5 years ago

Vím že "Spatie" měli taky něco co by třeba mohlo být použito. Najdu.

odinuv commented 5 years ago

myslim, ze to celky prekombinovavate. Mame usecase, ze potrebujem importovat 10 tabulek nebo exportovat 10 tabulek nebo nahrat do workspace 10 tabulek, maximalne mame jeste use case, ze potrebujem importovat 10 tabulek a pro kazdou po nahrati jeste nahrat metadata. To je vsechno.

natocTo commented 5 years ago

Tady jen ten odkaz, myslel jsem toto: https://github.com/spatie/async

odinuv commented 5 years ago

jako fakt bych nechtel v klientovi implementovat frontu

ujovlado commented 5 years ago

Nechcem to komplikovat, len ked som videl spomenute PHP+Promise, napadlo mi React PHP. Nejdem sa tu v tom inac vrtat.

odinuv commented 5 years ago

no najlos s tim zacal - ze tam bude zanoreni a rekurze a ze to bude neefektivni... (btw jediny kdy to bude neefektivni je pokud by nekdo udelal new ParalellJob([new ParallelJob[a,b,c]), e, f)

ondrejhlavacek commented 5 years ago

jako fakt bych nechtel v klientovi implementovat frontu

pole idček jobů, který polluješ v docker runneru, je v podstatě taky fronta.

@natocTo spatie/async vypadá dobře, jen to je asi zbytečně technickej overkill pro náš use case. reálně nepotřebujeme forkovat procesy.

no najlos s tim zacal - ze tam bude zanoreni a rekurze a ze to bude neefektivni... (btw jediny kdy to bude neefektivni je pokud by nekdo udelal new ParalellJob([new ParallelJob[a,b,c]), e, f)

@odin, tys příšel s https://github.com/keboola/storage-api-php-client/issues/233#issuecomment-440404561, tam jsou dvě úrovně zanoření a pokud dáš obě parallel, tak máš to úzký hrdlo, co popisuješ.

pokud je to takhle složitý, tak navrhuju se těma Jobama nezabejvat a nechat to plus mínus jako https://github.com/keboola/storage-api-php-client/issues/233#issuecomment-439919785 a nechat řízení jobů na docker runneru, output/input mappingu a transformacích.

když rozhodnem, co s metadatama (a nebude to ta sync varianta), tak jsme celkem snadno vyřešený.

odinuv commented 5 years ago

pole idček jobů, který polluješ v docker runneru, je v podstatě taky fronta.

neni, protoze do ni za behu nic nepribyva

@odin, tys příšel s #233 (comment), tam jsou dvě úrovně zanoření a pokud dáš obě parallel, tak máš to úzký hrdlo, co popisuješ.

a ja rikam, ze a) davat obe paralell nedava smysl, protoze ten seznam tech tabulek je dopredu znamej b) nemusime nic takovyho vubec delat c) i kdyby a) i b), tak to stejne porad neni to uzky hrdlo, protoze to porad pojede oproti soucasnymu stavu paralelne

podle me to slozity neni a https://github.com/keboola/storage-api-php-client/issues/233#issuecomment-440581545 to podle me uplne resi

odinuv commented 5 years ago

Toto "za behu nic nepribyva" je ta klicova vec, co to zjednodusuje - diky tomu tam neni potreba mit ani promisy, ani execution planner, ani frontu.... Je to seznam veci na ktery se bud ceka naraz nebo individualne, to je vsecko.

ondrejhlavacek commented 5 years ago

Oukej, pokud za běhu nic přibývat nebude, dost se to zjednodušuje. Jak to znovu celý pročítám, tak mi přijde skoro nejlepší v klientovi toho moc nevymýšlet.

Pro importní joby zduplikovat metody, aby vracely idčka jobů a mít nějakej handler, co bude pollovat joby a vyhodí chybu (= Podpora Fire & Forget).

Pro exportní joby bych furt nechal tohle - https://github.com/keboola/storage-api-php-client/issues/236#issue-382743574.