The ability to read and write from different kinds of data sources and for the community to create its own contributions is arguably one of Spark’s greatest strengths.
As a general computing engine, Spark can process data from various data management/storage systems, including HDFS, Hive, Cassandra, and Kafka. For flexibility and high throughput, Spark defines the Data Source API, which is an abstraction of the storage layer.
This Data Source API has two requirements:
This article will give you a better understanding of all the core data sources available. You will be introduced to a variety of data sources that you can use with Spark out of the box as well as the countless other sources built by the greater community. Spark has six “core” data sources and hundreds of external data sources written by the community.
Here are the core data sources in Apache Spark you should know about:
1.CSV
2.JSON
3.Parquet
4.ORC
5.JDBC/ODBC connections
6.Plain-text files
1. Cassandra
2. HBase
3. MongoDB
4. AWS Redshift
5. XML
And many, many others
DataFrameReader.format(...).option("key", "value").schema(...).load()
where .format is used to read all data sources.
.format is optional as by default Spark will use parquet format. The Option allows us to set the key-value configuration to parameterize how data has to be read.
Lastly, the schema is optional if data sources provide schema or you intend to provide schema inference.
The basic of reading data in Spark is through DataFrameReader. This can be accessed through SparkSession through the read attribute shown below:
spark.read
As we have DataFrameReader, we can specify multiple values. There are multiple sets of options for different data sources which determines how the data has to be read. All the options can be omitted except one. At the minimum DataFrameReader should be provided with the path from which files to read.
spark.read.format("csv") .option("mode", "FAILFAST") .option("inferSchema", "true") .option("path", "path/to/file(s)") .schema(someSchema) .load()
When working with semi-structured data sources more often we come across data that is malformed. Read mode specifies what needs to be done when such data is encountered.
permissive | Sets all fields to null when it encounters a corrupted record and places all corrupted records in a string column
called _corrupt_record |
dropMalformed | Drops the row that contains malformed records |
failFast | Fails immediately upon encountering malformed records |
The default is permissive.
DataFrameWriter.format(...) .option(...) .partitionBy(...) .bucketBy(...) .sortBy(...) .save()
.format specified how the file needs to be written to the data sources.
.option is optional as Spark uses parquet by default.
.PartitionBy, .bucketBy, .sortBy are only used with file-based data sources and control the file structure otr layout at the destination.
Writing data is the same as reading data. Only, DataFrameReader is replaced by DataFrameWriter.
dataFrame.write
With the DataFrameWriter we need to give format, series of options, and save path. We can specify many options but at the minimum, we need to give the destination path.
dataframe.write.format("csv") .option("mode", "OVERWRITE") .option("dateFormat", "yyyy-MM-dd") .option("path", "path/to/file(s)") .save()
append | Appends the output files to the list of files that already exist at that location |
overwrite | Will completely overwrite any data that already exists there |
errorIfExists | Throws an error and fails the write if data or files already exist at the specified location |
ignore | If data or files exist at the location, do nothing with the current DataFrame |
errorIfExists fails to write the data if Spark finds data present in the destination path.
CSV stands for comma-separated values. This is a common text file format in which each line represents a single record and each field is separated by a comma within a record. CSV format is well structured but maybe one of the trickiest file formats to work within the production scenarios because not many assumptions can be made about what they contain and how they are structured.
For this reason, CSV reader has a large number of options. These options give you the ability to work around issues like certain characters needing to be escaped—for example, commas inside of columns when the file is also comma-delimited or null values labeled in an unconventional way.
spark.read.format("csv") .option("header", "true") .option("mode", "FAILFAST") .option("inferSchema", "true") .load("some/path/to/file.csv")
If you have a header with column names on file, you need to explicitly specify true
for the header option, the API treats the header as a data record.
You can also specify data sources with their fully qualified name(i.e., org.apache.spark.sql.csv
), but for built-in sources, you can also use their short names (csv
,json
, parquet
, jdbc
, text
e.t.c).
When reading CSV files with a specified schema, it is possible that the data in the files does not match the schema. For example, a field containing the name of the city will not parse as an integer. The consequences depend on the mode that the parser runs in:
PERMISSIVE
(default): nulls are inserted for fields that could not be parsed correctlyDROPMALFORMED
: drops lines that contain fields that could not be parsedFAILFAST
: aborts the reading if any malformed data is found.
The table below presents the options available on CSV reader:
Read/write | Key | Potential values | Default | Description |
Both | sep | Any single string
character |
, | The single character that is used as a separator for each field and value. |
Both | header | true, false | false | A Boolean flag that declares whether the first line in the file(s) are the names of the columns. |
Read | escape | Any string character | \ | The character Spark should use to escape other characters in the file. |
Read | inferSchema | true, false | false | Specifies whether Spark should infer column types when reading the file. |
Read | ignoreLeadingWhiteSpace | true, false | false | Declares whether leading spaces from values being read should be skipped. |
Read | ignoreTrailingWhiteSpace | true, false | false | Declares whether trailing spaces from values being read should be skipped. |
Both | nullValue | JSON data source options: Any string character | “” | Declares what character represents a null value in the file. |
Both | nanValue | Any string character | NaN | Declares what character represents a NaN or missing character in the CSV file. |
Both | positiveInf | Any string or
character |
Inf | Declares what character(s) represent a positive infinite value. |
Both | negativeInf | Any string or
character |
-Inf | Declares what character(s) represent a negative infinite value. |
Both | compression or codec | None, uncompressed,
bzip2, deflate, gzip, lz4, or snappy |
none | Declares what compression codec Spark should use to read or write the file. |
Both | dateFormat | Any string or
character that conforms to java’s SimpleDataFormat. |
YYYY-MM-dd | Declares the date format for any columns that are date type. |
Both | timestampFormat | Any string or
character that conforms to java’s SimpleDataFormat. |
YYYY-MM-
dd’T’HH:mm :ss.SSSZZ |
Declares the timestamp format for any timestamp type. |
Read | maxColumns | Any integer | 20480 | Declares the maximum number of columns in the file. |
Read | maxCharsPerColumn | Any integer | 1000000 | Declares the maximum number of characters in a column. |
Read | escapeQuotes | true, false | true | Declares whether Spark should escape quotes that are found in lines. |
Read | maxMalformedLogPerPartition | Any integer | 10 | Sets the maximum number of malformed rows Spark will log for each partition. Malformed records beyond this number will be ignored. |
Write | quoteAll | true, false | false | Specifies whether all values should be enclosed in quotes, as opposed to just escaping values that have a quote character. |
Read | multiLine | true, false | false | This option allows you to read multiline CSV files where each logical row in the CSV file might span multiple rows in the file itself. |
csvFile.write.format("csv") .mode("overwrite") .option("sep", "\t")\ .save("/tmp/my-tsv-file.tsv")
People coming from different programming languages especially from Java and JavaScript must be aware of JavaScript Object Notation, or JSON by which it’s popularly known. In Spark, when we refer to JSON files, we refer to line-delimited JSON files. This contrasts with files that have a large JSON object or array per file.
When working in Spark when we refer to JSON files, we refer to the line delimited JSON files. The line-delimited versus multiline trade-off is controlled by a single option: multiLine. When you
set this option to true, you can read an entire file as one JSON object and Spark will go through the work of parsing that into a DataFrame.
Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. This conversion can be done using SparkSession.read.json
a JSON file. Note that the file that is offered as a JSON file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object.
For more information, please see JSON Lines text format, also called newline-delimited JSON.
JSON data source options:
Any single string
Read/write | Key | Potential values | Default | Description |
Both | None | None, uncompressed, bzip2, deflate,gzip, lz4, or snappy | none | DeclDeclares what compression codec Spark should use to read or write the file. |
Both | dateFormat | Any string or character that conforms to Java’s yyyy-MM-dd SimpleDataFormat. | Declares the date format for any columns that are date type. | |
Both | dateFormat | Any string or character that conforms to Java’s yyyy-MM-dd SimpleDataFormat. | Declares the date format for any columns that are date type. | |
Both | primitiveAsString | true, false | false | Infers all primitive values as string type. |
Both | timestampFormat | Any string or character that conforms to Java’s yyyy-MM-dd’T’HH:mm:ss.SSSZZ SimpleDataFormat. | Declares the timestamp format for any columns that are timestamp type. | |
Read | allowComments | true, false | false | Ignores Java/C++ style comment in JSON records. |
Read | allowUnquoted-
FieldNames |
true, false | false | Allows unquoted JSON field names. |
Read | allowSingleQuotes | true, false | true | Allows single quotes in addition to double quotes. |
Read | multiLine | true, false | false | Allows for reading in non-line- delimited JSON files. |
Read | allowNumeric-
LeadingZeros |
true, false | false | Allows leading zeroes in numbers (e.g., 00012). |
Read | allowBackslash-
EscapingAnyCharacter |
true, false | false | Allows accepting quoting of all characters using backslash quoting mechanism. |
Read | columnNameOf-
CorruptRecord |
Any string | Value of spark.sql.column&NameOfCorruptRecord | Allows renaming the new field having a malformed string created Value of by permissive mode. This will override the configuration value. |
spark.read.format("json") .option("mode", "FAILFAST")\ .option("inferSchema", "true")\ .load("/data/movie-data/json/2010-summary.json")
csvFile.write.format("json") .mode("overwrite") .save("/tmp/my-json-file.json")
Parquet is an open-source file format available to any project in the Hadoop ecosystem. Apache Parquet is designed for efficiency as well as the performant flat columnar storage format of data compared to row-based files like CSV or TSV files.
Parquet uses the record shredding and assembly algorithm which is superior to the simple flattening of nested namespaces. Parquet is optimized to work with complex data in bulk and features different ways for efficient data compression and encoding types. This approach is best especially for those queries that need to read certain columns from a large table. Parquet can only read the needed columns therefore greatly minimizing the IO.
spark.read.format("parquet")\ .load("/data/movie-data/parquet/2020-summary.parquet").show(5)
csvFile.write.format("parquet") .mode("overwrite")\ .save("/tmp/my-parquet-file.parquet"
The Optimized Row Columnar (ORC) file format provides a highly efficient way to store Hive data. It was designed to overcome the limitations of the other Hive file formats. Using ORC files improves performance when Hive is reading, writing, and processing data.
Compared with RCFile format, for example, ORC file format has many advantages such as:
spark.read.format("orc") .load("/data/movie-data/orc/2020-summary.orc").show(5)
csvFile.write.format("orc") .mode("overwrite") .save("/tmp/my-json-file.orc"
Spark also allows you to read in plain-text files. Each line in the file becomes a record in the DataFrame. It is then up to you to transform it accordingly. As an example of how you would do this, suppose that you need to parse some Apache log files to some more structured format, or perhaps you want to parse some plain text for natural-language processing.
Text files make a great argument for the Dataset API due to its ability to take advantage of the flexibility of native types.
spark.read.textFile("/data/movie-data/csv/2020-summary.csv") .selectExpr("split(value, ',') as rows").show()
csvFile.select("DEST_COUNTRY_NAME") .write.text("/tmp/simple-text-file.txt"
SQL data sources are one of the more powerful connectors because there are a variety of systems to which you can connect (as long as that system speaks SQL). For instance, you can connect to a MySQL database, a PostgreSQL database, or an Oracle database. You also can connect to SQLite, which is what we’ll do in this example.
Of course, databases aren’t just a set of raw files, so there are more options to consider regarding how you connect to the database. Namely, you’re going to need to begin considering things like authentication and connectivity (you’ll need to determine whether the network of your Spark cluster is connected to the network of your database system).
To get started you will need to include the JDBC driver for your particular database on the spark classpath. For example, to connect to Postgres from the Spark Shell you would run the following command:
./bin/spark-shell \
--driver-class-path postgresql-9.4.1207.jar \
--jars postgresql-9.4.1207.jar
Tables from external or remote databases can be loaded as a Dataframe or temporary view using data source API.
jdbcDF = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .load()
jdbcDF.write \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .save()
We have discussed a variety of sources available to you for reading and writing data in Spark. This covers nearly everything you’ll need to know as an everyday user of Spark with respect to data sources. For the curious, there are ways of implementing your own data source.
I recommend you go through the following data engineering resources to enhance your knowledge-
I hope you liked the article. Do not forget to drop in your comments in the comments section below.