Spark SQL – DataFrames & Data Sources

A DataFrame is a distributed collection of data, which is organized into named columns. Conceptually, it is equivalent to relational tables with good optimization techniques.

A DataFrame can be constructed from an array of different sources such as Hive tables, Structured Data files, external databases, or existing RDDs. This API was designed for modern Big Data and data science applications taking inspiration from DataFrame in R Programming and Pandas in Python.

Features of DataFrame

Here is a set of few characteristic features of DataFrame −

  • Ability to process the data in the size of Kilobytes to Petabytes on a single node cluster to large cluster.
  • Supports different data formats (Avro, csv, elastic search, and Cassandra) and storage systems (HDFS, HIVE tables, mysql, etc).
  • State of art optimization and code generation through the Spark SQL Catalyst optimizer (tree transformation framework).
  • Can be easily integrated with all Big Data tools and frameworks via Spark-Core.
  • Provides API for Python, Java, Scala, and R Programming.

SQLContext

SQLContext is a class and is used for initializing the functionalities of Spark SQL. SparkContext class object (sc) is required for initializing SQLContext class object.

The following command is used for initializing the SparkContext through spark-shell.

$ spark-shell

By default, the SparkContext object is initialized with the name sc when the spark-shell starts.

Use the following command to create SQLContext.

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

Example

Let us consider an example of employee records in a JSON file named employee.json. Use the following commands to create a DataFrame (df) and read a JSON document named employee.json with the following content.

employee.json − Place this file in the directory where the current scala> pointer is located.

{
   {"id" : "1201", "name" : "subrat", "age" : "25"}
   {"id" : "1202", "name" : "zafurl", "age" : "28"}
   {"id" : "1203", "name" : "ajit", "age" : "39"}
   {"id" : "1204", "name" : "jay", "age" : "23"}
   {"id" : "1205", "name" : "devi", "age" : "23"}
}

DataFrame Operations

DataFrame provides a domain-specific language for structured data manipulation. Here, we include some basic examples of structured data processing using DataFrames.

Follow the steps given below to perform DataFrame operations −

Read the JSON Document

First, we have to read the JSON document. Based on this, generate a DataFrame named (dfs).

Use the following command to read the JSON document named employee.json. The data is shown as a table with the fields − id, name, and age.

scala> val dfs = sqlContext.read.json("employee.json")

Output − The field names are taken automatically from employee.json.

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

Show the Data

If you want to see the data in the DataFrame, then use the following command.

scala> dfs.show()

Output − You can see the employee data in a tabular format.

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | subrat |
| 28 | 1202 | zafurl|
| 39 | 1203 | ajit  |
| 23 | 1204 | jay  |
| 23 | 1205 | devi |
+----+------+--------+

Use printSchema Method

If you want to see the Structure (Schema) of the DataFrame, then use the following command.

scala> dfs.printSchema()

Output

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

Use Select Method

Use the following command to fetch name-column among three columns from the DataFrame.

scala> dfs.select("name").show()

Output − You can see the values of the name column.

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| subrat |
| zafurl|
| ajit |
| jay  |
| devi |
+--------+

Use Age Filter

Use the following command for finding the employees whose age is greater than 23 (age > 23).

scala> dfs.filter(dfs("age") > 23).show()

Output

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | subrat|
| 28 | 1202 | zafurl|
| 39 | 1203 | ajit  |
+----+------+--------+

Use groupBy Method

Use the following command for counting the number of employees who are of the same age.

scala> dfs.groupBy("age").count().show()

Output − two employees are having age 23.

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

Running SQL Queries Programmatically

An SQLContext enables applications to run SQL queries programmatically while running SQL functions and returns the result as a DataFrame.

Generally, in the background, SparkSQL supports two different methods for converting existing RDDs into DataFrames −

Sr. NoMethods & Description
1Inferring the Schema using ReflectionThis method uses reflection to generate the schema of an RDD that contains specific types of objects.
2Programmatically Specifying the SchemaThe second method for creating DataFrame is through programmatic interface that allows you to construct a schema and then apply it to an existing RDD.

Spark SQL – Data Sources

A DataFrame interface allows different DataSources to work on Spark SQL. It is a temporary table and can be operated as a normal RDD. Registering a DataFrame as a table allows you to run SQL queries over its data.

In this chapter, we will describe the general methods for loading and saving data using different Spark DataSources. Thereafter, we will discuss in detail the specific options that are available for the built-in data sources.

There are different types of data sources available in SparkSQL, some of which are listed below −

Sr. NoData Sources
1JSON DatasetsSpark SQL can automatically capture the schema of a JSON dataset and load it as a DataFrame.
2Hive TablesHive comes bundled with the Spark library as HiveContext, which inherits from SQLContext.
3Parquet FilesParquet is a columnar format, supported by many data processing systems.

Leave a Reply