PySpark Weather Analytics

Apache Spark is a popular distributed computing framework for Big Data processing and analytics. In this tutorial, we will work hands-on with PySpark, Spark’s Python-specific interface. We built on the conceptual knowledge gained in a previous tutorial: Introduction to BigData Analytics with Apache Spark, in which we learned about the essential concepts behind Apache Spark and its distributed architecture.

The PySpark library provides access to Apache Spark APIs and other cool features like SQL, DataFrame, Streaming, Spark Core, and MLlib for machine learning. Several of these features will help us to prepare and analyse a historical dataset gathered by a weather station in Zurich. You will gain an overview of essential PySpark functions for transforming and querying data on a local computing environment.

The rest of this tutorial proceeds as follows: In the first sections, we will ingest, join, clean, and transform the data by using a broad set of PySpark’s DataFrame functions. In addition, we will touch on the topic of user-defined functions to perform custom transformations. Finally, we will analyze and visualize the data using the Seaborn Python library. As part of the analysis, we work with PySpark SQL, which allows us to register datasets as tables and query them using PySpark SQL syntax.

Note that this tutorial is a preview and is still a work in progress.

Do plot on historical weather data from Zurich, created using PySpark and Seaborn
We can use PySpark and Seaborn to create dot plots such as the one above.

Analyzing Historical Weather Data using PySpark

Weather and climate data are interesting resources, not only for weather forecasting but also for dozens of other purposes across different industries. A central area is certainly understanding climate change. Data scientists analyze climate data collected from weather stations around the globe to forecast how climate trends and events unfold. Because the amounts of data are often large, climate analytics is also a common application area for distributed computing.

PySpark Tutorial Relataly.com  Zurich Seasons
Zurich weather knows all seasons.

This tutorial guides us through a good use case for distributed data processing. We will process and analyze a set of historical weather data collected from a weather station in Zurich. Our goal is to explore the relations between climate variables such as temperate, wind, snowfall, and precipitation. However, the primary purpose of this tutorial is to familiarize ourselves with the essential functions of PySpark. Some topics covered in this part are:

  • reading and writing DataFrames
  • selecting, renaming, and manipulating columns
  • filtering, dropping, sorting, and aggregating rows
  • joining and transforming data
  • working with UDFs and Spark SQL functions

While we implement these functions, we will also look into what PySpark does under the hood.

PySpark can run on a simulated distributed environment on your local machine. So there is no need for expensive hardware.

The code is available on the GitHub repository.

Prerequisites

This tutorial assumes that you have set up your Python 3 environment. If you have not set it up yet, you can follow this tutorial to prepare your Anaconda environment. I recommend using Anaconda or Visual Studio Code, but any other Python environment will do.

It is also assumed that you have the following packages installed: pandas, matplotlib, and seaborn for visualization. You can install the packages with the console command: pip install <package name> or, if you are using the anaconda packet manager, conda install <package name>. In addition, you need to install the PySpark library.

pip install <package name> conda

install <package name> (if you are using the anaconda packet manager)

Download the Weather Data

We will train our model with a public weather dataset that contains daily weather information from Zurich, Switzerland. The data was collected at a weather station between 1979-01-01 and 2021-01-01 and has been divided into two sets, “A” and “B”. After downloading the files, place them under the following directory: root-folder/data/weather/

File A

  • date string in format YYYY-MM-DD
  • the minimum temperature in °C
  • maximum temperature in °C
  • avergage temperature in °C

File B

  • date in format YYYY-mm-dd
  • the wind direction in degrees
  • max snow depth in mm
  • precipitation in mm/m²
  • air pressure in hPa
  • max wind speed in km/h
  • average wind speed in km/h
  • daily sunshine in minutes

Step #1 Initialize SparkSession

The first step of any Spark application is to create a SparkSession. Older versions of Spark were using contexts for accessing Spark’s different modules. Since Spark 2.0, SparkSession has replaced SparkContext and has become the unified entry point for accessing Spark APIs and communicating with the cluster.

We can create new SparkSessions by using the getOrCreate() method. This method checks if there is an existing SparkSession, and if it does not find a current Session, the method creates a new instance. Since we are working with a local PySpark installation, this will make a new SparkSession. If you have a SparkCluster available, you can use this cluster by providing the function with the cluster address.

PySpark is the Python-specific interface of Apache Spark
PySpark is the Python-specific interface of Apache Spark
# A tutorial for this file is available at www.relataly.com

# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, isnan, when, count, udf, year, month, to_date, mean
import pyspark.sql.functions as F
import seaborn as sns
import matplotlib.pyplot as plt

# Create my_spark
spark = SparkSession.builder.getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x000001F8C0D10B20>

Step #2 Load the Data to a Spark DataFrame

Next, we want to load two CSV files by invoking the PySpark read function from the SparkSession instance. The function returns a Spark DataFrame, an abstraction layer for interacting with tabular data.

2.1 Reading File A

Let’s read the first file by invoking the read method on the dataframe. Spark will evaluate the read function lazily when we run the read function in the code below. So the read operation does not trigger computation and waits until we call an action.

After running the printSchema method in the code below, the Spark engine performs the “read” operation and shows us the result. If we only use the read function (a transformation) and not any actions, we would not see any computations. However, we also call summary() to compute some basic statistics on the DataFrame. These action methods trigger Spark to read the data and create a new RDD that contains the data.

Spark supports many formats for reading data, including CSV, JSON, Delta, Parquet, TXT, AVRO, etc. Our data files are in CSV format, separated by a comma. Therefore, we use the .csv function. The code below reads a CSV file into a Spark DataFrame.

DataFrame ActionDescription
show()Displays the top n rows of a DataFrame
count()Counts the number of rows in a DataFrame
collect()Collects the data from the worker nodes and returns them in an array
summary(), describe()Show statistics for string and numeric columns
first(), head()Returns the first row, or several first rows
take()Collects the n first rows returns them in an array
Actions Methods on the Spark DataFrame API
# Read File A
spark_weather_df_a = spark.read \
    .option("header", False) \
    .option("sep", ",") \
    .option("inferSchema", True) \
    .csv(path=f'data/weather/zurich_weather_a')
    
spark_weather_df_a.printSchema()
spark_weather_df_a.describe().show()

2.2 Reading File B

In the code below, you can see the option for header, separate, and inferShema. We are using Spark’s schema inference option to read the CSV data. However, we must use this function carefully because inferring schema can affect performance. Spark is more sensitive to data types, and inferring the schema does not always work. When we are unsure how the data looks, it is a better solution to make the data types explicit to Spark. In our case, inferSchema has been tested before to ensure it works.

# Read File B
spark_weather_df_b = spark.read \
    .option("header", False) \
    .option("sep", ",") \
    .option("inferSchema", True) \
    .csv(path=f'data/weather/zurich_weather_b')

spark_weather_df_b.printSchema()
spark_weather_df_b.describe().show()

Step #3 Join the Data

Next, we will join the data from the two files and merge them into a single Spark DataFrame. Whenever we run a transformation on a DataFrame, regardless if it’s via the SQL or the DataFrame API, we submit a query over to the query plan of the Spark query execution engine. The execution engine will then optimize and execute the query plan. The result is a new Resilient Distributed Dataset (RDD) with the transformed data. Try to keep this in mind and imagine what happens underneath the hood as we proceed.

We now have two files containing specific columns of our weather data. To make sense of the data and prepare them for analytics, we will merge the two files into a single DataFrame. In addition, we perform several transformations to bring the data into shape.

  • Join the two datasets using the Join function
  • Remove some columns that we won’t need via the drop functions
  • Define new column names
  • Select and rename columns using Select() with Alias()

First, we drop column_c0 because we will not need it.

# Drop unused Columns
spark_weather_df_a = spark_weather_df_a.drop("_c0")
spark_weather_df_b = spark_weather_df_b.drop("_c0")

There are different ways how we can rename columns in PySpark. The first (A) is withColumnRenamed. It requires a separate function call for each column we want to rename. The second is with a custom function and a dictionary. In this way, we can rename multiple columns at once.

After renaming the columns, we use the join function to merge the two datasets based on the row column.

# method A: rename individual columns
spark_weather_df_b_renamed = spark_weather_df_b.withColumnRenamed("_c1", "date") \
    .withColumnRenamed("_c2", "max_temp") \
    .withColumnRenamed("_c3", "min_temp") 
        
# method B: rename multiple columns at once   
def rename_multiple_columns(df, columns):
    if isinstance(columns, dict):
        return df.select(*[F.col(col_name).alias(columns.get(col_name, col_name)) for col_name in df.columns])
    else:
        raise ValueError("columns need to be in dict format {'existing_name_a':'new_name_a', 'existing_name_b':'new_name_b'}")

dict_columns = {"_c1": "date2", 
                "_c2": "avg_temp", 
                "_c3": "precip",
                "_c4": "snow",
                "_c5": "wind_dir",
                "_c6": "wind_speed",
                "_c7": "wind_power",
                "_c8": "air_pressure",
                "_c9": "sunny_hours",}
spark_weather_df_a_renamed = rename_multiple_columns(spark_weather_df_a , dict_columns)

# Join the dataframes
spark_weather_df = spark_weather_df_a_renamed.join(spark_weather_df_b_renamed, spark_weather_df_a_renamed.date2 == spark_weather_df_b_renamed.date, "inner")

Step #4 Gain a Quick Overview of the Data

When we perform transformations on a new dataset, we typically want to understand the outcome and print out several functions to gain an overview of the data. We can make our lives easier by writing a small function for this purpose, which we can call as needed. Running the custom function below gives us a quick overview of the data. It takes a DataFrame as an argument and performs the following steps:

  • Return the two first records
  • Counting NAN values for all columns that have double or float datatype
  • Print duplicate values based on the date column. To display the result, we can use the Limit function in combination with the toPandas function
  • Prints the schema
def quick_overview(df):
   # display the spark dataframe
   print("FIRST RECORDS")
   print(df.limit(2).sort(col("date"), ascending=True).toPandas())

   # count null values
   print("COUNT NULL VALUES")
   print(df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c, y in df.dtypes if y in ["double", "float"]]
      ).toPandas())

   # print("DESCRIBE STATISTICS")
   # print(df.describe().toPandas())
   # # Alternatively to get the max value, we could use max_value = df.agg({"precipitation": "max"}).collect()[0][0]

   # check for dublicates
   dublicates = df.groupby(spark_weather_df.date) \
    .count() \
    .where('count > 1') \
    .limit(5).toPandas()
   print(dublicates)

   # print schema
   print("PRINT SCHEMA")
   print(df.printSchema())

quick_overview(spark_weather_df)
PySpark data duplicates

As we can see, there are both duplicates and missing values in our data. But don’t worry; we will handle these data quality issues in the next step.

Step #5 Clean the Data using Chained Operations

Spark uses a design pattern that allows us to chain multiple operations. In the next step, we use this feature when we clean and filter the data. On execution, Spark will decide the order in which it performs the transformations as part of an optimized execution plan.

We begin cleaning the data by replacing Null values. Afterward, in a separate section, we will reduce the size of the data by selecting specific columns.

5.1 Replacing NA Values with Means

As we saw in the previous step, there are several NULL values in the temperature and wind columns. It is preferable to have complete datasets without gaps when performing historical data analytics. Therefore, we will replace the missing values with the average value. Running the code below, first, calculate the mean values. Subsequently, we use these mean values to replace the Null values. In the second step, we can utilize Spark’s method-chaining functionality and carry out the replace action for several columns in a single line of code.

# calculate mean values
avg = spark_weather_df.filter(spark_weather_df.avg_temp.isNotNull())\
    .select(mean(col('min_temp')).alias('mean_min'), 
            mean(col('max_temp')).alias('mean_max'), 
            mean(col('wind_speed')).alias('mean_wind')).collect()

mean_min = avg[0]['mean_min']
mean_max = avg[0]['mean_max']
mean_wind = avg[0]['mean_wind']

# replace na values with mean values
spark_weather_df = spark_weather_df \
    .na.fill(value=mean_min, subset=["min_temp"]) \
    .na.fill(value=mean_max, subset=["max_temp"]) \
    .na.fill(value=mean_wind, subset=["wind_speed"]) \
    .na.fill(value=0, subset=["snow"]) 
    

Empty DataFrame Columns: [date, count] Index: []

5.2 Removing Duplicates

Now that we have a complete dataset, we still need to remove duplicate values. In addition, we will use the following DataFrame methods:

  • orderBy: To sort the data.
  • With column: Select columns and convert data types (to Date type). We can find an overview of the data types supported by PySpark here.
  • drop: Used to eliminate individual columns from a dataset.
  • dropDublicates: Used to eliminate duplicate records from the dataset.

Again, we will use PySpark’s method chaining feature and invoke all transformations in a single line of code.

# remove dublicates and drop column date2, convert date to datatype "date", Sort the Data by Date
# drop date2 column, convert date column from string to date type, order by date
spark_cleaned_df = spark_weather_df.dropDuplicates()\
    .drop(col("date2"))\ # only for demonstration, actually not necessary because we are using select at the end
    .withColumn("date", to_date(col("date"),"yyyy-MM-dd")) \
    .orderBy(col("date")) \
    .select(col("date"),col("avg_temp"),col("min_temp"),col("max_temp"),col("wind_speed"),col("snow"),col("precip")) # select columns

quick_overview(spark_cleaned_df)

Step #6 Transform the Data using User Defined Functions (UDFs)

In addition to the standard methods, PySpark also offers the possibility to execute custom Python code in the Spark cluster. Such functions are called User Defined Functions (UDF). When we use UDFs, we should be aware that we won’t achieve the same level of performance as by using standard PySpark functions.

We want to create several bucks containing several temperature classes in the following. We can use UDFs to create these buckets. However, sometimes we can only achieve a task by using UDFs. By running the code below, we first create a normal Python function called “binner” and then hand over this function to a UDF that we call udf_binner_temp.

# More Infos on User Defined Functions: https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/
# create bucket column for temperature
def binner(min_temp, max_temp):
        if (min_temp is None) or (max_temp is None):
            return "unknown"
        else:
            if min_temp < -10:
                return "freezing cold"
            elif min_temp < -5:
                return "very cold"
            elif min_temp < 0:
                return "cold"
            elif max_temp < 10:
                return "normal"
            elif max_temp < 20:
                return "warm"
            elif max_temp < 30:
                return "hot"
            elif max_temp >= 30:
                return "very hot"
        return "normal"


udf_binner_temp = udf(binner, StringType() )
spark_cleaned_df = spark_cleaned_df.withColumn("temp_buckets", udf_binner_temp(col("min_temp"), col("max_temp")))
spark_cleaned_df.limit(10).toPandas()

In the same way, we create buckets to cover different levels of precipitation.

# create new columns for bucket percipitation and for month and year
udf_binner_precip = udf(lambda x: "very rainy" if x > 50 else ("rainy" if x > 0 else "dry"), StringType())
spark_cleaned_df = spark_cleaned_df \
    .withColumn("precip_buckets", udf_binner_precip("precip")) \
    .withColumn("month", month(spark_cleaned_df.date)) \
    .withColumn("year", year(spark_cleaned_df.date)) 
spark_cleaned_df.limit(5).toPandas()

These steps complete the data preparation. Next, we will focus on analyzing the data.

Step #7 Exemplary Data Analysis using Seaborn

We can analyze the dataset now that we have a clean and consistent set of historical weather data. To aid our analysis, we will visualize the data. Since PySpark does not yet provide its functions for visualization, we will use the Python library Seaborn for visualization.

PySpark provides two different ways to analyze data. The first is standard PySpark methods executed on a DataFrame (sums, counts, average, groupby). The second is PySpark SQL, which allows us to query the data using basic SQL syntax.

The PySpark transformation functions and PySpark SQL are optimized for distributed, parallelized computations on large data sets. In the following, we will explore both ways. This section shows how we can now use the data to create different types of plots:

  • Temperature scatterplot with colored dots to highlight historical developments
  • Heatmap on mean max temperature
  • Scatterplots that illustrate three-dimensional relationships

7.1 Temperature Scatterplot with Colored Dots

In this section, we create a line plot colored by temperature buckets.

# set the dimensions for the scatterplot
fig, ax = plt.subplots(figsize=(28,8))
sns.scatterplot(hue="temp_buckets", y="avg_temp", x="date", data=spark_cleaned_df.toPandas(), palette="Spectral_r")

# plot formatting 
ax.tick_params(axis="x", rotation=30, labelsize=10, length=0)

# title formatting
mindate = str(spark_cleaned_df.agg({'date': 'min'}).collect()[0]['min(date)'])
maxdate = str(spark_cleaned_df.agg({'date': 'max'}).collect()[0]['max(date)'])
ax.set_title("average temperature in Zurich: " + mindate + " - " + maxdate)
plt.xlabel("Year")
plt.show()

The outliers to the bottom and top are interesting. While the upward temperature peaks seem to remain relatively stable, the lower temperatures have decreased in the past decades.

7.2 Heatmap on Mean Max Temperature

Next, we want to create a heatmap that illustrates how the maximum temperature develops over years and months. We first use PySpark to compute the mean of the maximum temperature over years and months. Then we use seaborn to create a heatmap.

spark_cleaned_df_agg = spark_cleaned_df.select(col("year"),col("month"),col("max_temp")) \
    .filter(spark_cleaned_df.year < 2021)\
    .groupby(col("year"),col("month"))\
    .agg(mean("max_temp").alias("mean_max_temp")) \
    .orderBy(col("year")).toPandas()

plt.figure(figsize=(24,6))
avg_temp_df = spark_cleaned_df_agg.pivot("month", "year", "mean_max_temp")
sns.heatmap(avg_temp_df, cmap="Spectral_r", linewidths=.5)
PySpark big data analytics tutorial - weather heatmap

7.3 Scatterplots

Next, the goal is to dive deeper into understanding the relationship between temperature and other weather effects such as wind or snow. We will create different scatterplots to display the relationship between different variables in the following. We will show the relationship between min (x-axis) and max temperature (y-axis). In addition, we color the dots to visualize relationships with an additional variable. The code below creates three such scatter plots. The plots display the relationships between the following variables:

  • The first plots should illustrate the relationship between temperature and snowfall
  • We want to demonstrate how wind speed depends on the daily min and max temperature
  • We want to create a plot that visualizes how daily min and max temperature change over the month
fig, axes= plt.subplots(nrows=1, ncols=3, sharey=True, figsize=(30, 10))
fig.subplots_adjust(hspace=0.5, wspace=0.2)

palette = sns.color_palette("ch:start=.2,rot=-.3", as_cmap=True)
sns.scatterplot(ax = axes[0], hue="snow", size="snow", y="max_temp", x="min_temp", data=spark_cleaned_df.toPandas(), alpha=1.0, palette=palette)
axes[0].legend(bbox_to_anchor=(1.01, 1), loc=2, borderaxespad=0.)
axes[0].set_title("min - max temperature seperated by snow")

sns.scatterplot(ax = axes[1], hue="wind_speed", size="wind_speed", y="max_temp", x="min_temp", data=spark_cleaned_df.toPandas(), alpha=1.0, palette='rocket_r')
axes[1].legend(bbox_to_anchor=(1.01, 1), loc=2, borderaxespad=0.)
axes[1].set_title("min - max temperature seperated by wind speed")

sns.scatterplot(ax = axes[2], hue="month", y="max_temp", x="min_temp", data=spark_cleaned_df.toPandas(), alpha=1.0, palette='Spectral_r', hue_norm=(1,12), legend="full")
axes[2].legend(bbox_to_anchor=(1.01, 1), loc=2, borderaxespad=0.)
axes[2].set_title("min - max temperature seperated by month")
PySpark big data analytics tutorial - weather dot plots

There are various things we can learn from these plots:

  1. It is not surprising that most of the snowfall happens at low temperatures. We can also see that most of the snowfall is located around zero degrees celsius. We also observe some outliers in deep temperature and higher temperature areas.
  2. The highest wind speeds are located in the middle of the plot, where we can also observe the most significant daily difference between the minimum and the maximum temperatures.
  3. The last chart reflects the different seasons. Through the colored areas, we can distinguish the temperature ranges of the individual months.

Step #8 PySpark SQL

PySpark SQL combines classic SQL syntax with distributed computations, which results in increased query performance. Spark SQL is very similar to conventional SQL dialects such as MySQL or Microsoft Server. However, Spark provides a dedicated set of functions for common data operations such as string transformations, type conversions, etc. The Spark SQL reference guide offers more information on this topic.

This section aims to extend our analysis by using PySpark SQL. We will display how the daily temperature averages (for minimum, maximum, and mean) have changed.

First, we have to register our dataset as a temporary view. Once we have done that, we can run ad-hoc queries against the View using Spark SQL commands. As you can see, we can use traditional SQL syntax here. The SQL function returns the results in the form of a DataFrame. Running the code below will perform these actions and visualize the results in a line plot.

# Data Analysis using PySpark.SQL

# register the dataset as a temp view, so we can use sq
spark_cleaned_df.createOrReplaceTempView("waeather_data_temp_view")

# see how event numbers have evolved over years
events_over_years_df = spark.sql( \
        'SELECT year, month, mean(avg_temp), mean(max_temp), mean(min_temp) \
        FROM waeather_data_temp_view \
        WHERE max_temp > 25 \
        GROUP BY month, year \
        ORDER BY year, month')
print(events_over_years_df.limit(5).toPandas())

plt.figure(figsize=(16,6))
fig = sns.lineplot(y="mean(avg_temp)", x="year", data=events_over_years_df.toPandas(), color= "orange")
fig = sns.lineplot(y="mean(max_temp)", x="year", data=events_over_years_df.toPandas(), color= "red")
fig = sns.lineplot(y="mean(min_temp)", x="year", data=events_over_years_df.toPandas(), color= "blue")
plt.grid()
plt.show()
PySpark big data analytics tutorial 
- Temperature curves created

As shown below, We can express the same Spark query with the DataFrame API.

# Using PySpark standard functions
events_over_years_df = spark_cleaned_df\
    .filter(spark_cleaned_df.year < 2021)\
    .groupby(col("year"))\
    .agg(mean("avg_temp").alias("mean_avg_temp"), 
         mean("min_temp").alias("mean_min_temp"), 
         mean("max_temp").alias("mean_max_temp"))\
    .orderBy(col("year")).toPandas()

plt.figure(figsize=(15,6))

Summary

Congratulation, you have made your first steps with distributed computing! This tutorial has demonstrated PySpark – a distributed computing framework for big data processing and analytics. After initializing the Spark session, we prepared a weather dataset and tested several PySpark functions to process and analyze it. Among the primary data operations were:

  • Ingestion of CSV files with the read method
  • Joining PySpark DataFrames
  • Cleaning the data by treating duplicates and Null values
  • Filtering the data and selecting specific columns
  • Performing custom computations with UDFs
  • Running analytics and creating plots

Although we have used a wide range of functions, we touched only upon the surface of PySpark’s capabilities. We can do many further cool things with PySpark, including real-time data streaming and efficient training of big data machine learning models. We will cover these topics in separate tutorials.

Thanks for reading, and if you have any questions, please let me know in the comments.

Author

  • Hi, I am Florian, a Zurich-based consultant for AI and Data. Since the completion of my Ph.D. in 2017, I have been working on the design and implementation of ML use cases in the Swiss financial sector. I started this blog in 2020 with the goal in mind to share my experiences and create a place where you can find key concepts of machine learning and materials that will allow you to kick-start your own Python projects.

Leave a Reply