# Ingesting and Parsing Streaming JSON from Azure Event Hubs into Microsoft Fabric Lakehouse

In this notebook, we will learn how to ingest and parse streaming JSON from Azure Event Hubs into a Microsoft Fabric Lakehouse using the power of Spark Structured Streaming.

**Prerequisite**: 

- Setup an Azure Event Hubs namespace with an Event Hub.
- Setup a stream from Event Hubs Data Explorer or from your local machine to send the sample JSON below. Instructions can be found [here](#).

- **Setup the connection string**

In [None]:
connectionString = "Endpoint=sb://<EVENT_HUB_NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<SHARED_ACCESS_KEY_NAME>;SharedAccessKey=<SHARED_ACCESS_KEY>;EntityPath=<EVENT_HUB_NAME>"
ehConf = {}
ehConf['eventhubs.connectionString'] = spark._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)

> Note: As a security best practice, it is recommended to keep your shared access key in Azure Key Vault. Use [Credentials utilities](https://learn.microsoft.com/en-ca/fabric/data-engineering/notebook-utilities#credentials-utilities) to access Azure Key Vault secrets in a Fabric notebook.

- **Import necessary libraries**

In [None]:
import pyspark.sql.functions as f
from pyspark.sql.functions import col, explode, expr, first
from pyspark.sql.types import *

- **Read stream from Event Hubs in a DataFrame**
- **Select and cast the body column as a string**
- **Process the JSON from the DataFrame**
- **Start streaming query**

In [None]:
# Read stream from Event Hubs in a DataFrame
df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

# Select and cast the body column as a string
raw_data = df.selectExpr("CAST(body AS STRING) as message")

# Process the JSON from the DataFrame
def process_json(df, epoch_id):
    messages = df.collect()
    for message in messages:
        raw_message = message["message"]
        json_df = spark.read.json(spark.sparkContext.parallelize([raw_message]))
        
        # Add a unique ID column using uuid
        json_df = json_df.withColumn("id", expr("uuid()"))
        
        # Explode the JSON column
        exploded_df = json_df.withColumn("json_col", explode(col("columns")))
        
        # Select the necessary columns dynamically
        columns = exploded_df.select("json_col.*").columns
        selected_df = exploded_df.select("id", "json_col.*")
        
        # Aggregate the columns to combine fields into a single row
        aggregated_df = selected_df.groupBy("id").agg(
            *[first(c, ignorenulls=True).alias(c) for c in columns]
        )
        
        # Cast all columns to string
        final_df = aggregated_df.select([col(c).cast(StringType()).alias(c) for c in aggregated_df.columns if c != "id"])
        
        # Write to Delta table
        final_df.write.format("delta").mode("append").saveAsTable("events")

#Start streaming query
query = raw_data \
  .writeStream \
  .foreachBatch(process_json) \
  .option("checkpointLocation", "Files/checkpoint") \
  .trigger(processingTime='5 seconds') \
  .start()

# Await termination
query.awaitTermination()

<mark>Don't forget to terminate the session.</mark>