Posts 1-新建一个 Spark SQL 的工程
Post
Cancel

1-新建一个 Spark SQL 的工程

本文介绍如何新建一个 Spark SQL 的例子,Spark 采用 standalone 的 local 模式, 需要安装 scala

本文所演示的例子位于 https://github.com/adream307/SparkSQLWithCodegen/tree/master/code/new_spark_sql_project 目录内

新建目录结构

Spark SQL 工程的目录结构如下所示:

1
2
3
4
5
6
new_spark_sql_project/
|-- build.sbt
`-- src
    `-- main
        `-- scala
            `-- SparkSQLTest.scala

添加 scala 依赖库

编辑 new_spark_sql_project/build.sbt 内容如下

1
2
3
4
5
6
7
8
name := "SparkSQLTest"
  
version := "0.1"

scalaVersion := "2.12.11"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"

build.sbt 文件指定了 scala 的版本为 2.12.11 , spark 版本为 3.0.0

编写 Spark 程序

编辑 new_spark_sql_project/src/main/scala/SparkSQLTest.scala 内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
import org.apache.log4j.Logger
import org.apache.log4j.Level

object SparkSQLTest {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("spark sql test")
      .getOrCreate()

    val raw_data = Seq(
      Row(1.0, 2.0),
      Row(3.0, 4.0),
      Row(5.0, null),
      Row(null, 6.0),
      Row(null, null)
    )

    val sch = StructType(Array(StructField("x", DoubleType, true), StructField("y", DoubleType, true)))

    val df = spark.createDataFrame(spark.sparkContext.parallelize(raw_data), sch)
    df.createOrReplaceTempView("data_test")

    val test_sql = spark.sql("select x, y, power(x,y) from data_test")
    test_sql.show()
    spark.stop()

  }

}

运行 Spark 程序

进入 new_spark_sql_project/ 根目录,执行一下命令

1
sbt run

输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[info] running SparkSQLTest 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/06/04 06:36:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+----+----+-----------+
|   x|   y|POWER(x, y)|
+----+----+-----------+
| 1.0| 2.0|        1.0|
| 3.0| 4.0|       81.0|
| 5.0|null|       null|
|null| 6.0|       null|
|null|null|       null|
+----+----+-----------+

[success] Total time: 11 s, completed Jun 4, 2020 6:36:38 AM

Spark 程序讲解

1
Logger.getLogger("org").setLevel(Level.WARN)

设置当前日志级别为WARN,默认的日志级别为INFO


1
2
3
4
val spark = SparkSession.builder()
      .master("local[*]")
      .appName("spark sql test")
      .getOrCreate()

新建 SparkSession, 并设置其运行方式为 standalone 的 local 模式


1
2
3
4
5
6
7
8
9
10
11
12
val raw_data = Seq(
      Row(1.0, 2.0),
      Row(3.0, 4.0),
      Row(5.0, null),
      Row(null, 6.0),
      Row(null, null)
    )

val sch = StructType(Array(StructField("x", DoubleType, true), StructField("y", DoubleType, true)))

val df = spark.createDataFrame(spark.sparkContext.parallelize(raw_data), sch)
df.createOrReplaceTempView("data_test")

在 spark 内新建一个临时表 data_test,内容如下 x | y —|— 1.0 | 2.0 3.0 | 4.0 5.0 | null null | 6.0 null | null 其中 x,y的数据类型均为 double


1
2
val test_sql = spark.sql("select x, y, pow(x,y) from data_test")
test_sql.show()

在临时表 data_test 上查询数据,结果如下 x | y | POWER(x, y) —|—|— 1.0| 2.0| 1.0| 3.0| 4.0| 81.0| 5.0|null| null| null| 6.0| null| null|null| null|


1
spark.stop()

结果当前 Spark 任务并

This post is licensed under CC BY 4.0 by the author.
Contents