Read File

When reading using wrong format it won’t throw error but the data read will not be proper. Data will only loaded partially

empdf = spark.read.csv("file:///home/ak/datasets/emp.csv")
empdf = spark.read.option("header", "true").option("sep", ",").option("inferSchema", "true").csv("file:///home/ak/datasets/emp.csv")
empdf = spark.read.csv("file:///home/ak/datasets/emp.csv", inferSchema=True, sep=True, header=True)
 
# Can only read Parquet file
loadParq = spark.read.load("file:///home/ak/datasets/emp.csv") 
loadParq = spark.read.format("csv").load("file:///home/ak/datasets/emp.csv")

Write data to Disk

salesDF.write.save("file:///home/ak/data/sparksales")
salesDF.write.mode("overwrite").save("file:///home/ak/data/sparksales") 	# Replace exiting file
salesDF.repartition(8).write.save("file:///home/ak/data/sparksales")

View structure of data

empdf.printSchema()
empdf.show(<no-of-records>, truncate=False)
empdf.count()
df.describe('<column-name>').show() 		# Describe the values in the column

View Data

salesIND.select("Region", "Country", "Profit").show()

Number of partitions

empdf.rdd.getNumPartitions()

Change Column Names

retailDF.withColumnRenamed("_c0", "order_id").withColumnRenamed("_c1", "order_date")
retaildataDF = retaildataDF.toDF(*["order_id", "order_date", "order_customer_id", "order_status"])

Replace Values in a Column

df.withColumn('address', F.regexp_replace('address', 'lane', 'ln'))
df.withColumn('address', F.translate('address', 'lane', 'ln'))
df.na.fill({'age': 50, 'name': 'unknown'}).show() (Replace NULL values)

Change Datatype of Column

from pyspark.sql.types import DoubleType, IntegerType, StringType
cases = cases.withColumn('confirmed', F.col('confirmed').cast(IntegerType()))

Creating New Column (Constant Value)

df = df.withColumn("new_column_name", F.lit("constant_value"))

Filter Data

& and, | or, ~ not clause can be used to chain multiple conditions together

salesIND = salesDF.filter("Country = 'India'")
salesIND.select("Region", "Country", "Profit").filter("City='Agra'").show()
salesIND = salesDF.where("Country = 'India'")  # Where is an alias for filter
 
from pyspark.sql.functions import col
salesIND.select("Region", "Country", "Profit").filter(col("Profit") != '0').show()

Remove Duplicate Values

df.distinct().count()
df.dropDuplicates().show()

Convert RDD data to DF

lines = sc.textFile("file:///home/ak/spark-2.4.5-bin-hadoop2.7/examples/src/main/resources/people.txt")
parts=lines.map(lambda line : line.split(","))
 
from pyspark.sql import Row
 
people = parts.map(lambda p: Row(name=p[0],age=int(p[1])))
peopledf = spark.createDataFrame(people)
peopledf.show()

pySpark Dataframe Workbook.docx - Google Docs

PySpark_SQL_Cheat_Sheet_Python.pdf - Google Drive

The Most Complete Guide to pySpark DataFrames | Towards Data Science

pyspark.sql module — PySpark 2.4.0 documentation