1/22/25

Smart Charts: Powered by AI to enhance Data Understanding

Overview

This presentation explores how Generative AI, particularly Large Language Models (LLMs), can empower engineers with deeper data understanding. We'll delve into creating complex charts using Python and demonstrate how LLMs can analyze these visualizations, identify trends, and suggest actionable insights. Learn how to effectively utilize LLMs through prompt engineering with natural language and discover how this technology can save you valuable time and effort.

#BuildwithAI Series

Smart Charts: Powered by AI to enhance Data Understanding

  • Follow this GitHub repo during the presentation: (Give it a star and follow the project)

👉 https://github.com/ozkary/ai-engineering

YouTube Video

Video Agenda

Agenda:

  1. Introduction to LLMs and their Role in Data Analysis and Training

    • What are LLMs, and how do they work?
    • LLMs in the context of data analysis and visualization.
  2. Prompt Engineering - Guiding the LLM

    • Crafting effective prompts for chart analysis.
    • Providing context within the prompt (chart type, data).
  3. Tokens - The Building Blocks

    • Understanding the concept of tokens in LLMs.
    • How token limits impact prompt design and model performance.
  4. Let AI Help with Data Insights - Real Use Case

    • Creating complex charts using Python libraries.
    • Write Prompts for Chart Analysis
    • Utilizing an LLM to analyze the generated charts.
    • Demonstrating how LLMs can identify trends, anomalies, and potential areas for improvement.
  5. Live Demo - Create complex charts using python and ask AI to help you with the analysis

    • Live coding demonstration of creating a complex chart and using an LLM to analyze it.

Why Attend?

  • Discover how to leverage LLMs to gain deeper insights from your data visualizations.
  • Learn practical techniques for crafting effective prompts to guide LLM analysis.
  • Enhance your data analysis skills with the power of AI.

Presentation

What are LLM Models - Not Skynet

Large Language Model (LLM) refers to a class of Generative AI models that are designed to understand prompts and questions and generate human-like text based on large amounts of training data. LLMs are built upon Foundation Models which have a focus on language understanding.

Common Tasks

  • Text and Code Generation: LLMs can generate code and data analysis text based on specific prompts

  • Natural Language Processing (NLP): Understand and generate human language, sentiment analysis, translation

  • Text Summarization: LLMs can condense lengthy pieces of text into concise summaries

  • Question Answering: LLMs can access and process information from various sources to answer questions, making a great fit for chatbots

Smart Charts with AI: What are LLMs?

Training LLM Models - Secret Sauce

Models are trained using a combination of machine learning and deep learning. Massive datasets of text are collected, cleaned, and fed into complex neural networks with multiple layers. These networks iteratively learn by analyzing patterns in the data, allowing them to map inputs like chart data to desired outputs, such as chart analysis.

Training Process:

  • Data Collection: Sources from books, articles, code repositories, and online conversations

  • Preprocessing: Data cleaning and formatting for the ML algorithms to understand it effectively

  • Model Training: The neural network architecture is trained on the data. The network adjusts its internal parameters to learn how to map input data (user stories) to desired outputs (code snippets)

  • Fine-tuning: Fine-tune models for specific tasks like code generation, by training the model on relevant data (e.g., specific programming languages, coding conventions).

Smart Charts with AI: Neural-Network

Transformer Architecture - Not Autobots

Transformer is a neural network architecture that excels at processing long sequences of text by analyzing relationships between words, no matter how far apart they are. This allows LLMs to understand complex language patterns and generate human-like text.

Components

  • Encoder: Process the input (use story) by using multiple encoder layers with self-attention Mechanism to analyze the relationship between words

  • Decoder: Uses the encoded information and its own attention mechanism to generate the output text (like code), ensuring it aligns with the text.

  • Attention Mechanism: Enables the model to effectively focus on the most important information for the task at hand, leading to improved NLP and generation capabilities.

Smart Charts with AI: Transformers encoder decoder attention mechanism

👉 Read: Attention is all you need by Google, 2017

Fine-Tuning for Specific Domain

Fine-tuning LLMs is a process to specialize a pre-trained model into a specific domain like data analysis with your process information.

Process:

  • Use the knowledge and parameters gained from a large pretrained dataset, source model
  • Enhance the model performance by retraining the source model with a domain specific and smaller dataset
  • Use the target model for the final integration

Smart Charts with AI: Fine-tuning a model

Tokens - The Building Blocks of Language Models

Large language models work by dissecting text into a sequence of tokens. These tokens act as the building blocks, allowing it to grasp the essence, structure, and connections within the text.

Details

  • Tokens can be individual words, punctuation marks, or even smaller sub-word units, depending on the specific LLM architecture.
  • The length of a word can influence the number of tokens it generates.
  • Similar to how Lego bricks come in various shapes and sizes, tokens can vary depending on the model's design.
  • Measure cost.

👉 Think of tokens as Lego blocks

Prompt Engineering - What is it?

Prompt engineering is the process of designing and optimizing prompts to better utilize LLMs. Well described prompts can help the AI models better understand the context and generate more accurate responses.

Features

  • Clarity and Specificity: Effective prompts are clear, concise, and specific about the task or desired response

  • Task Framing: Provide background information, specifying the desired output format (e.g., code, email, poem), or outlining specific requirements

  • Examples and Counter-Examples: Including relevant examples and counterexamples within the prompt can further guide the LLM

  • Instructional Language: Use clear and concise instructions to improve the LLM's understanding of what information to generate

Smart Charts with AI: Data Analysis Prompt

Chart Analysis with AI

By combining existing charts with AI-driven analysis, we can unlock deeper insights, automate interpretation, and empower users to make more informed decisions.

Data Analysis Flow:

  • Chart Data: Identify the key data points from the chart (e.g., x-axis values, y-axis values, data labels, limits).

  • Chart Prompt: Format this information in a concise and human-readable format, such as:

"We are looking at a control chart measuring Curvature data points…"

  • Analysis Prompt: Provide details about what would you like to learn from the data:

"Interpret this chart, data series, limits and action items to take?"

👉 LLM generated analysis is not perfect, if the prompt is not detail enough, hallucinations may occurred

Smart Charts with AI: Data Analysis Chart

Smart Charts Powered by AI - Summary

LLMs empower developers, data engineers/analysts, and scientists by enhancing data understanding through AI-driven chart analysis. To ensure accurate and insightful analysis, crafting detailed prompts is crucial.

  • Provide Chart Context:

    • Chart Type: (e.g., line chart, bar chart, pie chart, scatter plot)
    • Chart Title
    • Data Series
    • Data Ranges/Limits: (e.g., Time period, Upper/Lower Limits)
  • Provide Guiding Questions:

    • What is the overall trend of the data?
    • Are there any significant peaks or dips?
    • Are there any outliers or anomalies?
    • What are the key takeaways from this chart?
    • What actions, if any, should be considered?

👉 By framing the prompt with contextual and guiding questions, you effectively "train" the model to analyze the chart in a more human-like and insightful manner.

Thanks for reading! 😊 If you enjoyed this post and would like to stay updated with our latest content, don’t forget to follow us. Join our community and be the first to know about new articles, exclusive insights, and more!

Leave comments on this post or contact me at:

👍 Originally published by ozkary.com

12/26/24

Cosmos DB for MongoDB: Tapping into Change Streams for Real-Time Integration

Overview

Azure Functions triggers for Cosmos DB enable developers to write event-driven applications that respond to changes in a collection/container. While this integration works seamlessly with the core SQL API, it doesn't directly support the MongoDB API. To achieve similar functionality with the MongoDB API, you can leverage change streams, a powerful feature that provides real-time monitoring of data modifications. This article will guide you through setting up and utilizing change streams in Cosmos DB's MongoDB API within an Azure Function.

Cosmos DB is a data base service which supports various database systems: NoSQL, MongoDB, PostgreSQL, Apache Cassandra, Apache Gremlin, and Table.

Cosmos DB for MongoDB: Tapping into Change Streams for Real-Time Integration

Understanding Change Streams

Change streams offer a continuous, ordered stream of changes occurring in a MongoDB collection (or a Cosmos DB container using the MongoDB API). They track inserts, updates, replaces, and deletes, providing your applications with real-time visibility into data modifications. This is invaluable for scenarios like:

  • Real-time Analytics and Reporting: Update dashboards and analytics systems as data changes.
  • Data Synchronization: Keep different data stores in sync by reacting to changes in real time.
  • Event-Driven Architectures: Trigger downstream processes and workflows based on data modifications.
  • Auditing and Logging: Capture a detailed history of data changes for audit trails and compliance.

Implementing Change Streams in Azure Functions

Here's how to set up a change stream within an Azure Function:

  • Prerequisites:

    • An active Azure subscription.
    • A Cosmos DB account configured with the MongoDB API.
    • An Azure Function App.
    • Install the MongoDB Driver: Use npm to install the necessary driver:
npm install mongodb

Implement the Azure Function

For this integration, we can use a Timer Trigger function. Since the MongoDB API doesn't offer a direct change feed trigger like the SQL API, the Timer Trigger provides a workaround. The function will execute at specified intervals (e.g., every 5 minutes or less) and establish a connection to MongoDB. Upon connection, it can then retrieve change stream events. This approach maintains the serverless nature of Azure Functions, as the function isn't continuously running but activates periodically to process changes.

An alternative to an Azure Function is to build a Node.js or .NET Core application and run it as a service on a VM. This provides a constantly running process for change stream monitoring, but requires managing the VM and application lifecycle.

  • Configure the Timer Trigger:

In your Azure Function's function.json, configure the timerTrigger to define the execution schedule. The schedule expression follows the NCrontab format. For example, to trigger every 5 minutes, use */5 * * * *.

{
  "bindings": [
    {
      "name": "myTimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "*/5 * * * *"
    }
  ]
}

The name property specifies the name of the timer object that will be passed to your function. The schedule expression determines the frequency of execution. Adjust the schedule value as needed to control the polling interval for change stream events. More frequent polling captures changes more rapidly, but consumes more resources. Less frequent polling conserves resources, but may introduce latency in processing changes.

  • Configure your app settings

    Use the local.settings.json for local development and your function settings on Azure to store the following configurations values:

{
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "node",
    "AzureWebJobsStorage": "<your-storage-connection-string>",
    "CosmosDBConnectionString": "mongodb://<your-cosmosdb-connection-string-from azure>",
    "CosmosDBDatabaseName": "<db-name>",
    "CosmosDBCollectionName": "<collection-name>"
  }
}

Ensure you define the AzureWebJobsStorage setting with a valid Azure Storage connection string. This is essential for the Azure Functions runtime. Furthermore, we'll use this storage account to persist the last processed change stream resume token. Each change stream record includes a token, enabling the resumption of processing from a specific point. By saving the token after each function execution, we can restart the function and continue processing new changes without duplicates. Upon restarting, the function will retrieve the stored token and resume the change stream from that point.

  • Implement the Function Code:

    Use the MongoDB Node.js driver to connect to Cosmos DB and process the change stream:

import { InvocationContext, CosmosDBTrigger } from "@azure/functions";
import { MongoClient, ChangeStreamDocument, Document } from "mongodb";


const getResumeToken = async function() : Promise<Binary | null> {

    // document = {token, updatedDt, _id}
    const lastProcessedDoc = await getLastProcessedToken() || null; // retrieve the last processed token
    let lastProcessedToken = lastProcessedDoc?.token ? Binary.createFromBase64(lastProcessedDoc.token) : null;        

    return lastProcessedToken;
}

const factoryTrigger = async function (context: Context, documents: ChangeStreamDocument<Document>[]): Promise<void> {

    // read the database env settings from local.settings.json or function configuration (Azure)
    const connectionString = process.env["CosmosDBConnection"]; 
    const databaseName = process.env["CosmosDBDatabaseName"]; 
    const collectionName = process.env["CosmosDBCollectionName"]; 
    const client = new MongoClient(connectionString);

    let currentToken = null;    

    try {
        await client.connect();

        const database = client.db(databaseName);
        const collection = database.collection(collectionName);                          
        const lastProcessedToken = await getResumeToken();

        // define the pipeline with the events and properties on the document
        const pipeline = [ 
        { $match: { "operationType": { $in: ["insert", "update", "replace"] } } }, 
        { $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1} } ];

        // use resumeAfter when the token is found
        const changeStream = lastProcessedToken ? 
            collection.watch(pipeline, { resumeAfter: { _data: lastProcessedToken}, fullDocument: 'updateLookup'}) : collection.watch(pipeline,  { fullDocument: 'updateLookup' });                             

        // Set up event handlers for the change stream the doc is the full document with the current changes
        changeStream.on('change', async (doc: ChangeStreamDocWithToken<Document>) => {
            console.log('Data change detected', doc);

            // get the resume token from the document
            const binToken = doc._id._data;
            const token = binToken.toString('base64');

            // Save the last processed token the next run
            currentToken = { token, updated: doc.fullDocument.updatedDt, id: doc.fullDocument._id };            

            // Add your auditing logic here
            console.log(`Send to storage id: ${doc.fullDocument._id}`);

            await sendToLog(logCollection, doc.fullDocument);

            const dateString = new Date().toISOString().slice(0, 10); // "YYYY-MM-DD"
            const blobName = `log-${dateString}.json`;  // Example filename: log-2024-07-25.json
            const changeData = JSON.stringify(doc.fullDocument) + '\n'; // Newline-delimited JSON
            console.log(changeData);

            // Append the data to blob or another mongodb collection depending on your requirements            

        });

        changeStream.on('error', async (error: any) => { 
          // or more specific error type if known
            context.log("Change stream error:", error);                          
        });

        changeStream.on('close', () => {  
            // Handle the 'close' event, usually optional
            context.log('Change Stream Closed');           
        });

        changeStream.on('connect', () => {  
            console.log('Change Stream Connected');           
        });

        // max timer to allow function to stop
        const timeout = setTimeout(async () => { 
                console.log(`Closing the stream after timeout ${maxDuration}`);
                clearTimeout(timeout);            
                await changeStream.close();   
                await client.close();  
                if (currentToken)       
                    updateLastProcessedToken(currentToken);
            }, maxDuration);

        context.log('Watching for changes...');
    } catch (err) {
        context.log('Error setting up change stream:', err);

        if (client)
            await client.close();
    } 
};

export default cosmosDBTrigger;

The factoryTrigger function uses a timer to periodically poll a Cosmos DB change stream (using the MongoDB API) for data modifications. It retrieves the last processed change stream token from blob storage to resume processing from where it left off. The function then watches the specified Cosmos DB collection for inserts, updates, and replaces, processing each change by sending the full document to a log collection and appending it to a newline-delimited JSON file in blob storage.

A timeout is implemented to limit the function's execution time and maintain its serverless nature. The timer ensures the function doesn't run continuously, conserving resources, while still periodically checking for and processing changes. The timeout further enforces this resource constraint by closing the change stream and exiting the function after a specified duration. This prevents runaway execution and associated costs while allowing the function to pick up where it left off in the next timer-triggered execution.

  • Implement the Blob Storage API

import { fileRead, fileWrite } from "./blobStorageUtils";

export interface DocumentToken {
    token: string;
    updated: string;
    id: string;
}

const tokenKey = 'resume-token';

const MISSING_CONFIG = 'Missing configuration partition/row key for last processed token.'
const NOT_FOUND = 'No existing entity found'
const FAILED_UPDATE = 'Failed to update record';

/**
 * Read the last process token from a blob storage
 * @returns the value 
 */
export async function getLastProcessedToken(): Promise<DocumentToken | null> {

    if (!tokenKey) {
        throw new Error(MISSING_CONFIG);
        return;
    }

    try {
        const blobName = `${tokenKey}.json`;        
        const value = await fileRead(blobName);
        return JSON.parse(value) as DocumentToken;
    } catch (error) {
        console.log(NOT_FOUND);
        return null;
    }
}

export async function updateLastProcessedToken(value: DocumentToken): Promise<void> {

    if (!tokenKey || !rowKey) {
        throw new Error(MISSING_CONFIG);
        return;
    }

    try {        
        const blobName = `${tokenKey}.json`;        
        await fileWrite(blobName, JSON.stringify(value));

    } catch (error) {
        console.log(FAILED_UPDATE, error);        
    }
}

This is a simple implementation wrapper which simplifies interaction with Azure Blob Storage, abstracting away the complexities of the @azure/storage-blob package (which requires import { BlobServiceClient, ContainerClient } from "@azure/storage-blob";). It offers convenient functions for reading and writing JSON data to blobs, streamlining the process of managing the change stream resume token. Consult the Azure documentation on the @azure/storage-blob package for more detailed information and advanced usage scenarios.

The getLastProcessedToken function reads the resume token from a file named resume-token.json. The updateLastProcessedToken function then overwrites this file with the latest resume token. This mechanism allows the change stream to be restarted from a specific point, ensuring that changes are processed sequentially without gaps or duplicates.

Conclusion:

Change streams provide a powerful mechanism for reacting to data modifications in Cosmos DB's MongoDB API. While the MongoDB API doesn't directly support a change feed trigger within Azure Functions, the timer-based approach outlined here offers a near real-time solution. By periodically polling the change stream, applications can effectively capture and process data changes with minimal latency. This approach balances the need for real-time responsiveness with the efficiency and cost-effectiveness of serverless functions. Leveraging change streams in this way opens up opportunities to build dynamic, data-driven applications that react swiftly to evolving information, combining the scalability and flexibility of Cosmos DB with the familiar MongoDB development experience.

Thanks for reading.

Send question or comment at Twitter @ozkary 👍 Originally published by ozkary.com

11/26/24

Introduction to Data Lakes and Data Warehouses - Data Engineering Process Fundamentals -

Overview

In this technical presentation, we will delve into the fundamental concepts of Data Engineering, focusing on two pivotal components of modern data architecture - Data Lakes and Data Warehouses. We will explore their roles, differences, and how they collectively empower organizations to harness the true potential of their data.

Introduction to Data Lake and Data Warehouse - Data Engineering Process Fundamentals

  • Follow this GitHub repo during the presentation: (Star the project to follow and get updates)

👉 GitHub Repo

  • Data engineering Series:

👉 Blog Series

YouTube Video

Video Agenda

Agenda:

  1. Introduction to Data Engineering:

  2. Brief overview of the data engineering landscape and its critical role in modern data-driven organizations.

  3. Operational Data

  4. Understanding Data Lakes:

  5. Explanation of what a data lake is and its purpose in storing vast amounts of raw and unstructured data.

  6. Exploring Data Warehouses:

  7. Definition of data warehouses and their role in storing structured, processed, and business-ready data.

  8. Comparing Data Lakes and Data Warehouses:

  9. Comparative analysis of data lakes and data warehouses, highlighting their strengths and weaknesses.

  10. Discussing when to use each based on specific use cases and business needs.

  11. Integration and Data Pipelines:

  12. Insight into the seamless integration of data lakes and data warehouses within a data engineering pipeline.

  13. Code walkthrough showcasing data movement and transformation between these two crucial components.

  14. Real-world Use Cases:

  15. Presentation of real-world use cases where effective use of data lakes and data warehouses led to actionable insights and business success.

  16. Hands-on demonstration using Python, Jupyter Notebook and SQL to solidify the concepts discussed, providing attendees with practical insights and skills.

  17. Q&A and Hands-on Session:

  18. An interactive Q&A session to address any queries.

Conclusion:

This session aims to equip attendees with a strong foundation in data engineering, focusing on the pivotal role of data lakes and data warehouses. By the end of this presentation, participants will grasp how to effectively utilize these tools, enabling them to design efficient data solutions and drive informed business decisions.

Presentation

Data Engineering Overview

A Data Engineering Process involves executing steps to understand the problem, scope, design, and architecture for creating a solution. This enables ongoing big data analysis using analytical and visualization tools.

Topics

  • Data Lake and Data Warehouse
  • Discovery and Data Analysis
  • Design and Infrastructure Planning
  • Data Lake - Pipeline and Orchestration
  • Data Warehouse - Design and Implementation
  • Analysis and Visualization

Follow this project: Give a star

👉 Data Engineering Process Fundamentals

Operational Data

Operational data is often generated by applications, and it is stored in transactional relational databases like SQL Server, Oracle and NoSQL (JSON) databases like MongoDB, Firebase. This is the data that is created after an application saves a user transaction like contact information, a purchase or other activities that are available from the application.

Features:

  • Application support and transactions
  • Relational data structure and SQL or document structure NoSQL
  • Small queries for case analysis

Not Best For:

  • Reporting system
  • Large queries
  • Centralized Big Data system

Data Engineering Process Fundamentals - Operational Data

Data Lake - Analytical Data Staging

A Data Lake is an optimized storage system for Big Data scenarios. The primary function is to store the data in its raw format without any transformation. Analytical data is the transaction data that has been extracted from a source system via a data pipeline as part of the staging data process.

Features:

  • Store the data in its raw format without any transformation
  • This can include structure data like CSV files, unstructured data like JSON and XML documents, or column-base data like parquet files
  • Low Cost for massive storage power
  • Not Designed for querying or data analysis
  • It is used as external tables by most systems

Data Engineering Process Fundamentals - Analytical Data staging

Data Warehouse - Analytical Data

A Data Warehouse is a centralized storage system that stores integrated data from multiple sources. The system is designed to host and serve Big Data scenarios with lower operational cost than transaction databases, but higher costs than a Data Lake. This system host the Analytical Data that has been processed and is ready for analytical purposes.

Data Warehouse Features:

  • Stores historical data in relational tables with an optimized schema, which enables the data analysis process
  • Provides SQL support to query the data
  • It can integrate external resources like CSV and parquet files that are stored on Data Lakes as external tables
  • The system is designed to host and serve Big Data scenarios. It is not meant to be used as a transactional system
  • Storage is more expensive
  • Offloads archived data to Data Lakes

Data Engineering Process Fundamentals - Analytical Data Store

Discovery - Data Analysis

During the discovery phase of a Data Engineering Process, we look to identify and clearly document a problem statement, which helps us have an understanding of what we are trying to solve. We also look at our analytical approach to make observations about at the data, its structure and source. This leads us into defining the requirements for the project, so we can define the scope, design and architecture of the solution.

  • Download sample data files
  • Run experiments to make observations
  • Write Python scripts using VS Code or Jupyter Notebooks
  • Transform the data with Pandas
  • Make charts with Plotly
  • Document the requirements

Data Engineering Process Fundamentals - Data Analysis and discovery

Design and Planning

The design and planning phase of a data engineering project is crucial for laying out the foundation of a successful system. It involves defining the system architecture, designing data pipelines, implementing source control practices, ensuring continuous integration and deployment (CI/CD), and leveraging tools like Docker and Terraform for infrastructure automation.

  • Use GitHub for code repo and for CI/CD actions
  • Use Terraform is an Infrastructure as Code (IaC) tool that enables us to manage cloud resources across multiple cloud providers
  • Use Docker containers to run the code and manage its dependencies

Data Engineering Process Fundamentals - Design and Planning

Data Lake - Pipeline and Orchestration

A data pipeline is basically a workflow of tasks that can be executed in Docker containers. The execution, scheduling, managing and monitoring of the pipeline is referred to as orchestration. In order to support the operations of the pipeline and its orchestration, we need to provision a VM and data lake, and monitor cloud resources.

  • This can be code-centric, leveraging languages like Python
  • Or a low-code approach, utilizing tools such as Azure Data Factory, which provides a turn-key solution
  • Monitor services enable us to track telemetry data
  • Docker Hub, GitHub can be used for the CI/CD process

Data Engineering Process Fundamentals - Data Lake - Data Pipeline and Orchestration

Data Warehouse - Design and Implementation

In the design phase, we lay the groundwork by defining the database system, schema model, and technology stack required to support the data warehouse’s implementation and operations. In the implementation phase, we focus on converting conceptual data models into a functional system. By creating concrete structures like dimension and fact tables and performing data transformation tasks, including data cleansing, integration, and scheduled batch loading, we ensure that raw data is processed and unified for analysis. Create a repeatable and extendable process.

Data Engineering Process Fundamentals - Data Warehouse Design and Implementation

Data Warehouse - Data Analysis

Data analysis is the practice of exploring data and understanding its meaning. It involves activities that can help us achieve a specific goal, such as identifying data dimensions and measures, as well as data analysis to identify outliers, trends, and distributions.

  • We can accomplish these activities by writing code using Python and Pandas, SQL, Visual Studio Code or Jupyter Notebooks.
  • What's more, we can use libraries, such as Plotly, to generate some visuals to further analyze data and create prototypes.

Data Engineering Process Fundamentals - Data Analysis

Data Analysis and Visualization

Data visualization is a powerful tool that takes the insights derived from data analysis and presents them in a visual format. While tables with numbers on a report provide raw information, visualizations allow us to grasp complex relationships and trends at a glance.

  • Dashboards, in particular, bring together various visual components like charts, graphs, and scorecards into a unified interface that can help us tell a story
  • Use tools like PowerBI, Looker, Tableau to model the data and create enterprise level visualizations

Data Engineering Process Fundamentals - Data Visualization

Conclusion

Both data lakes and data warehouses are essential components of a data engineering project. The primary function of a data lake is to store large amounts of operational data in its raw format, serving as a staging area for analytical processes. In contrast, a data warehouse acts as a centralized repository for information, enabling engineers to transform, process, and store extensive data. This allows the analytical team to utilize coding languages like Python and tools such as Jupyter Notebooks, as well as low-code platforms like Looker Studio and Power BI, to create enterprise-quality dashboards for the organization.

We've covered a lot today, but this is just the beginning!

If you're interested in learning more about building cloud data pipelines, I encourage you to check out my book, 'Data Engineering Process Fundamentals,' part of the Data Engineering Process Fundamentals series. It provides in-depth explanations, code samples, and practical exercises to help in your learning.

Data Engineering Process Fundamentals Book Amazon

Thanks for reading.

Send question or comment at Twitter @ozkary 👍 Originally published by ozkary.com

10/31/24

A Hands-On Exploration into the discovery phase - Data Engineering Process Fundamentals

Overview

The discovery process involves identifying the problem, analyzing data sources, defining project requirements, establishing the project scope, and designing an effective architecture to address the identified challenges.

In this session, we will delve into the essential building blocks of data engineering, placing a spotlight on the discovery process. From framing the problem statement to navigating the intricacies of exploratory data analysis (EDA) using Python, VSCode, Jupyter Notebooks, and GitHub, you'll gain a solid understanding of the fundamental aspects that drive effective data engineering projects.

A Hands-On Exploration into the discovery phase - Data Engineering Process Fundamentals

  • Follow this GitHub repo during the presentation: (Give it a star)

👉 https://github.com/ozkary/data-engineering-mta-turnstile

Jupyter Notebook

👉 https://github.com/ozkary/data-engineering-mta-turnstile/blob/main/Step1-Discovery/mta_discovery.ipynb

  • Data engineering Series:

👉 https://www.ozkary.com/2023/03/data-engineering-process-fundamentals.html

Jupyter Notebook Preview

# Standard library imports
from time import time
from pathlib import Path
import requests
from io import StringIO
# Load pandas support for data analysis tasks, dataframe (two-dimensional data structure with rows and columns) management
import pandas as pd    
import numpy as np 

# URL of the file you want to download. Note: It should be a Saturday date
url = 'http://web.mta.info/developers/data/nyct/turnstile/turnstile_241026.txt'

# Download the file in memory
response = requests.get(url)
response.raise_for_status()  # Check if the request was successful

# Create a DataFrame from the downloaded content
data = StringIO(response.text)
df = pd.read_csv(data)

# Display the DataFrame first 10 rows
df.head(10)

# use info to get the column names, data type and null values
df.info()

# remove spaces and type case the columns
df.columns = [column.strip() for column in df.columns]
print(df.columns)
df["ENTRIES"] = df["ENTRIES"].astype(int)
df["EXITS"] = df["EXITS"].astype(int)

# Define the set of special characters you want to check for
special_characters_set = set('@#$%/')


def has_special_characters(col, special_characters):
    # Check if any character in the column name is not alphanumeric or in the specified set
    return any(char in special_characters for char in col)

def rename_columns(df, special_characters_set):
    # Create a mapping of old column names to new column names
    mapping = {col: ''.join(char for char in col if char.isalnum() or char not in special_characters_set) for col in df.columns}

    print(mapping)
    # Rename columns using the mapping
    df_renamed = df.rename(columns=mapping)

    return df_renamed


# Identify columns with special characters using list comprehension syntax
columns_with_special_characters = [col for col in df.columns if has_special_characters(col, special_characters_set)]

# Print the result
print("Columns with special characters:", columns_with_special_characters)

# Identify columns with special characters and rename them
df = rename_columns(df, special_characters_set)

# Display the data frame again. there should be no column name with special characters
print(df.info())

YouTube Video

Video Agenda

  1. Introduction:

    • Unveiling the importance of the discovery process in data engineering.

    • Setting the stage with a real-world problem statement that will guide our exploration.

  2. Setting the Stage:

    • Downloading and comprehending sample data to kickstart our discovery journey.

    • Configuring the development environment with VSCode and Jupyter Notebooks.

  3. Exploratory Data Analysis (EDA):

    • Delving deep into EDA techniques with a focus on the discovery phase.

    • Demonstrating practical approaches using Python to uncover insights within the data.

  4. Code-Centric Approach:

    • Advocating the significance of a code-centric approach during the discovery process.

    • Showcasing how a code-centric mindset enhances collaboration, repeatability, and efficiency.

  5. Version Control with GitHub:

    • Integrating GitHub seamlessly into our workflow for version control and collaboration.

    • Managing changes effectively to ensure a streamlined data engineering discovery process.

  6. Real-World Application:

    • Applying insights gained from EDA to address the initial problem statement.

    • Discussing practical solutions and strategies derived from the discovery process.

Key Takeaways:

  • Mastery of the foundational aspects of data engineering.

  • Hands-on experience with EDA techniques, emphasizing the discovery phase.

  • Appreciation for the value of a code-centric approach in the data engineering discovery process.

Some of the technologies that we will be covering:

  • Python
  • Data Analysis and Visualization
  • Jupyter Notebook
  • Visual Studio Code

Presentation

Data Engineering Overview

A Data Engineering Process involves executing steps to understand the problem, scope, design, and architecture for creating a solution. This enables ongoing big data analysis using analytical and visualization tools.

Topics

  • Importance of the Discovery Process
  • Setting the Stage - Technologies
  • Exploratory Data Analysis (EDA)
  • Code-Centric Approach
  • Version Control
  • Real-World Use Case

Follow this project: Give a star

👉 Data Engineering Process Fundamentals

Importance of the Discovery Process

The discovery process involves identifying the problem, analyzing data sources, defining project requirements, establishing the project scope, and designing an effective architecture to address the identified challenges.

  • Clearly document the problem statement to understand the challenges the project aims to address.
  • Make observations about the data, its structure, and sources during the discovery process.
  • Define project requirements based on the observations, enabling the team to understand the scope and goals.
  • Clearly outline the scope of the project, ensuring a focused and well-defined set of objectives.
  • Use insights from the discovery phase to inform the design of the solution, including data architecture.
  • Develop a robust project architecture that aligns with the defined requirements and scope.

Data Engineering Process Fundamentals - Discovery Process

Setting the Stage - Technologies

To set the stage, we need to identify and select the tools that can facilitate the analysis and documentation of the data. Here are key technologies that play a crucial role in this stage:

  • Python: A versatile programming language with rich libraries for data manipulation, analysis, and scripting.

Use Cases: Data download, cleaning, exploration, and scripting for automation.

  • Jupyter Notebooks: An interactive tool for creating and sharing documents containing live code, visualizations, and narrative text.

Use Cases: Exploratory data analysis, documentation, and code collaboration.

  • Visual Studio Code: A lightweight, extensible code editor with powerful features for source code editing and debugging.

Use Cases: Writing and debugging code, integrating with version control systems like GitHub.

  • SQL (Structured Query Language): A domain-specific language for managing and manipulating relational databases.

Use Cases: Querying databases, data extraction, and transformation.

Data Engineering Process Fundamentals - Discovery Tools

Exploratory Data Analysis (EDA)

EDA is our go-to method for downloading, analyzing, understanding and documenting the intricacies of the datasets. It's like peeling back the layers of information to reveal the stories hidden within the data. Here's what EDA is all about:

  • EDA is the process of analyzing data to identify patterns, relationships, and anomalies, guiding the project's direction.

  • Python and Jupyter Notebook collaboratively empower us to download, describe, and transform data through live queries.

  • Insights gained from EDA set the foundation for informed decision-making in subsequent data engineering steps.

  • Code written on Jupyter Notebook can be exported and used as the starting point for components for the data pipeline and transformation services.

Data Engineering Process Fundamentals - Discovery Pie Chart

Code-Centric Approach

A code-centric approach, using programming languages and tools in EDA, helps us understand the coding methodology for building data structures, defining schemas, and establishing relationships. This robust understanding seamlessly guides project implementation.

  • Code delves deep into data intricacies, revealing integration and transformation challenges often unclear with visual tools.

  • Using code taps into Pandas and Numpy libraries, empowering robust manipulation of data frames, establishment of loading schemas, and addressing transformation needs.

  • Code-centricity enables sophisticated analyses, covering aggregation, distribution, and in-depth examinations of the data.

  • While visual tools have their merits, a code-centric approach excels in hands-on, detailed data exploration, uncovering subtle nuances and potential challenges.

Data Engineering Process Fundamentals - Discovery Pie Chart

Version Control

Using a tool like GitHub is essential for effective version control and collaboration in our discovery process. GitHub enables us to track our exploratory code and Jupyter Notebooks, fostering collaboration, documentation, and comprehensive project management. Here's how GitHub enhances our process:

  • Centralized Tracking: GitHub centralizes tracking and managing our exploratory code and Jupyter Notebooks, ensuring a transparent and organized record of our data exploration.

  • Sharing: Easily share code and Notebooks with team members on GitHub, fostering seamless collaboration and knowledge sharing.

  • Documentation: GitHub supports Markdown, enabling comprehensive documentation of processes, findings, and insights within the same repository.

  • Project Management: GitHub acts as a project management hub, facilitating CI/CD pipeline integration for smooth and automated delivery of data engineering projects.

Data Engineering Process Fundamentals - Discovery Problem Statement

Summary: The Power of Discovery

By mastering the discovery phase, you lay a strong foundation for successful data engineering projects. A thorough understanding of your data is essential for extracting meaningful insights.

  • Understanding Your Data: The discovery phase is crucial for understanding your data's characteristics, quality, and potential.
  • Exploratory Data Analysis (EDA): Use techniques to uncover patterns, trends, and anomalies.
  • Data Profiling: Assess data quality, identify missing values, and understand data distributions.
  • Data Cleaning: Address data inconsistencies and errors to ensure data accuracy.
  • Domain Knowledge: Leverage domain expertise to guide data exploration and interpretation.
  • Setting the Stage: Choose the right language and tools for efficient data exploration and analysis.

The data engineering discovery process involves defining the problem statement, gathering requirements, and determining the scope of work. It also includes a data analysis exercise utilizing Python and Jupyter Notebooks or other tools to extract valuable insights from the data. These steps collectively lay the foundation for successful data engineering endeavors.

Thanks for reading.

Send question or comment at Twitter @ozkary 👍 Originally published by ozkary.com

9/25/24

Live Dashboards: Boosting App Performance with Real-Time Integration

Overview

Dive into the future of web applications. We're moving beyond traditional API polling and embracing real-time integration. Imagine your client app maintaining a persistent connection with the server, enabling bidirectional communication and live data streaming. We'll also tackle scalability challenges and integrate Redis as our in-memory data solution.

Live Dashboards: Boosting App Performance with Real-Time Integration

  • Follow this GitHub repo during the presentation: (Give it a star)

👉 https://github.com/ozkary/Realtime-Apps-with-Nodejs-Angular-Socketio-Redis

YouTube Video

Video Agenda

This presentation explores strategies for building highly responsive and interactive live dashboards. We'll delve into the challenges of traditional API polling and demonstrate how to leverage Node.js, Angular, Socket.IO, and Redis to achieve real-time updates and a seamless user experience.

  • Introduction:

    • Understanding telemetry data and the importance to monitor the data
    • Challenges of traditional API polling for real-time data.
    • Design patterns to enhance an app with minimum changes
  • Traditional Solution Architecture

    • SQL Database Integration.
    • Restful API
    • Angular and Node.js Integration
  • Real-Time Integration with Web Sockets

    • Database Optimization Challenges
    • Introduction to Web Sockets for bidirectional communication.
    • Implementing Web Sockets in a Web application.
    • Handling data synchronization and consistency.
  • Distributed Caching with Redis:

    • Benefits of in-memory caching for improving performance and scalability.
    • Integrating Redis into your Node.js application.
    • Caching strategies for distributed systems.
  • Case Study: Building a Live Telemetry Dashboard

    • Step-by-step demonstration of the implementation.
    • Performance comparison with and without optimization techniques.
    • User experience benefits of real-time updates.
  • Benefits and Considerations

    • Improved dashboard performance and responsiveness.
    • Reduced server load and costs.
    • Scalability and scalability considerations.
    • Best practices for implementing real-time updates.

Why Attend:

Gain a deep understanding of real-time data integration for your Web application.

Presentation

Telemetry Data Story

Devices send telemetry data via API integration with SQL Server. There are inherit performance problems with a disk-based database. We progressively enhance the system with minimum changes by adding real-time integration and an in-memory cache system.

Live Dashboards: Real-time dashboard

Database Integration

Solution Architecture

  • Disk-based Storage
  • Web apps and APIs query database to get the data
  • Applications can do both high reads and writes
  • Web components, charts polling back-end database for reads

Let’s Start our Journey

  • Review our API integration and talk about concerns
  • Do not refactor everything
  • Enhance to real-time integration with sockets
  • Add Redis as the distributed cache
  • Add the service broker strategy to sync the data sources
  • Centralized the real-time integration with Redis

Live Dashboards: Direct API Integration

RESTful API Integration

Applied Technologies

  • REST API Written with Node.js
  • TypeORM Library Repository
  • Angular Client Application with Plotly.js Charts
  • Disk-based storage – SQL Server
  • API Telemetry (GET, POST) route

Use Case

  • IoT devices report telemetry information via API
  • Dashboard reads that most recent data only via API calls which queries the storage service
  • Polling the database to get new records

Project Repo (Star the project and follow) https://github.com/ozkary/Realtime-Apps-with-Nodejs-Angular-Socketio-Redis

Live Dashboards: Repository Integration

Database Optimization and Challenges

Slow Queries on disk-based storage

  • Effort on index optimization
  • Database Partition strategies
  • Double-digit millisecond average speed (physics on data disks)

Simplify data access strategies

  • Relational data is not optimal for high data read systems (joins?)
  • Structure needs to be de-normalized
  • Often views are created to shape the data, date range limit

Database Contention

  • Read isolation levels (nolock)
  • Reads competing with inserts

Cost to Scale

  • Vertical and horizontal scaling up on resources
  • Database read-replicas to separate reads and writes
  • Replication workloads/tasks
  • Data lakes and data warehouse

Live Dashboards: SQL Query

Real-Time Integration

What is Socket.io, Web Sockets?

  • Enables real-time bidirectional communication.
  • Push data to clients as events take place on the server
  • Data streaming
  • Connection starts as HTTP is them promoted to Web Sockets

Additional Technologies -Socket.io (Signalr for .Net) for both client and server components

Use Case

  • IoT devices report telemetry information via sockets. All subscribed clients get the information as an event which updates the dashboard

Demo

  • Update both server and client to support Web sockets
  • Use device demo tool to connect and automate the telemetry data to the server

Live Dashboards: Web Socket Integration

Distributed Cache Strategy

Why Use a Cache?

  • Data is stored in-memory
  • Sub-millisecond average speed
  • Cache-Aside Pattern
    • Read from cache first (cache-hit) fail over to database (cache miss)
    • Update cache on cache miss
  • Write-Through
    • Write to cache and database
    • Maintain both systems updated
  • Improves app performance
  • Reduces load on Database

Application Changes

  • Changes are only done on the server
  • No changes on client-side

Live Dashboards: Cache Architecture

Redis and Socket.io Integration

What is Redis?

  • Key-value store, keys can contain strings (JSON), hashes, lists, sets, & sorted sets
  • Redis supports a set of atomic operations on these data types (available until committed)
  • Other features include transactions, publish/subscribe, limited time to live -TTL
  • You can use Redis from most of today's programming languages using libraries

Use Case

  • As application load and data frequency increases, we need to use a cache for performance. We also need to centralize the events, so all the socket servers behind a load balancer can notify the clients. Update both storage and cache

Demo

  • Start Redis-cli on Ubuntu and show some inserts, reads and sync events.
    • sudo service redis-server restart
    • redis-cli -c -p 6379 -h localhost
    • zadd table:data 100 "{data:'100'}“
    • zrangebycore table:data 100 200
    • subscribe telemetry:data

Live Dashboards: Load Balanced Architecture

Summary: Boosting Your App Performance

When your application starts to slow down due to heavy read and writes on your database, consider moving the read operations to a cache solution and broadcasting the data to your application via a real-time integration using Web Sockets. This approach can significantly enhance performance and user experience.

Key Benefits

  • Improved Performance: Offloading reads to a cache system like Redis reduces load on the database.
  • Real-Time Updates: Using Web Sockets ensures that your application receives updates in real-time, with no need for manual refreshes.
  • Scalability: By reducing the database load, your application can handle more concurrent users.
  • Efficient Resource Utilization: Leveraging caching and real-time technologies optimizes the user of server resources, leading to savings and better performance.

Live Dashboards: Load Balanced Architecture

We've covered a lot today, but this is just the beginning!

If you're interested in learning more about building cloud data pipelines, I encourage you to check out my book, 'Data Engineering Process Fundamentals,' part of the Data Engineering Process Fundamentals series. It provides in-depth explanations, code samples, and practical exercises to help in your learning.

Data Engineering Process Fundamentals - Book by Oscar Garcia Data Engineering Process Fundamentals - Book by Oscar Garcia

Thanks for reading.

Send question or comment at Twitter @ozkary 👍 Originally published by ozkary.com