Integration with Cloud Infrastructures

Introduction

All major cloud providers offer persistent data storage in object stores. These are not classic “POSIX” file systems. In order to store hundreds of petabytes of data without any single points of failure, object stores replace the classic file system directory tree with a simpler model of object-name => data. To enable remote access, operations on objects are usually offered as (slow) HTTP REST operations.

Spark can read and write data in object stores through filesystem connectors implemented in Hadoop or provided by the infrastructure suppliers themselves. These connectors make the object stores look almost like file systems, with directories and files and the classic operations on them such as list, delete and rename.

Important: Cloud Object Stores are Not Real Filesystems

While the stores appear to be filesystems, underneath they are still object stores, and the difference is significant

They cannot be used as a direct replacement for a cluster filesystem such as HDFS except where this is explicitly stated.

Key differences are:

How does this affect Spark?

  1. Reading and writing data can be significantly slower than working with a normal filesystem.
  2. Some directory structures may be very inefficient to scan during query split calculation.
  3. The rename-based algorithm by which Spark normally commits work when saving an RDD, DataFrame or Dataset is potentially both slow and unreliable.

For these reasons, it is not always safe to use an object store as a direct destination of queries, or as an intermediate store in a chain of queries. Consult the documentation of the object store and its connector to determine which uses are considered safe.

Consistency

As of 2021, the object stores of Amazon (S3), Google Cloud (GCS) and Microsoft (Azure Storage, ADLS Gen1, ADLS Gen2) are all consistent.

This means that as soon as a file is written/updated it can be listed, viewed and opened by other processes -and the latest version will be retrieved. This was a known issue with AWS S3, especially with 404 caching of HEAD requests made before an object was created.

Even so: none of the store connectors provide any guarantees as to how their clients cope with objects which are overwritten while a stream is reading them. Do not assume that the old file can be safely read, nor that there is any bounded time period for changes to become visible -or indeed, that the clients will not simply fail if a file being read is overwritten.

For this reason: avoid overwriting files where it is known/likely that other clients will be actively reading them.

Other object stores are inconsistent

This includes OpenStack Swift.

Such stores are not always safe to use as a destination of work -consult each store’s specific documentation.

Installation

With the relevant libraries on the classpath and Spark configured with valid credentials, objects can be read or written by using their URLs as the path to data. For example sparkContext.textFile("s3a://landsat-pds/scene_list.gz") will create an RDD of the file scene_list.gz stored in S3, using the s3a connector.

To add the relevant libraries to an application’s classpath, include the hadoop-cloud module and its dependencies.

In Maven, add the following to the pom.xml file, assuming spark.version is set to the chosen version of Spark:

<dependencyManagement>
  ...
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hadoop-cloud_2.12</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
  </dependency>
  ...
</dependencyManagement>

Commercial products based on Apache Spark generally directly set up the classpath for talking to cloud infrastructures, in which case this module may not be needed.

Authenticating

Spark jobs must authenticate with the object stores to access data within them.

  1. When Spark is running in a cloud infrastructure, the credentials are usually automatically set up.
  2. spark-submit reads the AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN environment variables and sets the associated authentication options for the s3n and s3a connectors to Amazon S3.
  3. In a Hadoop cluster, settings may be set in the core-site.xml file.
  4. Authentication details may be manually added to the Spark configuration in spark-defaults.conf
  5. Alternatively, they can be programmatically set in the SparkConf instance used to configure the application’s SparkContext.

Important: never check authentication secrets into source code repositories, especially public ones

Consult the Hadoop documentation for the relevant configuration and security options.

Configuring

Each cloud connector has its own set of configuration parameters, again, consult the relevant documentation.

For object stores whose consistency model means that rename-based commits are safe use the FileOutputCommitter v2 algorithm for performance; v1 for safety.

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

This does less renaming at the end of a job than the “version 1” algorithm. As it still uses rename() to commit files, it is unsafe to use when the object store does not have consistent metadata/listings.

The committer can also be set to ignore failures when cleaning up temporary files; this reduces the risk that a transient network problem is escalated into a job failure:

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true

The original v1 commit algorithm renames the output of successful tasks to a job attempt directory, and then renames all the files in that directory into the final destination during the job commit phase:

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1

The slow performance of mimicked renames on Amazon S3 makes this algorithm very, very slow. The recommended solution to this is switch to an S3 “Zero Rename” committer (see below).

For reference, here are the performance and safety characteristics of different stores and connectors when renaming directories:

Store Connector Directory Rename Safety Rename Performance
Amazon S3 s3a Unsafe O(data)
Azure Storage wasb Safe O(files)
Azure Datalake Gen 2 abfs Safe O(1)
Google Cloud Storage gs Mixed O(files)
  1. As storing temporary files can run up charges; delete directories called "_temporary" on a regular basis.
  2. For AWS S3, set a limit on how long multipart uploads can remain outstanding. This avoids incurring bills from incompleted uploads.
  3. For Google cloud, directory rename is file-by-file. Consider using the v2 committer and only write code which generates idempotent output -including filenames, as it is no more unsafe than the v1 committer, and faster.

Parquet I/O Settings

For optimal performance when working with Parquet data use the following settings:

spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.hive.metastorePartitionPruning true

These minimise the amount of data read during queries.

ORC I/O Settings

For best performance when working with ORC data, use these settings:

spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.hive.metastorePartitionPruning true

Again, these minimise the amount of data read during queries.

Spark Streaming and Object Storage

Spark Streaming can monitor files added to object stores, by creating a FileInputDStream to monitor a path in the store through a call to StreamingContext.textFileStream().

  1. The time to scan for new files is proportional to the number of files under the path, not the number of new files, so it can become a slow operation. The size of the window needs to be set to handle this.

  2. Files only appear in an object store once they are completely written; there is no need for a workflow of write-then-rename to ensure that files aren’t picked up while they are still being written. Applications can write straight to the monitored directory.

  3. In case of the default checkpoint file manager called FileContextBasedCheckpointFileManager streams should only be checkpointed to a store implementing a fast and atomic rename() operation. Otherwise the checkpointing may be slow and potentially unreliable. On AWS S3 with Hadoop 3.3.1 or later using the S3A connector the abortable stream based checkpoint file manager can be used (by setting the spark.sql.streaming.checkpointFileManagerClass configuration to org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager) which eliminates the slow rename. In this case users must be extra careful to avoid the reuse of the checkpoint location among multiple queries running parallelly as that could lead to corruption of the checkpointing data.

Committing work into cloud storage safely and fast.

As covered earlier, commit-by-rename is dangerous on any object store which exhibits eventual consistency (example: S3), and often slower than classic filesystem renames.

Some object store connectors provide custom committers to commit tasks and jobs without using rename.

Hadoop S3A committers

In versions of Spark built with Hadoop 3.1 or later, the hadoop-aws JAR contains committers safe to use for S3 storage accessed via the s3a connector.

Instead of writing data to a temporary directory on the store for renaming, these committers write the files to the final destination, but do not issue the final POST command to make a large “multi-part” upload visible. Those operations are postponed until the job commit itself. As a result, task and job commit are much faster, and task failures do not affect the result.

To switch to the S3A committers, use a version of Spark was built with Hadoop 3.1 or later, and switch the committers through the following options.

spark.hadoop.fs.s3a.committer.name directory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

It has been tested with the most common formats supported by Spark.

mydataframe.write.format("parquet").save("s3a://bucket/destination")

More details on these committers can be found in the latest Hadoop documentation with S3A committer detail covered in Committing work to S3 with the S3A Committers.

Note: depending upon the committer used, in-progress statistics may be under-reported with Hadoop versions before 3.3.1.

Amazon EMR: the EMRFS S3-optimized committer

Amazon EMR has its own S3-aware committers for parquet data. For instructions on use, see the EMRFS S3-optimized committer

For implementation and performanc details, see [“Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer”](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/

Azure and Google cloud storage: MapReduce Intermediate Manifest Committer.

Versions of the hadoop-mapreduce-core JAR shipped after September 2022 (3.3.5 and later) contain a committer optimized for performance and resilience on Azure ADLS Generation 2 and Google Cloud Storage.

This committer, the “manifest committer” uses a manifest file to propagate directory listing information from the task committers to the job committer. These manifests can be written atomically, without relying on atomic directory rename, something GCS lacks.

The job committer reads these manifests and will rename files from the task output directories directly into the destination directory, in parallel, with optional rate limiting to avoid throttling IO. This delivers performance and scalability on the object stores.

It is not critical for job correctness to use this with Azure storage; the classic FileOutputCommitter is safe there -however this new committer scales better for large jobs with deep and wide directory trees.

Because Google GCS does not support atomic directory renaming, the manifest committer should be used where available.

This committer does support “dynamic partition overwrite” (see below).

For details on availability and use of this committer, consult the hadoop documentation for the Hadoop release used.

It is not available on Hadoop 3.3.4 or earlier.

IBM Cloud Object Storage: Stocator

IBM provide the Stocator output committer for IBM Cloud Object Storage and OpenStack Swift.

Source, documentation and releasea can be found at https://github.com/CODAIT/stocator.

Cloud Committers and INSERT OVERWRITE TABLE

Spark has a feature called “dynamic partition overwrite”; a table can be updated and only those partitions into which new data is added will have their contents replaced.

This is used in SQL statements of the form INSERT OVERWRITE TABLE, and when Datasets are written in mode “overwrite”

eventDataset.write
  .mode("overwrite")
  .partitionBy("year", "month")
  .format("parquet")
  .save(tablePath)

This feature uses file renaming and has specific requirements of both the committer and the filesystem:

  1. The committer’s working directory must be in the destination filesystem.
  2. The target filesystem must support file rename efficiently.

These conditions are not met by the S3A committers and AWS S3 storage.

Committers for other cloud stores may support this feature, and declare to spark that they are compatible. If dynamic partition overwrite is required when writing data through a hadoop committer, Spark will always permit this when the original FileOutputCommitter is used. For other committers, after their instantiation, Spark will probe for their declaration of compatibility, and permit the operation if state that they are compatible.

If the committer is not compatible, the operation will fail with the error message PathOutputCommitter does not support dynamicPartitionOverwrite

Unless there is a compatible committer for the target filesystem, the sole solution is to use a cloud-friendly format for data storage.

Further Reading

Here is the documentation on the standard connectors both from Apache and the cloud providers.

The Cloud Committer problem and hive-compatible solutions