prof-rossetti / intro-to-python

An Introduction to Programming in Python
Other
97 stars 244 forks source link

Multi threading / Parallel Processing #80

Open s2t2 opened 3 years ago

s2t2 commented 3 years ago

Some students have need for threading in their final project, so the repo should provide some notes about parallel processing, using the concurrent futures module.

References

Threads and Thread Pool Executors:

Locks and Semaphores:

Threading on Heroku

Example:

# super h/t: https://www.youtube.com/watch?v=IEEhzQoKtQU

import os
import time
import random
from dotenv import load_dotenv

from concurrent.futures import ThreadPoolExecutor, as_completed # see: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
from threading import Thread, Lock, BoundedSemaphore, current_thread

load_dotenv()

LIMIT = int(os.getenv("USERS_LIMIT", default=500))
MAX_THREADS = int(os.getenv("MAX_THREADS", default=200)) # heroku supports max 256, see: https://devcenter.heroku.com/articles/dynos#process-thread-limits
BATCH_SIZE = int(os.getenv("BATCH_SIZE", default=20))

def fetch_friends(user_id, sleep_seconds=1):
    thread_id = int(current_thread().name.replace("THREAD_", "")) + 1
    time.sleep(sleep_seconds)
    return {"thread_id": thread_id, "user_id": user_id, "duration": sleep_seconds}

if __name__ == "__main__":

    user_ids = range(1, LIMIT + 1)
    start_at = time.perf_counter()
    print(f"USERS: {len(user_ids)}")
    print(f"THREADS: {MAX_THREADS}")

    with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
        #print("EXECUTOR:", type(executor))

        #results = executor.map(fetch_friends, user_ids, random.choice([1,5]))
        #for result in results:
        #    print(result)

        #futures = [executor.submit(fetch_friends, user_id, random.choice([1,5])) for user_id in user_ids]
        #for future in futures:
        #    print(future.result())

        #batch = BoundedSemaphore(5)
        #lock = Lock()

        batch = []
        results = []
        futures = [executor.submit(fetch_friends, user_id, random.choice([1,5,10])) for user_id in user_ids]
        for index, future in enumerate(as_completed(futures)):
            result = future.result()
            print(result)
            batch.append(result)
            results.append(result)

            if len(batch) == BATCH_SIZE:
                print(f"CLEARING BATCH OF {len(batch)}...")
                #time.sleep(5)
                batch = []

    end_at = time.perf_counter()
    clock_seconds = round(end_at - start_at, 2)
    total_seconds = sum([result["duration"] for result in results])
    print(f"PROCESSED {len(user_ids)} USERS IN {clock_seconds} SECONDS (OTHERWISE {total_seconds} SECONDS)")

Use a simpler example than this though.

s2t2 commented 3 years ago
with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:            
    for batch in batches:                
        executor.submit(process_and_save_batch, batch, service)
with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
        futures = [executor.submit(perform, batch, bq_service, bas_service) for batch in batches]        
        for future in as_completed(futures):            
            job.counter += future.result()            
            job.progress_report()
with ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix="THREAD") as executor:            
    futures = [executor.submit(perform, group_name, filtered_df, storage.local_dirpath, tokenize) for group_name, filtered_df in groupby]            
    for future in as_completed(futures):                
        result = future.result()