This section outlines common and essential patterns for working with PySpark DataFrames, providing concise code examples for efficient data manipulation and analysis.
It's good practice to import PySpark SQL functions and types with aliases for cleaner code.
# Easily reference these as F.my_function() and T.my_type() below
from pyspark.sql import functions as F, types as T
Learn how to filter rows based on various conditions, including equality, range comparisons, and multiple criteria.
# Filter on equals condition
df = df.filter(df.is_adult == 'Y')
# Filter on >, <, >=, <= condition
df = df.filter(df.age > 25)
Combine conditions using logical AND (&) and OR (|) operators. Ensure each condition is enclosed in parentheses.
# Multiple conditions require parentheses around each condition
df = df.filter((df.age > 25) & (df.is_adult == 'Y'))
# Compare against a list of allowed values
from pyspark.sql.functions import col
df = df.filter(col('first_name').isin([3, 4, 7]))
Order your DataFrame based on one or more columns in ascending or descending order.
# Sort results ascending
df = df.orderBy(df.age.asc())
# Sort results descending
df = df.orderBy(df.age.desc())
Understand different types of joins and how to match columns between DataFrames.
# Left join in another dataset
df = df.join(person_lookup_table, 'person_id', 'left')
# Match on different columns in left & right datasets
df = df.join(other_table, df.id == other_table.person_id, 'left')
# Match on multiple columns
df = df.join(other_table, ['first_name', 'last_name'], 'left')
Perform various operations on DataFrame columns, including adding, modifying, selecting, and renaming.
# Add a new static column
df = df.withColumn('status', F.lit('PASS'))
Create new columns based on conditional logic using when and otherwise.
# Construct a new dynamic column
df = df.withColumn('full_name', F.when(
(df.fname.isNotNull() & df.lname.isNotNull()), F.concat(df.fname, df.lname)
).otherwise(F.lit('N/A')))
Choose specific columns to keep and rename them for clarity.
# Pick which columns to keep, optionally rename some
df = df.select(
'name',
'age',
F.col('dob').alias('date_of_birth'),
)
# Remove columns
df = df.drop('mod_dt', 'mod_username')
# Rename a column
df = df.withColumnRenamed('dob', 'date_of_birth')
# Keep all the columns which also occur in another dataset
df = df.select(*(F.col(c) for c in df2.columns))
A common task is to clean up column names by converting them to lowercase and replacing spaces or hyphens.
# Batch Rename/Clean Columns
for col_name in df.columns:
df = df.withColumnRenamed(col_name, col_name.lower().replace(' ', '_').replace('-', '_'))
Manage data types, replace null values, and handle duplicate records effectively.
# Cast a column to a different type
df = df.withColumn('price', df.price.cast(T.DoubleType()))
Replace nulls in specific columns with default values.
# Replace all nulls with a specific value
df = df.fillna({
'first_name': 'Tom',
'age': 0,
})
Select the first non-null value from a list of columns.
# Take the first value that is not null
df = df.withColumn('last_name', F.coalesce(df.last_name, df.surname, F.lit('N/A')))
Remove duplicate rows from a DataFrame.
# Drop duplicate rows in a dataset (distinct)
df = df.dropDuplicates() # or
df = df.distinct()
# Drop duplicate rows, but consider only specific columns
df = df.dropDuplicates(['name', 'height'])
# Replace empty strings with null (leave out subset keyword arg to replace in all columns)
df = df.replace({"": None}, subset=["name"])
# Convert Python/PySpark/NumPy NaN operator to null
df = df.replace(float("nan"), None)