이전 포스팅을 참고하여 sales, products, customers 데이터를 가져옵니다.
2024.01.23 - [Microsoft Fabric/Fabric 실습 시리즈 1] - 05. Fabric 레이크하우스에 데이터 수집하기
데이터 로드는 코드를 직접 작성하지 않아도 간단히 클릭해서 자동작성할 수 있습니다.
data 폴더 아래 로드하고자 하는 파일 끝의 더보기 ... 를 클릭하여 데이터 로드를 클릭하고 Spark를 선택합니다.
데이터 로드 연습이 되셨으면 이제 sales 테이블에서 제품을 판매한 고객의 수를 집계해 봅니다.
데이터 로드 코드 중에 꼭 필요하지 않은 display(df) 줄은 삭제하고
df = spark.read.format("csv").option("header","true").load("Files/data/sales.csv")
# df now is a Spark DataFrame containing CSV data from "Files/data/sales.csv".
그 아래 다음 코드를 추가합니다.
customers = df['CustomerName', 'EmailAddress']
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())
결과는 다음과 같습니다.
이번에는 위의 코드를 수정하여 Road-250 Red, 52 제품을 구매한 고객을 확인해 봅니다.
customers = df['CustomerName', 'EmailAddress'].where(df['Item']=='Road-250 Red, 52')
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())
결과는 다음과 같습니다.
이번에는 sales 테이블에서 제품별 주문 수량의 합계를 보겠습니다.
productSales = df.select("Item", "Quantity").groupBy("Item").sum()
display(productSales)
이번에는 연간 판매 주문 수를 확인해 보겠습니다.
from pyspark.sql.functions import *
yearlySales = df.select(year(col("OrderDate")).alias("Year")).groupBy("Year").count().orderBy("Year")
display(yearlySales)
다음은 함수를 사용해서 데이터를 변환해 봅니다.
OrderDate 컬럼에서 Year와 Month를 추출하여 새로운 컬럼으로 생성하고
CustomerName 컬럼에서 FirstName과 LastName을 분리하여 새로운 컬럼으로 생성한 후
열 순서를 정렬합니다.
from pyspark.sql.functions import *
## Year 와 Month 컬럼 생성하기
transformed_df = df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))
# FirstName 과 LastName 컬럼 생성하기
transformed_df = transformed_df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))
# 필터링과 열정렬
transformed_df = transformed_df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "EmailAddress", "Item", "Quantity", "UnitPrice", "TaxAmount"]
# 상위 5개 데이터 표시하기
display(transformed_df.limit(5))
테이블 및 SQL 작업도 연습해 봅니다.
먼저 테이블을 델타 형식으로 생성합니다.
델타 레이크는 테이블에 관계형 데이터베이스 기능을 추가하는 Spark 기술입니다.
트랜잭션, 행 버전 관리 및 기타 유용한 기능에 대한 지원을 포함합니다.
Fabric의 데이터 레이크하우스에서는 델타 형식으로 테이블을 생성하는 것이 좋습니다.
# 새 테이블 생성하기
df.write.format("delta").saveAsTable("salesorders")
# 테이블 상세정보 보기
spark.sql("DESCRIBE EXTENDED salesorders").show(truncate=False)
데이터 레이크에 salesorders 테이블이 생성되었는지 확인하고, 데이터 로드를 실행해 봅니다.
이제 코드 셀에서 SQL 코드를 실행해 봅니다.
%%sql
SELECT YEAR(OrderDate) AS OrderYear,
SUM((UnitPrice * Quantity) + TaxAmount) AS GrossRevenue
FROM salesorders
GROUP BY YEAR(OrderDate)
ORDER BY OrderYear;