본문 바로가기

Microsoft Fabric/Fabric 실습 2 - Spark

06. Apache Spark로 데이터 분석 연습하기

이전 포스팅을 참고하여 sales, products, customers 데이터를 가져옵니다. 

2024.01.23 - [Microsoft Fabric/Fabric 실습 시리즈 1] - 05. Fabric 레이크하우스에 데이터 수집하기

 

05. Fabric 레이크하우스에 데이터 수집하기

기본 데이터 수집 방법 Fabric 레이크하우스에 데이터를 로드하는 여러 가지 방법이 있습니다. 업로드: 로컬 파일 또는 폴더를 레이크하우스에 업로드합니다. 그런 다음, 파일 데이터를 탐색 및

d365-powerbi.tistory.com

customers.csv
2.03MB
products.csv
0.10MB

 

데이터 로드는 코드를 직접 작성하지 않아도 간단히 클릭해서 자동작성할 수 있습니다. 

data 폴더 아래 로드하고자 하는 파일 끝의 더보기 ... 를 클릭하여 데이터 로드를 클릭하고 Spark를 선택합니다. 

 
데이터 로드 연습이 되셨으면 이제 sales 테이블에서 제품을 판매한 고객의 수를 집계해 봅니다. 
 
데이터 로드 코드 중에 꼭 필요하지 않은 display(df) 줄은 삭제하고  
 
df = spark.read.format("csv").option("header","true").load("Files/data/sales.csv")
# df now is a Spark DataFrame containing CSV data from "Files/data/sales.csv".
 
그 아래 다음 코드를 추가합니다. 

customers = df['CustomerName', 'EmailAddress']
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())
 
결과는 다음과 같습니다.

 

이번에는 위의 코드를 수정하여 Road-250 Red, 52 제품을 구매한 고객을 확인해 봅니다. 

customers = df['CustomerName', 'EmailAddress'].where(df['Item']=='Road-250 Red, 52')
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())

 

결과는 다음과 같습니다.

 

이번에는 sales 테이블에서 제품별 주문 수량의 합계를 보겠습니다. 

productSales = df.select("Item", "Quantity").groupBy("Item").sum()
display(productSales)

 

이번에는 연간 판매 주문 수를 확인해 보겠습니다. 

from pyspark.sql.functions import *

yearlySales = df.select(year(col("OrderDate")).alias("Year")).groupBy("Year").count().orderBy("Year")
display(yearlySales)

 

다음은 함수를 사용해서 데이터를 변환해 봅니다. 

OrderDate 컬럼에서 Year와 Month를 추출하여 새로운 컬럼으로 생성하고

CustomerName 컬럼에서 FirstName과 LastName을 분리하여 새로운 컬럼으로 생성한 후 

열 순서를 정렬합니다.

from pyspark.sql.functions import *

## Year 와 Month 컬럼 생성하기
transformed_df = df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))

# FirstName 과 LastName 컬럼 생성하기
transformed_df = transformed_df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

# 필터링과 열정렬
transformed_df = transformed_df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "EmailAddress", "Item", "Quantity", "UnitPrice", "TaxAmount"]

# 상위 5개 데이터 표시하기
display(transformed_df.limit(5))

 

테이블 및 SQL 작업도 연습해 봅니다. 

 

먼저 테이블을 델타 형식으로 생성합니다. 

델타 레이크는 테이블에 관계형 데이터베이스 기능을 추가하는 Spark 기술입니다. 

트랜잭션, 행 버전 관리 및 기타 유용한 기능에 대한 지원을 포함합니다. 

Fabric의 데이터 레이크하우스에서는 델타 형식으로 테이블을 생성하는 것이 좋습니다. 

# 새 테이블 생성하기
df.write.format("delta").saveAsTable("salesorders")

# 테이블 상세정보 보기
spark.sql("DESCRIBE EXTENDED salesorders").show(truncate=False)

 

데이터 레이크에 salesorders 테이블이 생성되었는지 확인하고, 데이터 로드를 실행해 봅니다. 

 

이제 코드 셀에서 SQL 코드를 실행해 봅니다. 

%%sql
SELECT YEAR(OrderDate) AS OrderYear,
       SUM((UnitPrice * Quantity) + TaxAmount) AS GrossRevenue
FROM salesorders
GROUP BY YEAR(OrderDate)
ORDER BY OrderYear;