Junghyun99 / Test

0 stars 0 forks source link

쓰레드풀 사용 #6

Closed Junghyun99 closed 1 day ago

Junghyun99 commented 1 month ago

종목별 트레이드를 쓰레드로 병렬처리 개별로 쓰레드관리하지말고 풀을 사용하자

Junghyun99 commented 1 month ago

풀 사이즈 조절 가능 일반적으로 코어수만큼 io가 많으면 코어수 2배

Junghyun99 commented 1 month ago

Fetching data for ID: 1 Fetching data for ID: 2 Fetching data for ID: 3 Data for ID: 2 Fetching data for ID: 4 Data for ID: 1 Fetching data for ID: 5 Data for ID: 3 Data for ID: 4 Data for ID: 5 All tasks completed.

from concurrent.futures import ThreadPoolExecutor, as_completed import time

작업으로 사용할 함수 정의

def fetch_data(data_id): print(f"Fetching data for ID: {data_id}") time.sleep(2) # 네트워크 요청 등을 가정한 대기 시간 return f"Data for ID: {data_id}"

ThreadPoolExecutor 사용 예제

def main(): data_ids = [1, 2, 3, 4, 5] # 작업할 ID 목록 results = []

# 스레드 풀을 생성하고 스레드를 관리
with ThreadPoolExecutor(max_workers=3) as executor:
    # 각 ID에 대해 fetch_data 작업을 비동기로 제출
    futures = [executor.submit(fetch_data, data_id) for data_id in data_ids]

    # 작업 완료를 기다리며 결과 수집
    for future in as_completed(futures):
        result = future.result()  # 작업 결과 가져오기
        results.append(result)
        print(result)

print("All tasks completed.")

if name == "main": main()

Junghyun99 commented 1 month ago

쓰레드가 3개 작업할 내용이 그 이상이면 대기하다가 완료되면 들어감 작업내용을 큐로 관리하며 무한하면 계속해서 큐에 넣으면서 쓰레드 3개로 실행할수있음

Junghyun99 commented 1 month ago

thread safe 큐 방식은?

쓰레드간 데이터 공유할때 동시 접근을 막음

Junghyun99 commented 1 month ago

sqlite3 는 쓰기는 하나의 쓰레드만 접근가능 다른 쓰레드는 대기걸림

Value 0 inserted successfully.
Value 1 inserted successfully.
Value 2 inserted successfully.
Attempt 1 for Value 3 failed: database is locked
Value 3 inserted successfully.
Value 4 inserted successfully.
Attempt 1 for Value 5 failed: database is locked
Attempt 2 for Value 5 failed: database is locked
Value 5 inserted successfully.
Value 6 inserted successfully.
Value 7 inserted successfully.
Attempt 1 for Value 8 failed: database is locked
Value 8 inserted successfully.
Value 9 inserted successfully.
Data insertion completed.

import sqlite3
from concurrent.futures import ThreadPoolExecutor
import time

# 데이터 삽입 함수 (백오프 로직 포함)
def insert_data_with_retry(value, retries=5):
    for attempt in range(retries):
        try:
            # 각 스레드마다 새로운 연결 생성
            conn = sqlite3.connect('example.db')
            cursor = conn.cursor()

            # 테이블이 없는 경우 생성
            cursor.execute('''CREATE TABLE IF NOT EXISTS data (id INTEGER PRIMARY KEY, value TEXT)''')

            # 데이터 삽입
            cursor.execute("INSERT INTO data (value) VALUES (?)", (value,))
            conn.commit()
            print(f"{value} inserted successfully.")
            conn.close()
            break
        except sqlite3.OperationalError as e:
            print(f"Attempt {attempt + 1} for {value} failed: {e}")
            time.sleep(0.1)  # 백오프 시간

# ThreadPoolExecutor를 사용하여 멀티스레드로 데이터 삽입
with ThreadPoolExecutor(max_workers=3) as executor:
    values = [f"Value {i}" for i in range(10)]
    executor.map(insert_data_with_retry, values)

print("Data insertion completed.")
Junghyun99 commented 2 weeks ago

쓰레드 수를 결정할 때는 여러 요소를 고려해야 합니다. 올바른 수의 쓰레드를 선택하는 것은 성능 최적화와 시스템 안정성을 유지하는 데 중요합니다. 일반적으로 쓰레드 수는 작업의 성격과 시스템의 하드웨어에 따라 달라집니다.

  1. 작업의 성격에 따라

I/O 바운드 작업 (예: 네트워크 요청, 파일 읽기/쓰기, 데이터베이스 접근):

I/O 바운드 작업은 작업 시간이 주로 I/O 작업에 의해 지연되므로 더 많은 쓰레드를 사용하는 것이 효율적입니다.

쓰레드 수를 CPU 코어 수보다 훨씬 더 크게 설정할 수 있습니다. 예를 들어, 네트워크 요청 작업을 실행하는 경우 max_workers를 10, 20 또는 더 크게 설정해도 무방합니다.

CPU 바운드 작업 (예: 복잡한 계산, 데이터 처리):

CPU 바운드 작업은 CPU 성능을 많이 사용하기 때문에, 쓰레드 수를 시스템의 CPU 코어 수에 가깝게 설정하는 것이 효율적입니다.

예를 들어, 4코어 CPU 시스템이라면 max_workers를 4로 설정하거나, 약간 더 높게 설정할 수 있습니다.

  1. 시스템의 하드웨어 자원에 따라

CPU 코어 수:

일반적으로 os.cpu_count()를 사용하여 시스템의 CPU 코어 수를 확인한 후, 이 수를 기준으로 쓰레드 수를 설정합니다.

예를 들어, CPU가 8코어라면, CPU 바운드 작업의 경우 max_workers를 8로 설정하는 것이 일반적입니다.

메모리 사용량:

작업이 메모리를 많이 사용하는 경우, 너무 많은 쓰레드를 생성하면 메모리 부족 문제가 발생할 수 있습니다.

메모리 사용량을 고려하여 적절한 쓰레드 수를 설정해야 합니다.

  1. 실험과 최적화

성능이 중요한 경우, 실험적으로 최적의 쓰레드 수를 결정하는 것도 좋은 방법입니다.

성능 테스트를 통해 다양한 쓰레드 수를 시도하고, 가장 효율적인 수치를 선택합니다.

프로파일링 도구를 사용해 병목 현상을 분석하고, 시스템 리소스를 얼마나 효율적으로 사용하는지 확인합니다.

일반적인 권장 사항

I/O 바운드 작업: 보통 CPU 코어 수 2 또는 CPU 코어 수 4 정도의 쓰레드를 사용할 수 있습니다.

CPU 바운드 작업: 보통 CPU 코어 수와 비슷한 수로 설정합니다.

예를 들어, 4코어 CPU 시스템에서는:

I/O 바운드 작업: max_workers=8 또는 16 설정

CPU 바운드 작업: max_workers=4 설정

참고

작업이 다양한 종류일 경우, 쓰레드 수를 조정하여 CPU와 I/O의 균형을 맞추는 것이 좋습니다.

비동기 처리가 더 효율적인 경우에는, 비동기 코드(예: asyncio)를 고려할 수도 있습니다.

작업 부하에 맞게 적절한 쓰레드 수를 설정하면 성능을 크게 향상시킬 수 있습니다.

Junghyun99 commented 2 weeks ago

executor.submit() 메서드는 다중 인수를 사용하는 작업도 지원합니다. 이 메서드는 주어진 함수를 쓰레드 풀 내에서 실행하도록 제출하며, 함수를 호출할 때 여러 인수를 전달할 수 있습니다.

여기 몇 가지 예제를 통해 executor.submit()의 사용법을 보여드리겠습니다:

  1. 기본 예제: 하나의 인수만 사용하는 경우

from concurrent.futures import ThreadPoolExecutor

def greet(name): return f"Hello, {name}!"

names = ["Alice", "Bob", "Charlie"]

with ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(greet, name) for name in names]

for future in futures:
    print(future.result())

설명: greet 함수는 하나의 인수 name을 받고, 이를 이용해 인사말을 반환합니다.

executor.submit()에 greet 함수와 name 인수를 전달합니다.

  1. 다중 인수를 사용하는 예제

from concurrent.futures import ThreadPoolExecutor

def add_numbers(a, b): return a + b

number_pairs = [(1, 2), (3, 4), (5, 6)]

with ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(add_numbers, a, b) for a, b in number_pairs]

for future in futures:
    print(future.result())

설명: add_numbers 함수는 두 개의 인수 a와 b를 받고, 두 수의 합을 반환합니다.

executor.submit()에 함수 add_numbers와 두 개의 인수를 순서대로 전달합니다.

  1. 키워드 인수 사용

from concurrent.futures import ThreadPoolExecutor

def introduce(name, age): return f"My name is {name} and I am {age} years old."

people = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]

with ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(introduce, name=name, age=age) for name, age in people]

for future in futures:
    print(future.result())

설명: introduce 함수는 name과 age라는 키워드 인수를 사용합니다.

executor.submit()에 introduce 함수와 키워드 인수로 전달할 값을 명시합니다.

  1. 혼합된 위치 및 키워드 인수

from concurrent.futures import ThreadPoolExecutor

def calculate_area(length, width, unit="square meters"): area = length * width return f"Area: {area} {unit}"

rectangles = [(5, 10), (3, 4), (7, 2)]

with ThreadPoolExecutor(max_workers=3) as executor: futures = [ executor.submit(calculate_area, length, width, unit="square feet") for length, width in rectangles ]

for future in futures:
    print(future.result())

설명: calculate_area 함수는 두 개의 위치 인수 length와 width와 하나의 키워드 인수 unit을 받습니다.

executor.submit()에 함수와 혼합된 인수들을 전달합니다.

요약

executor.submit()는 하나의 인수뿐 아니라 여러 인수와 키워드 인수도 사용할 수 있습니다.

인수를 여러 개 전달할 때는 함수를 호출하는 것처럼 executor.submit()에 나열하면 됩니다.

이러한 방법을 사용하면 다양한 함수 호출 패턴을 지원할 수 있어 더욱 유연한 멀티쓰레딩 작업을 구현할 수 있습니다.

Junghyun99 commented 2 weeks ago

에러 처리를 추가하면 작업이 실패했을 때에도 안전하게 처리할 수 있습니다. try-except 블록을 활용해 각 작업에서 발생하는 예외를 잡아내고, 필요에 따라 적절한 로깅이나 대체 동작을 수행할 수 있습니다.

아래는 에러 처리를 추가한 코드입니다:

from concurrent.futures import ThreadPoolExecutor, as_completed

results = []

with ThreadPoolExecutor(max_workers=3) as executor:

각 ID에 대해 fetch_data 작업을 비동기로 제출

futures = [executor.submit(fetch_data, data_id) for data_id in data_ids]

# 작업 완료를 기다리며 결과 수집
for future in as_completed(futures):
    try:
        result = future.result()  # 작업 결과 가져오기
        results.append(result)
        print(result)
    except Exception as e:
        # 예외가 발생한 경우 처리
        print(f"Error occurred: {e}")
        # 필요에 따라 추가적인 에러 처리 로직 추가 (예: 로깅)

설명

  1. try-except 블록:

future.result() 호출 시 예외가 발생할 수 있습니다. 예를 들어, fetch_data 함수에서 오류가 발생하면 해당 작업의 Future 객체에서 result()를 호출할 때 예외가 발생합니다.

try 블록 안에서 future.result()을 호출하고, except 블록에서 발생한 예외를 잡아냅니다.

  1. Exception 핸들링:

Exception as e로 예외를 잡아내고, 에러 메시지를 출력합니다.

필요에 따라, 에러 정보를 로깅하거나 다른 방식으로 처리할 수 있습니다.

확장: 로깅을 추가하거나 특정 예외 처리

에러를 단순히 출력하는 대신, Python의 logging 모듈을 사용하여 로그 파일에 기록할 수 있습니다.

특정 예외를 다르게 처리하려면 except ValueError as e와 같은 방식으로 추가적인 except 블록을 작성할 수 있습니다.

이렇게 하면 작업 중 예외가 발생해도 프로그램이 중단되지 않고, 에러 정보를 확인하며 남은 작업들을 계속 수행할 수 있습니다.