Make your Spark jobs reliable and fast — use shuffle correctly.
Moving data between cluster nodes is very expensive. So try to reduce number of shuffles to minimum. Eg. in some cases sort-merge join can be replaced by broadcast join. If the data you want to join are too big for single broadcast join (the limit for single broadcast join is 8 GB) you can use several consecutive broadcast joins.
Sometimes you can avoid shuffle when using multiple
groupBys or multiple windows with window functions. Eg. if you call
groupBy($"userId", $"hour") and later
groupBy($"userId") then Spark will do 2 shuffles because after the first
groupBy records for certain user can be on multiple executors (so second
groupBy needs to redistribute records to ensure that records for single user are on single executor). On the other hand if you do first
groupBy($"userId") and later
groupBy($"userId", $"hour") then 1 shuffle may be enough.
If you need shuffle then shuffle only the data you need. Maybe some columns can be computed after shuffling. Eg. you already have column
timestamp and you want to add columns
day then do so after the shuffle.
Even if you already computed those columns and need to shuffle consider dropping the columns, shuffling and then computing them again.
Every column adds at least 8 bytes to your row (even if it is just
Boolean Spark will store it in
UnsafeRow where everything is aligned to 8 bytes). To shuffle the column Spark will compress its values (remember at least 8 bytes each), send them over the network and decompress the values. You should keep this in mind when evaluating cost of recomputation. Eg. recomputation of
timestamp is certainly faster than compression + transfer over the network + decompression.
Additionally if some columns of the row won’t be needed after the shuffle then replace their values before shuffle with
null. Eg. suppose you have column
url which is used after shuffle but only in certain conditions. Then replace
null before shuffle if you know that those conditions won’t be met. This is especially important for nested structures, strings, arrays and maps because it will reduce the size of your
Generally before shuffle you need:
- filter as many rows as possible,
- drop as many columns as possible,
- replace big columns by small columns,
- replace as many values as possible by
repartition(partitionExprs: Column*) (or anything which translates to
RepartitionByExpression in your logical plan) use
partitionExprs which result in partitions which need similar time and similar amounts of memory for processing. This prevents your jobs from failing because a small bunch of tasks are constantly running out of memory.
So these were 3 simple tips how to optimize your shuffling when using Spark SQL. Completely different approach is to forget about distributed computation and run your job on single machine without Spark. If your machine has enough memory it can actually be faster than Spark because you can avoid lots of IO.