DE32FinalTeam2 / FinalRepo

프로젝트 최종버전을 올리는 레포지토리이다.
0 stars 0 forks source link

중복 저장 없이 크롤링 하기 #5

Open GITSangWoo opened 5 days ago

GITSangWoo commented 5 days ago

[나만의 추가 작업 ]

GITSangWoo commented 5 days ago

url 링크 목록 만들면서 로그 남기는 코드

import re
import os
import time
from datetime import datetime, timedelta
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By

# 셀레니움 웹 드라이버 설정
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))

# 오늘 날짜로 로그파일 이름 설정
today = datetime.today().strftime('%Y%m%d')
log_file_name = f"{today}.log"

# 어제 날짜로 로그파일 이름 설정
yesterday = (datetime.today() - timedelta(days=1)).strftime('%Y%m%d')
yesterday_log_file_name = f"{yesterday}.log"

# 페이지 열기 (예시로 원하는 페이지)
driver.get("https://www.wanted.co.kr/wdlist/518/655?country=kr&job_sort=job.recommend_order&years=-1&selected=655&locations=all")

# 페이지 로딩 대기
time.sleep(3)  # 페이지 로딩 대기 (필요시 WebDriverWait 사용 가능)

# 링크를 저장할 리스트
all_links = []

# 정규 표현식 패턴 (wd/ 뒤에 숫자가 있는 URL을 찾는 패턴)
pattern = re.compile(r'wd/\d+$')

# 스크롤 내리기 및 링크 추출 반복
previous_height = driver.execute_script("return document.body.scrollHeight")  # 현재 페이지의 높이를 가져옴

while True:
    # 페이지에서 모든 <a> 태그를 찾음
    links = driver.find_elements(By.TAG_NAME, "a")

    # 이미 가져온 링크들을 확인하고 중복되지 않게 추가
    for link in links:
        href = link.get_attribute("href")
        # 정규 표현식으로 'wd/숫자' 형식의 링크만 필터링
        if href and pattern.search(href) and href not in all_links:
            all_links.append(href)

    # 스크롤을 페이지 끝까지 내리기
    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

    # 잠시 대기하여 새로운 요소들이 로드될 시간을 줌
    time.sleep(2)  # 2초간 대기, 이 시간은 페이지 로딩 속도에 맞게 조절

    # 새로운 페이지 높이가 이전과 같다면 스크롤을 더 이상 내릴 필요가 없으므로 종료
    new_height = driver.execute_script("return document.body.scrollHeight")
    if new_height == previous_height:
        break  # 더 이상 새로운 요소가 로드되지 않으면 반복 종료

    previous_height = new_height  # 이전 높이를 업데이트

# 어제 로그 파일이 있으면 읽기
previous_urls = {}
if os.path.exists(yesterday_log_file_name):
    with open(yesterday_log_file_name, 'r', encoding='utf-8') as file:
        for line in file.readlines()[1:]:  # 첫 번째 줄은 header
            columns = line.strip().split(',')
            url = columns[0]
            notice_status = columns[1]
            work_status = columns[2]
            done_time = columns[3]
            previous_urls[url] = {
                'notice_status': notice_status,
                'work_status': work_status,
                'done_time': done_time
            }

# 오늘 로그 파일에 기록할 내용 생성
log_data = []

# 오늘 크롤링한 URL과 어제 로그 파일을 비교하여 상태 설정
for url in all_links:
    if os.path.exists(yesterday_log_file_name):
        if url in previous_urls:
            # 어제 로그 파일에 URL이 있고, work_status가 "done"이 아니면 그대로 가져옴
            if previous_urls[url]['work_status'] != "done":
                # 작업이 필요하거나 아직 완료되지 않은 경우
                notice_status = previous_urls[url]['notice_status']
                work_status = previous_urls[url]['work_status']
                done_time = previous_urls[url]['done_time']
            else:
                # "done" 상태면 "exist"로 설정하고, done_time을 그대로 사용
                notice_status = "exist"
                work_status = "done"
                done_time = previous_urls[url]['done_time']
        else:
            # 어제 로그 파일에 없으면 상태를 "deleted"로 설정
            notice_status = "deleted"
            work_status = "done"
            done_time = datetime.today().strftime('%Y-%m-%d %H:%M:%S')  # 삭제된 시간을 현재 시간으로
    else:
        # 어제 로그 파일이 없으면 모든 URL은 "update"로 설정
        notice_status = "update"
        work_status = "null"
        done_time = "null"

    log_data.append(f"{url},{notice_status},{work_status},{done_time}")

# 오늘 로그 파일 생성 (기존 로그 파일 덮어쓰기)
with open(log_file_name, 'w', encoding='utf-8') as file:
    # 헤더 작성
    file.write("url,notice_status,work_status,done_time\n")
    for line in log_data:
        file.write(line + "\n")

# 브라우저 종료
driver.quit()

어제 로그에있는 링크와 오늘 읽어온 링크를 바탕으로 비교해서 삭제, 업데이트, 유지에 따라 로그를 남김

위의 코드로 생긴 로그를 바탕으로 작업을 함 중간에 작업이 중단되어도, 중단한 부분부터 시작 가능

import re
import os
import time
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# 셀레니움 웹 드라이버 설정
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))

# 오늘 날짜로 로그파일 이름 설정
today = datetime.today().strftime('%Y%m%d')
log_file_name = f"{today}.log"

# 크롤링 작업 완료 후 내용을 textnotice 디렉토리에 저장
def save_crawled_content(url, content):
    # URL에서 숫자 부분만 추출하여 파일명으로 사용
    file_name = url.split('/')[-1] + ".txt"
    file_path = os.path.join('textnotice', file_name)

    # 디렉토리가 없으면 생성
    if not os.path.exists('textnotice'):
        os.makedirs('textnotice')

    # 크롤링한 내용을 파일에 저장
    with open(file_path, 'w', encoding='utf-8') as file:
        file.write(content)
    print(f"Content saved to {file_path}")

# 페이지 크롤링 함수 (예시로 크롤링할 내용을 입력)
def crawl_url(url):
    driver.get(url)
    time.sleep(3)  # 페이지 로딩 대기 (필요시 WebDriverWait 사용 가능)

    try:
        # "상세 정보 더 보기" 버튼을 기다리고 클릭
        more_button = WebDriverWait(driver, 20).until(
            EC.element_to_be_clickable((By.XPATH, "//span[contains(text(), '상세 정보 더 보기')]"))
        )

        # 버튼이 화면에 보이지 않으면 스크롤하여 화면에 표시
        driver.execute_script("arguments[0].scrollIntoView(true);", more_button)
        time.sleep(1)  # 스크롤 후 잠시 대기

        # 버튼 클릭 시도 (다른 요소에 가려져 있을 경우 자바스크립트로 강제로 클릭)
        driver.execute_script("arguments[0].click();", more_button)

        # 버튼 클릭 후 로딩될 시간 대기
        job_content_section = WebDriverWait(driver, 20).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, "section.JobContent_JobContent__qan7s"))
        )

        # 섹션 내의 모든 텍스트 내용 가져오기
        job_content_text = job_content_section.text

        # 파일로 저장
        save_crawled_content(url, job_content_text)
        return job_content_text
    except Exception as e:
        print(f"Error during crawling {url}: {e}")
        return "Error during crawling."

# 기존 로그 파일을 읽고, "update" 상태인 URL들만 크롤링
def update_log_file(url, crawl_time):
    # 로그 파일을 읽고, 해당 URL의 상태 업데이트
    with open(log_file_name, 'r', encoding='utf-8') as file:
        lines = file.readlines()

    # 새로운 데이터를 담을 리스트
    updated_lines = []

    for line in lines:
        columns = line.strip().split(',')
        if columns[0] == url:
            # URL이 일치하면 work_status를 "done"으로 업데이트하고, done_time 추가
            columns[2] = "done"
            columns[3] = crawl_time
            updated_line = ','.join(columns)
            updated_lines.append(updated_line + '\n')
        else:
            # 일치하지 않으면 기존 라인을 그대로 추가
            updated_lines.append(line)

    # 로그 파일을 덮어쓰기 (한 줄씩 업데이트)
    with open(log_file_name, 'w', encoding='utf-8') as file:
        file.writelines(updated_lines)

# 기존 로그 파일을 읽기
with open(log_file_name, 'r', encoding='utf-8') as file:
    lines = file.readlines()

    # 'update' 상태인 URL들을 크롤링 대상 리스트에 추가
    for line in lines[1:]:
        columns = line.strip().split(',')
        url = columns[0]
        notice_status = columns[1]
        work_status = columns[2]
        done_time = columns[3]

        if notice_status == "update" and work_status == "null":
            print(f"Starting crawl for {url}")
            # 크롤링 작업 수행
            crawl_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            content = crawl_url(url)  # 크롤링 함수 호출

            # 작업이 끝난 후 즉시 로그 파일을 업데이트
            update_log_file(url, crawl_time)

# 브라우저 종료
driver.quit()

결과

[20241126] 어제 날짜 로그 (20241127파일을 복사하고 252615 url 로그 일부러 지우고, 작업 완료 날짜 11월26일로 바꾼 로그 파일) image

[20241127] 오늘 날짜 로그 image

[작업 처리] update이지만 work_status가 null인 값만 작업함 image

GITSangWoo commented 4 days ago

로그 파일 생성 로직 수정

전날 로그를 복사한 뒤 오늘 크롤링한 url과 비교해서 작업 로그를 새김

수정 사항: 어제와 오늘 모두 존재하는 URL 처리:

만약 URL이 어제도 존재하고 오늘도 존재한다면, notice_status를 "exist"로 설정하고, work_status와 done_time은 어제의 값을 그대로 가져옵니다.

오늘만 존재하는 URL 처리:

만약 URL이 오늘만 존재한다면, notice_status를 "update"로 설정하고, work_status와 done_time은 null로 설정합니다.

어제만 존재하는 URL 처리:

만약 URL이 어제는 존재했으나 오늘은 존재하지 않는다면, notice_status를 "deleted"로 설정하고, work_status는 "done"으로, done_time은 현재 시간으로 설정합니다.

코드

import re
import os
import time
from datetime import datetime, timedelta
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By

# 셀레니움 웹 드라이버 설정
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))

# 오늘 날짜로 로그파일 이름 설정
today = datetime.today().strftime('%Y%m%d')
log_file_name = f"{today}.log"

# 어제 날짜로 로그파일 이름 설정
yesterday = (datetime.today() - timedelta(days=1)).strftime('%Y%m%d')
yesterday_log_file_name = f"{yesterday}.log"

# 페이지 열기 (예시로 원하는 페이지)
driver.get("https://www.wanted.co.kr/wdlist/518/655?country=kr&job_sort=job.recommend_order&years=-1&selected=655&locations=all")

# 페이지 로딩 대기
time.sleep(3)  # 페이지 로딩 대기 (필요시 WebDriverWait 사용 가능)

# 링크를 저장할 리스트
all_links = []

# 정규 표현식 패턴 (wd/ 뒤에 숫자가 있는 URL을 찾는 패턴)
pattern = re.compile(r'wd/\d+$')

# 스크롤 내리기 및 링크 추출 반복
previous_height = driver.execute_script("return document.body.scrollHeight")  # 현재 페이지의 높이를 가져옴

while True:
    # 페이지에서 모든 <a> 태그를 찾음
    links = driver.find_elements(By.TAG_NAME, "a")

    # 이미 가져온 링크들을 확인하고 중복되지 않게 추가
    for link in links:
        href = link.get_attribute("href")
        # 정규 표현식으로 'wd/숫자' 형식의 링크만 필터링
        if href and pattern.search(href) and href not in all_links:
            all_links.append(href)

    # 스크롤을 페이지 끝까지 내리기
    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

    # 잠시 대기하여 새로운 요소들이 로드될 시간을 줌
    time.sleep(2)  # 2초간 대기, 이 시간은 페이지 로딩 속도에 맞게 조절

    # 새로운 페이지 높이가 이전과 같다면 스크롤을 더 이상 내릴 필요가 없으므로 종료
    new_height = driver.execute_script("return document.body.scrollHeight")
    if new_height == previous_height:
        break  # 더 이상 새로운 요소가 로드되지 않으면 반복 종료

    previous_height = new_height  # 이전 높이를 업데이트

# 어제 로그 파일이 있으면 읽기
previous_urls = {}
if os.path.exists(yesterday_log_file_name):
    with open(yesterday_log_file_name, 'r', encoding='utf-8') as file:
        for line in file.readlines()[1:]:  # 첫 번째 줄은 header
            columns = line.strip().split(',')
            url = columns[0]
            notice_status = columns[1]
            work_status = columns[2]
            done_time = columns[3]
            previous_urls[url] = {
                'notice_status': notice_status,
                'work_status': work_status,
                'done_time': done_time
            }

# 오늘 로그 파일에 기록할 내용 생성
log_data_deleted = []  # deleted 상태를 따로 저장
log_data_other = []    # 나머지 (exist, update) 상태를 따로 저장

# 오늘 크롤링한 URL과 어제 로그 파일을 비교하여 상태 설정
for url in all_links:
    if url in previous_urls:
        # 어제와 오늘 모두 존재하는 URL이면 "exist"로 처리
        notice_status = "exist"
        work_status = previous_urls[url]['work_status']  # 어제의 상태 그대로
        done_time = previous_urls[url]['done_time']  # 어제의 done_time 그대로
        log_data_other.append(f"{url},{notice_status},{work_status},{done_time}")
    else:
        # 오늘만 존재하는 URL은 "update"로 설정
        notice_status = "update"
        work_status = "null"
        done_time = "null"
        log_data_other.append(f"{url},{notice_status},{work_status},{done_time}")  # update 상태는 따로 추가

# 어제 로그 파일에 있지만 오늘 로그 파일에 없는 URL 처리
for url in previous_urls:
    if url not in all_links:
        # 어제는 존재했지만 오늘은 없는 URL은 "deleted"로 설정
        notice_status = "deleted"
        work_status = "done"
        done_time = datetime.today().strftime('%Y-%m-%d %H:%M:%S')  # 삭제된 시간을 현재 시간으로 설정
        log_data_deleted.append(f"{url},{notice_status},{work_status},{done_time}")  # 삭제된 URL은 따로 추가

# 오늘 로그 파일 생성 (기존 로그 파일 덮어쓰기)
with open(log_file_name, 'w', encoding='utf-8') as file:
    # 헤더 작성
    file.write("url,notice_status,work_status,done_time\n")
    # deleted 항목을 먼저 기록
    for line in log_data_deleted:
        file.write(line + "\n")
    # 나머지 (exist, update) 항목을 그 뒤에 기록
    for line in log_data_other:
        file.write(line + "\n")

# 브라우저 종료
driver.quit()

테스트

[20241127.log] 어제 날짜 로그파일

url,notice_status,work_status,done_time
https://www.wanted.co.kr/wd/252310,update,null,null
https://www.wanted.co.kr/wd/252615,update,null,null
https://www.wanted.co.kr/wd/252262,update,null,null
https://www.wanted.co.kr/wd/226146,update,null,null
https://www.wanted.co.kr/wd/252420,update,null,null
https://www.wanted.co.kr/wd/241695,update,null,null
https://www.wanted.co.kr/wd/251838,update,null,null

[20241128.log] 오늘 생성된 로그파일

url,notice_status,work_status,done_time
https://www.wanted.co.kr/wd/243538,deleted,done,2024-11-28 11:30:49
https://www.wanted.co.kr/wd/252310,exist,null,null
https://www.wanted.co.kr/wd/252615,exist,null,null
https://www.wanted.co.kr/wd/226146,exist,null,null
https://www.wanted.co.kr/wd/252262,exist,null,null
https://www.wanted.co.kr/wd/241695,exist,null,null
https://www.wanted.co.kr/wd/252420,exist,null,null
https://www.wanted.co.kr/wd/228746,exist,null,null
https://www.wanted.co.kr/wd/251100,exist,null,null
https://www.wanted.co.kr/wd/245921,exist,null,null
https://www.wanted.co.kr/wd/252316,exist,null,null
https://www.wanted.co.kr/wd/235682,exist,null,null
https://www.wanted.co.kr/wd/229465,exist,null,null
https://www.wanted.co.kr/wd/245694,exist,null,null

[https://www.wanted.co.kr/wd/243538,deleted,done,2024-11-28 11:30:49 공고 삭제되었는지 접속] image

지원 마감된 것 확인함

[코드 추가 개량 아이디어] (1) 전날 로그파일이 없을때 모두다 업데이트로 들어오기 때문에 중복저장 위험이 있음 생성된 가장 최신의 로그파일을 추적해 그 로그 파일 기반으로 작업을 처리하게 해야될듯함

개량으로 얻는 효과 (1) 작업을 마치지 못한 경우에 대해서 대비할 수 있어 중복 저장을 회피할 수 있음 (2) 시스템 적으로 오류가 발생했을 경우 크롤링 과정이 잘 진행되지 못한 경우를 대비할 수 있음

GITSangWoo commented 4 days ago

수정된 로그 기반으로 크롤링 작업과 db 적재 s3 적재를 하는 코드

import re
import os
import time
import uuid
import boto3
import pymysql
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# AWS S3 클라이언트 설정
s3_client = boto3.client('s3')

# MySQL 연결 설정
connection = pymysql.connect(
    host='13.125.201.86',  # DB 연결 IP
    user='user',
    password='1234',
    database='testdb',
    port=3306
)
cursor = connection.cursor()

# S3 버킷과 폴더 경로 설정
s3_bucket_name = 't2jt'
s3_folder_path = 'job/DE/sources/wanted/txt/'

# 셀레니움 웹 드라이버 설정
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))

# 오늘 날짜로 로그파일 이름 설정
today = datetime.today().strftime('%Y%m%d')
log_file_name = f"{today}.log"

# 크롤링 작업 완료 후 내용을 로컬에 저장하고 S3에 업로드
def save_crawled_content(url, content):
    # URL에서 숫자 부분만 추출하여 파일명으로 사용
    file_name = f"{uuid.uuid4()}.txt"  # uuid로 파일 이름 생성
    file_path = os.path.join('textnotice', file_name)

    # 로컬에 저장
    if not os.path.exists('textnotice'):
        os.makedirs('textnotice')

    # 크롤링한 내용을 로컬 파일에 저장
    with open(file_path, 'w', encoding='utf-8') as file:
        file.write(content)
    print(f"Content saved locally to {file_path}")

    # S3에 파일 업로드
    s3_file_path = os.path.join(s3_folder_path, file_name)  # S3에 저장할 경로
    try:
        s3_client.put_object(
            Bucket=s3_bucket_name,
            Key=s3_file_path,  # 'job/DE/sources/wanted/txt/UUID.txt'
            Body=content,
            ContentType='text/plain'
        )
        # S3 URL 생성 (형식: s3://<bucket-name>/<folder-path>/<file-name>)
        s3_url = f"s3://{s3_bucket_name}/{s3_file_path}"
        print(f"Content uploaded to S3 at {s3_url}")
        return s3_url  # S3 URL 반환
    except Exception as e:
        print(f"Error uploading to S3: {e}")
        return None

# 페이지 크롤링 함수 (예시로 크롤링할 내용을 입력)
def crawl_url(url):
    driver.get(url)
    time.sleep(3)  # 페이지 로딩 대기 (필요시 WebDriverWait 사용 가능)

    try:
        # "상세 정보 더 보기" 버튼을 기다리고 클릭
        more_button = WebDriverWait(driver, 20).until(
            EC.element_to_be_clickable((By.XPATH, "//span[contains(text(), '상세 정보 더 보기')]"))
        )

        # 버튼이 화면에 보이지 않으면 스크롤하여 화면에 표시
        driver.execute_script("arguments[0].scrollIntoView(true);", more_button)
        time.sleep(1)  # 스크롤 후 잠시 대기

        # 버튼 클릭 시도 (다른 요소에 가려져 있을 경우 자바스크립트로 강제로 클릭)
        driver.execute_script("arguments[0].click();", more_button)

        # 버튼 클릭 후 로딩될 시간 대기
        job_content_section = WebDriverWait(driver, 20).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, "section.JobContent_JobContent__qan7s"))
        )

        # 섹션 내의 모든 텍스트 내용 가져오기
        job_content_text = job_content_section.text

        # 회사 이름 추출
        company_element = driver.find_element(By.CSS_SELECTOR, "a.JobHeader_JobHeader__Tools__Company__Link__zAvYv")
        company_name = company_element.get_attribute("data-company-name")

        # 공고 제목 추출
        post_title = driver.find_element(By.CSS_SELECTOR, "h1.JobHeader_JobHeader__PositionName__kfauc").text

        # 마감일 추출
        try:
            deadline_element = driver.find_element(By.CSS_SELECTOR, "span.wds-lgio6k")
            deadline = deadline_element.text.strip()
        except Exception as e:
            deadline = "Unknown Deadline"

        # due_type 및 due_date 설정
        if re.match(r"\d{4}\.\d{2}\.\d{2}", deadline):  # 날짜 형식(2024.12.02) 확인
            due_type = "날짜"
            due_date = datetime.strptime(deadline, "%Y.%m.%d").date()  # 문자열을 날짜 형식으로 변환
        else:
            due_type = deadline  # 날짜 형식이 아니면 받아온 값 그대로 사용
            due_date = None  # 날짜가 아닐 경우 due_date는 null

        # S3에 저장 후 URL 받아오기
        s3_text_url = save_crawled_content(url, job_content_text)

        # 데이터베이스에 삽입할 메타 데이터 설정
        create_time = update_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        job_data = {
            'create_time': create_time,
            'update_time': update_time,
            'removed_time': None,
            'site': 'wanted',
            'job_title': 'DE',
            'due_type': due_type,
            'due_date': due_date,
            'company': company_name,
            'post_title': post_title,
            'notice_type': 'text',
            'org_url': url,
            's3_text_url': s3_text_url,
            's3_images_url': None,
            'responsibility': None,
            'qualification': None,
            'preferential': None
        }

        # 데이터베이스에 저장
        insert_query = """
        INSERT INTO wanted (create_time, update_time, removed_time, site, job_title, due_type, due_date, company, 
                            post_title, notice_type, org_url, s3_text_url, s3_images_url, responsibility, qualification, preferential)
        VALUES (%(create_time)s, %(update_time)s, %(removed_time)s, %(site)s, %(job_title)s, %(due_type)s, %(due_date)s, %(company)s, 
                %(post_title)s, %(notice_type)s, %(org_url)s, %(s3_text_url)s, %(s3_images_url)s, %(responsibility)s, %(qualification)s, %(preferential)s)
        """
        cursor.execute(insert_query, job_data)
        connection.commit()
        print(f"Data for job {post_title} inserted into database.")
        return job_content_text
    except Exception as e:
        print(f"Error during crawling {url}: {e}")
        return "Error during crawling."

# 'notice_status'가 'update' 또는 'exist'이고 'work_status'가 'null'인 URL만 크롤링하고 처리
def crawl_updates_and_existence():
    with open(log_file_name, 'r', encoding='utf-8') as file:
        lines = file.readlines()

    for line in lines[1:]:  # 첫 번째 줄은 헤더이므로 생략
        columns = line.strip().split(',')
        notice_status = columns[1]
        work_status = columns[2]
        url = columns[0]
        done_time = columns[3]  # done_time 값 읽기

        # 'update' 또는 'exist' 상태이면서 'work_status'가 'null'인 URL만 크롤링
        if notice_status in ['update', 'exist'] and work_status == 'null':
            print(f"Crawling URL: {url}")
            crawl_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            crawl_url(url)  # 해당 URL 크롤링
            update_log_file(url, crawl_time)  # 로그 파일 업데이트

        # 'deleted' 상태인 경우, DB에서 URL을 찾아 removed_time을 done_time으로 업데이트
        elif notice_status == "deleted":
            print(f"URL {url} is deleted. Checking if it exists in the database.")
            cursor.execute("SELECT removed_time FROM wanted WHERE org_url = %s", (url,))
            result = cursor.fetchone()

            if result:
                removed_time = done_time  # 로그에서 가져온 done_time 값을 removed_time으로 사용
                print(f"URL {url} exists in DB. Updating removed_time to: {removed_time}")
                # 데이터베이스에서 removed_time을 done_time으로 업데이트
                update_time_query = """
                UPDATE wanted
                SET removed_time = %s
                WHERE org_url = %s
                """
                cursor.execute(update_time_query, (removed_time, url))
                connection.commit()
                print(f"Removed time for {url} updated successfully.")
            else:
                print(f"URL {url} not found in the database.")

# 크롤링 시작
crawl_updates_and_existence()

(1) 로그에서 exist, updated인 값은 이전 로그 값이 null이면 크롤링 작업후 done_time에 작업완료시간 업데이트 (2) 로그에서 deleted 인 값은 DB에서 url같은 행 찾아서 removed_time 값을 삭제를 체크한 날짜 값으로 업데이트

테스트

[removed 값 업데이트 테스트를 위해서 첫 줄의 데이터는 더미로 넣어놨었음] image

[work_status가 null인 값부터 크롤링 작업을 함 ] - 어제 로그기반이기 때문에 다시 로그만들고 적재하면 중복저장 될 수 있음 로그를 다시 만들어야하는 불가피한 상황에서는 이미있는 로그를 어제 날짜로 바꿔놓고 다시 만들어서 처리해야함 제일 좋은건 db 적재 정보 기반일 듯 비용때문에 회피 중 대신에 어제 못한 작업이 있다면 이미 존재했던 오늘 새로 생겼던 합쳐서 크롤링을 이어서 진행가능함

image

GITSangWoo commented 3 days ago

수정된 로그 기반으로 크롤링 작업과 db 적재 s3 적재를 하는 코드 업데이트 2차

[업데이트 내용] (1) s3에 적재, db의 적재, 크롤링,할 때 에러가 나면 에러를 YYMMDD_error.log에 새김 (2) 크롤링할때 에러생기면 로그파일 work_status = error로 됌 (3) notice_status가 exist거나 update일때 work_status =error거나 null인 url 부터 작업을 실행함

import os
import time
import uuid
import boto3
import pymysql
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# AWS S3 클라이언트 설정
s3_client = boto3.client('s3')

# MySQL 연결 설정
connection = pymysql.connect(
    host='43.201.40.223',
    user='user',
    password='1234',
    database='testdb',
    port=3306
)
cursor = connection.cursor()

# S3 버킷과 폴더 경로 설정
s3_bucket_name = 't2jt'
s3_folder_path = 'job/DE/sources/wanted/txt/'

# 셀레니움 웹 드라이버 설정
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))

# 오늘 날짜로 로그파일 이름 설정
today = datetime.today().strftime('%Y%m%d')
log_file_name = f"{today}.log"
error_log_file_name = f"{today}_error.log"  # 에러 로그 파일

# 크롤링한 콘텐츠를 로컬에 저장하고, S3에 업로드 후 URL 반환
def save_crawled_content(url, content):
    file_name = f"{uuid.uuid4()}.txt"  # UUID로 파일 이름 생성
    file_path = os.path.join('textnotice', file_name)

    # 로컬에 저장
    if not os.path.exists('textnotice'):
        os.makedirs('textnotice')

    # 크롤링한 내용을 로컬 파일에 저장
    with open(file_path, 'w', encoding='utf-8') as file:
        file.write(content)
    print(f"Content saved locally to {file_path}")

    # S3에 파일 업로드
    s3_file_path = os.path.join(s3_folder_path, file_name)
    try:
        s3_client.put_object(
            Bucket=s3_bucket_name,
            Key=s3_file_path,  # 'job/DE/sources/wanted/txt/UUID.txt'
            Body=content,
            ContentType='text/plain'
        )
        s3_url = f"s3://{s3_bucket_name}/{s3_file_path}"
        print(f"Content uploaded to S3 at {s3_url}")
        return s3_url  # S3 URL 반환
    except Exception as e:
        print(f"Error uploading to S3: {e}")

        # S3 업로드 오류를 에러 로그 파일에 기록
        error_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        with open(error_log_file_name, 'a', encoding='utf-8') as error_log:
            error_log.write(f"{error_time}, {url}, S3 upload error: {str(e)}\n")

        return None

# 페이지 크롤링 함수
def crawl_url(url):
    driver.get(url)
    time.sleep(3)  # 페이지 로딩 대기

    try:
        # "상세 정보 더 보기" 버튼을 기다리고 클릭
        more_button = WebDriverWait(driver, 20).until(
            EC.element_to_be_clickable((By.XPATH, "//span[contains(text(), '상세 정보 더 보기')]"))
        )

        # 버튼이 화면에 보이지 않으면 스크롤하여 화면에 표시
        driver.execute_script("arguments[0].scrollIntoView(true);", more_button)
        time.sleep(1)  # 스크롤 후 잠시 대기

        # 버튼 클릭 시도
        driver.execute_script("arguments[0].click();", more_button)

        # 클릭 후 로딩된 콘텐츠 가져오기
        job_content_section = WebDriverWait(driver, 20).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, "section.JobContent_JobContent__qan7s"))
        )

        # 섹션 내의 텍스트 내용 가져오기
        job_content_text = job_content_section.text

        # 회사 이름, 공고 제목, 마감일 추출
        company_name = driver.find_element(By.CSS_SELECTOR, "a.JobHeader_JobHeader__Tools__Company__Link__zAvYv").get_attribute("data-company-name")
        post_title = driver.find_element(By.CSS_SELECTOR, "h1.JobHeader_JobHeader__PositionName__kfauc").text
        deadline = driver.find_element(By.CSS_SELECTOR, "span.wds-lgio6k").text.strip() if driver.find_elements(By.CSS_SELECTOR, "span.wds-lgio6k") else "Unknown Deadline"

        # 마감일 처리
        if re.match(r"\d{4}\.\d{2}\.\d{2}", deadline):  # 날짜 형식 (2024.12.02) 확인
            due_type = "날짜"
            due_date = datetime.strptime(deadline, "%Y.%m.%d").date()  # 날짜 형식 변환
        else:
            due_type = deadline
            due_date = None

        # S3에 텍스트 내용 저장 후 URL 반환
        s3_text_url = save_crawled_content(url, job_content_text)

        if not s3_text_url:
            # S3 업로드 실패 시 에러 처리
            update_log_file(url, "Error", status="error")
            return "Error during crawling."

        # 데이터베이스에 메타 데이터 삽입
        create_time = update_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        job_data = {
            'create_time': create_time,
            'update_time': update_time,
            'removed_time': None,
            'site': 'wanted',
            'job_title': 'DE',
            'due_type': due_type,
            'due_date': due_date,
            'company': company_name,
            'post_title': post_title,
            'notice_type': 'text',
            'org_url': url,
            's3_text_url': s3_text_url,
            's3_images_url': None,
            'responsibility': None,
            'qualification': None,
            'preferential': None
        }
        insert_query = """
        INSERT INTO wanted (create_time, update_time, removed_time, site, job_title, due_type, due_date, company, 
                            post_title, notice_type, org_url, s3_text_url, s3_images_url, responsibility, qualification, preferential)
        VALUES (%(create_time)s, %(update_time)s, %(removed_time)s, %(site)s, %(job_title)s, %(due_type)s, %(due_date)s, %(company)s, 
                %(post_title)s, %(notice_type)s, %(org_url)s, %(s3_text_url)s, %(s3_images_url)s, %(responsibility)s, %(qualification)s, %(preferential)s)
        """
        cursor.execute(insert_query, job_data)
        connection.commit()

        # 크롤링 성공 시 log 파일 업데이트
        crawl_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        update_log_file(url, crawl_time, status="done")

        return job_content_text
    except Exception as e:
        print(f"Error during crawling {url}: {e}")

        # 에러 로그 파일에 기록
        error_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        with open(error_log_file_name, 'a', encoding='utf-8') as error_log:
            error_log.write(f"{error_time}, {url}, {str(e)}\n")

        # 에러 발생 시 log 파일에서 work_status를 'error'로 변경하고 done_time 업데이트
        update_log_file(url, error_time, status="error")

        return "Error during crawling."

# 로그 파일을 읽고 해당 URL의 작업 상태 업데이트
def update_log_file(url, done_time, status="done"):
    try:
        with open(log_file_name, 'r', encoding='utf-8') as file:
            lines = file.readlines()

        updated_lines = []
        for line in lines:
            columns = line.strip().split(',')
            if columns[0] == url:
                columns[2] = status  # 상태를 'done' 또는 'error'로 변경
                columns[3] = done_time  # done_time을 업데이트
                updated_line = ','.join(columns) + '\n'
                updated_lines.append(updated_line)
            else:
                updated_lines.append(line)

        # 변경된 내용을 파일에 다시 작성
        with open(log_file_name, 'w', encoding='utf-8') as file:
            file.writelines(updated_lines)

        print(f"Log file updated for URL {url}")
    except Exception as e:
        print(f"Error updating log file for URL {url}: {e}")

# 로그에 따라 'deleted', 'update', 'exist' 상태인 URL만 크롤링
with open(log_file_name, 'r', encoding='utf-8') as file:
    lines = file.readlines()

for line in lines[1:]:  # 첫 번째 줄은 헤더이므로 생략
    columns = line.strip().split(',')
    url = columns[0]
    notice_status = columns[1]
    work_status = columns[2]
    done_time = columns[3] if len(columns) > 3 else None  # done_time이 존재할 때만 처리

    # 'deleted' 상태일 경우 처리 (크롤링은 하지 않음)
    if notice_status == "deleted":
        print(f"URL {url} is deleted. Checking if it exists in the database.")
        cursor.execute("SELECT removed_time FROM wanted WHERE org_url = %s", (url,))
        result = cursor.fetchone()

        if result:
            removed_time = done_time  # 로그에서 가져온 done_time 값을 removed_time으로 사용
            print(f"URL {url} exists in DB. Updating removed_time to: {removed_time}")
            # 데이터베이스에서 removed_time을 done_time으로 업데이트
            update_time_query = """
            UPDATE wanted
            SET removed_time = %s
            WHERE org_url = %s
            """
            cursor.execute(update_time_query, (removed_time, url))
            connection.commit()
            print(f"Removed time for {url} updated successfully.")
        else:
            print(f"URL {url} not found in the database.")
        continue  # 'deleted' 상태인 경우 크롤링을 건너뛰고, 제거만 처리

    # 'update' 또는 'exist' 상태이면서 'work_status'가 'null' 또는 'error'인 URL만 크롤링
    if notice_status in ['update', 'exist'] and (work_status == 'null' or work_status == 'error'):
        print(f"Crawling URL: {url}")
        crawl_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

        # 크롤링 함수 실행
        crawl_result = crawl_url(url)  # 크롤링 함수 실행

        # 크롤링이 성공적으로 완료되면 로그 파일에서 work_status를 done으로 업데이트
        if crawl_result != "Error during crawling.":
            update_log_file(url, crawl_time)

결과

[그냥 로그 파일] 크롤링 과정중 문제가 생기면 work_status는 error로 바꾸고 다시 크롤링을 시작하면 error인 부분이나 null인 부분 부터 작업을 시작함

image

[시스템 에러 로그 파일] image

[다다음날 공고 삭제되었을때] 로그파일 그저께거 복사해서 붙여넣음 image

image

GITSangWoo commented 1 day ago

작업 로그 파일 만드는 코드 업데이트 2차

[업데이트 내용] 작업이 중단되거나,다하지 못해서 밀렸을 경우, 오류가 났을 경우를 모두 대비해서 가장 최근 생성된 작업 로그 파일을 기반으로 오늘의 작업 로그 파일을 생성하도록 만듦

import re
import os
import time
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By

# 셀레니움 웹 드라이버 설정
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))

# 오늘 날짜로 로그 파일 이름 설정
today = datetime.today().strftime('%Y%m%d')
today_log_file_name = f"{today}.log"

# 로그 파일을 찾을 디렉토리 설정
log_directory = '.'  # 현재 디렉토리
log_files = [f for f in os.listdir(log_directory) if f.endswith('.log')]

# 가장 최근에 생성된 로그 파일 찾기
if log_files:
    # 파일들을 생성 시간 기준으로 정렬하고 가장 최근 파일을 선택
    log_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
    recent_log_file_name = log_files[0]  # 가장 최근의 로그 파일을 선택
    print(f"Found the most recent log file: {recent_log_file_name}")
else:
    print("No log files found in the directory. All URLs will be marked as 'update'.")

# 페이지 열기 (예시로 원하는 페이지)
driver.get("https://www.wanted.co.kr/wdlist/518/655?country=kr&job_sort=job.recommend_order&years=-1&selected=655&locations=all")

# 페이지 로딩 대기
time.sleep(3)  # 페이지 로딩 대기 (필요시 WebDriverWait 사용 가능)

# 링크를 저장할 리스트
all_links = []

# 정규 표현식 패턴 (wd/ 뒤에 숫자가 있는 URL을 찾는 패턴)
pattern = re.compile(r'wd/\d+$')

# 스크롤 내리기 및 링크 추출 반복
previous_height = driver.execute_script("return document.body.scrollHeight")  # 현재 페이지의 높이를 가져옴

while True:
    # 페이지에서 모든 <a> 태그를 찾음
    links = driver.find_elements(By.TAG_NAME, "a")

    # 이미 가져온 링크들을 확인하고 중복되지 않게 추가
    for link in links:
        href = link.get_attribute("href")
        # 정규 표현식으로 'wd/숫자' 형식의 링크만 필터링
        if href and pattern.search(href) and href not in all_links:
            all_links.append(href)

    # 스크롤을 페이지 끝까지 내리기
    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

    # 잠시 대기하여 새로운 요소들이 로드될 시간을 줌
    time.sleep(2)  # 2초간 대기, 이 시간은 페이지 로딩 속도에 맞게 조절

    # 새로운 페이지 높이가 이전과 같다면 스크롤을 더 이상 내릴 필요가 없으므로 종료
    new_height = driver.execute_script("return document.body.scrollHeight")
    if new_height == previous_height:
        break  # 더 이상 새로운 요소가 로드되지 않으면 반복 종료

    previous_height = new_height  # 이전 높이를 업데이트

# 이전 로그 파일이 존재하는지 확인하고 읽기
previous_urls = {}
if os.path.exists(recent_log_file_name):
    with open(recent_log_file_name, 'r', encoding='utf-8') as file:
        for line in file.readlines()[1:]:  # 첫 번째 줄은 header
            columns = line.strip().split(',')
            url = columns[0]
            notice_status = columns[1]
            work_status = columns[2]
            done_time = columns[3]
            previous_urls[url] = {
                'notice_status': notice_status,
                'work_status': work_status,
                'done_time': done_time
            }

# 오늘 로그 파일에 기록할 내용 생성
log_data_deleted = []  # deleted 상태를 따로 저장
log_data_other = []    # 나머지 (exist, update) 상태를 따로 저장

# 오늘 크롤링한 URL과 최근 로그 파일을 비교하여 상태 설정
for url in all_links:
    if url in previous_urls:
        # 이전 로그 파일과 오늘 모두 존재하는 URL이면 "exist"로 처리
        notice_status = "exist"
        work_status = previous_urls[url]['work_status']  # 이전의 상태 그대로
        done_time = previous_urls[url]['done_time']  # 이전의 done_time 그대로
        log_data_other.append(f"{url},{notice_status},{work_status},{done_time}")
    else:
        # 오늘만 존재하는 URL은 "update"로 설정
        notice_status = "update"
        work_status = "null"
        done_time = "null"
        log_data_other.append(f"{url},{notice_status},{work_status},{done_time}")  # update 상태는 따로 추가

# 이전 로그 파일에 있지만 오늘 로그 파일에 없는 URL 처리
for url in previous_urls:
    if url not in all_links:
        # 이전에는 존재했지만 오늘은 없는 URL은 "deleted"로 설정
        notice_status = "deleted"
        work_status = "done"
        done_time = datetime.today().strftime('%Y-%m-%d %H:%M:%S')  # 삭제된 시간을 현재 시간으로 설정
        log_data_deleted.append(f"{url},{notice_status},{work_status},{done_time}")  # 삭제된 URL은 따로 추가

# 오늘 로그 파일 생성 (기존 로그 파일 덮어쓰기)
with open(today_log_file_name, 'w', encoding='utf-8') as file:
    # 헤더 작성
    file.write("url,notice_status,work_status,done_time\n")
    # deleted 항목을 먼저 기록
    for line in log_data_deleted:
        file.write(line + "\n")
    # 나머지 (exist, update) 항목을 그 뒤에 기록
    for line in log_data_other:
        file.write(line + "\n")

# 브라우저 종료
driver.quit()

# 로직 추가 
# 작업이 중단되거나,다하지 못해서 밀렸을 경우, 오류가 났을 경우를 모두 대비해서 가장 최근 생성된 작업로그파일을 기반으로 
# 오늘의 작업로그파일을 생성하도록 만듦

[결과]

[로그 파일이 아예 없다면] image

[로그 파일이 존재한다면] image

[개선점] makelog 파일도 오류났을때를 대비해 시스템 로그 남게 했으면 좋지 않을까

파일이름은 makelog_system.log 정도

GITSangWoo commented 18 hours ago

작업 로그 파일 만드는 코드 업데이트 3차

[업데이트 내용]

  1. 크롤링중 오류가 나면 오류를 makelog_err.log 파일에 남게함
  2. 어제 deleted 된 공고된 내용도 오늘 deleted로 초기화 시키는 오류를 발견함 어제 deleted 된 공고에 대해서는 더 이상 정보를 가져오지 않고 어제 존재했지만 오늘은 존재하지 않는 공고에 대해서만 deleted로 업데이트 되게함

코드

import re
import os
import time
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By

def log_error(error_message):
    """오류를 makelog_err.log 파일에 기록"""
    with open('makelog_err.log', 'a', encoding='utf-8') as err_file:
        timestamp = datetime.today().strftime('%Y-%m-%d %H:%M:%S')
        err_file.write(f"{timestamp},{error_message}\n")

try:
    # 셀레니움 웹 드라이버 설정
    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))

    # 오늘 날짜로 로그 파일 이름 설정
    today = datetime.today().strftime('%Y%m%d')
    today_log_file_name = f"{today}.log"

    # 로그 파일을 찾을 디렉토리 설정
    log_directory = '.'  # 현재 디렉토리
    log_files = [f for f in os.listdir(log_directory) if f.endswith('.log')]

    # 가장 최근에 생성된 로그 파일 찾기
    if log_files:
        # 파일들을 생성 시간 기준으로 정렬하고 가장 최근 파일을 선택
        log_files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
        recent_log_file_name = log_files[0]  # 가장 최근의 로그 파일을 선택
        print(f"Found the most recent log file: {recent_log_file_name}")
    else:
        print("No log files found in the directory. All URLs will be marked as 'update'.")

    # 페이지 열기 (예시로 원하는 페이지)
    driver.get("https://www.wanted.co.kr/wdlist/518/655?country=kr&job_sort=job.recommend_order&years=-1&selected=655&locations=all")

    # 페이지 로딩 대기
    time.sleep(3)  # 페이지 로딩 대기 (필요시 WebDriverWait 사용 가능)

    # 링크를 저장할 리스트
    all_links = []

    # 정규 표현식 패턴 (wd/ 뒤에 숫자가 있는 URL을 찾는 패턴)
    pattern = re.compile(r'wd/\d+$')

    # 스크롤 내리기 및 링크 추출 반복
    previous_height = driver.execute_script("return document.body.scrollHeight")  # 현재 페이지의 높이를 가져옴

    while True:
        # 페이지에서 모든 <a> 태그를 찾음
        links = driver.find_elements(By.TAG_NAME, "a")

        # 이미 가져온 링크들을 확인하고 중복되지 않게 추가
        for link in links:
            href = link.get_attribute("href")
            # 정규 표현식으로 'wd/숫자' 형식의 링크만 필터링
            if href and pattern.search(href) and href not in all_links:
                all_links.append(href)

        # 스크롤을 페이지 끝까지 내리기
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

        # 잠시 대기하여 새로운 요소들이 로드될 시간을 줌
        time.sleep(2)  # 2초간 대기, 이 시간은 페이지 로딩 속도에 맞게 조절

        # 새로운 페이지 높이가 이전과 같다면 스크롤을 더 이상 내릴 필요가 없으므로 종료
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == previous_height:
            break  # 더 이상 새로운 요소가 로드되지 않으면 반복 종료

        previous_height = new_height  # 이전 높이를 업데이트

    # 이전 로그 파일이 존재하는지 확인하고 읽기
    previous_urls = {}
    if os.path.exists(recent_log_file_name):
        with open(recent_log_file_name, 'r', encoding='utf-8') as file:
            for line in file.readlines()[1:]:  # 첫 번째 줄은 header
                columns = line.strip().split(',')
                url = columns[0]
                notice_status = columns[1]
                work_status = columns[2]
                done_time = columns[3]
                previous_urls[url] = {
                    'notice_status': notice_status,
                    'work_status': work_status,
                    'done_time': done_time
                }

    # 오늘 로그 파일에 기록할 내용 생성
    log_data_deleted = []  # deleted 상태를 따로 저장
    log_data_other = []    # 나머지 (exist, update) 상태를 따로 저장

    # 오늘 크롤링한 URL과 최근 로그 파일을 비교하여 상태 설정
    for url in all_links:
        if url in previous_urls:
            # 이전 로그 파일과 오늘 모두 존재하는 URL이면 "exist"로 처리
            if previous_urls[url]['notice_status'] == "deleted":
                # 이미 'deleted' 상태로 존재하는 공고는 다시 "deleted"로 처리하지 않음
                continue
            notice_status = "exist"
            work_status = previous_urls[url]['work_status']  # 이전의 상태 그대로
            done_time = previous_urls[url]['done_time']  # 이전의 done_time 그대로
            log_data_other.append(f"{url},{notice_status},{work_status},{done_time}")
        else:
            # 오늘만 존재하는 URL은 "update"로 설정
            notice_status = "update"
            work_status = "null"
            done_time = "null"
            log_data_other.append(f"{url},{notice_status},{work_status},{done_time}")  # update 상태는 따로 추가

    # 이전 로그 파일에 있지만 오늘 로그 파일에 없는 URL 처리
    for url in previous_urls:
        if url not in all_links:
            # 이전에는 존재했지만 오늘은 없는 URL은 "deleted"로 설정
            if previous_urls[url]['notice_status'] == "deleted":
                # 이미 'deleted' 상태로 기록된 공고는 다시 'deleted'로 갱신하지 않음
                continue
            notice_status = "deleted"
            work_status = "done"
            done_time = datetime.today().strftime('%Y-%m-%d %H:%M:%S')  # 삭제된 시간을 현재 시간으로 설정
            log_data_deleted.append(f"{url},{notice_status},{work_status},{done_time}")  # 삭제된 URL은 따로 추가

    # 오늘 로그 파일 생성 (기존 로그 파일 덮어쓰기)
    with open(today_log_file_name, 'w', encoding='utf-8') as file:
        # 헤더 작성
        file.write("url,notice_status,work_status,done_time\n")
        # deleted 항목을 먼저 기록
        for line in log_data_deleted:
            file.write(line + "\n")
        # 나머지 (exist, update) 항목을 그 뒤에 기록
        for line in log_data_other:
            file.write(line + "\n")

    # 브라우저 종료
    driver.quit()

except Exception as e:
    # 오류 발생 시 오류 메시지 기록
    error_message = str(e)
    log_error(error_message)
    # 프로그램 종료 전 브라우저 종료
    if 'driver' in locals():
        driver.quit()

# 파일 실행시 에러상황에서 로그 남김 

결과

[12-01 작업 로그] image

[12-02 작업 로그] image

[DB 업데이트 상황] image

hun0219 commented 11 hours ago

url 스크래핑

# 패키지 불러오기
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from urllib.parse import urlparse, parse_qs, urlencode
import boto3
import datetime
import time
import os

# URL에서 rec_idx 값까지만 포함된 URL 반환 함수
def extract_rec_idx_url(url):
    """
    URL에서 rec_idx 값까지만 포함된 URL을 반환
    """
    parsed_url = urlparse(url)
    query_params = parse_qs(parsed_url.query)
    base_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"

    # rec_idx 값만 포함한 쿼리 스트링 생성
    rec_idx = query_params.get("rec_idx", [None])[0]
    if rec_idx:
        new_query = urlencode({"rec_idx": rec_idx})
        return f"{base_url}?{new_query}"
    return base_url

# AWS s3 설정
BUCKET_NAME = "t2jt"               # S3 버킷 이름
S3_PATH_PREFIX = "job/DE/sources/saramin/links/"  # S3 경로

# S3 클라이언트 생성 (자격 증명은 ~/.aws/credentials에서 읽음)
s3_client = boto3.client("s3")

# Chrome 옵션 설정
chrome_options = Options()
chrome_options.add_argument("--disable-cache")  # 캐시 비활성화
#chrome_options.add_argument("--incognito")   # 시크릿 모드로 실행
#chrome_options.add_argument("--disk-cache-dir=/dev/null")  # 디스크 캐시를 비활성화
#chrome_options.add_argument("--disable-application-cache")  # 애플리케이션 캐시 비활성화
#chrome_options.add_argument("--disable-gpu")  # GPU 가속 비활성화 (필요한 경우)
#chrome_options.add_argument("--no-sandbox")  # 샌드박스 비활성화

# Selenium WebDriver 설정
driver = webdriver.Chrome(options=chrome_options)

# DevTools Protocol을 사용하여 캐시 비활성화
driver.execute_cdp_cmd("Network.setCacheDisabled", {"cacheDisabled": True})

# 사람인 홈페이지 접속
url = "https://www.saramin.co.kr/zf_user/"
driver.get(url)
time.sleep(7)

driver.delete_all_cookies()
driver.execute_script("window.localStorage.clear();")
driver.execute_script("window.sessionStorage.clear();")

try:
    # 수집 시점
    current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    today_date = datetime.datetime.now().strftime("%Y%m%d")  # 오늘 날짜 (YYYYMMDD)

    # s3 파일 경로
    s3_file_path = f"{S3_PATH_PREFIX}{today_date}.txt"

    # 검색창 선택 및 검색어 입력
    #search_box = WebDriverWait(driver, 5).until(
    #    EC.presence_of_element_located((By.CLASS_NAME, "search"))
    #)
    search_box = driver.find_element(By.CLASS_NAME, "search")
    search_box.click()
    time.sleep(5)

    search_input = driver.find_element(By.XPATH, '//input[@id="ipt_keyword_recruit"]')
    search_input.click()
    time.sleep(5)

    keyword = "데이터 엔지니어"
    search_input.send_keys(keyword)  # 검색어 입력
    time.sleep(3) #추가
    #search_input.send_keys(Keys.RETURN)  # 검색 실행
    search_button = driver.find_element(By.XPATH, '//button[@id="btn_search_recruit"]')  # 정확한 XPATH 사용
    search_button.click()  # 검색 버튼 클릭
    print("검색 완료!")

    # 검색 결과 로드 대기
    WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.ID, "recruit_info_list"))
    )

    # 모든 페이지 데이터 수집
    page = 1  # 현재 페이지 번호
    job_data_list = [] # 데이터를 저장할 리스트

    while True:
        print(f"현재 페이지: {page}")

        # 현재 페이지 데이터 수집
        job_elements = WebDriverWait(driver, 10).until(
            EC.presence_of_all_elements_located(
                (By.XPATH, '//div[@id="recruit_info_list"]//div[contains(@class, "item_recruit")]')
            )
        )

        for job_element in job_elements:
            try:
                # 제목과 URL
                title_element = job_element.find_element(By.XPATH, './/h2[@class="job_tit"]/a')
                title = title_element.get_attribute("title")
                url = title_element.get_attribute("href")

                # URL에서 rec_idx 값까지만 포함된 URL 생성
                org_url = extract_rec_idx_url(url)  # URL 가공

                # 기업명
                company_element = job_element.find_element(By.XPATH, './/div[@class="area_corp"]//a')
                company_name = company_element.text if company_element else "Unknown"

                # 데이터 리스트에 추가
                job_data_list.append({
                    "URL_CR_TIME": current_time,
                    "SITE": "saramin",
                    "JOB_TITLE": keyword,
                    "COMPANY": company_name,
                    "POST_TITLE": title,
                    "ORG_URL": org_url  # 가공된 URL 저장
                })
            except Exception as e:
                print(f"요소 처리 중 오류 발생: {e}")
        time.sleep(7)

        # 다음 페이지로 이동
        try:
            next_page = driver.find_element(By.XPATH, f'//a[@page="{page + 1}"]')
            next_page.click()  # 다음 페이지로 이동
            time.sleep(7)
            page += 1
        except Exception as e:
            print("마지막 페이지에 도달했거나 다음 페이지로 이동할 수 없습니다.")
            break

    # S3로 데이터 업로드
    s3_content = ""
    for job in job_data_list:
        formatted_line = (
            f"URL_CR_TIME: {job['URL_CR_TIME']}, "
            f"SITE: {job['SITE']}, "
            f"JOB_TITLE: {job['JOB_TITLE']}, "
            f"COMPANY: {job['COMPANY']}, "
            f"POST_TITLE: {job['POST_TITLE']}, "
            f"ORG_URL: {job['ORG_URL']}\n"
        )
        s3_content += formatted_line

    s3_client.put_object(
        Bucket=BUCKET_NAME,
        Key=s3_file_path,
        Body=s3_content.encode("utf-8-sig"),
        ContentType="text/plain"
    )
    print(f"S3에 파일 업로드 완료: s3://{BUCKET_NAME}/{s3_file_path}")

except Exception as e:
    print(f"오류 발생: {e}")

finally:
    # 자동 종료 안되게
    #input("Press Enter to close the browser...")
    driver.quit()
hun0219 commented 11 hours ago

DB 적재

    format="%(asctime)s [%(levelname)s] %(message)s"
)

# AWS S3 클라이언트 생성
s3 = boto3.client('s3')

# S3 설정
BUCKET_NAME = "t2jt"
S3_BASE_PATH = "job/DE/sources/saramin/links"
S3_TEXT_PATH = "job/DE/sources/saramin/txt"
S3_IMAGES_PATH = "job/DE/sources/saramin/images"
today_date = datetime.now().strftime("%Y%m%d")  # 오늘 날짜 (YYYYMMDD 형식)
yesterday_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")  # 어제 날짜 (YYYYMMDD 형식)
today_file_path = f"{S3_BASE_PATH}/{today_date}.txt"
yesterday_file_path = f"{S3_BASE_PATH}/{yesterday_date}.txt"

# MySQL 연결 풀 설정
db_config = {
    'host': '43.201.40.223',
    'user': 'user',
    'password': '1234',
    'database': 'testdb',
    'port': '3306'
}
# Mysql 연결 재사용 연결 풀링 설정
connection_pool = pooling.MySQLConnectionPool(pool_name="saramin_pool", pool_size=5, **db_config)

def get_connection():
    return connection_pool.get_connection()

# S3에 텍스트 업로드
def upload_to_s3(content, file_name):
    try:
        s3.put_object(Bucket=BUCKET_NAME, Key=f"{S3_TEXT_PATH}/{file_name}", Body=content)
        s3_url = f"s3://{BUCKET_NAME}/{S3_TEXT_PATH}/{file_name}"
        logging.info(f"S3에 텍스트 업로드 성공: {s3_url}")
        return s3_url
    except Exception as e:
        logging.error(f"S3 업로드 실패: {e}")
        return None

# S3 파일 읽기 함수 (재시도 포함)
def read_s3_file(bucket, path, retries=3):
    for attempt in range(retries):
        try:
            response = s3.get_object(Bucket=bucket, Key=path)
            return response['Body'].read().decode('utf-8').strip()
        except Exception as e:
            logging.error(f"S3 파일 읽기 실패 (시도 {attempt + 1}/{retries}): {e}")
            if attempt < retries - 1:
                time.sleep(2)  # 재시도 전 대기
            else:
                return None

# URL 추출 함수
def extract_urls_with_details(content):
    """
    Extract URLs along with JOB_TITLE, COMPANY, and POST_TITLE from content.
    """
    data = []
    for line in content.splitlines():
        url_match = re.search(r"ORG_URL:\s*(https?://[^\s,]+)", line)
        job_title_match = re.search(r"JOB_TITLE:\s*([^,]+)", line)
        company_match = re.search(r"COMPANY:\s*([^,]+)", line)
        post_title_match = re.search(r"POST_TITLE:\s*([^,]+)", line)

        if url_match:
            url = url_match.group(1).strip()
            job_title = job_title_match.group(1).strip() if job_title_match else "Unknown Job Title"
            company = company_match.group(1).strip() if company_match else "Unknown Company"
            post_title = post_title_match.group(1).strip() if post_title_match else "Unknown Post Title"
            data.append((url, job_title, company, post_title))
    return data

# DB 중복 URL 확인 함수
def is_url_in_db(org_url):
    try:
        conn = get_connection()
        cursor = conn.cursor()
        cursor.execute("SELECT COUNT(*) FROM saramin WHERE org_url = %s", (org_url,))
        return cursor.fetchone()[0] > 0
    finally:
        if conn.is_connected():
            cursor.close()
            conn.close()

# MySQL 데이터베이스에 배치 삽입
def batch_insert_to_db(data):
    try:
        conn = get_connection()
        cursor = conn.cursor()
        insert_query = """
            INSERT INTO saramin (
                id, create_time, update_time, removed_time, site, job_title, due_type, due_date, company, post_title, notice_type, org_url, s3_text_url, s3_images_url
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """

        # 데이터 내의 notice_type 값을 설정하여 새로운 리스트 생성
        updated_data = []
        for record in data:
            record = list(record)  # 튜플을 리스트로 변환
            s3_text_url = record[12]  # s3_text_url 위치
            s3_images_url = record[13]  # s3_images_url 위치

            # notice_type 결정
            if s3_text_url and not s3_images_url:
                notice_type = "text"
            elif not s3_text_url and s3_images_url:
                notice_type = "images"
            elif s3_text_url and s3_images_url:
                notice_type = "both"
            else:
                notice_type = "none"

            # notice_type 업데이트
            record[10] = notice_type  # notice_type 위치
            updated_data.append(tuple(record))  # 리스트를 다시 튜플로 변환

        # 업데이트된 데이터를 DB에 삽입
        cursor.executemany(insert_query, updated_data)
        conn.commit()
        logging.info(f"{len(updated_data)}개의 데이터가 성공적으로 삽입되었습니다.")
    except Exception as e:
        logging.error(f"배치 삽입 실패: {e}")
    finally:
        if conn.is_connected():
            cursor.close()
            conn.close()

# DB에서 removed_time 업데이트
def update_removed_time(org_url):
    try:
        conn = get_connection()
        cursor = conn.cursor()
        removed_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        cursor.execute("UPDATE saramin SET removed_time = %s WHERE org_url = %s", (removed_time, org_url))
        conn.commit()
        logging.info(f"DB 업데이트 완료: org_url = {org_url}, removed_time = {removed_time}")
    except Exception as e:
        logging.error(f"DB 업데이트 실패: {e}")
    finally:
        if conn.is_connected():
            cursor.close()
            conn.close()

def upload_image_to_s3(image_url):
    """
    이미지 URL을 다운로드하여 S3에 저장한 후, DB에 저장할 URL을 반환
    """
    try:
        # S3 키로 사용하기 위해 URL의 슬래시를 |로 인코딩
        encoded_url = image_url.replace('/', '|')  # 슬래시를 %2F로 변환
        s3_key = f"{S3_IMAGES_PATH}/{encoded_url}"

        # 원본 URL에서 이미지 다운로드
        response = requests.get(image_url, stream=True)
        if response.status_code == 200:
            # S3에 이미지 업로드
            s3.put_object(
                Bucket=BUCKET_NAME,
                Key=s3_key,
                Body=response.content,
                ContentType=response.headers.get('Content-Type', 'application/octet-stream')
            )
            # DB에 저장할 S3 경로와 원본 URL 결합
            s3_db_entry = f"s3://{BUCKET_NAME}/{s3_key}"
            logging.info(f"S3 업로드 성공 및 DB 경로 생성: {s3_db_entry}")
            return s3_db_entry  # S3 경로 반환
        else:
            logging.error(f"이미지 다운로드 실패: {image_url}, 상태 코드: {response.status_code}")
            return None
    except Exception as e:
        logging.error(f"이미지 업로드 실패: {image_url}, 에러: {e}")
        return None

# 마감일과 텍스트 또는 이미지 가져오기
def extract_due_date_and_content(url, next_id, job_title, company, post_title, retries=3):
    s3_text_url = None
    s3_images_url = None

    for attempt in range(retries):
        try:
            logging.info(f"URL로 이동 중 (시도 {attempt + 1}/{retries}): {url}")
            driver = webdriver.Chrome()
            driver.get(url)

            time.sleep(10)

            # Iframe으로 전환
            WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "iframe")))
            iframe = driver.find_element(By.ID, "iframe_content_0")
            driver.switch_to.frame(iframe)

            # user_content 텍스트 추출
            WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CLASS_NAME, "user_content")))
            content_element = driver.find_element(By.CLASS_NAME, "user_content")
            extracted_text = content_element.text.strip()

            # 텍스트 처리 - S3 저장
            if extracted_text:
                file_name = f"{uuid.uuid4()}.txt"
                s3_key = f"{S3_TEXT_PATH}/{file_name}"
                s3.put_object(Bucket=BUCKET_NAME, Key=s3_key, Body=extracted_text.encode("utf-8"))
                s3_text_url = f"s3://{BUCKET_NAME}/{s3_key}"
                logging.info(f"S3에 텍스트 저장 완료: {s3_text_url}")

                # 텍스트가 있으면 이미지는 건너뛴다
                s3_images_url = None
            else:
                # 이미지 URL 추출 및 S3 업로드
                user_content = driver.find_element(By.CLASS_NAME, "user_content")
                img_elements = user_content.find_elements(By.TAG_NAME, "img")
                images_urls = {img.get_attribute("src").strip() for img in img_elements if img.get_attribute("src")}

                if images_urls:
                    # 각 이미지를 S3에 업로드하고 S3 경로 수집
                    uploaded_urls = [upload_image_to_s3(image_url) for image_url in images_urls]
                    s3_images_url = ", ".join(filter(None, uploaded_urls))  # None 값 제거

            # 마감일 추출
            driver.switch_to.default_content()
            title_element = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "title")))
            title_text = title_element.get_attribute("textContent").strip()

            match = re.search(r"\(([^()]*)\)\s*- 사람인", title_text)
            if match:
                due_date_value = match.group(1).strip()
                if "D-" in due_date_value:
                    days_to_add = int(due_date_value.split("-")[1])
                    due_date = (datetime.now() + timedelta(days=days_to_add)).strftime("%Y-%m-%d")
                    due_type = "날짜"
                else:
                    due_date = None
                    due_type = due_date_value  # D-가 아닌 경우 원문 반환
            else:
                due_date_value, due_date, due_type = "없음", None, "없음"

            # notice_type 결정
            if s3_text_url and not s3_images_url:
                notice_type = "text"
            elif not s3_text_url and s3_images_url:
                notice_type = "images"
            elif s3_text_url and s3_images_url:
                notice_type = "both"
            else:
                notice_type = "none"

            # DB 저장
            save_to_db(
                next_id=next_id,
                job_title=job_title,
                company=company,
                post_title=post_title,
                due_type=due_type,
                due_date=due_date,
                notice_type=notice_type,
                url=url,
                s3_text_url=s3_text_url,
                s3_images_url=s3_images_url
            )
            return True

        except Exception as e:
            logging.error(f"마감일 및 콘텐츠 추출 실패 (시도 {attempt + 1}/{retries}): {e}")
            if attempt < retries - 1:
                logging.info("재시도 중...")
                time.sleep(5)  # 재시도 전에 대기
            else:
                logging.error("최대 재시도 횟수에 도달했습니다.")
                return False
        finally:
            driver.quit()

# DB 저장 함수
def save_to_db(next_id, job_title, company, post_title, due_type, due_date, notice_type, url, s3_text_url, s3_images_url):
    try:
        conn = get_connection()
        cursor = conn.cursor()
        insert_query = """
            INSERT INTO saramin (
                id, create_time, update_time, removed_time, site, job_title, due_type, due_date, company, post_title, notice_type, org_url, s3_text_url, s3_images_url
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """
        cursor.execute(insert_query, (
            next_id,
            datetime.now(),
            datetime.now(),
            None,
            "saramin",
            job_title,
            due_type,
            due_date,
            company,
            post_title,
            notice_type,
            url,
            s3_text_url,
            s3_images_url
        ))
        conn.commit()
        logging.info(f"DB에 저장 성공 - URL: {url}, Job Title: {job_title}, Company: {company}, Post_Title: {post_title}, S3 Text URL: {s3_text_url}, S3 Images URL: {s3_images_url}")
    except Exception as e:
        logging.error(f"DB 저장 실패 - URL: {url}, Error: {e}")
    finally:
        if conn.is_connected():
            cursor.close()
            conn.close()

# 정규화된 URL 사용
def normalize_url(url):
    try:
        parsed_url = urlparse(url)
        query_params = sorted(parse_qsl(parsed_url.query), key=lambda x: x[0])
        normalized_query = urlencode(query_params, doseq=True)
        normalized_url = parsed_url._replace(
            scheme=parsed_url.scheme.lower(),
            netloc=parsed_url.netloc.lower(),
            query=normalized_query,
            fragment=''
        ).geturl()
        logging.debug(f"URL 정규화: 원본 URL = {url}, 정규화된 URL = {normalized_url}")
        return normalized_url
    except Exception as e:
        logging.error(f"URL 정규화 실패: {url}, 에러: {e}", exc_info=True)
        return url

# 실행 로직
# 오늘날짜.txt와 DB와 먼저 중복 체크 후 오늘날짜.txt와 어제날짜.txt비교 후 추가 및 제거
def execute():
    try:
        # S3 파일 읽기
        today_content = read_s3_file(BUCKET_NAME, today_file_path)
        yesterday_content = read_s3_file(BUCKET_NAME, yesterday_file_path)

        if not today_content:
            logging.error("오늘 파일을 읽을 수 없으므로 종료합니다.")
            return

        # 오늘 날짜 파일에서 URL 및 관련 데이터 추출
        today_data = extract_urls_with_details(today_content)
        logging.info(f"오늘 날짜 파일에서 추출된 데이터: {len(today_data)}개")

        # DB와 중복 확인
        filtered_today_data = []
        conn = get_connection()
        try:
            cursor = conn.cursor()
            for url, job_title, company, post_title in today_data:
                cursor.execute("SELECT COUNT(*) FROM saramin WHERE org_url = %s", (url,))
                if cursor.fetchone()[0] == 0:  # DB에 없는 경우만 추가
                    filtered_today_data.append((url, job_title, company, post_title))
            logging.info(f"DB 중복 확인 후 남은 데이터: {len(filtered_today_data)}개")
        finally:
            if conn.is_connected():
                cursor.close()
                conn.close()

        # 어제 날짜 파일에서 URL 데이터 추출
        yesterday_urls = {item[0] for item in extract_urls_with_details(yesterday_content)} if yesterday_content else set()

        # 추가 및 제거된 URL 계산
        added_data = [data for data in filtered_today_data if data[0] not in yesterday_urls]
        removed_urls = yesterday_urls - {data[0] for data in filtered_today_data}

        logging.info(f"추가된 URL: {len(added_data)}개")
        logging.info(f"제거된 URL: {len(removed_urls)}개")

        # DB에서 ID 초기화
        conn = get_connection()
        try:
            cursor = conn.cursor()
            cursor.execute("SELECT MAX(id) FROM saramin")
            max_id_result = cursor.fetchone()[0]
            next_id = (max_id_result + 1) if max_id_result is not None else 1  # 없는 경우 1로 시작
        finally:
            if conn.is_connected():
                cursor.close()
                conn.close()

        # 추가된 데이터 처리
        for url, job_title, company, post_title in added_data:
            if extract_due_date_and_content(url, next_id, job_title, company, post_title):
                next_id += 1

        # 제거된 데이터 처리
        for org_url in removed_urls:
            update_removed_time(org_url)
            logging.info(f"제거된 데이터 업데이트 완료: {org_url}")

        logging.info("모든 작업이 완료되었습니다.")

    except Exception as e:
        logging.error(f"실행 중 오류 발생: {e}")

# 실행
execute()