<aside> 💡 프로젝트 소개 및 개요
</aside>
Task 1: API 크롤링
Open API를 통해 데이터를 수집한 후, 이를 데이터베이스(DB)에 저장하는 과정입니다.
흐름은 "API 크롤링 → 데이터 처리 → DB 저장"으로 이어집니다.
Task 2: CSV 읽기
CSV 파일에서 데이터를 읽고 이를 처리한 뒤, 동일한 데이터베이스(DB)에 저장하는 과정입니다.
흐름은 "CSV 읽기 → 데이터 처리 → DB 저장"으로 이어집니다.
def crawl_fine_dust_data(**kwargs):
"""
미세먼지 데이터를 외부 API로부터 크롤링하여 Pandas DataFrame으로 변환한 후,
XCom을 사용하여 다음 태스크로 DataFrame을 전달한다.
"""
# API 호출을 위한 URL 구성
url = f"{API_URL}?year=2020&pageNo=1&numOfRows=100&returnType=xml&serviceKey={SERVICE_KEY}"
# API 요청 전송
response = requests.get(url)
# XML 응답을 파싱하여 BeautifulSoup 객체로 변환
data_bs = BeautifulSoup(response.text, "lxml-xml")
# 'item' 태그 내의 데이터를 모두 추출
items = data_bs.find_all('item')
# 데이터를 저장할 리스트 생성
data_list = []
# 각 'item' 태그 내의 데이터를 파싱하여 리스트에 저장
for item in items:
row = {
"districtName": item.find("districtName").text, # 지역명
"dataDate": item.find("dataDate").text, # 데이터 날짜
"issueVal": item.find("issueVal").text, # 발령 값
"issueTime": item.find("issueTime").text, # 발령 시간
"clearVal": item.find("clearVal").text, # 해제 값
"clearTime": item.find("clearTime").text, # 해제 시간
"issueGbn": item.find("issueGbn").text, # 발령 구분
"itemCode": item.find("itemCode").text, # 항목 코드
}
data_list.append(row)
# 리스트를 Pandas DataFrame으로 변환
df = pd.DataFrame(data_list)
# 수집된 데이터를 XCom에 저장하여 다음 태스크로 전달
kwargs['ti'].xcom_push(key='fine_dust_df', value=df)
# 수집된 데이터의 개수를 출력
print(f"총 {len(data_list)}개의 데이터를 수집했습니다.")
Pandas DataFrame
형식으로 변환한 후 Airflow의 XCom에 저장하여 다른 태스크에서 사용할 수 있도록 합니다.