Describe data lineage

22.4726.00
Clear

Stages of Data Lineage

1. Source Data: CRM_System

  • Source Tables:
    • customer_transactions: Contains raw customer transaction data (e.g., purchase details).
    • product_catalog: Stores product metadata related to customer transactions.
  • Key Fields Extracted:
    • transaction_id, customer_id, product_id, purchase_date, and amount.

2. Extraction: Data Pipeline

  • Tool: Apache NiFi
  • Process:
    • A scheduled pipeline extracts data from the customer_transactions and product_catalog tables via SQL queries.
    • Data is fetched in incremental mode based on a last_updated_timestamp column to avoid redundancy.

Example SQL Query:

sql
SELECT transaction_id, customer_id, product_id, purchase_date, amount, last_updated_timestamp
FROM customer_transactions
WHERE last_updated_timestamp > '2023-01-01 00:00:00';

3. Transformation: Data Cleansing and Enrichment

  • Tool: Apache Spark
  • Transformations:
    • Data Cleaning:
      • Remove duplicate transaction records.
      • Convert inconsistent date formats to YYYY-MM-DD.
    • Data Enrichment:
      • Join with product_catalog to retrieve product_name and product_category.
      • Calculate a derived column discounted_price based on promotional rules.
    • Validation:
      • Check for null values in critical fields (transaction_id, amount).
      • Flag and log erroneous rows for further review.

Example Transformation Code (PySpark):

python
from pyspark.sql.functions import col, to_date, when

# Clean and format data
cleaned_data = raw_data.dropDuplicates().withColumn(
"purchase_date", to_date(col("purchase_date"), "MM/dd/yyyy")
)

# Enrich data with product details
enriched_data = cleaned_data.join(product_catalog, "product_id", "inner").withColumn(
"discounted_price",
when(col("promotion_active") == True, col("amount") * 0.9).otherwise(col("amount"))
)

4. Loading: Target Table

  • Destination: sales_data table in the analytics database (PostgreSQL).
  • Loading Method:
    • Data is written in batch mode using PostgreSQL’s COPY command for efficient loading.
    • Existing data is upserted (update if exists, insert if not).

Example Upsert Query:

sql
INSERT INTO sales_data (transaction_id, customer_id, product_id, purchase_date, amount, discounted_price, product_name, product_category)
VALUES (...)
ON CONFLICT (transaction_id)
DO UPDATE SET
amount = EXCLUDED.amount,
discounted_price = EXCLUDED.discounted_price,
product_name = EXCLUDED.product_name,
product_category = EXCLUDED.product_category;

5. Usage: Reporting and Analytics

  • The sales_data table is used for:
    • Generating daily sales reports.
    • Feeding machine learning models for customer segmentation.
    • Supporting dashboards in tools like Tableau or Power BI.
Describe data lineage
22.4726.00
Clear

How to Use Prompts

Step 1: Download the prompt after purchase.

Step 2: Paste the prompt into your text-generation tool (e.g., ChatGPT).

Step 3: Adjust parameters or use it directly to achieve your goals.

Describe data lineage
22.4726.00
Clear

License Terms

Regular License:

  • Allowed for personal or non-commercial projects.
  • Cannot be resold or redistributed.
  • Limited to a single use.

Extended License:

  • Allowed for commercial projects and products.
  • Can be included in resold products, subject to restrictions.
  • Suitable for multiple uses.
Describe data lineage
22.4726.00
Clear