Tips and Best Practices to Take Advantage of Spark 2.x

With Apache Spark 2.0 and later versions, big improvements were implemented to enable Spark to execute faster, making a lot of earlier tips and best practices obsolete. This post will first give a quick overview of what changes were made and then some tips to take advantage of these changes.

This post is an excerpt from the eBook Getting Started with Spark 2.x: From Inception to Production, which you can download to learn more about Spark 2.x.

Project Tungsten

Tungsten is the code name for the Spark project that makes changes to Apache Spark's execution engine, focusing on improvements to the efficiency of memory and CPU usage. Tungsten builds upon ideas from modern compilers and massively parallel processing (MPP) technologies, such as Apache Drill, Presto, and Apache Arrow.

Spark 2.x improvements include:

Tips for Taking Advantage of Spark 2.x Improvements

Use Dataset, DataFrames, Spark SQL

In order to take advantage of Spark 2.x, you should be using Datasets, DataFrames, and Spark SQL instead of RDDs. Datasets, DataFrames, and Spark SQL provide the following advantages:

When possible, use Spark SQL functions — for example, to_date(), hour() — instead of custom UDFs in order to benefit from the advantages above.

Datasets provide the advantage of compile-time type safety over DataFrames. However, Dataset functional transformations (like map) will not take advantage of query optimization, whole-stage code generation, and reduced GC. To learn more about Datasets, DataFrames, and Spark SQL, refer to Chapters 2 and 3 in the eBook.

Use the Best Data Store for Your Use Case

Spark supports several data formats, including CSV, JSON, ORC, Parquet, and several data sources or connectors, including distributed file stores such as MapR XD, Hadoop's HDFS, and Amazon's S3, popular NoSQL databases such as MapR Database and Apache HBase, and distributed messaging stores such as Apache Kafka and MapR Event Store for Kafka.

But just because Spark supports a given data storage or format doesn't mean you'll get the same performance with all of them. Typically, data pipelines will involve multiple data sources and sinks and multiple formats to support different use cases and different read/write latency requirements. Here are some guidelines:

CSV and JSON Tips and Best Practices

When persisting and compressing CSV and JSON files, make sure they are splittable, give high speeds, and yield reasonable compression. ZIP compression is not splittable, whereas Snappy is splittable; Snappy also gives reasonable compression with high speed. When reading CSV and JSON files, you will get better performance by specifying the schema instead of using inference; specifying the schema reduces errors for data types and is recommended for production code. See chapter 2 in the eBook for examples of specifying the schema on read.

Parquet Tips and Best Practices

Apache Parquet gives the fastest read performance with Spark. Parquet arranges data in columns, putting related values in close proximity to each other to optimize query performance, minimize I/O, and facilitate compression. Parquet detects and encodes the same or similar data, using a technique that conserves resources. Parquet also stores column metadata and statistics, which can be pushed down to filter columns (discussed below). Spark 2.x has a vectorized Parquet reader that does decompression and decoding in column batches, providing ~ 10x faster read performance.

Parquet files are immutable; modifications require a rewrite of the dataset. For streaming data, you can stream to a fast read/write data store, such as MapR Database, then extract data to Parquet files for specific analytic use cases or stream new datasets to a new partition (see partitioning below).

Parquet Partitioning

Spark table partitioning optimizes reads by storing files in a hierarchy of directories based on partitioning columns. For example, a directory structure could be organized by location, such as state/city, or by date, such as year/month, shown below:

DataFrames can be saved as persistent tables into a Hive metastore using the saveAsTable command. If you do not have Hive setup, Spark will create a default local Hive metastore (using Derby). Persistent tables have several optimization benefits: partition and statistic metadata, and they can be bucketed (discussed later).

As an example with the flight dataset (used in chapters 2, 3, 5, and 9 of the eBook), a lot of queries about departure delays are organized around the originating airport (the src column), so this could make a good partitioning column. Here is a JSON row from this Dataset:

{    
"id": "ATL_LGA_2017-01-01_AA_1678",
"dofW": 7,
"carrier": "AA",
"src": "ATL",
"dst": "LGA",
"crsdephour": 17,
"crsdeptime": 1700,
"depdelay": 0.0,
"crsarrtime": 1912,
"arrdelay": 0.0,
"crselapsedtime": 132.0,
"dist": 762.0
}

Here is the code to persist a flights DataFrame as a table consisting of Parquet files partitioned by the src column:

df.write.format("parquet")
.partitionBy("src")
.option("path", "/user/mapr/data/flights")
.saveAsTable("flights")

Below is the resulting directory structure as shown by a Hadoop list files command:

hadoop fs -ls /user/mapr/data/flights
  /user/mapr/data/flights/src=ATL
  /user/mapr/data/flights/src=BOS
  /user/mapr/data/flights/src=CLT
  /user/mapr/data/flights/src=DEN
  /user/mapr/data/flights/src=DFW
  /user/mapr/data/flights/src=EWR
  /user/mapr/data/flights/src=IAH
  /user/mapr/data/flights/src=LAX
  /user/mapr/data/flights/src=LGA
  /user/mapr/data/flights/src=MIA
  /user/mapr/data/flights/src=ORD
  /user/mapr/data/flights/src=SEA
  /user/mapr/data/flights/src=SFO

Below, we see that the src=DEN subdirectory contains two Parquet files:

hadoop fs -ls /user/mapr/data/flights/src=DEN

/user/mapr/data/flights/src=DEN/part-00000-deb4a3d4-d8c3-4983-8756-ad7e0b29e780.c000.snappy.parquet
/user/mapr/data/flights/src=DEN/part-00001-deb4a3d4-d8c3-4983-8756-ad7e0b29e780.c000.snappy.parquet

Partition Pruning and Predicate Pushdown

Partition pruning is a performance optimization that limits the number of files and partitions that Spark reads when querying. After partitioning the data, queries that match certain partition filter criteria improve performance by allowing Spark to only read a subset of the directories and files. When partition filters are present, the catalyst optimizer pushes down the partition filters. The scan reads only the directories that match the partition filters, thus reducing disk I/O. For example, the following query reads only the files in the src=DEN partition directory in order to query the average departure delay for flights originating from Denver.

df.filter("src = 'DEN' and depdelay > 1")
.groupBy("src", "dst").avg("depdelay")
.sort(desc("avg(depdelay)")).show()

result:
+---+---+------------------+
|src|dst|     avg(depdelay)|
+---+---+------------------+
|DEN|EWR|54.352020860495436|
|DEN|MIA| 48.95263157894737|
|DEN|SFO|47.189473684210526|
|DEN|ORD| 46.47721518987342|
|DEN|DFW|44.473118279569896|
|DEN|CLT|37.097744360902254|
|DEN|LAX|36.398936170212764|
|DEN|LGA| 34.59444444444444|
|DEN|BOS|33.633187772925766|
|DEN|IAH| 32.10775862068966|
|DEN|SEA|30.532345013477087|
|DEN|ATL| 29.29113924050633|
+---+---+------------------+

Or in SQL:

%sql
select src, dst, avg(depdelay)
from flights where src='DEN' and depdelay > 1
group by src, dst
ORDER BY src

You can see the physical plan for a DataFrame query in the Spark web UI SQL tab (discussed in chapter 3) or by calling the explain method shown below. Here in red, we see partition filter push down, which means that the src=DEN filter is pushed down into the Parquet file scan. This minimizes the files and data scanned and reduces the amount of data passed back to the Spark engine for the aggregation average on the departure delay.

df.filter("src = 'DEN' and depdelay > 1")
.groupBy("src", "dst").avg("depdelay")
.sort(desc("avg(depdelay)")).explain

== Physical Plan ==
TakeOrderedAndProject(limit=1001, orderBy=[avg(depdelay)#304 DESC NULLS LAST], output=[src#157,dst#149,avg(depdelay)#314])

+- *(2) HashAggregate(keys=[src#157, dst#149],
       functions=[avg(depdelay#152)],
       output=[src#157, dst#149, avg(depdelay)#304])

   +- Exchange hashpartitioning(src#157, dst#149, 200)

+- *(1) HashAggregate(keys=[src#157, dst#149],
              functions=[partial_avg(depdelay#152)],  output=[src#157,  dst#149,
              sum#321, count#322L])

+- *(1) Project[dst#149, depdelay#152, src#157]
     +- *(1)Filter (isnotnull(depdelay#152) && (depdelay#152 > 1.0))

+- *(1) FileScan parquet default.flights[dst#149,depdelay#152,src#157] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[maprfs:/user/mapr/data/flights/src=DEN], PartitionCount: 1, PartitionFilters: [isnotnull(src#157), (src#157 = DEN)], PushedFilters: [IsNotNull(depdelay), GreaterThan(depdelay,1.0)],
ReadSchema: struct<dst:string,depdelay:double>

The physical plan is read from the bottom up, whereas the DAG is read from the top down. Note: the Exchange means a shuffle occurred between stages.

Partitioning Tips

The partition columns should be used frequently in queries for filtering and should have a small range of values with enough corresponding data to distribute the files in the directories. You want to avoid too many small files, which make scans less efficient with excessive parallelism. You also want to avoid having too few large files, which can hurt parallelism.

Coalesce and Repartition

Before or when writing a DataFrame, you can use dataframe.coalesce(N) to reduce the number of partitions in a DataFrame, without shuffling, or df.repartition(N) to reorder and either increase or decrease the number of partitions with shuffling data across the network to achieve even load balancing.

df.write.format("parquet")
.repartition(13)
.partitionBy("src")
.option("path", "/user/mapr/data/flights")
.saveAsTable("flights")

Bucketing

Bucketing is another data organization technique that groups data with the same bucket value across a fixed number of "buckets." This can improve performance in wide transformations and joins by avoiding "shuffles." As covered in Chapter 3 of the eBook, with wide transformation shuffles, data is sent across the network to other nodes and written to disk, causing network and disk I/O and making the shuffle a costly operation. Below is a shuffle caused by a df.groupBy("carrier").count; if this dataset were bucketed by "carrier," then the shuffle could be avoided.

Bucketing is similar to partitioning, but partitioning creates a directory for each partition, whereas bucketing distributes data across a fixed number of buckets by a hash on the bucket value. Tables can be bucketed on more than one value and bucketing can be used with or without partitioning.

As an example with the flight dataset, here is the code to persist a flights DataFrame as a table, consisting of Parquet files partitioned by the src column and bucketed by the dst and carrier columns (sorting by the id will sort by the src, dst, flightdate, and carrier, since that is what the id is made up of):

df.write.format("parquet")
.sortBy("id")
.partitionBy("src")
.bucketBy(4,"dst","carrier")
.option("path", "/user/mapr/data/flightsbkdc")
.saveAsTable("flightsbkdc")

The resulting directory structure is the same as before, with the files in the src directories bucketed by dst and carrier. The code below computes statistics on the table, which can then be used by the Catalyst optimizer. Next, the partitioned and bucketed table is read into a new DataFrame df2.

spark.sql("ANALYZE TABLE flightsbkdc COMPUTE STATISTICS")
val df2  = spark.table("flightsbkdc")

Next, let's look at the optimizations for the following query:

df2.filter("src = 'DEN' and depdelay > 1")
.groupBy("src", "dst","carrier")
.avg("depdelay")
.sort(desc("avg(depdelay)")).show()

result:
+---+---+-------+------------------+
|src|dst|carrier|     avg(depdelay)|
+---+---+-------+------------------+
|DEN|EWR|     UA| 60.95841209829867|
|DEN|LAX|     DL|59.849624060150376|
|DEN|SFO|     UA|59.058282208588956|
. . .

Here again, we see partition filter and filter pushdown, but we also see that there is no "Exchange" like there was before bucketing, which means there was no shuffle to aggregate by src, dst, and carrier.

== Physical Plan ==
TakeOrderedAndProject(limit=1001, orderBy=[avg(depdelay)#491 DESC NULLS LAST], output=[src#460,dst#452,carrier#451,avg(depdelay)#504])

+- *(1) HashAggregate(keys=[src#460, dst#452, carrier#451], functions=[avg(depdelay#455)], output=[src#460, dst#452, carrier#451, avg(depdelay)#491])
 +- *(1) HashAggregate(keys=[src#460, dst#452, carrier#451], functions=[partial_avg(depdelay#455)], output=[src#460, dst#452, carrier#451, sum#512, count#513L])
  +- *(1)Project [carrier#451, dst#452, depdelay#455, src#460]

   +- *(1) Filter(isnotnull(depdelay#455) && (depdelay#455 > 1.0))

     +- *(1) FileScan parquet default.flightsbkdc
          [carrier#451,dst#452,depdelay#455,src#460]
         Batched: true, Format: Parquet, Location:     PrunedInMemoryFileIndex
         [maprfs:/user/mapr/data/flightsbkdc/src=DEN],
         PartitionCount: 1, PartitionFilters: [isnotnull(src#460), (src#460 = DEN)],
 PushedFilters: [IsNotNull(depdelay), GreaterThan(depdelay,1.0)],
         ReadSchema:
struct<carrier:string,dst:string,depdelay:double>

In the DAG below, we see that there is no exchange shuffle, and we see "Whole-Stage Java Code Generation," which optimizes CPU usage by generating a single optimized function in bytecode.

Bucketing Tips

Partitioning should only be used with columns that have a limited number of values; bucketing works well when the number of unique values is large. Columns which are used often in queries and provide high selectivity are good choices for bucketing. Spark tables that are bucketed store metadata about how they are bucketed and sorted, which optimizes:

MapR Database, Data Modeling, Partitioning, and Filter Pushdown

MapR Database Data Modeling: Partitioning and Row Key Design

With MapR Database, a table is automatically partitioned into tablets across a cluster by key range, providing for scalable and fast reads and writes by row key.

In this use case, the row key (the id) starts with the origin (destination airport codes), followed by the flightdate and carrier, so the table is automatically partitioned and sorted by the src, dst, date, and carrier.

MapR Database Data Modeling: Avoiding JOINS with Nested Entities

If your tables exist in a one-to-many relationship, it's possible to model it as a single document; this can avoid expensive JOINS. In the one-to-many relationship example below, we have an order table, which has a one-to-many relationship with an order items table.

Here is a nested entity example of this one-to-many relationship in a document database. In this example, the order and related line items are stored together and can be read together with a find on the row key (_id). This makes the reads a lot faster than joining tables together.

{
     “id”: “123”,
     “date”: “10/10/2017”,
     “ship_status”:”backordered”
     “orderitems”: \[
          {
               “itemid”: “4348”,
               “price”: 10.00
          },
          {
               “itemid”: “5648”,
               “price”: 15.00
          }]
}

See Data Modeling Guidelines for NoSQL JSON Document Databases and Guidelines for HBase Schema Design for more information on designing your MapR Database schema. (Nested Entities are also possible with JSON and Parquet files.)

Projection and Filter Pushdown Into MapR Database

Below, we see the physical plan for a DataFrame query, with projection and filter pushdown highlighted in red. This means that the scanning of the src, dst, and depdelay columns and the filter on the depdelay column are pushed down into MapR Database, meaning that the scanning and filtering will take place in MapR Database before returning the data to Spark. Projection pushdown minimizes data transfer between MapR Database and the Spark engine by omitting unnecessary fields from table scans. It is especially beneficial when a table contains many columns. Filter pushdown improves performance by reducing the amount of data passed between MapR Database and the Spark engine when filtering data.

df.filter("src = 'ATL' and depdelay > 1")
.groupBy("src", "dst")
.avg("depdelay").sort(desc("avg(depdelay)")).explain

== Physical Plan ==
*(3) Sort [avg(depdelay)#273 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(avg(depdelay)#273 DESC NULLS LAST, 200)
   +- *(2) HashAggregate(keys=[src#5, dst#6],
         functions=[avg(depdelay#9)])
      +- Exchange hashpartitioning(src#5, dst#6, 200)
         +- *(1) HashAggregate(keys=[src#5, dst#6],
            functions=[partial_avg(depdelay#9)])
            +- *(1) Filter (((isnotnull(src#5) &&
                isnotnull(depdelay#9)) &&
                            (src#5 = ATL)) && (depdelay#9 > 1.0))
               +- *(1) Scan MapRDBRelation(/user/mapr/flighttable [src#5,dst#6,depdelay#9] PushedFilters: [IsNotNull(src), IsNotNull(depdelay), EqualTo(src,ATL), GreaterThan(depdelay,1.0)]


Spark Web UI and SQL Tips

Read or review chapter 3 of the eBook in order to understand how to use the Spark Web UI to explore your task jobs, storage, and SQL query plan. Here is a summary of tips and what to look for:

SQL Tab

You can see details about the query plan produced by Catalyst on the web UI SQL tab. In the query plan details, you can check and see:

Use the Spark SQL "ANALYZE TABLE tablename COMPUTE STATISTICS" to take advantage of cost-based optimization in the Catalyst Planner.

Stages Tab

You can use the stage detail metrics to identify problems with an executor or task distribution. Things to look for:

Storage Tab

Caching Datasets can make execution faster if the data will be reused. You can use the storage tab to see if important Datasets are fitting into memory.

Executors Tab

You can use the executors tab to confirm that your application has the amount of resources needed.

References and More Information

 

 

 

 

Top