11/29/23

Data Engineering Process Fundamentals - An introduction to Data Analysis and Visualization

In this technical presentation, we will delve into the fundamental concepts of Data Engineering in the areas of data analysis and visualization. We focus on these areas by using both a code-centric and low-code approach.

  • Follow this GitHub repo during the presentation: (Give it a star)

https://github.com/ozkary/data-engineering-mta-turnstile

  • Read more information on my blog at:

https://www.ozkary.com/2023/03/data-engineering-process-fundamentals.html

Presentation

YouTube Video

Section 1: Data Analysis Essentials

Data Analysis: Explore the fundamentals of data analysis using Python, unraveling the capabilities of libraries such as Pandas and NumPy. Learn how Jupyter Notebooks provide an interactive environment for data exploration, analysis, and visualization.

Data Profiling: With Python at our fingertips, discover how Jupyter Notebooks aid in data profiling—understanding data structures, quality, and characteristics. Witness the seamless integration with tools like pandas-profiling for comprehensive data insights.

Cleaning and Preprocessing: Dive into the world of data cleaning and preprocessing with Python's Pandas library, facilitated by the user-friendly environment of Jupyter Notebooks. See how Visual Studio Code enhances the coding experience for efficient data preparation.

Section 2: Statistical Analysis vs. Business Intelligence

Statistical Analysis: Embrace Python's statistical libraries, such as SciPy and StatsModels, within the Jupyter environment. Witness the power of statistical analysis for extracting patterns and correlations from data, all seamlessly integrated into your workflow with Visual Studio Code.

Business Intelligence: Contrast statistical analysis with the broader field of business intelligence, emphasizing the role of Python in data transformation. Utilize Jupyter Notebooks to showcase how Python's versatility extends to business intelligence applications.

Section 3: The Power of Data Visualization

Importance of Data Visualization: Unlock the potential of Python's visualization libraries, such as Matplotlib and Seaborn, within the interactive canvas of Jupyter Notebooks. Visual Studio Code complements this process, providing a robust coding environment for creating captivating visualizations.

Introduction to Tools: While exploring the importance of data visualization, let's talk about the powerful visualization tools like Power BI, Looker, and Tableau. Learn how this integration elevates your data storytelling capabilities.

Conclusion:

This session aims to equip attendees with a strong foundation in data engineering, focusing on the pivotal role of data analysis and visualization. By the end of this presentation, participants will grasp how to effectively utilize these practices, so they are able to start the journey on data analysis and visualization.

This presentation will be accompanied by live code demonstrations and interactive discussions, ensuring attendees gain practical knowledge and valuable insights into the dynamic world of data engineering.

Some of the technologies that we will be covering:

  • Data Analysis
  • Data Visualization
  • Python
  • Jupyter Notebook
  • Looker

Thanks for reading.

Send question or comment at Twitter @ozkary Originally published by ozkary.com

10/25/23

Data Engineering Process Fundamentals - Unveiling the Power of Data Lakes and Data Warehouses


In this technical presentation, we will delve into the fundamental concepts of Data Engineering, focusing on two pivotal components of modern data architecture - Data Lakes and Data Warehouses. We will explore their roles, differences, and how they collectively empower organizations to harness the true potential of their data.

- Follow this GitHub repo during the presentation: (Give it a star)


- Read more information on my blog at:



Presentation


YouTube Video


1. Introduction to Data Engineering:
- Brief overview of the data engineering landscape and its critical role in modern data-driven organizations.

2. Understanding Data Lakes:
- Explanation of what a data lake is and its purpose in storing vast amounts of raw and unstructured data.

3. Exploring Data Warehouses:
- Definition of data warehouses and their role in storing structured, processed, and business-ready data.

4. Comparing Data Lakes and Data Warehouses:
- Comparative analysis of data lakes and data warehouses, highlighting their strengths and weaknesses.
- Discussing when to use each based on specific use cases and business needs.

5. Integration and Data Pipelines:
- Insight into the seamless integration of data lakes and data warehouses within a data engineering pipeline.
- Code walkthrough showcasing data movement and transformation between these two crucial components.

6. Real-world Use Cases:
- Presentation of real-world use cases where effective use of data lakes and data warehouses led to actionable insights and business success.
- Hands-on demonstration using Python, Jupyter Notebook and SQL to solidify the concepts discussed, providing attendees with practical insights and skills.


Conclusion:

This session aims to equip attendees with a strong foundation in data engineering, focusing on the pivotal role of data lakes and data warehouses. By the end of this presentation, participants will grasp how to effectively utilize these tools, enabling them to design efficient data solutions and drive informed business decisions.

This presentation will be accompanied by live code demonstrations and interactive discussions, ensuring attendees gain practical knowledge and valuable insights into the dynamic world of data engineering.

Some of the technologies that we will be covering:

- Data Lakes
- Data Warehouse
- Data Analysis and Visualization
- Python
- Jupyter Notebook
- SQL

Send question or comment at Twitter @ozkary

Originally published by ozkary.com

10/15/23

AI with Python and Tensorflow - Convolutional Neural Networks Analysis

Convolutional neural network (CNN)

Convolutional Neural Networks (CNNs) have revolutionized the field of computer vision and image processing. These specialized deep learning models are inspired by the human visual system and excel at tasks like image classification, object detection, and facial recognition.

CNNs employ convolution operations, primarily used for processing images. The network initiates the analysis by applying filters that aim to extract valuable image features using various convolutional kernels. Similar to other weights in the neural network, these filters can be enhanced by adjusting their kernels based on the output error. After this, the resultant images undergo pooling, followed by pixel-wise input to a standard neural network in a process known as flattening.

AI convolutional neural network - ozkary

Model 1

Input (IMG_WIDTH, IMG_HEIGHT, 3)
|
Conv2D (32 filters, 3x3 kernel, ReLU)
|
MaxPooling2D (2x2 pool size)
|
Flatten
|
Dense (128 nodes, ReLU)
|
Dropout (50%)
|
Dense (NUM_CATEGORIES, softmax)
|
Output (NUM_CATEGORIES)
  1. Input Layer (Conv2D):

    • Type: Convolutional Layer (2D)
    • Number of Filters: 32
    • Filter Size: (3, 3)
    • Activation Function: Rectified Linear Unit (ReLU)
    • Input Shape: (IMG_WIDTH, IMG_HEIGHT, 3) where 3 represents the color channels (RGB).
  2. Pooling Layer (MaxPooling2D):

    • Type: Max Pooling Layer (2D)
    • Pool Size: (2, 2)
    • Purpose: Reduces the spatial dimensions by taking the maximum value from each group of 2x2 pixels.
  3. Flatten Layer (Flatten):

    • Type: Flattening Layer
    • Purpose: Converts the multidimensional input into a 1D array to feed into the Dense layer.
  4. Dense Hidden Layer (Dense):

    • Number of Neurons: 128
    • Activation Function: ReLU
    • Purpose: Learns and represents complex patterns in the data.
  5. Dropout Layer (Dropout):

    • Rate: 0.5
    • Purpose: Helps prevent overfitting by randomly setting 50% of the inputs to zero during training.
  6. Output Layer (Dense):

    • Number of Neurons: NUM_CATEGORIES (Number of categories for traffic signs)
    • Activation Function: Softmax
    • Purpose: Produces probabilities for each category, summing to 1, indicating the likelihood of the input image belonging to each category.

layers = tf.keras.layers

# Create a convolutional neural network
model =  tf.keras.models.Sequential([

    # Convolutional layer. Learn 32 filters using a 3x3 kernel
    layers.Conv2D(32, (3, 3), activation='relu', input_shape=(30, 30, 3)),

    # Max-pooling layer, reduces the spatial dimensions by taking the maximum value from each group of 2x2 pixels
    layers.MaxPooling2D((2, 2)),

    # Converts the multidimensional input into a 1D array to feed into the Dense layer
    layers.Flatten(),

    # Dense Hidden Layer with 128 nodes and relu activation function to learns and represent complex patterns in the data
    layers.Dense(128, activation='relu'),

    # Dropout layer to prevent overfitting by randomly setting 50% of the inputs to 0 at each update during training
    layers.Dropout(0.5),

    # Output layer with NUM_CATEGORIES outputs and softmax activation function to return probability-like results
    layers.Dense(NUM_CATEGORIES, activation='softmax')
])

Model 2

Input (IMG_WIDTH, IMG_HEIGHT, 3)
|
Conv2D (32 filters, 3x3 kernel, ReLU)
|
MaxPooling2D (2x2 pool size)
|
Conv2D (64 filters, 3x3 kernel, ReLU)
|
MaxPooling2D (2x2 pool size)
|
Flatten
|
Dense (128 nodes, ReLU)
|
Dropout (50%)
|
Dense (NUM_CATEGORIES, softmax)
|
Output (NUM_CATEGORIES)
  1. Input Layer (Conv2D):

    • Type: Convolutional Layer (2D)
    • Number of Filters: 32
    • Filter Size: (3, 3)
    • Activation Function: Rectified Linear Unit (ReLU)
    • Input Shape: (IMG_WIDTH, IMG_HEIGHT, 3) where 3 represents the color channels (RGB).
  2. Pooling Layer (MaxPooling2D):

    • Type: Max Pooling Layer (2D)
    • Pool Size: (2, 2)
    • Purpose: Reduces the spatial dimensions by taking the maximum value from each group of 2x2 pixels.
  3. Convolutional Layer (Conv2D):

    • Number of Filters: 64
    • Filter Size: (3, 3)
    • Activation Function: ReLU
    • Purpose: Extracts higher-level features from the input.
  4. Pooling Layer (MaxPooling2D):

    • Pool Size: (2, 2)
    • Purpose: Further reduces spatial dimensions.
  5. Flatten Layer (Flatten):

    • Type: Flattening Layer
    • Purpose: Converts the multidimensional input into a 1D array to feed into the Dense layer.
  6. Dense Hidden Layer (Dense):

    • Number of Neurons: 128
    • Activation Function: ReLU
    • Purpose: Learns and represents complex patterns in the data.
  7. Dropout Layer (Dropout):

    • Rate: 0.5
    • Purpose: Helps prevent overfitting by randomly setting 50% of the inputs to zero during training.
  8. Output Layer (Dense):

    • Number of Neurons: NUM_CATEGORIES (Number of categories for traffic signs)
    • Activation Function: Softmax
    • Purpose: Produces probabilities for each category, summing to 1, indicating the likelihood of the input image belonging to each category.

layers = tf.keras.layers

    # Create a convolutional neural network
    model = tf.keras.models.Sequential([

        # Convolutional layer. Learn 32 filters using a 3x3 kernel
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=(30, 30, 3)),

        # Max-pooling layer, reduces the spatial dimensions by taking the maximum value from each group of 2x2 pixels
        layers.MaxPooling2D((2, 2)),

        # Convolutional layer. Learn 64 filters using a 3x3 kernel to extracts higher-level features from the input
        layers.Conv2D(64, (3, 3), activation='relu'),

        # Max-pooling layer, using 2x2 pool size reduces spatial dimensions
        layers.MaxPooling2D((2, 2)),

        # Converts the multidimensional input into a 1D array to feed into the Dense layer
        layers.Flatten(),

        # Dense Hidden Layer with 128 nodes and relu activation function to learns and represent complex patterns in the data
        layers.Dense(128, activation='relu'),

        # Dropout layer to prevent overfitting by randomly setting 50% of the inputs to 0 at each update during training
        layers.Dropout(0.5),

        # Output layer with NUM_CATEGORIES outputs and softmax activation function to return probability-like results
        layers.Dense(NUM_CATEGORIES, activation='softmax')
    ])

The architecture follows a typical CNN pattern: alternating Convolutional and MaxPooling layers to extract features and reduce spatial dimensions, followed by Flattening and Dense layers for classification.

Feel free to adjust the number of filters, filter sizes, layer types, or other hyperparameters based on your specific problem and dataset. Experimentation is key to finding the best architecture for your task.

Model 1 Results

Images and Labels loaded 26640, 26640
Epoch 1/10
500/500 [==============================] - 7s 12ms/step - loss: 4.9111 - accuracy: 0.0545   
Epoch 2/10
500/500 [==============================] - 6s 12ms/step - loss: 3.5918 - accuracy: 0.0555
Epoch 3/10
500/500 [==============================] - 6s 12ms/step - loss: 3.5411 - accuracy: 0.0565
Epoch 4/10
500/500 [==============================] - 6s 12ms/step - loss: 3.5190 - accuracy: 0.0577
Epoch 5/10
500/500 [==============================] - 6s 12ms/step - loss: 3.5088 - accuracy: 0.0565
Epoch 6/10
500/500 [==============================] - 6s 12ms/step - loss: 3.5041 - accuracy: 0.0577
Epoch 7/10
500/500 [==============================] - 6s 12ms/step - loss: 3.5019 - accuracy: 0.0577
Epoch 8/10
500/500 [==============================] - 6s 12ms/step - loss: 3.5008 - accuracy: 0.0577
Epoch 9/10
500/500 [==============================] - 6s 12ms/step - loss: 3.5002 - accuracy: 0.0577
Epoch 10/10
500/500 [==============================] - 6s 12ms/step - loss: 3.4999 - accuracy: 0.0577
333/333 - 1s - loss: 3.4964 - accuracy: 0.0541 - 1s/epoch - 4ms/step

Model 2 Results


Images and Labels loaded 26640, 26640
Epoch 1/10
500/500 [==============================] - 9s 15ms/step - loss: 4.0071 - accuracy: 0.1315
Epoch 2/10
500/500 [==============================] - 7s 14ms/step - loss: 2.0718 - accuracy: 0.3963
Epoch 3/10
500/500 [==============================] - 7s 15ms/step - loss: 1.4216 - accuracy: 0.5529
Epoch 4/10
500/500 [==============================] - 7s 14ms/step - loss: 1.0891 - accuracy: 0.6546
Epoch 5/10
500/500 [==============================] - 7s 14ms/step - loss: 0.8440 - accuracy: 0.7320
Epoch 6/10
500/500 [==============================] - 7s 14ms/step - loss: 0.6838 - accuracy: 0.7862
Epoch 7/10
500/500 [==============================] - 7s 14ms/step - loss: 0.5754 - accuracy: 0.8184
Epoch 8/10
500/500 [==============================] - 7s 14ms/step - loss: 0.5033 - accuracy: 0.8420
Epoch 9/10
500/500 [==============================] - 7s 14ms/step - loss: 0.4171 - accuracy: 0.8729
Epoch 10/10
500/500 [==============================] - 7s 15ms/step - loss: 0.3787 - accuracy: 0.8851
333/333 - 2s - loss: 0.1354 - accuracy: 0.9655 - 2s/epoch - 6ms/step
Model saved to cnn_model2.keras.

Summary

This is a summary of the CNN model experiments:

Model 1 had a loss of 3.4964 and an accuracy of 0.0541. This model had a simple architecture with few layers and filters, which may have limited its ability to learn complex features in the input images.

Model 2 had a loss of 0.1354 and an accuracy of 0.9655. This model had a more complex architecture with additional hidden layers, including a convolutional layer with 64 filters and an additional max pooling (2x2) layer. The addition of these layers likely helped the model learn more complex features in the input images, leading to a significant improvement in accuracy.

In particular, the addition of more convolutional layers with more filters can help the model learn more complex features in the input images, as each filter learns to detect a different pattern or feature in the input. However, it is important to balance the number of filters with the size of the input images and the complexity of the problem, as using too many filters can lead to overfitting and poor generalization to new data.

Overall, the results suggest that increasing the complexity of the model by adding more hidden layers can help improve its accuracy, but it is important to balance the complexity of the model with the size of the input images and the complexity of the problem to avoid overfitting.

Learning rate

  • A learning rate of .001 (default) provided optimal results - loss: 0.1354 - accuracy: 0.9655
  • A learning rate of .01 lower the accuracy and increase the loss loss: 3.4858 - accuracy: 0.0594

A learning rate of 0.01 is too high for our specific problem and dataset, which can cause the model to overshoot the optimal solution and fail to converge.


model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

Thanks for reading.

Send question or comment at Twitter @ozkary

👍 Originally published by ozkary.com

8/5/23

Data Engineering Process Fundamentals - Data Streaming

In modern data engineering solutions, handling streaming data is very important. Businesses often need real-time insights to promptly monitor and respond to operational changes and performance trends. A data streaming pipeline facilitates the integration of real-time data into data warehouses and visualization dashboards. To achieve this integration in a data engineering solution, understanding the principles of data streaming is essential, and how technologies like Apache Kafka and Apache Spark play a key role in building efficient streaming data pipelines.

👉 Data Engineering Process Fundamentals - Data Analysis and Visualization

What is Data Streaming

Data streaming enables us to build data integration in real-time. Unlike traditional batch processing, where data is collected and processed periodically, streaming data arrives continuously by and is processed on-the-fly. This kind of integration empowers organizations to:

  • React Instantly: Timely responses to events and anomalies become possible
  • Predict Trends: Identify patterns and trends as they emerge
  • Enhance User Experience: Provide real-time updates and personalization
  • Optimize Operations: Streamline processes and resource allocation

ozkary-data-engineering-design-data-streaming-messages

Data Streaming Channels

Data streaming is a continuous data flow which can arrive from a channel that is usually hosted on an HTTP end-point. The type of the channel technology depends on the provider technology stack and can be any of the following:

  • Web Hooks: Web hooks are like virtual messengers that notify us when something interesting happens on the web. They are HTTP callbacks triggered by specific events, such as a change in a system. To harness data from web hooks, we set up endpoints that listen for these notifications, allowing us to react instantly to changes.

  • Events: Events are a fundamental concept in data streaming. They represent occurrences in a system or application, such as a user click, a sensor detecting a temperature change, or a train arrival to a station. Events can be collected and processed in real-time by using a middleware platform like Apache Kafka or RabbitMQ, providing insights into user behavior, system health, and more.

  • API Integration: APIs (Application Programming Interfaces) are bridges between different software systems. Through API integration, we can fetch data from external services, social media platforms, IoT devices, or any source that exposes an API. This seamless connectivity enables us to incorporate external data into our applications and processes by scheduling calls to the API at a certain frequency.

👍 Events are used for a wide range of real-time applications, including IoT data collection, application monitoring, and user behavior tracking. Web hooks are typically employed for integrating external services, automating workflows, and receiving notifications from third-party platforms.

Scaling to Handle a Data Stream

Data streaming sources often produce small payload size with high volume of messages. This introduces scalability concerns that should be addressed with essential components like the following:

  • Streaming Infrastructure: Robust streaming infrastructure is the backbone of data streaming. This includes systems like Apache Kafka, AWS Kinesis, or Azure Stream Analytics, which facilitate the ingestion, processing, and routing of data streams

  • Real-Time Processing: Traditional batch processing won't cut it for data streaming. We need real-time processing frameworks like Apache Storm, or Apache Spark Streaming to handle data as it flows

  • Data Storage: Storing and managing streaming data is crucial. we might use data lakes for long-term storage and databases optimized for real-time access. Cloud storage solutions offer scalability and reliability

  • Analytics and Visualization: To derive meaningful insights, we need analytics tools capable of processing streaming data. Visualization platforms like PowerBI, Looker, or custom dashboards can help you make sense of the information in real time

  • Monitoring and Alerts: Proactive monitoring ensures that your data streaming pipeline is healthy. Implement alerts and triggers to respond swiftly to anomalies or critical events

  • Scalable Compute Resources: As data volumes grow, compute resources should be able to scale horizontally to handle increased data loads. Cloud-based solutions are often used for this purpose

Data Streaming Components

At the heart of data streaming solutions lies technologies like Apache Kafka, a distributed event streaming platform, and Apache Spark, a versatile data processing engine. Together, they form a powerful solution that ingests, processes, and analyzes streaming data at scale.

ozkary-data-engineering-design-data-streaming

Apache Kafka

Kafka acts as the ingestion layer or message broker in the streaming pipeline. It serves as a highly durable, fault-tolerant, and scalable event streaming platform. Data producers, which can be various sources like applications, sensors, or webhooks publish events (messages) to Kafka topics. These events are typically small pieces of data containing information such as transactions, logs, or sensor readings. Let's look at a simplified overview of how Kafka works:

  • Kafka organizes events into topics. A topic is a logical channel or category to which records (messages) are sent by producers and from which records are consumed by consumers. Topics serve as the central mechanism for organizing and categorizing data within Kafka. Each topic can have multiple partitions to support fail-over scenarios

    • Kafka is distributed and provides fault tolerance. If a broker (Kafka server) fails, partitions can be replicated across multiple brokers
  • Kafka follows a publish-subscribe model. Producers send records to topics, and consumers subscribe to one or more topics to receive and process those records

    • A producer is a program or process responsible for publishing records to Kafka topics. Producers generate data, which is then sent to one or more topics. Each message in a topic is identified by an offset, which represents its position within the topic.

    • A consumer is a program or process that subscribes to one or more topics and processes the records within them. Consumers can read data from topics in real-time and perform various operations on it, such as analytics, storage, or forwarding to other systems

Apache Spark Structured Streaming

Apache Spark Structured Streaming is a micro-batch processing framework built on top of Apache Spark. It enables real-time data processing by ingesting data from Kafka topics in mini-batches. Here's how the process works:

👍 Apache Spark offers a unified platform for both batch and stream processing. If your application requires seamless transitions between batch and stream processing modes, Spark can be a good fit.

  • Kafka Integration: Spark Streaming integrates with Kafka using the Kafka Direct API. It can consume data directly from Kafka topics, leveraging Kafka's parallelism and fault tolerance features

  • Mini-Batch Processing: Spark Streaming reads data from Kafka topics in mini-batches, typically ranging from milliseconds to seconds. Each mini-batch of data is treated as a Resilient Distributed Dataset (RDD) within the Spark ecosystem

  • Data Transformation: Once the data is ingested into Spark Streaming, we can apply various transformations, computations, and analytics on the mini-batches of data. Spark provides a rich set of APIs for tasks like filtering, aggregating, joining, and machine learning

  • Windowed Operations: Spark Streaming allows us to perform windowed operations, such as sliding windows or tumbling windows, to analyze data within specific time intervals. This is useful for aggregating data over fixed time periods (e.g., hourly, daily) or for tracking patterns over sliding windows

  • Output: After processing, the results can be stored in various destinations, such as a data lake (e.g., Hadoop HDFS), a data warehouse (e.g., BigQuery, Redshift), or other external systems. Spark provides connectors to these storage solutions for seamless data persistence

Benefits of a Kafka and Spark Integration

A Kafka and Spark integration enables us to build solutions with High Availability requirements due to the following features:

  • Fault Tolerance: Kafka ensures that events are not lost even in the face of hardware failures, making it a reliable source of data

  • Scalability: Kafka scales horizontally, allowing you to handle increasing data volumes by adding more Kafka brokers

  • Flexibility: Spark Streaming's flexibility in data processing and windowing operations enables a wide range of real-time analytics

  • End-to-End Pipeline: By combining Kafka's ingestion capabilities with Spark's processing power, you can create end-to-end data streaming pipelines that handle real-time data ingestion, processing, and storage

Supported Programming Languages

When it comes to programming language support, both Kafka and Spark allows developers to choose the language that aligns best with their skills and project requirements.

  • Kafka supports multiple programming languages, including Python, C#, and Java

  • Spark also support multiple programming languages like PySpark (Python), Scala, and even R for data processing tasks. Additionally, Spark allows users to work with SQL-like expressions, making it easier to perform complex data transformations and analysis

Sample Python Code for a Kafka Producer

This is a very simple implementation of a Kafka producer using Python as the programming language. This code does not consume a data feed from a provider. It only shows how a producer sends messages to a Kafka topic.


from kafka import KafkaProducer
import time

kafka_broker = "localhost:9092"

# Create a Kafka producer instance
producer = KafkaProducer(
    bootstrap_servers=kafka_broker,  # Replace with your Kafka broker's address
    value_serializer=lambda v: str(v).encode('utf-8')
)

# Sample data message (comma-delimited)
sample_message = "timestamp,station,turnstile_id,device_id,entry,exit,entry_datetime"

try:
    # continue to run until the instance is shutdown
    while True:
        # Simulate generating a new data message. This data should come from the data provider
        data_message = sample_message + f"\n{int(time.time())},StationA,123,456,10,15,'2023-07-12 08:30:00'"

        # Send the message to the Kafka topic
        producer.send('turnstile-stream', value=data_message)

        # add logging information for tracking
        print("Message sent:", data_message)
        time.sleep(1)  # Sending messages every second
except KeyboardInterrupt:
    pass
finally:
    producer.close()

This Kafka producer code initializes a producer and sends a sample message to the specified Kafka topic. Let's review each code segment:

  • Create Kafka Producer Configuration:

    • Set up the Kafka producer configuration
    • Specify the Kafka broker(s) to connect to (bootstrap.servers)
  • Define Kafka Topic: Define the Kafka topic to send messages (turnstile-stream in this example)

  • Create a Kafka Producer:

    • Create an instance of the Kafka producer with the broker end-point
    • Use a value_serializer to encode the string message with unicode utf-8
  • Define Message Contents:

    • Prepare the message to send as a CSV string with the header and value information
  • Produce Messages:

    • Use the send method of the Kafka producer to send messages to the Kafka topic, turnstile-stream
  • Close the Kafka Producer:

    • Always remember to close the Kafka producer when the application terminates to avoid leaving open connections on the broker

Sample Python Code for a Kafka Consumer and Spark Client

After looking at the Kafka producer code, let's take a look at how a Kafka consumer on Spark would consume and process the messages.


from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.functions import window, sum

# Create a Spark session
spark = SparkSession.builder.appName("TurnstileStreamApp").getOrCreate()

# Create a StreamingContext with a batch interval of 5 seconds
ssc = StreamingContext(spark.sparkContext, 5)

kafka_broker = "localhost:9092"

# Define the Kafka broker and topic to consume from
kafkaParams = {
    "bootstrap.servers": kafka_broker,  # Replace with your Kafka broker's address
    "auto.offset.reset": "latest",
}
topics = ["turnstile-stream"]

# Create a Kafka stream
kafkaStream = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)

# Parse the Kafka stream as a DataFrame
lines = kafkaStream.map(lambda x: x[1])
df = spark.read.csv(lines)

# Define a window for aggregation (4-hour window)
windowed_df = df
  .withWatermark("entry_datetime", "4 hours") \
  # 4-hour window with a 4-hour sliding interval
  .groupBy("station", window("entry_datetime", "4 hours")) 
  .agg(
    sum("entries").alias("entries"),
    sum("exits").alias("exits")
  )

# Write the aggregated data to a blob storage as compressed CSV files
query = windowed_df.writeStream\
    .outputMode("update")\
    .foreachBatch(lambda batch_df, batch_id: batch_df.write\
        .mode("overwrite")\
        .csv("gs://your-bucket-name/")  # Replace with your blob storage path
    )\
    .start()

query.awaitTermination()

This simple example shows how to write a Kafka consumer, use Spark to process and aggregate the data, and finally write a csv file to the data lake. Let’s look at each code segment for more details:

  • Create a Spark Session:

    • Initialize a Spark session with the name "TurnstileStreamApp"
  • Create a StreamingContext:

    • Set up a StreamingContext with a batch interval of 5 seconds. This determines how often Spark will process incoming data
  • Define Kafka Broker and Topic:

    • Specify the Kafka broker's address (localhost:9092 in this example) and the topic to consume data from ("turnstile-stream")
  • Create a Kafka Stream:

    • Use KafkaUtils to create a direct stream from Kafka
    • The stream will consume data from the specified Kafka topic
  • Parse the Kafka Stream:

    • Extract the message values from the Kafka stream
    • Read these messages into a DataFrame (df) using Spark's CSV reader
  • Define a Window for Aggregation:

    • We specify the watermark for late data using withWatermark. This ensures that any data arriving later than the specified window is still considered for aggregation
    • Create a windowed DataFrame (windowed_df) by grouping data based on "station" and a 4-hour window
    • The "entry_datetime" column is used as the timestamp for windowing
    • Aggregations are performed to calculate the sum of "entries" and "exits" within each window
  • Write the Aggregated Data to a Data Lake:

    • Define a streaming query (query) to write the aggregated data to a blob storage path
    • The "update" output mode indicates that only updated results will be written
    • A function is applied to each batch of data, which specifies how to write the data
    • In this case, data is written as compressed CSV files to a data lake location
    • The awaitTermination method ensures the query continues to run and process data until manually terminated.

This Spark example processes data from Kafka, aggregates it in 4-hour windows, and it writes the results to blob storage. The code is structured to efficiently handle real-time streaming data and organize the output into folders in the data lake based on station names and time windows. In each folder, Spark will generate filenames automatically based on the default naming convention. Typically, it uses a combination of a unique identifier and partition number to create filenames. The exact format of the file name might vary depending on the Spark version and configuration. This approach is used to send the information to a data lake, so the data transformation process can pick it up and send to a data warehouse.

Alternatively, the Spark client can send the aggregated results directly in the data warehouse. The Spark client can connect to the data warehouse, so it can directly insert the information without using a data lake as an staging step.

Solution Design and Architecture

For our solution strategy, we followed the design shown below. This design helps us ensure smooth flow, efficient processing and storage of data so that it is immediately available in our data warehouse consequently, the visualization tools. Let's break down each component and explain its purpose.

ozkary-data-engineering-design-data-streaming

Components

  • Real-Time Data Source: This is an external data source, which continuously emits data as events or messages

  • Message Broker Layer: Our message broker layer is the central hub for data ingestion and distribution. It consists of two vital components:

    • Kafka Broker Instance: Kafka acts as a scalable message broker and entry point for data ingestion. It efficiently collects and organizes data in topics from the source
    • Kafka Producer (Python): To bridge the gap between the data source and Kafka, we write a Python-based Kafka producer. This component is responsible for capturing data from the real-time source and forwarding it to the Kafka instance and corresponding topic
  • Stream Processing Layer: The stream processing layer is where the messages from Kafka are processed, aggregated and sent to the corresponding data storage. This layer also consists of two key components:

    • Spark Instance: Apache Spark, a high-performance stream processing framework, is responsible for processing and transforming data in real-time
    • Stream Consumer (Python): In order to consume the messages from a Kafka topic, we write a Python component that acts as both a Kafka Consumer and Spark application.
      • The Kafka consumer retrieves data from the Kafka topic, ensuring that the data is processed as soon as it arrives
      • The Spark application process the messages, aggregates the data and saves the results in the data warehouse. This dual role ensures efficient data processing and storage.
  • Data Warehouse: As the final destination for our processed data, the data warehouse provides a reliable and structured repository for storing the results of our real-time data processing, so visualization tools like Looker and PowerBI can display the data as soon as the dashboards are refreshed

👉 We should note that dashboards query the data from the database. For a near real-time data to be available, the dashboard data needs to be refreshed at certain intervals (e.g., minutes or hourly). To make available the real-time data to the dashboard, there needs to be a live connection (socket) between the dashboard and the streaming platform, which is done by another system component, like Redis Cache or custom service, that could push those events on a socket channel.

DevOps Support

  • Containerization: In order to continue to meet our DevOps requirements, enhance scalability and manageability, and follow best enterprise level practices, we use Docker containers for all of our components. Each component, our Kafka and Spark instance as well as our two Python-based components, runs in separate Docker container. This ensures modularity, easy deployment, and resource isolation

    • This approach also enable us to use a Kubernetes cluster , a container orchestration platform that can help us manage and deploy Docker containers at scale, to run our components. We could use Minikube for local development or use a cloud-managed Kubernetes service like Google Kubernetes Engine (GKE), Amazon Elastic Kubernetes Service (EKS), or Azure Kubernetes Service (AKS)
  • Virtual Machine (VM): Our components need to run on a VM, so we follow the same approach and create a VM instance using a Terraform script, similar to how it was done for our batch data pipeline during our planning and design phase

Advantages

Our data streaming design offers several advantages:

  • Real-time Processing: Data is processed as it arrives, enabling timely insights and rapid response to changing conditions
  • Scalability: The use of Kafka and Spark allows us to scale our architecture effortlessly to handle growing data volumes
  • Containerization: Docker containers simplify deployment and management, making our system highly portable and maintainable
  • Integration: The seamless integration of Kafka, Spark, and the Kafka consumer as a Spark client ensures data continuity and efficient processing

This data streaming strategy, powered by Kafka and Spark, empowers us to unlock real-time insights from our data streams, providing valuable information for rapid decision-making, analytics, and storage.

Summary

In today's data-driven landscape, data streaming solutions are an absolute necessity, enabling the rapid processing and analysis of vast amounts of real-time data. Technologies like Kafka and Spark play a pivotal role in empowering organizations to harness real-time insights from their data streams.

Kafka and Spark, work together seamlessly to enable real-time data processing and analytics. Kafka handles the reliable ingestion of events, while Spark Streaming provides the tools for processing, transforming, analyzing, and storing the data in a data lake or data warehouse in near real-time, allowing businesses to make decisions much at a much faster pace.

Exercise - Data Streaming with Apache Kafka

Now that we have defined the data streaming strategy, we can continue our journey and build a containerized Kafka producer that can consume real-time data sources. Let's take a look at that next.

Coming soon!

👉 Data Engineering Process Fundamentals - Data Streaming With Apache Kafka Exercise

Thanks for reading.

Send question or comment at Twitter @ozkary

👍 Originally published by ozkary.com

7/8/23

Data Engineering Process Fundamentals - Data Analysis and Visualization Exercise

Data analysis and visualization are fundamental to a data-driven decision-making process. To grasp the best strategy for our scenario, we delve into the data analysis and visualization phase of the process, making data models, analyzes and diagrams that allow us to tell stories from the data.

With the understanding of best practices for data analysis and visualization, we start by creating a code-based dashboard using Python, Pandas and Plotly. We then follow up by using a high-quality enterprise tool, such as Looker, to construct a low-code cloud-hosted dashboard, providing us with insights into the type of effort each method takes.

👍 This is a dashboard created with Looker. Similar dashboards can be created with PowerBI and Tableau

ozkary-data-engineering-analysis-visualization-dashboard

Once we have designed our dashboard, we can align it with our initial requirements and proceed to formulate the data analysis conclusions, thereby facilitating informed business decisions for stakeholders. However, before delving into coding, let's commence by reviewing the data analysis specifications, which provide the blueprint for our implementation effort.

Specifications

At this stage of the process, we have a clear grasp of the requirements and a deep familiarity with the data. With these insights, we can now define our specifications as outlined below:

  • Identify pertinent measures such as exits and entries
  • Conduct distribution analysis based on station
    • This analysis delineates geographical boundary patterns
  • Conduct distribution analysis based on days of the week and time slots

By calculating the total count of passengers for arrivals and departures, we gain a holistic comprehension of passenger flow dynamics. Furthermore, we can employ distribution analysis to investigate variations across stations, days of the week, and time slots. These analyses provide essential insights for business strategy and decision-making, allowing us to identify peak travel periods, station preferences, and time-specific trends that can help us make informed decisions.

Data Analysis Requirements

In our analysis process, we can adhere to these specified requirements:

  • Determine distinct time slots for morning and afternoon analysis:
    12:00 AM - 3:59 AM
    04:00 AM - 7:59 AM
    08:00 AM - 11:59 AM
    12:00 PM - 3:59 PM
    04:00 PM - 7:59 PM
    08:00 PM - 11:59 PM
    
  • Examine data regarding commuter exits (arrivals) and entries (departures)
  • Implement a master filter for date ranges, which exerts control over all charts
  • Incorporate a secondary filter component to facilitate station selection
  • Display the aggregate counts of entries and exits for the designated date range
    • Employ score card components for this purpose
  • Investigate station distributions to identify the most frequented stations
    • Utilize donut charts, with the subway station name as the primary dimension
  • Analyze distributions using the day of the week to unveil peak traffic days
    • Employ bar charts to visualize entries and exits per day
  • Explore distributions based on time slots to uncover daily peak hours
    • Integrate bar charts to illustrate entries and exits within each time slot

Dashboard Design

In the dashboard design, we can utilize a two-column layout, positioning the exits charts in the left column and the entries charts in the right column of the dashboard. Additionally, we can incorporate a header container to encompass the filters, date range, and station name. To support multiple devices, we need a responsive layout. We should note that when using a platform like Looker, there is really no responsive layout, but we need to define different layouts for mobile and desktop.

Layout Configuration:

  • Desktop 1200px by 900px
  • Mobile 360px by 1980px

UI Components

For our dashboard components, we should incorporate the following:

  • Date range picker
  • Station name list box
  • For each selected measure (exits, entries), we should employ a set of the following components:
    • Score cards for the total numbers
    • Donut charts for station distribution
    • Bar charts for day of the week distribution
    • Bar charts for time slot distribution

Review the Code - Code Centric

The dashboard layout is done using HTML for the presentation and Python to build the different HTML elements using the dash library. All the charts are generated by plotly.

# Define the layout of the app
app.layout = html.Div([
    html.H4("MTA Turnstile Data Dashboard"),

    dcc.DatePickerRange(
        id='date-range',
        start_date=data['created_dt'].min(),
        end_date=data['created_dt'].max(),
        display_format='YYYY-MM-DD'
    ),

    dbc.Row([
        dbc.Col(
            dbc.Card(
                dbc.CardBody([
                    html.P("Total Entries"),
                    html.H5(id='total-entries')
                ]),
                className='score-card'
            ),
            width=6
        ),
        dbc.Col(
            dbc.Card(
                dbc.CardBody([
                    html.P("Total Exits"),
                    html.H5(id='total-exits')
                ]),
                className='score-card'
            ),
            width=6
        )
    ], className='score-cards'),

    dbc.Row([
            dbc.Col(
                dcc.Graph(id='top-entries-stations', className='donut-chart'),
                width=6
            ),
            dbc.Col(
                dcc.Graph(id='top-exits-stations', className='donut-chart'),
                width=6
            )
    ], className='donut-charts'),

    dbc.Row([
                dbc.Col(
                    dcc.Graph(id='exits-by-day', className='bar-chart'),
                    width=6
                ),
                dbc.Col(
                    dcc.Graph(id='entries-by-day', className='bar-chart'),
                    width=6
                )
    ], className='bar-charts'),

    dbc.Row([
                dbc.Col(
                    dcc.Graph(id='exits-by-time', className='bar-chart'),
                    width=6
                ),
                dbc.Col(
                    dcc.Graph(id='entries-by-time', className='bar-chart'),
                    width=6
                )
    ], className='bar-charts')

])

The provided Python code is building a web application dashboard layout using Dash, a Python framework for creating interactive web applications. This dashboard is designed to showcase insights and visualizations derived from MTA Turnstile Data. Here's a breakdown of the main components:

  • App Layout: The app.layout defines the overall structure of the dashboard using the html.Div component. It acts as a container for all the displayed components

  • Title: html.H4("MTA Turnstile Data Dashboard") creates a header displaying the title of the dashboard

  • Date Picker Range: The dcc.DatePickerRange component allows users to select a date range for analysis. It's a part of Dash Core Components (dcc)

  • Score Cards: The dbc.Row and dbc.Col components create rows and columns for displaying score cards using dbc.Card and dbc.CardBody. These cards show metrics like "Total Entries" and "Total Exits"

  • Donut Charts: Another set of dbc.Row and dbc.Col components creates columns for displaying donut charts using the dcc.Graph component. These charts visualize the distribution of top entries and exits by station

  • Bar Charts: Similar to the previous sections, dbc.Row and dbc.Col components are used to create columns for displaying bar charts using the dcc.Graph component. These charts showcase the distribution of exits and entries by day of the week and time slot

  • CSS Classnames: The className attribute is used to apply CSS class names to the components, allowing for custom styling using CSS

In summary, the code establishes the layout of the dashboard with distinct sections for date selection, score cards, donut charts, and bar charts. The various visualizations and metrics offer valuable insights into MTA Turnstile Data, enabling users to comprehend passenger flow patterns and trends effectively.


def update_dashboard(start_date, end_date):
    filtered_data = data[(data['created_dt'] >= start_date) & (data['created_dt'] <= end_date)]   

    total_entries = filtered_data['entries'].sum() / 1e12  # Convert to trillions
    total_exits = filtered_data['exits'].sum() / 1e12  # Convert to trillions

    measures = ['exits','entries']    
    filtered_data["created_dt"] = pd.to_datetime(filtered_data['created_dt'])  
    measures = ['exits','entries']  

    exits_chart , entries_chart = create_station_donut_chart(filtered_data)
    exits_chart_by_day ,entries_chart_by_day = create_day_bar_chart(filtered_data, measures)
    exits_chart_by_time, entries_chart_by_time = create_time_bar_chart(filtered_data, measures)

    return (
        f"{total_entries:.2f}T",
        f"{total_exits:.2f}T",
        entries_chart,
        exits_chart,
        exits_chart_by_day,
        entries_chart_by_day,
        exits_chart_by_time,
        entries_chart_by_time
    )

The update_dashboard function is responsible for updating and refreshing the dashboard. It handles the data range change event. As the user changes the date range, this function takes in the start and end dates as inputs. The function then filters the dataset, retaining only the records falling within the specified date range. Subsequently, the function calculates key metrics for the dashboard's score cards. It computes the total number of entries and exits during the filtered time period, and these values are converted to trillions for better readability.

The code proceeds to generate various visual components for the dashboard. These components include donut charts illustrating station-wise entries and exits, bar charts showcasing entries and exits by day of the week, and another set of bar charts displaying entries and exits by time slot. Each of these visualizations is created using specialized functions like create_station_donut_chart, create_day_bar_chart, and create_time_bar_chart.

Finally, the function compiles all the generated components and calculated metrics into a tuple. This tuple is then returned by the update_dashboard function, containing values like total entries, total exits, and the various charts.

def create_station_donut_chart(df: pd.DataFrame ) -> Tuple[go.Figure, go.Figure]:
    """
     creates the station distribution donut chart   
    """
    top_entries_stations = df.groupby('station_name').agg({'entries': 'sum'}).nlargest(10, 'entries')
    top_exits_stations = df.groupby('station_name').agg({'exits': 'sum'}).nlargest(10, 'exits')

    entries_chart = px.pie(top_entries_stations, names=top_entries_stations.index, values='entries',
                           title='Top 10 Stations by Entries', hole=0.3)
    exits_chart = px.pie(top_exits_stations, names=top_exits_stations.index, values='exits',
                         title='Top 10 Stations by Exits', hole=0.3)

    entries_chart.update_traces(marker=dict(colors=px.colors.qualitative.Plotly))
    exits_chart.update_traces(marker=dict(colors=px.colors.qualitative.Plotly))
    return entries_chart, exits_chart

The create_station_donut_chart function is responsible for generating donut charts to visualize the distribution of entries and exits across the top stations. It starts by selecting the top stations based on the highest entries and exits from the provided DataFrame. Using Plotly Express, the function then constructs two separate donut charts: one for the top stations by entries and another for the top stations by exits.

Each donut chart provides a graphical representation of the distribution, where each station is represented by a segment based on the number of entries or exits it recorded. The charts are presented in a visually appealing manner with a central hole for a more focused view.

def create_day_bar_chart(df: pd.DataFrame, measures: List[str]) -> Tuple[go.Figure, go.Figure]:
    """
    Creates a bar chart using the week days from the given dataframe.
    """
    measures = ['exits','entries']
    day_categories = ['Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat']   
    group_by_date = df.groupby(["created_dt"], as_index=False)[measures].sum()

    df['weekday'] = pd.Categorical(df['created_dt'].dt.strftime('%a'),
                                                 categories=day_categories,
                                                 ordered=True)        

    group_by_weekday =  df.groupby('weekday', as_index=False)[measures].sum()

    exits_chart_by_day = px.bar(group_by_weekday, x='weekday', y='exits', color='weekday',
                                title='Exits by Day of the Week', labels={'weekday': 'Day of the Week', 'exits': 'Exits'},
                                color_discrete_sequence=['green'])

    entries_chart_by_day = px.bar(group_by_weekday, x='weekday', y='entries', color='weekday',
                                  title='Entries by Day of the Week', labels={'weekday': 'Day of the Week', 'entries': 'Entries'},
                                  color_discrete_sequence=['orange'])

    # Hide the legend on the side
    exits_chart_by_day.update_layout(showlegend=False)
    entries_chart_by_day.update_layout(showlegend=False)   

    # Return the chart
    return exits_chart_by_day, entries_chart_by_day

The create_day_bar_chart function is responsible for generating bar charts that illustrate the distribution of data based on the day of the week. Due to the limitations of the date-time data type not inherently containing day information, the function maps the data to the corresponding day category.

To begin, the function calculates the sum of the specified measures (entries and exits) for each date in the DataFrame using group_by_date. Next, it creates a new column named 'weekday' that holds the abbreviated day names (Sun, Mon, Tue, etc.) by applying the strftime method to the 'created_dt' column. This column is then transformed into a categorical variable using predefined day categories, ensuring that the order of days is preserved.

Using the grouped data by 'weekday', the function constructs two separate bar charts using Plotly Express. One chart visualizes the distribution of exits by day of the week, while the other visualizes the distribution of entries by day of the week.

def create_time_bar_chart(df: pd.DataFrame, measures : List[str] ) -> Tuple[go.Figure, go.Figure]:

    """
    Creates a bar chart using the time slot category
    """

    # Define time (hr) slots
    time_slots = {
        '12:00-3:59am': (0, 3, 0),
        '04:00-7:59am': (4, 7, 1),
        '08:00-11:59am': (8, 11, 2),
         '12:00-3:59pm': (12, 15, 3),
        '04:00-7:59pm': (16, 19, 4),
        '08:00-11:59pm': (20, 23, 5)
    }

    # Add a new column 'time_slot' based on time ranges
    def categorize_time(row):
        for slot, (start, end, order) in time_slots.items():
            if start <= row.hour <= end:
                return slot

    df['time_slot'] = df['created_dt'].apply(categorize_time)
    group_by_time = df.groupby('time_slot', as_index=False)[measures].sum()

    # Sort the grouped_data DataFrame based on the sorting value
    group_by_time_sorted = group_by_time.sort_values(by=['time_slot'], key=lambda x: x.map({slot: sort_order for slot, (_, _, sort_order) in time_slots.items()}))


    exits_chart_by_time = px.bar(group_by_time_sorted, x='time_slot', y='exits', color='time_slot',
                                title='Exits by Day of the Week', labels={'time_slot': 'Time of Day', 'exits': 'Exits'},
                                color_discrete_sequence=['green'])

    entries_chart_by_time = px.bar(group_by_time_sorted, x='time_slot', y='entries', color='time_slot',
                                  title='Entries by Day of the Week', labels={'time_slot': 'Time of Day', 'entries': 'Entries'},
                                  color_discrete_sequence=['orange'])
    # Hide the legend on the side
    exits_chart_by_time.update_layout(showlegend=False)
    entries_chart_by_time.update_layout(showlegend=False)

    return exits_chart_by_time, entries_chart_by_time

The create_time_bar_chart function is responsible for generating bar charts that depict the data distribution at specific times of the day. Just as with days of the week, the function maps and labels time ranges to create a new series, enabling the creation of these charts.

First, the function defines time slots using a dictionary, where each slot corresponds to a specific time range. For each data row, a new column named 'time_slot' is added based on the time ranges defined. This is achieved by using the categorize_time function, which checks the hour of the row's timestamp and assigns it to the appropriate time slot.

The data is then grouped by 'time_slot', and the sum of the specified measures (exits and entries) is calculated for each slot. To ensure that the time slots are displayed in the correct order, the grouped data is sorted based on a sorting value derived from the time slots' dictionary.

Using the grouped and sorted data, the function constructs two bar charts using Plotly Express. One chart visualizes the distribution of exits by time of day, while the other visualizes the distribution of entries by time of day. Each bar in the chart represents the sum of exits or entries for a specific time slot.

Once the implementation of this Python dashboard is complete, we can run it and see the following dashboard load on our browser:

ozkary-data-engineering-analysis-visualization-dashboard

Requirements

These are the requirements to be able to run the Python dashboard.

👉 Clone this repo or copy the files from this folder. We could also create a GitHub CodeSpace and run this online.

  • Use the analysis_data.csv file for test data
    • Use the local file for this implementation
  • Install the Python dependencies
    • Type the following from the terminal
$ pip install pandas
$ pip install plotly
$ pip install dash
$ pip install dash_bootstrap_components

How to Run It

After installing the dependencies and downloading the code, we should be able to run the code from a terminal by typing:

$ python3 dashboard.py

We should note that this is a simple implementation to illustrate the amount of effort it takes to build a dashboard using code. The code uses a local CSV file. If we need to connect to the data warehouse, we need to expand this code to use an API call that is authorized to access the data warehouse. Writing Python dashboards or creating Jupyter charts, works well for small teams that are working closely together and are running experiments on the data. However, for a more enterprise solution, we should look at using a tool like Looker or PowerBI. Let's take a look at that next.

Review the Code - Low-Code

Tools like Looker and PowerBI excel in data visualization, requiring little to no coding. These tools offer a plethora of visual elements for configuring dashboards, minimizing the need for extensive coding. For instance, these platforms effortlessly handle tasks like automatically displaying the day of the week from a date-time field.

In cases where an out-of-the-box solution is lacking, we might need to supplement it with a code snippet. For instance, consider our time range requirement. Since this is quite specific to our project, we must generate a new series with our desired labels. To achieve this, we introduce a new field that corresponds to the date-time hour value. When the field is created, we are essentially implementing a function.

The provided code reads the hour value from the date-time fields and subsequently maps it to its corresponding label.

CASE 
    WHEN HOUR(created_dt) BETWEEN 0 AND 3 THEN "12:00-3:59am" 
    WHEN HOUR(created_dt) BETWEEN 4 AND 7 THEN "04:00-7:59am" 
    WHEN HOUR(created_dt) BETWEEN 8 AND 11 THEN "08:00-11:59am" 
    WHEN HOUR(created_dt) BETWEEN 12 AND 15 THEN "12:00-3:59pm" 
    WHEN HOUR(created_dt) BETWEEN 16 AND 20 THEN "04:00-7:59pm" 
    WHEN HOUR(created_dt) BETWEEN 20 AND 23 THEN "08:00-11:59pm" 
END

Requirements

The only requirement here is to sign up with Looker Studio and have access to a data warehouse or database that can serve data and is accessible from external sources.

👉 Sign-up for Looker Studio

Other Visualizations tools:

Looker UI

Take a look at the image below. This is the Looker UI. We should familiarize ourselves with the following areas:

ozkary-data-engineering-analysis-visualization-looker

  • Theme and Layout: Use it to configure the theme and change the layout for mobile or desktop
  • Add data: Use this to add a new data source
  • Add a chart: This allows us to add new charts
  • Add a control: Here, we can add the date range and station name list
  • Canvas: This is where we place all the components
  • Setup Pane: This allows us to configure the date range, dimension, measures, and sorting settings
  • Style Pane: Here, we can configure the colors and font
  • Data Pane: This displays the data sources with their fields. New fields are created as functions. When we hover over a field, we can see a function (fx) icon, which indicates that we can edit the function and configure our snippet

How to Build it

Sign up for a Looker account or use another BI tool and follow these steps:

  • Create a new dashboard
  • Click on the "Add Data" button
  • Use the connector for our data source:
    • This should allow us to configure the credentials for access
    • Select the "rpt_turnstile" view, which already includes joins with the fact_table and dimension tables
  • Once the data is loaded, we can see the dimensions and measures
  • Add the dashboard filters:
    • Include a date range control for the filter, using the "created_dt" field
    • Add a list control and associate it with the station name
  • Proceed to add the remaining charts:
    • Ensure that all charts are associated with the date range dimension
    • This enables filtering to cascade across all the charts
  • Utilize the "entries" and "exits" measures for all dashboards:
    • Integrate two scorecards for the sum of entries and exits
    • Incorporate a donut chart to display exits and entries distribution by stations
    • Incorporate two bar charts (entries and exits) and use the weekday value from the "created_dt" dimension
      • Sort them by the weekday. Use the day number (0-6), not the name (Sun-Sat). This is achieved by adding a new field with the following code and using it for sorting:
WEEKDAY(created_dt)
  • Create the time slot dimension field (click "Add Field" and enter this definition):
CASE 
    WHEN HOUR(created_dt) BETWEEN 0 AND 3 THEN "12:00-3:59am" 
    WHEN HOUR(created_dt) BETWEEN 4 AND 7 THEN "04:00-7:59am" 
    WHEN HOUR(created_dt) BETWEEN 8 AND 11 THEN "08:00-11:59am" 
    WHEN HOUR(created_dt) BETWEEN 12 AND 15 THEN "12:00-3:59pm" 
    WHEN HOUR(created_dt) BETWEEN 16 AND 19 THEN "04:00-7:59pm" 
    WHEN HOUR(created_dt) BETWEEN 20 AND 23 THEN "08:00-11:59pm" 
END
  • Add two bar charts (entries and exits) and use the time slot dimension:
    • Use the hour value from the "created_dt" dimension for sorting by adding a new field and using it as your sorting criteria:
HOUR(created_dt)

View the Dashboard

After following all the specification, we should be able to preview the dashboard on the browser. We can load an example, of a dashboard by clicking on the link below:

👉 View the dashboard online

👉 View the mobile dashboard online

This is a an image of the mobile dashboard.

ozkary-data-engineering-analysis-visualization-mobile-dashboard

Data Analysis Conclusions

By examining the dashboard, the following conclusions can be observed:

  • Stations with the highest distribution represent the busiest locations
  • The busiest time slot for both exits and entries is between 4pm and 9pm
  • Every day of the week exhibits a high volume of commuters
  • Businesses can choose the stations near their locations for further analysis

With these insights, strategies can be devised to optimize marketing campaigns and target users within geo-fenced areas and during specific hours of the day that are in close proximity to corresponding business locations.

Summary

We utilize our expertise in data analysis and visualization to construct charts and build them into dashboards. We adopt two distinct approaches for dashboard creation: a code-centric method and a low-code enterprise solution like Looker. After a comprehensive comparison, we deduce that the code-centric approach is optimal for small teams, whereas it might not suffice for enterprise users, especially when targeting executive stakeholders.

Lastly, as the dashboard becomes operational, we transition into the role of business analysts, deciphering insights from the data. This enables us to offer answers aligned with our original requirements.

Next

We have successfully completed our data pipeline from CSV files to our data warehouse and dashboard. Now, let's explore an advanced concept in data engineering: data streaming, which facilitates real-time data integration. This involves the continuous and timely processing of incoming data. Technologies like Apache Kafka and Apache Spark play a crucial role in enabling efficient data streaming processes. Let's take a closer look at these components next.

Coming Soon!

👉 [Data Engineering Process Fundamentals - Real-Time Data]

Thanks for reading.

Send question or comment at Twitter @ozkary

👍 Originally published by ozkary.com