Class ShuffleStatus

Object
org.apache.spark.ShuffleStatus
All Implemented Interfaces:
org.apache.spark.internal.Logging

public class ShuffleStatus extends Object implements org.apache.spark.internal.Logging
Helper class used by the MapOutputTrackerMaster to perform bookkeeping for a single ShuffleMapStage.

This class maintains a mapping from map index to MapStatus. It also maintains a cache of serialized map statuses in order to speed up tasks' requests for map output statuses.

All public methods of this class are thread-safe.

  • Constructor Details

    • ShuffleStatus

      public ShuffleStatus(int numPartitions, int numReducers)
  • Method Details

    • mapStatuses

      public MapStatus[] mapStatuses()
      MapStatus for each partition. The index of the array is the map partition id. Each value in the array is the MapStatus for a partition, or null if the partition is not available. Even though in theory a task may run multiple times (due to speculation, stage retries, etc.), in practice the likelihood of a map output being available at multiple locations is so small that we choose to ignore that case and store only a single location for each output.
      Returns:
      (undocumented)
    • mapStatusesDeleted

      public MapStatus[] mapStatusesDeleted()
      Keep the previous deleted MapStatus for recovery.
      Returns:
      (undocumented)
    • mergeStatuses

      public org.apache.spark.scheduler.MergeStatus[] mergeStatuses()
      MergeStatus for each shuffle partition when push-based shuffle is enabled. The index of the array is the shuffle partition id (reduce id). Each value in the array is the MergeStatus for a shuffle partition, or null if not available. When push-based shuffle is enabled, this array provides a reducer oriented view of the shuffle status specifically for the results of merging shuffle partition blocks into per-partition merged shuffle files.
      Returns:
      (undocumented)
    • addMapOutput

      public void addMapOutput(int mapIndex, MapStatus status)
      Register a map output. If there is already a registered location for the map output then it will be replaced by the new location.
      Parameters:
      mapIndex - (undocumented)
      status - (undocumented)
    • getMapStatus

      public scala.Option<MapStatus> getMapStatus(long mapId)
      Get the map output that corresponding to a given mapId.
      Parameters:
      mapId - (undocumented)
      Returns:
      (undocumented)
    • updateMapOutput

      public void updateMapOutput(long mapId, BlockManagerId bmAddress)
      Update the map output location (e.g. during migration).
      Parameters:
      mapId - (undocumented)
      bmAddress - (undocumented)
    • removeMapOutput

      public void removeMapOutput(int mapIndex, BlockManagerId bmAddress)
      Remove the map output which was served by the specified block manager. This is a no-op if there is no registered map output or if the registered output is from a different block manager.
      Parameters:
      mapIndex - (undocumented)
      bmAddress - (undocumented)
    • addMergeResult

      public void addMergeResult(int reduceId, org.apache.spark.scheduler.MergeStatus status)
      Register a merge result.
      Parameters:
      reduceId - (undocumented)
      status - (undocumented)
    • registerShuffleMergerLocations

      public void registerShuffleMergerLocations(scala.collection.Seq<BlockManagerId> shuffleMergers)
    • removeShuffleMergerLocations

      public void removeShuffleMergerLocations()
    • removeMergeResult

      public void removeMergeResult(int reduceId, BlockManagerId bmAddress)
      Remove the merge result which was served by the specified block manager.
      Parameters:
      reduceId - (undocumented)
      bmAddress - (undocumented)
    • removeOutputsOnHost

      public void removeOutputsOnHost(String host)
      Removes all shuffle outputs associated with this host. Note that this will also remove outputs which are served by an external shuffle server (if one exists).
      Parameters:
      host - (undocumented)
    • removeOutputsOnExecutor

      public void removeOutputsOnExecutor(String execId)
      Removes all map outputs associated with the specified executor. Note that this will also remove outputs which are served by an external shuffle server (if one exists), as they are still registered with that execId.
      Parameters:
      execId - (undocumented)
    • removeOutputsByFilter

      public void removeOutputsByFilter(scala.Function1<BlockManagerId,Object> f)
      Removes all shuffle outputs which satisfies the filter. Note that this will also remove outputs which are served by an external shuffle server (if one exists).
      Parameters:
      f - (undocumented)
    • removeMergeResultsByFilter

      public void removeMergeResultsByFilter(scala.Function1<BlockManagerId,Object> f)
      Removes all shuffle merge result which satisfies the filter.
      Parameters:
      f - (undocumented)
    • numAvailableMapOutputs

      public int numAvailableMapOutputs()
      Number of partitions that have shuffle map outputs.
      Returns:
      (undocumented)
    • numAvailableMergeResults

      public int numAvailableMergeResults()
      Number of shuffle partitions that have already been merge finalized when push-based is enabled.
      Returns:
      (undocumented)
    • findMissingPartitions

      public scala.collection.Seq<Object> findMissingPartitions()
      Returns the sequence of partition ids that are missing (i.e. needs to be computed).
      Returns:
      (undocumented)
    • serializedMapStatus

      public byte[] serializedMapStatus(org.apache.spark.broadcast.BroadcastManager broadcastManager, boolean isLocal, int minBroadcastSize, SparkConf conf)
      Serializes the mapStatuses array into an efficient compressed format. See the comments on MapOutputTracker.serializeOutputStatuses() for more details on the serialization format.

      This method is designed to be called multiple times and implements caching in order to speed up subsequent requests. If the cache is empty and multiple threads concurrently attempt to serialize the map statuses then serialization will only be performed in a single thread and all other threads will block until the cache is populated.

      Parameters:
      broadcastManager - (undocumented)
      isLocal - (undocumented)
      minBroadcastSize - (undocumented)
      conf - (undocumented)
      Returns:
      (undocumented)
    • serializedMapAndMergeStatus

      public scala.Tuple2<byte[],byte[]> serializedMapAndMergeStatus(org.apache.spark.broadcast.BroadcastManager broadcastManager, boolean isLocal, int minBroadcastSize, SparkConf conf)
      Serializes the mapStatuses and mergeStatuses array into an efficient compressed format. See the comments on MapOutputTracker.serializeOutputStatuses() for more details on the serialization format.

      This method is designed to be called multiple times and implements caching in order to speed up subsequent requests. If the cache is empty and multiple threads concurrently attempt to serialize the statuses array then serialization will only be performed in a single thread and all other threads will block until the cache is populated.

      Parameters:
      broadcastManager - (undocumented)
      isLocal - (undocumented)
      minBroadcastSize - (undocumented)
      conf - (undocumented)
      Returns:
      (undocumented)
    • hasCachedSerializedBroadcast

      public boolean hasCachedSerializedBroadcast()
    • withMapStatuses

      public <T> T withMapStatuses(scala.Function1<MapStatus[],T> f)
      Helper function which provides thread-safe access to the mapStatuses array. The function should NOT mutate the array.
      Parameters:
      f - (undocumented)
      Returns:
      (undocumented)
    • withMergeStatuses

      public <T> T withMergeStatuses(scala.Function1<org.apache.spark.scheduler.MergeStatus[],T> f)
    • getShufflePushMergerLocations

      public scala.collection.Seq<BlockManagerId> getShufflePushMergerLocations()
    • invalidateSerializedMapOutputStatusCache

      public void invalidateSerializedMapOutputStatusCache()
      Clears the cached serialized map output statuses.
    • invalidateSerializedMergeOutputStatusCache

      public void invalidateSerializedMergeOutputStatusCache()
      Clears the cached serialized merge result statuses.