Unlocking the Power of Apache Spark: Essential Techniques for Data Manipulation and Analysis
2 min readMar 30, 2024
RDD
1. Word count
val a = sc.textFile("/word.txt")
val splitdata = a.flatMap(line => line.split(" "))
val mapdata = splitdata.map(word => (word,1));
val reducedata = mapdata.reduceByKey(_+_)
reducedata.collect()
Spark SQL
1. Spark SQL with Hive
val a =spark.sql("select * from default.patient");
a.show
2. registerTempTable
val schemaString = "id name dName gender amount"
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{ StructType, StructField, StringType };
val patient1 = sc.textFile("file:///home/cse-ad/datagen_10.txt")
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = patient1.map(_.split(",")).map(p => Row(p(0), p(1), p(2), p(3), p(4)))
val patientDf = sqlContext.createDataFrame(rowRDD, schema)
patientDf.registerTempTable("patient_spark")
3. Spark UDF
import spark.implicits._
val columns = Seq("sno","name")
val data = Seq(("1", "divithraju"),
("2", "deepak"),
("3", "rahul")
)
val df = data.toDF(columns:_*)
df.show(false)
val convertCase = (strQuote:String) => {
val arr = strQuote.split(" ")
arr.map(f=> f.substring(0,1).toUpperCase + f.substring(1,f.length)).mkString(" ")
}
val convertUDF = udf(convertCase)
df.select(col("sno"),convertUDF(col("name")).as("name") ).show(false)
// Using it on SQL
spark.udf.register("convertUDF", convertCase)
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select sno, convertUDF(name) from NAME_TABLE").show(false)
4. Spark Column WithColumn
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructType}
val data = Seq(("jaggu","","Bhai","2011-04-01","M",30000),
("Michael","madhan","","2015-05-19","M",40000),
("Robert","","Rome","2016-09-05","M",40000),
("Maria","sher","pova","2013-12-01","F",40000),
("matteo","Mary","marcin","2012-02-17","F",35000),
("santhi","","sagari","2012-02-17","F",52000),
("satya","sai","kumari","2012-02-17","F",50000))
val columns = Seq("first_name","middle_name","last_name","date of joining","gender","salary")
var df = data.toDF(columns:_*)
df.show()
// changing data type
println("changing dataType of a column")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
println("schema of DatFrame before cast")
df.printSchema()
df = df.withColumn("salary",col("salary").cast(DoubleType))
.withColumn("date of joining",(col("date of joining").cast(DateType)))
println("schema of DatFrame After cast")
df.printSchema()
//Adding a new column
println("adding a new column using withcolumn()")
//Let's have the increment of salary by 5 percent
df = df.withColumn("increment_in_salary",col("salary").multiply(5).divide(100))
df.show()
//Updating the value of an existing column
println("update the value of an existing col")
//updating values of salary column
// salary = salary + increment_in_salary
df = df.withColumn("salary",col("salary").plus(col("increment_in_salary")))
df.show()
//Dropping an unwanted column
println("dropping an unwanted col")
//increment_in_salary col is no more needed once the salary col updated with new salary values.
df = df.drop(col("increment_in_salary"))
df.show()
//Creating a new column from existing columns
println("creating a new column from existing col's")
//Here, we are creating full_name col
import org.apache.spark.sql.functions._
df = df.withColumn("full_name",concat_ws(" ",col("first_name"),col("middle_name"),col("last_name")))
df.show()
//SparkSql case clause using when() in withcolumn()
println("sparkSql case clause using when() in withcolumn()")
//Assigning M to Male and F to Female using When()
import org.apache.spark.sql.functions._
df = df.withColumn("gender",when(col("gender").equalTo("M"),lit("Male"))
.when(col("gender").equalTo("F"),lit("Female"))
.otherwise(lit("")))
df.show()
//Renaming a column using withColumnRenamed()
println("Renaming a column using withColumnRename()")
//Assigning M to Male and F to Female using When()
import org.apache.spark.sql.functions._
df = df.withColumnRenamed("gender","gender/sex")
df.show()
4. Spark Dedup
val df= spark.read.format("csv").option("header","false").load("file:///home/username/data.txt")
df.dropDuplicates("_c0").show
or
df.dropDuplicates().show
5. Spark Moving Average
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val columns = Seq("Name","Role","Salary")
val data = Seq(("divithraju", "Developer","125000"),
("deppa","Developer","108000"),
("yamini","Developer","185000"),
("like","Developer","98000"),
("peter","Developer","144000"),
("kumar","Developer","110000"),
("rose","Tester","70000"),
("jon","Tester","65000"),
("bharath","Tester","82000"),
("saran","Tester","75000"))
val rdd = spark.sparkContext.parallelize(data)
val dfFromRDD1 = rdd.toDF("name","role","salary")
(or)
val dfFromRDD = spark.createDataFrame(rdd).toDF(columns:_*)
dfFromRDD.show()
val movAvg = dfFromRDD.withColumn("movingAverage", avg(dfFromRDD("Salary")).over( Window.partitionBy("role").rowsBetween(-1,1)) )
movAvg.show
“Thank you for reading! If you enjoyed this article and want to stay updated on my latest insights and projects, feel free to connect with me on LinkedIn.”