-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMyExample.scala
More file actions
85 lines (75 loc) · 2.94 KB
/
MyExample.scala
File metadata and controls
85 lines (75 loc) · 2.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import cats.effect.{IO, IOApp, ExitCode}
import org.apache.spark.sql.{SparkSession, functions}
/**
* Custom example demonstrating Spark Connect Scala client.
*/
object MyExample extends IOApp:
override def run(args: List[String]): IO[ExitCode] =
SparkSession.builder()
.remote("sc://localhost:15002")
.appName("MyExample")
.build()
.use { spark =>
for
_ <- IO.println("=== My Custom Spark Connect Example ===\n")
// Get Spark version
version <- spark.version
_ <- IO.println(s"Connected to Spark version: $version\n")
// Create a DataFrame with numbers 0-99
_ <- IO.println("Creating a DataFrame with range 0-99:")
df <- spark.range(100)
_ <- df.show(10)
// Filter for even numbers less than 20
_ <- IO.println("\nFiltering for even numbers less than 20:")
evenNumbers = df
.filter(functions.col("id") < 20)
.filter(functions.col("id") % 2 === 0)
_ <- evenNumbers.show()
// Add computed columns
_ <- IO.println("\nAdding computed columns:")
withColumns = df
.filter(functions.col("id") < 10)
.withColumn("doubled", functions.col("id") * 2)
.withColumn("tripled", functions.col("id") * 3)
.withColumn("squared", functions.col("id") * functions.col("id"))
_ <- withColumns.show()
// Group and aggregate
_ <- IO.println("\nGrouping by id % 5 and computing aggregations:")
aggregated = df
.filter(functions.col("id") < 50)
.groupBy((functions.col("id") % 5).as("remainder"))
.agg(
functions.count(functions.lit(1)).as("count"),
functions.sum(functions.col("id")).as("sum"),
functions.avg(functions.col("id")).as("avg"),
functions.max(functions.col("id")).as("max"),
functions.min(functions.col("id")).as("min")
)
.sort(functions.col("remainder"))
_ <- aggregated.show()
// SQL query
_ <- IO.println("\nRunning a SQL query:")
sqlDf <- spark.sql("""
SELECT
id,
id * id as square,
id * id * id as cube
FROM range(10)
WHERE id >= 5
""")
_ <- sqlDf.show()
// Join example
_ <- IO.println("\nJoin example - matching IDs:")
leftDf <- spark.range(8)
left = leftDf.select(functions.col("id").as("left_id"))
rightDf <- spark.range(5, 12)
right = rightDf.select(functions.col("id").as("right_id"))
joined = left.join(
right,
functions.col("left_id") === functions.col("right_id"),
"inner"
)
_ <- joined.show()
_ <- IO.println("\n=== Example completed successfully! ===")
yield ExitCode.Success
}