Building an ETL Pipeline using PySpark
An ETL (Extract, Transform, and Load) pipeline extracts data from sources, transforms it, and loads it into a storage system. It helps create clean, usable data formats for analysis. PySpark is ideal for building ETL pipelines for large-scale data processing. It offers distributed computing, high performance, and handles structured and unstructured data efficiently. This article will take you through building an ETL pipeline using PySpark.
Building an ETL Pipeline using PySpark
The dataset we will be using for building an ETL Pipeline contains temperature-related data for various countries from 1961 to 2022. The columns include identifiers like ObjectId, Country, ISO2, and ISO3, along with year-wise temperature data such as F1961, F1962, etc., as floating-point values. Some columns contain missing values. You can download this dataset from here.
We’ll develop an ETL Pipeline using PySpark to process this dataset to handle the following tasks:
- Extract: Load the dataset from the CSV file.
- Transform: Clean the data, handle missing values, and pivot year-wise temperature data for analysis.
- Load: Save the processed data into a new storage format (e.g., Parquet or a database).
Step 1: Setting Up the Environment & Initializing a PySpark Session
Ensure that PySpark is installed and set up. Run the following command to install PySpark if it’s not already installed:
pip install pyspark
Initialize a PySpark session to enable interaction with the Spark framework:
from pyspark.sql import SparkSession
# initialize SparkSession
spark = SparkSession.builder \
.appName("ETL Pipeline") \
.getOrCreate()
Step 2: Extract – Load the Dataset
The next step is to load the dataset into a PySpark DataFrame:
# load the CSV file into a Spark DataFrame
file_path = "/content/temperature.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
# display the schema and preview the data
df.printSchema()
df.show(5)
root
|-- ObjectId: integer (nullable = true)
|-- Country: string (nullable = true)
|-- ISO2: string (nullable = true)
|-- ISO3: string (nullable = true)
|-- F1961: double (nullable = true)
|-- F1962: double (nullable = true)
|-- F1963: double (nullable = true)
|-- F1964: double (nullable = true)
|-- F1965: double (nullable = true)
|-- F1966: double (nullable = true)
|-- F1967: double (nullable = true)
|-- F1968: double (nullable = true)
|-- F1969: double (nullable = true)
|-- F1970: double (nullable = true)
|-- F1971: double (nullable = true)
|-- F1972: double (nullable = true)
...
In PySpark, we are loading a CSV file into a distributed DataFrame, which is similar to using pandas.read_csv() to load data into a Pandas DataFrame. However, unlike Pandas, which uses memory and runs on a single machine, PySpark handles large datasets distributed across a cluster. The methods df.printSchema() and df.show(5) provide insights into the schema and preview the data, comparable to df.info() and df.head() in Pandas, but designed for scalable data exploration on big data workloads.
Step 3: Transform – Clean and Process the Data
All datasets require different types of cleaning and processing steps. In this data, we will replace missing values in important columns like ISO2 or impute missing temperature values:
# fill missing values for country codes
df = df.fillna({"ISO2": "Unknown"})
# drop rows where all temperature values are null
temperature_columns = [col for col in df.columns if col.startswith('F')]
df = df.dropna(subset=temperature_columns, how="all")
Next, we will transform the dataset to have “Year” as a single column and its temperature value:
from pyspark.sql.functions import expr
# reshape temperature data to have 'Year' and 'Temperature' columns
df_pivot = df.selectExpr(
"ObjectId", "Country", "ISO3",
"stack(62, " +
",".join([f"'F{1961 + i}', F{1961 + i}" for i in range(62)]) +
") as (Year, Temperature)"
)
# convert 'Year' column to integer
df_pivot = df_pivot.withColumn("Year", expr("int(substring(Year, 2, 4))"))
df_pivot.show(5)
+--------+--------------------+----+----+-----------+
|ObjectId| Country|ISO3|Year|Temperature|
+--------+--------------------+----+----+-----------+
| 1|Afghanistan, Isla...| AFG|1961| -0.113|
| 1|Afghanistan, Isla...| AFG|1962| -0.164|
| 1|Afghanistan, Isla...| AFG|1963| 0.847|
| 1|Afghanistan, Isla...| AFG|1964| -0.764|
| 1|Afghanistan, Isla...| AFG|1965| -0.244|
+--------+--------------------+----+----+-----------+
only showing top 5 rows
Step 4: Load – Save the Processed Data
After completing all the processing steps, you save the transformed data to a Parquet file for efficient storage and querying:
output_path = "/processed_temperature.parquet"
df_pivot.write.mode("overwrite").parquet(output_path)
This operation saves the transformed DataFrame as a Parquet file, which optimizes it for storage and querying in a distributed environment.
We can load the saved Parquet file to ensure the data was correctly saved:
# load the saved parquet file
processed_df = spark.read.parquet(output_path)
processed_df.show(5)
+--------+--------------------+----+----+-----------+
|ObjectId| Country|ISO3|Year|Temperature|
+--------+--------------------+----+----+-----------+
| 1|Afghanistan, Isla...| AFG|1961| -0.113|
| 1|Afghanistan, Isla...| AFG|1962| -0.164|
| 1|Afghanistan, Isla...| AFG|1963| 0.847|
| 1|Afghanistan, Isla...| AFG|1964| -0.764|
| 1|Afghanistan, Isla...| AFG|1965| -0.244|
+--------+--------------------+----+----+-----------+
only showing top 5 rows
Summary
PySpark is ideal for building ETL pipelines for large-scale data processing. It offers distributed computing, high performance, and handles structured and unstructured data efficiently. I hope you liked this article on building an ETL Pipeline using PySpark. Feel free to ask valuable questions in the comments section below. You can follow me on Instagram for many more resources.