€22.47 – €26.00
Stages of Data Lineage
1. Source Data: CRM_System
- Source Tables:
customer_transactions
: Contains raw customer transaction data (e.g., purchase details).product_catalog
: Stores product metadata related to customer transactions.
- Key Fields Extracted:
transaction_id
,customer_id
,product_id
,purchase_date
, andamount
.
2. Extraction: Data Pipeline
- Tool: Apache NiFi
- Process:
- A scheduled pipeline extracts data from the
customer_transactions
andproduct_catalog
tables via SQL queries. - Data is fetched in incremental mode based on a
last_updated_timestamp
column to avoid redundancy.
- A scheduled pipeline extracts data from the
Example SQL Query:
3. Transformation: Data Cleansing and Enrichment
- Tool: Apache Spark
- Transformations:
- Data Cleaning:
- Remove duplicate transaction records.
- Convert inconsistent date formats to
YYYY-MM-DD
.
- Data Enrichment:
- Join with
product_catalog
to retrieveproduct_name
andproduct_category
. - Calculate a derived column
discounted_price
based on promotional rules.
- Join with
- Validation:
- Check for null values in critical fields (
transaction_id
,amount
). - Flag and log erroneous rows for further review.
- Check for null values in critical fields (
- Data Cleaning:
Example Transformation Code (PySpark):
4. Loading: Target Table
- Destination:
sales_data
table in the analytics database (PostgreSQL). - Loading Method:
- Data is written in batch mode using PostgreSQL’s
COPY
command for efficient loading. - Existing data is upserted (update if exists, insert if not).
- Data is written in batch mode using PostgreSQL’s
Example Upsert Query:
5. Usage: Reporting and Analytics
- The
sales_data
table is used for:- Generating daily sales reports.
- Feeding machine learning models for customer segmentation.
- Supporting dashboards in tools like Tableau or Power BI.