pandas & PySpark

Explore strategies for seamlessly integrating pandas and PySpark to maximize performance and efficiency. This article dives into leveraging the pandas API on Spark and harnessing the power of pandas UDFs, demonstrating how to combine the simplicity and flexibility of pandas with the scalability of PySpark. Learn how these tools complement each other to enable efficient, large-scale data processing without compromising usability or ease of implementation.
Person coding on a laptop
Summary

Stay on top of the latest tech trends & AI news with Le Wagon’s newsletter

This article is written by Laura Meyer, an Engineer at a leading consultancy, specializing in AI, data science, and DevOps, with extensive experience in GenAI innovation and delivering technical training.

 


 

In my previous article, pandas vs PySpark, I explored how these two powerful libraries serve different data processing needs. pandas is the go-to in-memory Python library for manipulating small to medium-sized datasets, while PySpark is built for distributed computing, enabling the processing of massive datasets across multiple machines. That discussion focused on their individual architectures, benefits, and use cases. But what if we could harness the strengths of both libraries to achieve even better performance?

This article explores how pandas and PySpark can seamlessly integrate to unlock additional capabilities, combining pandas’ flexibility with PySpark’s scalability. We’ll first dive into the pandas API on Spark, which lets developers work with large datasets as if they were using pandas—all while leveraging Spark’s distributed computing power. Given PySpark’s distributed nature, where data is partitioned across multiple nodes, these partitions can be treated as smaller pandas DataFrames. This opens up exciting possibilities: What if we could directly manipulate these partitions using familiar pandas tools? And what if we could control how these partitions are created and processed?

In this context, we will also discuss pandas UDFs (User-Defined Functions), which enable users to perform custom transformations at scale, merging pandas’ ease of use with PySpark’s distributed processing power.

If you’re new to pandas or PySpark, I recommend checking out my previous article, pandas vs PySpark, which provides a detailed introduction to both tools and their unique strengths.

 

pandas API on Spark

As we’ve seen, PySpark is a Python wrapper for Spark, enabling data scientists and analysts to use Spark without needing to learn Java or Scala. While pandas is a popular Python library for data analysis, it is limited to single-machine processing and cannot handle big data. Due to parallel execution on all cores on multiple machines, PySpark runs operations faster than Pandas, hence we often required to covert Pandas DataFrame to PySpark for better performance. However, for those familiar with pandas, the syntax and operations in traditional PySpark DataFrames can feel unfamiliar. The pandas API on Spark addresses this by offering pandas-like methods and functionality for distributed DataFrames, enabling users to apply their existing pandas’ knowledge directly on Spark DataFrames.

The **pandas API on Spark (**available through pyspark.pandas) provides a familiar interface, allowing data engineers and scientists to work with distributed datasets in Spark using the same syntax as pandas. It maintains the simplicity of pandas while being optimized for Spark’s distributed architecture, making it capable of handling large-scale data that cannot fit into a single machine’s memory. Additionally, it allows seamless interoperability between pandas and PySpark DataFrames.

To give you a better overview, let’s discuss the different DataFrame options in pandas, Spark, and Pandas-on-Spark—it’s always important to know what you’re working with.

Pandas DataFrame

Class: pandas.core.frame.DataFrame

Primary in-memory data structure for analyzing small to medium datasets on a single machine.

import pandas as pd

data = [[“Alice”, “M”, 50000], [“Bob”, “”, 60000], [“Carol”, “F”, None]]

columns = [‘Name’, ‘Gender’, ‘Salary’]

# Create the pandas DataFrame

pandas_df = pd.DataFrame(data=data, columns=columns)

# Print the DataFrame

print(pandas_df)

 

PySpark DataFrame

Class: pyspark.sql.dataframe.DataFrame

The core data structure for distributed data processing in Spark. It is immutable, fault-tolerant, and supports lazy evaluation, meaning transformations are only executed when actions are triggered. PySpark DataFrames are distributed across a cluster for parallel processing.

from pyspark.sql.functions import mean, max

spark_df = spark.createDataFrame(data, columns)

 

# Example 1: Select mean of ‘Salary’

spark.select(mean(“Salary”)).show()

 

# Example 2: Group by ‘Gender’ and calculate mean and max of ‘Salary’

spark.groupBy(“Gender”) \

.agg(mean(“Salary”), max(“Salary”)) \

.show()

 

Pandas-on-Spark DataFrame

Class: pyspark.pandas.DataFrame

Combines pandas’ familiar syntax with Spark’s scalability, enabling users to work with large datasets while leveraging pandas-like operations. Data is transferred between multiple machines and the client machine when converting between formats.

# import Pandas-on-Spark

import pyspark.pandas as ps

ps_df = ps.DataFrame(range(10))

 

# Convert a pandas-on-Spark Dataframe into a pandas Dataframe

pd_df = ps_df.to_pandas()

 

# Convert a pandas Dataframe into a pandas-on-Spark Dataframe

ps_df = ps.from_pandas(pd_df)

 

# Create a DataFrame with Pandas-on-Spark

ps_df = ps.DataFrame(range(10))

 

# Convert a Pandas-on-Spark Dataframe into a Spark Dataframe

spark_df = ps_df.to_spark()

 

# Convert a Spark Dataframe into a Pandas-on-Spark Dataframe

ps_df_new = spark_df.to_pandas_on_spark()

 

pandas UDFs

Pandas UDFs (User-Defined Functions) are one of the most powerful features PySpark offers for data manipulation. They allow users to scale custom operations designed for pandas DataFrames to work with PySpark DataFrames. Pandas UDFs are particularly useful when built-in PySpark functions cannot support the required data transformation or computation. Therefore, before creating custom functions, it is advisable to check if PySpark already provides the necessary functionality through its built-in pyspark.sql.functions

Unlike traditional Python UDFs, which process data row-by-row, pandas UDFs apply functions to batches of data (e.g., columns) in a vectorized manner using pandas, treating a PySpark DataFrame as a collection of small pandas DataFrames. PySpark will convert the Spark DataFrame into a pandas DataFrame, apply the UDF, and then convert the output back into a PySpark DataFrame. This process ensures efficient serialization between PySpark and pandas DataFrames via PyArrow, avoiding costly serialization and deserialization. As a result, pandas UDFs can enhance performance by up to 100x compared to Python UDFs.

You define a pandas UDF using pyspark.sql.functions.pandas_udf() by applying the pandas_udf decorator to a function. This decorator allows you to specify the input and output types of the UDF, along with other configuration options. After defining the UDF, you can apply it to a PySpark DataFrame, enabling the function to operate on the DataFrame in parallel.

Let’s walk through an example step-by-step.

 

Step 1: Create SparkSession

We start by importing SparkSession from the pyspark.sql module. SparkSession is the entry point to working with DataFrames in Spark. It provides an interface for Spark’s SQL, streaming, and batch processing. The appName('EmployeeNameCapitalization') assigns a name to this Spark application, and .getOrCreate() ensures that if a Spark session already exists, we reuse it.

from pyspark.sql import SparkSession

 

# Create SparkSession

spark = SparkSession.builder.appName(‘EmployeeNameCapitalization’) \

.getOrCreate()

 

Step 2: Preparing the data

columns = [“EmployeeID”, “FullName”]

data = [(“101”, “alice green”),

(“102”, “bob johnson”),

(“103”, “charlie brown”),

(“104”, “david white”),

(“105”, “eve black”)]

 

# Create DataFrame

df = spark.createDataFrame(data=data, schema=columns)

df.show()

 

Here, we define a new dataset containing EmployeeID and FullName columns, with a list of employees and their names. We then create a Spark DataFrame df from this data and display it using df.show().

 

Step 3: Creating the pandas UDF

from pyspark.sql.functions import pandas_udf

from pyspark.sql.types import StringType

import pandas as pd

 

# create pandas_udf to capitalize each name

 

@pandas_udf(StringType())

def capitalize_name(s: pd.Series) -> pd.Series:

return s.str.title() # capitalizes the first letter of each word

 

In this step, we create a pandas UDF called capitalize_name, which takes a Pandas Series s (a column from the DataFrame) and applies the .str.title() method to capitalize the first letter of each word in the string.

There are two methods to apply the defined UDF:

  • select(): Creates a new DataFrame with specified columns. To add a new column, you must explicitly include all existing columns you wish to retain.
  • withColumn(): Adds or replaces a column in the existing DataFrame, automatically preserving all other columns.

 

Step 4: Using the pandas UDF with select()

# Using UDF with select()

df.select(“EmployeeID”, “FullName”, capitalize_name(“FullName”)).show()

 

Here, we apply the capitalize_name pandas UDF to the FullName column using the select() method. We select the original columns (EmployeeID and FullName), and we add a new column with the capitalized version of the FullName. The result is displayed using df.show().

 

Step 5. Using the pandas UDF with withColumn()

# Using UDF with withColumn()

df.withColumn(“Capitalized_Name”, capitalize_name(“FullName”)).show()

Lastly, we use the withColumn() method to add a new column to the existing DataFrame, Capitalized_Name, which contains the capitalized version of the FullName column. The updated DataFrame is displayed with df.show().

This approach highlights how pandas UDFs allow us to apply powerful data transformations using Pandas functions in a distributed Spark environment, offering a flexible and efficient way to handle text manipulations.

 

Conclusion

We’ve learned that PySpark and Pandas can be seamlessly integrated, allowing users to easily convert between Pandas and PySpark DataFrames. Leveraging Pandas on top of Spark via pyspark.pandas boosts Pandas’ performance, reduces the learning curve when transitioning to Spark, and merges single-machine and distributed computing within the same codebase. Thanks to Spark’s engine, your code runs faster, even on a single machine.

Moreover, Pandas UDFs (User-Defined Functions) enable high-performance feature engineering on PySpark DataFrames while benefiting from Pandas’ optimizations. Pandas UDFs offer significant advantages, including improved performance through vectorized computations, flexibility for integration into Spark pipelines and online inference, and ease of use for data scientists familiar with Pandas. Their rich set of data manipulation functions facilitates quick and complex feature transformations, making them a powerful tool for efficient feature engineering in e.g., machine learning pipelines.

 


 

Related articles:

Our users have also consulted:
Pour développe mes compétences
Formation développeur web
Formation data scientist
Formation data analyst
Les internautes ont également consulté :
Career week: tools and activities to support your post-bootcamp transition

The week after bootcamp, aka the 10th week, newly bootcamp graduates are invited to participate

Suscribe to our newsletter

Receive a monthly newsletter with personalized tech tips.