Data processing experiment - Part 2
The one where I spike spark sql
The code for this project is available in GitHub - I’m using a branch for each part and merging each part into the latest branch. See the ReadMe.md in each branch for the write up.
Establishing Spark capability
Spark SQL lets us query CSV files in a directory as if it was a database.
First I need some data - in this oversimplified example I'm just going to pretend we have some CSV files containing bank transactions:
date,account,description,amount,comment
2020-01-01, 1, burger, 15.45,
2020-01-02, 1, movie, 20.00,
2020-01-03, 1, tennis, 35.00,
2020-01-04, 2, petrol, 150.45, This gives me an easy-to-understand domain and some basic structure. See the ./data/sample1/statements/ folder.
The first thing we need is a spark session:
// spark setup
val config = SparkConf().setAppName("spike").setMaster("local")
val sparkSession = SparkSession.builder().config(config).orCreate Now we can load the statements folder into a data frame:
// load raw data frame
val statementsDataFrame = sparkSession.read()
.format("csv")
.option("header", true)
.load("../data/sample1/statements")
.alias("statements")
println("Statements data frame")
statementsDataFrame.show(20)
statementsDataFrame.printSchema() This gives us our RAW dataframe, exactly as it is in the CSV files. Now I need to
select only the columns I want, discarding any data that isn't needed
type the columns appropriately
This could be done in one step, but I'm doing it as two discrete steps here to call it out as part of the logical process.
// only select the columns needed so we can exclude data we don't need here
val selectedDataFrame = statementsDataFrame.select(
functions.col("date"),
functions.col("account"),
functions.col("description"),
functions.col("amount"),
)
println("Selected data frame")
selectedDataFrame.show(20)
selectedDataFrame.printSchema()
// convert to typed columns
val typedDataFrame = selectedDataFrame.select(
functions.to_date(functions.col("date"), "yyyy-MM-dd").alias("amount"),
functions.col("account"),
functions.col("description"),
functions.col("amount").cast("double").alias("amount")
)
println("Typed data frame")
typedDataFrame.show(20)
typedDataFrame.printSchema() And there we have it - a basic runnable spark sql capability.
Running the application we can see the output from each stage:
7:04:51 pm: Executing 'run --stacktrace'...
> Task :app:checkKotlinGradlePluginConfigurationErrors
> Task :spark:checkKotlinGradlePluginConfigurationErrors
> Task :spark:compileKotlin UP-TO-DATE
> Task :spark:compileJava NO-SOURCE
> Task :spark:processResources NO-SOURCE
> Task :spark:classes UP-TO-DATE
> Task :spark:jar UP-TO-DATE
> Task :app:compileKotlin UP-TO-DATE
> Task :app:compileJava NO-SOURCE
> Task :app:processResources UP-TO-DATE
> Task :app:classes UP-TO-DATE
> Task :app:run
Starting...
19:04:52.071 [main] WARN o.a.hadoop.util.NativeCodeLoader MDC= - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
SLF4J: Class path contains multiple SLF4J providers.
SLF4J: Found provider [ch.qos.logback.classic.spi.LogbackServiceProvider@776b83cc]
SLF4J: Found provider [org.apache.logging.slf4j.SLF4JServiceProvider@37858383]
SLF4J: See https://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual provider is of type [ch.qos.logback.classic.spi.LogbackServiceProvider@776b83cc]
Statements data frame
+------------+-------+------------+-------+--------------------+
| date|account| description| amount| comment|
+------------+-------+------------+-------+--------------------+
| 2020-13-01| 1| burger| 0.01| invalid date|
|invalid date| 1| petrol| 0.02| invalid date|
| NULL| 1| tennis| 0.03| no date|
| 2020-12-01| | tennis| 0.04| no account|
| 2020-12-01| 1| petrol| x| invalid number f...|
| 2020-02-01| 1| burger| 15.46| NULL|
| 2020-02-02| 1| movie| 20.01| NULL|
| 2020-02-03| 1| tennis| 35.01| NULL|
| 2020-02-04| 2| petrol| 150.46| NULL|
| 2020-02-04| 2| electricity| 300.47| NULL|
| 2020-01-01| 1| burger| 15.45| NULL|
| 2020-01-02| 1| movie| 20.00| NULL|
| 2020-01-03| 1| tennis| 35.00| NULL|
| 2020-01-04| 2| petrol| 150.45| NULL|
| 2020-03-01| 1| burger| 15.47| NULL|
| 2020-03-03| 1| tennis| 35.03| NULL|
| 2020-03-04| 2| petrol| 150.47| NULL|
+------------+-------+------------+-------+--------------------+
root
|-- date: string (nullable = true)
|-- account: string (nullable = true)
|-- description: string (nullable = true)
|-- amount: string (nullable = true)
|-- comment: string (nullable = true)
Selected data frame
+------------+-------+------------+-------+
| date|account| description| amount|
+------------+-------+------------+-------+
| 2020-13-01| 1| burger| 0.01|
|invalid date| 1| petrol| 0.02|
| NULL| 1| tennis| 0.03|
| 2020-12-01| | tennis| 0.04|
| 2020-12-01| 1| petrol| x|
| 2020-02-01| 1| burger| 15.46|
| 2020-02-02| 1| movie| 20.01|
| 2020-02-03| 1| tennis| 35.01|
| 2020-02-04| 2| petrol| 150.46|
| 2020-02-04| 2| electricity| 300.47|
| 2020-01-01| 1| burger| 15.45|
| 2020-01-02| 1| movie| 20.00|
| 2020-01-03| 1| tennis| 35.00|
| 2020-01-04| 2| petrol| 150.45|
| 2020-03-01| 1| burger| 15.47|
| 2020-03-03| 1| tennis| 35.03|
| 2020-03-04| 2| petrol| 150.47|
+------------+-------+------------+-------+
root
|-- date: string (nullable = true)
|-- account: string (nullable = true)
|-- description: string (nullable = true)
|-- amount: string (nullable = true)
Typed data frame
+----------+-------+------------+------+
| amount|account| description|amount|
+----------+-------+------------+------+
| NULL| 1| burger| 0.01|
| NULL| 1| petrol| 0.02|
| NULL| 1| tennis| 0.03|
|2020-12-01| | tennis| 0.04|
|2020-12-01| 1| petrol| NULL|
|2020-02-01| 1| burger| 15.46|
|2020-02-02| 1| movie| 20.01|
|2020-02-03| 1| tennis| 35.01|
|2020-02-04| 2| petrol|150.46|
|2020-02-04| 2| electricity|300.47|
|2020-01-01| 1| burger| 15.45|
|2020-01-02| 1| movie| 20.0|
|2020-01-03| 1| tennis| 35.0|
|2020-01-04| 2| petrol|150.45|
|2020-03-01| 1| burger| 15.47|
|2020-03-03| 1| tennis| 35.03|
|2020-03-04| 2| petrol|150.47|
+----------+-------+------------+------+
root
|-- amount: date (nullable = true)
|-- account: string (nullable = true)
|-- description: string (nullable = true)
|-- amount: double (nullable = true)
Finished...
BUILD SUCCESSFUL in 3s
7 actionable tasks: 3 executed, 4 up-to-date
7:04:54 pm: Execution finished 'run --stacktrace'.You should see in the output dataframes that invalid values (those that couldn't be converted to types) come through as NULL.
Note that in order to run on Java 17 it was necessary to add some exports as JVM arguments:
application {
// Define the main class for the application.
mainClass = "com.example.dataprocessingexperiment.app.AppKt"
// spark Java17 Compatible JvmArgs
applicationDefaultJvmArgs = listOf(
"--add-exports=java.base/java.lang=ALL-UNNAMED",
"--add-exports=java.base/java.lang.invoke=ALL-UNNAMED",
"--add-exports=java.base/java.lang.reflect=ALL-UNNAMED",
"--add-exports=java.base/java.io=ALL-UNNAMED",
"--add-exports=java.base/java.net=ALL-UNNAMED",
"--add-exports=java.base/java.nio=ALL-UNNAMED",
"--add-exports=java.base/java.util=ALL-UNNAMED",
"--add-exports=java.base/java.util.concurrent=ALL-UNNAMED",
"--add-exports=java.base/java.util.concurrent.atomic=ALL-UNNAMED",
"--add-exports=java.base/sun.nio.ch=ALL-UNNAMED",
"--add-exports=java.base/sun.nio.cs=ALL-UNNAMED",
"--add-exports=java.base/sun.security.action=ALL-UNNAMED",
"--add-exports=java.base/sun.util.calendar=ALL-UNNAMED",
"--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED",
)
} This is done for the application in app/build.gradle.kts and for tests in spark/build.gradle.kts.
View the code here: https://github.com/prule/data-processing-experiment/tree/part-2

