Class DefaultPartitionCoalescer
- All Implemented Interfaces:
PartitionCoalescer
prev
) into fewer partitions, so that each partition of
this RDD computes one or more of the parent ones. It will produce exactly maxPartitions
if the
parent had more than maxPartitions, or fewer if the parent had fewer.
This transformation is useful when an RDD with many partitions gets filtered into a smaller one, or to avoid having a large number of small tasks when processing a directory with many files.
If there is no locality information (no preferredLocations) in the parent, then the coalescing is very simple: chunk parents that are close in the Array in chunks. If there is locality information, it proceeds to pack them with the following four goals:
(1) Balance the groups so they roughly have the same number of parent partitions (2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer (3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard) (4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine
Furthermore, it is assumed that the parent RDD may have many partitions, e.g. 100 000. We assume the final number of desired partitions is small, e.g. less than 1000.
The algorithm tries to assign unique preferred machines to each partition. If the number of desired partitions is greater than the number of preferred machines (can happen), it needs to start picking duplicate preferred machines. This is determined using coupon collector estimation (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist: it tries to also achieve locality. This is done by allowing a slack (balanceSlack, where 1.0 is all locality, 0 is all balance) between two bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions according to locality. (contact alig for questions)
-
Nested Class Summary
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionboolean
addPartToPGroup
(Partition part, PartitionGroup pgroup) double
Runs the packing algorithm and returns an array of PartitionGroups that if possible are load balanced and grouped by localityscala.collection.Seq<String>
currPrefLocs
(Partition part, RDD<?> prev) scala.Option<PartitionGroup>
getLeastGroupHash
(String key) Gets the least element of the list associated with key in groupHash The returned PartitionGroup is the least loaded of all groups that represent the machine "key"scala.collection.mutable.ArrayBuffer<PartitionGroup>
groupArr()
scala.collection.mutable.Map<String,
scala.collection.mutable.ArrayBuffer<PartitionGroup>> scala.collection.mutable.Set<Partition>
boolean
Accessor for nested Scala objectpickBin
(Partition p, RDD<?> prev, double balanceSlack, org.apache.spark.rdd.DefaultPartitionCoalescer.PartitionLocations partitionLocs) Takes a parent RDD partition and decides which of the partition groups to put it in Takes locality into account, but also uses power of 2 choices to load balance It strikes a balance between the two using the balanceSlack variablescala.util.Random
rnd()
void
setupGroups
(int targetLen, org.apache.spark.rdd.DefaultPartitionCoalescer.PartitionLocations partitionLocs) Initializes targetLen partition groups.void
throwBalls
(int maxPartitions, RDD<?> prev, double balanceSlack, org.apache.spark.rdd.DefaultPartitionCoalescer.PartitionLocations partitionLocs)
-
Constructor Details
-
DefaultPartitionCoalescer
public DefaultPartitionCoalescer(double balanceSlack)
-
-
Method Details
-
partitionGroupOrdering
Accessor for nested Scala object- Returns:
- (undocumented)
-
balanceSlack
public double balanceSlack() -
rnd
public scala.util.Random rnd() -
groupArr
-
groupHash
public scala.collection.mutable.Map<String,scala.collection.mutable.ArrayBuffer<PartitionGroup>> groupHash() -
initialHash
-
noLocality
public boolean noLocality() -
currPrefLocs
-
getLeastGroupHash
Gets the least element of the list associated with key in groupHash The returned PartitionGroup is the least loaded of all groups that represent the machine "key"- Parameters:
key
- string representing a partitioned group on preferred machine key- Returns:
- Option of
PartitionGroup
that has least elements for key
-
addPartToPGroup
-
setupGroups
public void setupGroups(int targetLen, org.apache.spark.rdd.DefaultPartitionCoalescer.PartitionLocations partitionLocs) Initializes targetLen partition groups. If there are preferred locations, each group is assigned a preferredLocation. This uses coupon collector to estimate how many preferredLocations it must rotate through until it has seen most of the preferred locations (2 * n log(n))- Parameters:
targetLen
- The number of desired partition groupspartitionLocs
- (undocumented)
-
pickBin
public PartitionGroup pickBin(Partition p, RDD<?> prev, double balanceSlack, org.apache.spark.rdd.DefaultPartitionCoalescer.PartitionLocations partitionLocs) Takes a parent RDD partition and decides which of the partition groups to put it in Takes locality into account, but also uses power of 2 choices to load balance It strikes a balance between the two using the balanceSlack variable- Parameters:
p
- partition (ball to be thrown)balanceSlack
- determines the trade-off between load-balancing the partitions sizes and their locality. e.g., balanceSlack=0.10 means that it allows up to 10% imbalance in favor of localityprev
- (undocumented)partitionLocs
- (undocumented)- Returns:
- partition group (bin to be put in)
-
throwBalls
public void throwBalls(int maxPartitions, RDD<?> prev, double balanceSlack, org.apache.spark.rdd.DefaultPartitionCoalescer.PartitionLocations partitionLocs) -
getPartitions
-
coalesce
Runs the packing algorithm and returns an array of PartitionGroups that if possible are load balanced and grouped by locality- Specified by:
coalesce
in interfacePartitionCoalescer
- Parameters:
maxPartitions
- (undocumented)prev
- (undocumented)- Returns:
- array of partition groups
-