본문 바로가기

Microsoft Fabric/Fabric 실습 1 - Lakehouse

08. 스트리밍 데이터에 델타 테이블 사용

Delta Lake는 스트리밍 데이터를 지원합니다.

델타 테이블을 사용해서 가상의 IoT 디바이스의 스트리밍 데이터를 싱크해 봅니다. 

from notebookutils import mssparkutils
from pyspark.sql.types import *
from pyspark.sql.functions import *

# 폴더 생성하기
inputPath = 'Files/streamdata/'
mssparkutils.fs.mkdirs(inputPath)

# 폴더에서 데이터 조회해서 스트림 생성하기
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write some event data to the folder
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''
mssparkutils.fs.put(inputPath + "data.txt", device_data, True)
print("Source stream created...")

 

위 소스코드를 실행시키면 스트리밍 데이터가 생성됩니다. 

 

이번에는 스트리밍 디바이스 데이터를 델타 형식으로 iotdevicedata라는 폴더에 씁니다.

tables 폴더의 폴더 위치에 대한 경로이기 때문에 해당 폴더에 대한 테이블이 자동으로 만들어집니다.

# 델타 테이블에 스트리밍 데이터 쓰기
delta_stream_table_path = 'Tables/iotdevicedata'
checkpointpath = 'Files/delta/checkpoint'
deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")

 

%%sql 매직으로 데이터를 조회해 봅니다. 

%%sql

SELECT * FROM IotDeviceData;

 

스트리링 원본에 가상의 데이터를 추가하고 조회해 봅니다. 

# 스트리밍 원본에 가상 데이터 추가하기
more_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

%%sql

SELECT * FROM IotDeviceData;

 

스트림을 중단하는 코드는 아래와 같습니다. 

deltastream.stop()

 

 

Delta Lake는 Apache Spark에 관계형 데이터베이스 의미 체계를 추가하는 기술입니다. Microsoft Fabric 레이크하우스의 테이블은 Delta Lake를 기반으로 하므로 Delta Lake API를 통해 많은 고급 기능과 기술을 활용할 수 있습니다.

https://docs.delta.io/latest/index.html