Spark SQL and Storlets

This blog post introduces Spark-Storlets, an open source project for boosting Spark analytics workloads by offloading filtering tasks to Openstack Swift object store using Storlets.

Storlets Essentials

The Openstack Storlets project allows to execute dockerized computations inside Openstack Swift object store in a serverless fashion. With Storlets, an end user can upload the code as if it were just another data object. With the code stored as an object the user can then execute the code over other data objects. The user does not need to take care of any server side settings in order to execute code. We refer to the code being invoked on data objects as a storlet.

Storlets can be invoked during the upload, download or copy of an object. Let’s consider a storlet invocation during download. When downloading an object using a Storlet the user gets the storlet’s output over the object content rather than its original content. The invoked storlet gets the object content as an input stream and starts streaming out the computation result. Thus, storlets are well suited to “streaming” computations where typically, the output should start flowing out before all the input is read. Filtering is a good example for such a computation.

Spark Essentials

Spark is a most popular distributed in-memory compute engine. While Spark is a general purpose engine it has several modules for more specific use cases. One of those modules is Spark SQL which deals with structured data and is the one we focus on in this post. Data to Spark can be ingested from a variety of storage systems, including Openstack Swift. Object stores are cheap, massively scalable secondary storage systems for data that is no longer affordable to keep in primary storage. However, moving the data to a secondary storage does not need to imply that one must copy it back to the primary storage tier in order to query it. Spark-Storlets allows just that.

The Data Sources API

The data sources API is the way Spark exposes to developers a way to write code for importing structured data that is stored in various formats. One example is Databrick’s Spark-CSV package that implements the Data Sources API allowing to import CSV formatted data. The basic API (known as Scan) takes no parameters and is expected to return an RDD (which is the fundamental data structure Spark works with) whose entries are table rows that can be further manipulated using Spark SQL. More sophisticated versions of the API take selection and/or projection filters (These are called PrunedScan and PrunedFilteredScan). This means that Spark SQL can leverage the data source API implementation not only to parse a certain format (CSV in our example) but to also do some filtering while parsing the data. Spark-Storlets implements the PrunedFilteredScan API that takes both the selection and projection filters to essentially push down the filtering work to a storlet running in the object store. Spark-CSV returns an RDD whose rows consist only of the selected columns and the projected rows.

As mentioned above, filtering is well suited for storlets as it can be done efficiently on a stream and the data sources API is essentially an offloading hook. Indeed, the Spark-Storlets repository contains an implementation of the Data Sources API for CSV formatted data. The implementation leverages the CSVStorlet that can perform selection and projection filtering within the object store (the storlet is part of the Openstack Storlets repository).

Some Design Considerations

Independent of Spark-SQL and the Data Source API, Spark has the ability to read data from Swift using the Hadoop I/O layer. Moreover, there is also a choice of drivers to do so, e.g., IBM’s stocator and Hadoop’s Openstack Swift native file system driver. While it is tempting to leverage those existing ‘connectors’ also for ‘spark-storlets’ , attempting to do so reveals quite a few issues.

Partition Discovery

The first issue has to do with partition discovery. In the context of loading data from external storage, partition discovery allocates different partitions of the data to different Spark worker nodes. This essentially parallelizes the reading and processing of the data amongst the Spark nodes. When using Hadoop I/O, data partitioning is done according to the underlying driver reported chunk size. In HDFS the chunk size is a fundamental system wide parameter: Every file is divided to chunks of that size, and these chunks are spread over the HDFS cluster. Thus, in HDFS it only makes sense that data would be
consumed in chunks of that size. Object stores, however, are a different story: When we offload work to the object store it would make sense to partition taking into account the “amount of parallelism” that exists in the object store, e.g. the replication factor and the number of workers in each replica node.

The Broken Records Problem

Spark, being a distributed compute engine, partitions the data amongst different workers. This surfaces a classic distributed computing problem: Imagine a read only shared memory containing text records (e.g. logs), where different workers are assigned different partitions of the data and are not allowed to communicate. How can we make sure that records crossing a partition boundary are not lost? Apparently, knowing the maximum length of a record is enough to solve the problem: In case a worker reaches the end of its partition and it is still in the middle of a record it will read past the end of a partition up to the maximum length until it reaches a record separator. In addition, all workers except the one working on the first partition skip the beginning of their data chunk up to the first record separator.

This solution is implemented inside the Hadoop I/O layer. When offloading the filtering to the object store, different storlet instances will be processing different ranges of the data object, and so we clearly need to solve this also within the storlet. Thus, if we are to use Hadoop I/O it should be made aware of strolets.

Data Format

Relying on Hadoop I/O also limits the data formats that can be used to text based data. Consider the use case of EXIF metadata extraction from JPEGs. One can imagine a Data Source API implementation for JPEGs that would import a large collection of JPEGs as a table of their EXIF metadata. As JPEGs are binary data, they cannot be consumed using Hadoop I/O. Also, partitioning the dataset in this case should have a totally different approach then using chunk size.

Spark-Storlets Demonstration

Spark-Storlets was demonstrated in this Openstack conference talk. While the talk covers some more Storlets use cases, the link is pointed to the beginning of the spark-storlets part. This part is given by Yosef Moatti, my colleague at the time.

Beyond the Data Sources API

Storlets can do more then filtering, and there is no apparent reason why we should limit Spark offload to the Data Sources API. Why not provide the Spark programmer means to offload certain tasks explicitly? Practically, this means that for any type of “storlet compatible” functionality we need an RDD-Storlet
pair, where the RDD serves as an iterator over the storlet output. It would be interesting to find machine learning algorithms that can benefit from the type of computations that can be done using storlets.

Just another idea for Spark and Object store integration

While not directly related to Spark and Storlets, having an object store container listing as a Data Source seems like a compelling idea. Specifically, if/when the object tore supports metadata search. Having an RDD of a container listing that can be queried seems most useful. It can help reduce the number of objects on which we actually want to run storlets on.

There is, however, more to the idea which has to do with another partition discovery issue: When loading data from external storage it is possible to load many objects. Unlike filesystems, object stores are typically made of flat namespaces that can contain millions of objects. Current partition discovery for multiple objects in Spark relies on the ability to hold the container listing in memory while calculating the partitions. This does not scale. Having an RDD as a container listing might be of help. This implies, however, lazy or dynamic partitioning that seems to be supported only with Spark-Streaming. It would be fun to work on that one.


Posted

in

by

Tags:

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *