TheoCoombes / crawlingathome

A client library for LAION's effort to filter CommonCrawl with CLIP, building a large scale image-text dataset.
http://crawling.at
MIT License
31 stars 7 forks source link

Workload transfer #3

Open rvencu opened 3 years ago

rvencu commented 3 years ago

Hi, can you explain in summary how would I start a job on one computer and then upload the results from another? I mean transferring the downloaded dataset to a central GPU unit for filtering, and I would like to upload the final result to your server directly from the GPU unit instead of going back to the original computer.

While for a more generic approach of the 2 steps it will be required for your server to manage the half jobs and perform distribution of workload based on node types, I want to try first to deploy my own droplets for scraping and move inference to my home GPU pc. Later the code can be adapted for mass usage.

So I am looking how to pass a job from one crawlingathome client to another one.

TheoCoombes commented 3 years ago

Hi, I'm actually currently working on integrating this into the server update v2.2.0. From that update, there will be 3 types of workers:

However, the Stage 1 -> Stage 2 transition is something that needs working on, and may cause this update not to work. The stage one workers somehow need to host the images for the stage 2 workers to use.

So I am looking how to pass a job from one crawlingathome client to another one.

This is actually currently achievable, although rather hacky. I will probably add improved support for this on my to-do list.

Take a look here. You'd want to make a duplicate of this class, but change the __init__ function to the following:

def __init__(self, url, token, shard, start_id, end_id, shard_piece):
    self.s = session() # requests.session instance
    self.url = url
    self.token = token
    self.shard = shard
    self.start_id = start_id  # by default usually class np.int64 
    self.end_id = end_id      # ^^^
    self.shard_piece = shard_piece

You can find all these variables using your old client, e.g. client.url will return the url required for this __init___.

You can then interact using the normal functions, acting as the original worker. Please note, however, that workers have a 90 minute timeout. If this is an issue for you, perhaps you could do this so that it run the client.log("Waiting for Stage Two") on all your workers every 30 mins until you run it on your GPU?

Hopefully this should work for the time being. Please let me know if you encounter any issues!

rvencu commented 3 years ago

I am doing a variation of this here https://github.com/rvencu/crawlingathome-worker/tree/dev (this is completely untested)

I start from GPU and I want to integrate workers setup from the main script, this way I can control a swarm of them that will start jobs and transfer workload for GPU. I am transferring back the results and they complete your server communication.

I am using a file semaphore mechanism to signal when images were downloaded, and the GPU is polling the workers for completion.

This is for someone that wants to max the use of its GPU during a timeframe and also pay for the workers.

I estimate that 500 euro contribution for workers can take me as far as 5% of the entire project in some 40 days. Of course 40 days of 90% GPU workload at home is in addition to that.

When I am done with that I will use your info to create a version where workers and GPU cooperate but unfortunately for them to communicate we need to disclose their IP addresses. If not, then a centralized hub still needs to be used.

TheoCoombes commented 3 years ago

Sounds great! Let me know if you'd like me to help make a server that can create this CPU -> GPU worker communication :)

rvencu commented 3 years ago

Yes please.

I envision 3 type of clients (maybe make 3 clients)

  1. scraping worker. This will run scraping part and upload 1 csv + images as a zip file (somewhere). You mark stage completion and some kind of way to serve the zip file on request
  2. GPU worker. this will get a new GPU job from the pending zip files sent by a scraper. download zip and upload final result, mark whole job done
  3. normal client - keep the existing client for individuals wanting to contribute on their own cpu / gpu computer one job at a a time

there are some considerations to make re. renaming the zip files with random names (save them in job properties) so they cannot be altered by malicious users, etc.

TheoCoombes commented 3 years ago

Definitely what I envisioned too. I believe some sort of overlay server should be best for this, as I'd like the main server to be kept as simple as possible.

Here's my idea:

This is partly why I added features that can help with this:

dump_json = cah.dump(client)
client = cah.load(**dump_json)

The server can also keep the clients alive using the client.log function, whilst a client waits for its job to be filled by the GPU worker.

I'll try have a demo server of this, along with a new git repo up by the weekend. I can't say for certain however as we are currently in the process of moving the main crawling@home server, but it seems to be going smoothly so far 🙂

TheoCoombes commented 3 years ago

Sorry - misclick! 😅

rvencu commented 3 years ago

What I have against this overlay server is that it should not be GPU bound. We need an independent overlay server. Because people will pop in and out with colab gpus and kaggle gpus at any time. they are not reliable to keep scrapers alive when they suffer their own sudden death

If you do not want CAH server to integrate this then we should write a separate server for it. But seems more logical to be a CAH server function

contributors will keep their own nodes alive. we will provide them with a specific cloud-init file that will automate everything. they just keep them running as long as they want to pay the fee. so the job of managing scrapers is not there anymore. hence my idea that CAH server already does that, registers workers and expires them, the extension will just go further to register GPU workers and expire them as well

TheoCoombes commented 3 years ago

Fair enough. The more that I think of it, it appears the central server can do all that's needed and more compared to overlay servers.

Here's what I'm thinking now:

What do you think about this idea? :)

rvencu commented 3 years ago

Yes, basically this is it. The challenge would be that we have too many scrapers and too few GPUs, then the payload in the temp sharing site can expire. In such cases GPU node trying to access it should report back to CAH server to unmark the shard for stage 1

a more resilient solution for storage should minimize the potential waste of scrapers

we need a mechanism to communicate what we miss most. if we miss scrapers cause we have too many GPU then register the GOU as scraper as well, this is easy. the other way around is complicated...

TheoCoombes commented 3 years ago

Yeah. As long as it's a URL that can download a ZIP/TAR, It should be fine - so there's room for customisation there.

No promises, but I'll try to attempt to build a system that can do this :)

rvencu commented 3 years ago

I was looking for file transfer sites with speed and retention. It is hard to reconcile cost, speed, disk space and reliability. Wetransfer has retired public API. Dropbox has good API and reliability and a plus plan gives 2TB of storage (or about 4000 shards completed after stage 1)

But looking at the scaling aspects there are a lot of potential problems that arise, either with Dropbox or really anyone else in case we want to use 1000s of scrapers at once

A good read here https://developers.dropbox.com/dbx-performance-guide

rvencu commented 3 years ago

https://pixeldrain.com/

has API, does not require an account to upload and download, only to delete 10000 zip files in principle (though no account means every worker can upload up to 10000 files) 30 days to keep

not sure about speed capping needs testing

cons: some captcha to fill at download, or 49 USD per year for an account

arkseal commented 3 years ago

Just a thought, could we set up an SFTP server for the CPU & GPU clients to communicate? And if security isn't needed we could just use FTP.

TheoCoombes commented 3 years ago

@rvencu https://tmp.ninja/ seems okay? We might want to consider emailing devs first just to ensure they have the resources ready, but theoretically, any file hosting service should work as long as we have a direct download URL the workers can use to pull the data (even gdrive has this).

TheoCoombes commented 3 years ago

@ARKseal It might be better just to integrate it directly into the main server though - also helps with monitoring workers etc.

rvencu commented 3 years ago

I agree but beware of the file size. 500MB each shard as result of stage 1

rvencu commented 3 years ago

untested yet but very simple, the server just need to store / handle these ids (just made up this class)

import requests
import re

class Pixeldrain():    
    def __init__(self, api_key=""):
        session = requests.Session()
        self.session = session
        self.key=api_key
        self.url = "https://pixeldrain.com/api/file/" 
    def post_file(self, file):
        payload={}
        files=[
        ('file',(file,open(file,'rb'),'application/octet-stream'))
        ]
        headers = {}
        response = self.session.post(self.url+'?file', headers=headers, data=payload, files=files)
        info = response.json()
        return info.id
    def get_file(self, id):
        headers = {}
        response = self.session.get("GET", self.url+id, headers=headers)
        d = response.headers['content-disposition']
        fname = re.findall("filename=(.+)", d)[0]
        with open(fname, "wb") as file:
            file.write(response.content)
rvencu commented 3 years ago

I tried to upload 400MB file and it crashed. I read that requests does not handle HTTP 100 - continue that perhaps happens for such a big file

TheoCoombes commented 3 years ago

Did you stream your upload?

rvencu commented 3 years ago

yes, it was streamed. I could place a monitor and look at the upload, it crashes as soon as the uploaded bytes exceed 250MB

at this time I think this website is in trouble, perhaps cannot sustain the costs of storage and it is not properly maintained...

guess we still need to look at something like dropbox for reliability

except, of course, if the new server also comes with proper storage space...

TheoCoombes commented 3 years ago

Yeah, I suppose. As long as we can have a direct download URL, it should be fine.

TheoCoombes commented 3 years ago

We could theoretically use the server, however, I think it'd be a much better solution to avoid storing it on the server to avoid slowing down the workers.

arkseal commented 3 years ago

As of now we have Dropbox as our only possible storage host, and for the plus plan we need to pay $10 usd, not sure if it's cheaper for you. But then again we could use GDrive.

TheoCoombes commented 3 years ago

The thing is, the server only has 100GB of storage, and I don't really want to be congesting the server with files if it can't handle the number of requests. I'm going to look into setting up a server built specifically for this crawling@home image hosting, hopefully using a google cloud trial again.

rvencu commented 3 years ago

the protocol should first attempt peer connectivity between scraper and gpu worker. in case it fails then save the files at the transfer server...

TheoCoombes commented 3 years ago

A great idea. I wonder how we could pull this off without having to forward ports?

rvencu commented 3 years ago

look here https://github.com/magic-wormhole/magic-wormhole

some of the workers will be behind NAT but the scrapers no so maybe it will work easily

arkseal commented 3 years ago

I tested this out: We just need the inference worker's ip.

Reciever Script `import socket import tqdm import os

SERVER_HOST = "0.0.0.0" SERVER_PORT = 5001

BUFFER_SIZE = 4096 SEPARATOR = ""

s = socket.socket() s.bind((SERVER_HOST, SERVER_PORT))

s.listen(5) print(f'[*] Listening as {SERVER_HOST}:{SERVER_PORT}')

client_socket, address = s.accept() print(f'[+] {address} is connected')

received = client_socket.recv(BUFFER_SIZE).decode() filename, filesize = received.split(SEPARATOR)

filename = os.path.basename(filename) filesize = int(filesize)

progress = tqdm.tqdm(range(filesize), f'Recieving {filename}', unit='B', unit_scale=True, unit_divisor=1024) with open(filename, 'wb') as f: while True: bytes_read = client_socket.recv(BUFFER_SIZE) if not bytes_read: break f.write(bytes_read) progress.update(len(bytes_read))

client_socket.close() s.close()`

Sender Script: `import socket import tqdm import os

SEPARATOR = "" BUFFER_SIZE = 4096

receiver ip

host = ""

port = 5001

filename = "test.zip"

filesize = os.path.getsize(filename)

s = socket.socket()

print(f'[+] Connecting to {host}:{port}') s.connect((host, port)) print('[+] Connected')

s.send(f'{filename}{SEPARATOR}{filesize}'.encode())

progress = tqdm.tqdm(range(filesize), f'Sending {filename}', unit='B', unit_scale=True, unit_divisor=1024)

with open(filename, 'rb') as f: while True: bytes_read = f.read(BUFFER_SIZE) if not bytes_read: break s.sendall(bytes_read) progress.update(len(bytes_read))

s.close()`

TheoCoombes commented 3 years ago

That looks great! Just a couple questions:

rvencu commented 3 years ago

I do not want to expose my GPU box to the internet.

arkseal commented 3 years ago

@rvencu I believe that magic-wormhole also stores your IP, allowing for the transfer. Maybe we can use Tor to provide security?

rvencu commented 3 years ago

Seems that it supports Tor. Or not?

https://magic-wormhole.readthedocs.io/en/latest/tor.html#onion-servers