Unlocking the Power of Apache Spark: Essential Techniques for Data Manipulation and Analysis

Divith Raju
2 min readMar 30, 2024
 RDD
1. Word count
val a = sc.textFile("/word.txt")
val splitdata = a.flatMap(line => line.split(" "))
val mapdata = splitdata.map(word => (word,1));
val reducedata = mapdata.reduceByKey(_+_)
reducedata.collect()


Spark SQL

1. Spark SQL with Hive

val a =spark.sql("select * from default.patient");
a.show

2. registerTempTable

val schemaString = "id name dName gender amount"
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{ StructType, StructField, StringType };
val patient1 = sc.textFile("file:///home/cse-ad/datagen_10.txt")
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = patient1.map(_.split(",")).map(p => Row(p(0), p(1), p(2), p(3), p(4)))
val patientDf = sqlContext.createDataFrame(rowRDD, schema)
patientDf.registerTempTable("patient_spark")

3. Spark UDF

import spark.implicits._
val columns = Seq("sno","name")
val data = Seq(("1", "divithraju"),
("2", "deepak"),
("3", "rahul")
)
val df = data.toDF(columns:_*)
df.show(false)
val convertCase = (strQuote:String) => {
val arr = strQuote.split(" ")
arr.map(f=> f.substring(0,1).toUpperCase + f.substring(1,f.length)).mkString(" ")
}
val convertUDF = udf(convertCase)

df.select(col("sno"),convertUDF(col("name")).as("name") ).show(false)


// Using it on SQL
spark.udf.register("convertUDF", convertCase)
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select sno, convertUDF(name) from NAME_TABLE").show(false)


4. Spark Column WithColumn

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructType}
val data = Seq(("jaggu","","Bhai","2011-04-01","M",30000),
("Michael","madhan","","2015-05-19","M",40000),
("Robert","","Rome","2016-09-05","M",40000),
("Maria","sher","pova","2013-12-01","F",40000),
("matteo","Mary","marcin","2012-02-17","F",35000),
("santhi","","sagari","2012-02-17","F",52000),
("satya","sai","kumari","2012-02-17","F",50000))

val columns = Seq("first_name","middle_name","last_name","date of joining","gender","salary")
var df = data.toDF(columns:_*)
df.show()

// changing data type
println("changing dataType of a column")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
println("schema of DatFrame before cast")
df.printSchema()
df = df.withColumn("salary",col("salary").cast(DoubleType))
.withColumn("date of joining",(col("date of joining").cast(DateType)))
println("schema of DatFrame After cast")
df.printSchema()

//Adding a new column

println("adding a new column using withcolumn()")
//Let's have the increment of salary by 5 percent
df = df.withColumn("increment_in_salary",col("salary").multiply(5).divide(100))
df.show()

//Updating the value of an existing column

println("update the value of an existing col")
//updating values of salary column
// salary = salary + increment_in_salary
df = df.withColumn("salary",col("salary").plus(col("increment_in_salary")))
df.show()

//Dropping an unwanted column

println("dropping an unwanted col")
//increment_in_salary col is no more needed once the salary col updated with new salary values.
df = df.drop(col("increment_in_salary"))
df.show()


//Creating a new column from existing columns

println("creating a new column from existing col's")
//Here, we are creating full_name col
import org.apache.spark.sql.functions._
df = df.withColumn("full_name",concat_ws(" ",col("first_name"),col("middle_name"),col("last_name")))
df.show()


//SparkSql case clause using when() in withcolumn()

println("sparkSql case clause using when() in withcolumn()")
//Assigning M to Male and F to Female using When()
import org.apache.spark.sql.functions._
df = df.withColumn("gender",when(col("gender").equalTo("M"),lit("Male"))
.when(col("gender").equalTo("F"),lit("Female"))
.otherwise(lit("")))

df.show()

//Renaming a column using withColumnRenamed()

println("Renaming a column using withColumnRename()")
//Assigning M to Male and F to Female using When()
import org.apache.spark.sql.functions._
df = df.withColumnRenamed("gender","gender/sex")
df.show()

4. Spark Dedup

val df= spark.read.format("csv").option("header","false").load("file:///home/username/data.txt")
df.dropDuplicates("_c0").show
or
df.dropDuplicates().show

5. Spark Moving Average

import org.apache.spark.sql.expressions.Window

import spark.implicits._


val columns = Seq("Name","Role","Salary")

val data = Seq(("divithraju", "Developer","125000"),
("deppa","Developer","108000"),
("yamini","Developer","185000"),
("like","Developer","98000"),
("peter","Developer","144000"),
("kumar","Developer","110000"),
("rose","Tester","70000"),
("jon","Tester","65000"),
("bharath","Tester","82000"),
("saran","Tester","75000"))

val rdd = spark.sparkContext.parallelize(data)

val dfFromRDD1 = rdd.toDF("name","role","salary")
(or)
val dfFromRDD = spark.createDataFrame(rdd).toDF(columns:_*)

dfFromRDD.show()

val movAvg = dfFromRDD.withColumn("movingAverage", avg(dfFromRDD("Salary")).over( Window.partitionBy("role").rowsBetween(-1,1)) )
movAvg.show

“Thank you for reading! If you enjoyed this article and want to stay updated on my latest insights and projects, feel free to connect with me on LinkedIn.”

--

--

Divith Raju

LinkedIn Top Voice|| Big Data Engineer ||EX- Data Engineer@ Freelance ||EX -Data scientist @ Oasis Infobyte ||Entrepreneur ||