<aside> 💡 프로젝트 소개 및 개요

</aside>

1. Movation


2. Introduction

DALL·E 2024-09-11 12.29.01 - A simple flowchart representing an Airflow architecture with two tasks. Task 1 involves API crawling to fetch data and store it into a database, and T.webp


3. Method

3.1 crawl_fine_dust_data(**kwargs)


def crawl_fine_dust_data(**kwargs):
    """
    미세먼지 데이터를 외부 API로부터 크롤링하여 Pandas DataFrame으로 변환한 후,
    XCom을 사용하여 다음 태스크로 DataFrame을 전달한다.
    """
    # API 호출을 위한 URL 구성
    url = f"{API_URL}?year=2020&pageNo=1&numOfRows=100&returnType=xml&serviceKey={SERVICE_KEY}"
    
    # API 요청 전송
    response = requests.get(url)
    
    # XML 응답을 파싱하여 BeautifulSoup 객체로 변환
    data_bs = BeautifulSoup(response.text, "lxml-xml")
    
    # 'item' 태그 내의 데이터를 모두 추출
    items = data_bs.find_all('item')

    # 데이터를 저장할 리스트 생성
    data_list = []
    
    # 각 'item' 태그 내의 데이터를 파싱하여 리스트에 저장
    for item in items:
        row = {
            "districtName": item.find("districtName").text,  # 지역명
            "dataDate": item.find("dataDate").text,  # 데이터 날짜
            "issueVal": item.find("issueVal").text,  # 발령 값
            "issueTime": item.find("issueTime").text,  # 발령 시간
            "clearVal": item.find("clearVal").text,  # 해제 값
            "clearTime": item.find("clearTime").text,  # 해제 시간
            "issueGbn": item.find("issueGbn").text,  # 발령 구분
            "itemCode": item.find("itemCode").text,  # 항목 코드
        }
        data_list.append(row)

    # 리스트를 Pandas DataFrame으로 변환
    df = pd.DataFrame(data_list)
    
    # 수집된 데이터를 XCom에 저장하여 다음 태스크로 전달
    kwargs['ti'].xcom_push(key='fine_dust_df', value=df)
    
    # 수집된 데이터의 개수를 출력
    print(f"총 {len(data_list)}개의 데이터를 수집했습니다.")

3.2 load_data_to_mysql(**kwargs)