Class PushBasedFetchHelper

Object
org.apache.spark.storage.PushBasedFetchHelper
All Implemented Interfaces:
org.apache.spark.internal.Logging

public class PushBasedFetchHelper extends Object implements org.apache.spark.internal.Logging
Helper class for ShuffleBlockFetcherIterator that encapsulates all the push-based functionality to fetch push-merged block meta and shuffle chunks. A push-merged block contains multiple shuffle chunks where each shuffle chunk contains multiple shuffle blocks that belong to the common reduce partition and were merged by the external shuffle service to that chunk.
  • Nested Class Summary

    Nested classes/interfaces inherited from interface org.apache.spark.internal.Logging

    org.apache.spark.internal.Logging.SparkShellLoggingFilter
  • Constructor Summary

    Constructors
    Constructor
    Description
    PushBasedFetchHelper(org.apache.spark.storage.ShuffleBlockFetcherIterator iterator, org.apache.spark.network.shuffle.BlockStoreClient shuffleClient, org.apache.spark.storage.BlockManager blockManager, org.apache.spark.MapOutputTracker mapOutputTracker, org.apache.spark.shuffle.ShuffleReadMetricsReporter shuffleMetrics)
     
  • Method Summary

    Modifier and Type
    Method
    Description
    void
    addChunk(ShuffleBlockChunkId blockId, org.roaringbitmap.RoaringBitmap chunkMeta)
    This is executed by the task thread when the iterator.next() is invoked and the iterator processes a response of type ShuffleBlockFetcherIterator.PushMergedLocalMetaFetchResult.
    scala.collection.mutable.ArrayBuffer<scala.Tuple3<BlockId,Object,Object>>
    createChunkBlockInfosFromMetaResponse(int shuffleId, int shuffleMergeId, int reduceId, long blockSize, org.roaringbitmap.RoaringBitmap[] bitmaps)
    This is executed by the task thread when the iterator.next() is invoked and the iterator processes a response of type ShuffleBlockFetcherIterator.PushMergedRemoteMetaFetchResult.
    void
    fetchAllPushMergedLocalBlocks(scala.collection.mutable.LinkedHashSet<BlockId> pushMergedLocalBlocks)
    This is executed by the task thread when the iterator is initialized.
    scala.Option<org.roaringbitmap.RoaringBitmap>
    Get the RoaringBitMap for a specific ShuffleBlockChunkId
    int
    Get the number of map blocks in a ShuffleBlockChunk
    void
    This is executed by the task thread when the iterator.next() is invoked and the iterator processes a response of type: 1) ShuffleBlockFetcherIterator.SuccessFetchResult 2) ShuffleBlockFetcherIterator.FallbackOnPushMergedFailureResult 3) ShuffleBlockFetcherIterator.PushMergedRemoteMetaFailedFetchResult
    boolean
    Returns true if the address is of a push-merged-local block. false otherwise.
    boolean
    Returns true if the address is for a push-merged block.
    boolean
    Returns true if the address is of a remote push-merged block. false otherwise.
    void
    This is executed by the task thread when the iterator.next() is invoked and the iterator processes a response of type ShuffleBlockFetcherIterator.SuccessFetchResult.
    void
    sendFetchMergedStatusRequest(org.apache.spark.storage.ShuffleBlockFetcherIterator.FetchRequest req)
    This is executed by the task thread when the iterator is initialized and only if it has push-merged blocks for which it needs to fetch the metadata.

    Methods inherited from class java.lang.Object

    equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait

    Methods inherited from interface org.apache.spark.internal.Logging

    initializeForcefully, initializeLogIfNecessary, initializeLogIfNecessary, initializeLogIfNecessary$default$2, isTraceEnabled, log, logDebug, logDebug, logError, logError, logInfo, logInfo, logName, logTrace, logTrace, logWarning, logWarning, org$apache$spark$internal$Logging$$log_, org$apache$spark$internal$Logging$$log__$eq
  • Constructor Details

    • PushBasedFetchHelper

      public PushBasedFetchHelper(org.apache.spark.storage.ShuffleBlockFetcherIterator iterator, org.apache.spark.network.shuffle.BlockStoreClient shuffleClient, org.apache.spark.storage.BlockManager blockManager, org.apache.spark.MapOutputTracker mapOutputTracker, org.apache.spark.shuffle.ShuffleReadMetricsReporter shuffleMetrics)
  • Method Details

    • addChunk

      public void addChunk(ShuffleBlockChunkId blockId, org.roaringbitmap.RoaringBitmap chunkMeta)
      This is executed by the task thread when the iterator.next() is invoked and the iterator processes a response of type ShuffleBlockFetcherIterator.PushMergedLocalMetaFetchResult.

      Parameters:
      blockId - shuffle chunk id.
      chunkMeta - (undocumented)
    • createChunkBlockInfosFromMetaResponse

      public scala.collection.mutable.ArrayBuffer<scala.Tuple3<BlockId,Object,Object>> createChunkBlockInfosFromMetaResponse(int shuffleId, int shuffleMergeId, int reduceId, long blockSize, org.roaringbitmap.RoaringBitmap[] bitmaps)
      This is executed by the task thread when the iterator.next() is invoked and the iterator processes a response of type ShuffleBlockFetcherIterator.PushMergedRemoteMetaFetchResult.

      Parameters:
      shuffleId - shuffle id.
      reduceId - reduce id.
      blockSize - size of the push-merged block.
      bitmaps - chunk bitmaps, where each bitmap contains all the mapIds that were merged to that chunk.
      shuffleMergeId - (undocumented)
      Returns:
      shuffle chunks to fetch.
    • fetchAllPushMergedLocalBlocks

      public void fetchAllPushMergedLocalBlocks(scala.collection.mutable.LinkedHashSet<BlockId> pushMergedLocalBlocks)
      This is executed by the task thread when the iterator is initialized. It fetches all the outstanding push-merged local blocks.
      Parameters:
      pushMergedLocalBlocks - set of identified merged local blocks and their sizes.
    • getRoaringBitMap

      public scala.Option<org.roaringbitmap.RoaringBitmap> getRoaringBitMap(ShuffleBlockChunkId blockId)
      Get the RoaringBitMap for a specific ShuffleBlockChunkId

      Parameters:
      blockId - shuffle chunk id.
      Returns:
      (undocumented)
    • getShuffleChunkCardinality

      public int getShuffleChunkCardinality(ShuffleBlockChunkId blockId)
      Get the number of map blocks in a ShuffleBlockChunk
      Parameters:
      blockId -
      Returns:
    • initiateFallbackFetchForPushMergedBlock

      public void initiateFallbackFetchForPushMergedBlock(BlockId blockId, BlockManagerId address)
      This is executed by the task thread when the iterator.next() is invoked and the iterator processes a response of type: 1) ShuffleBlockFetcherIterator.SuccessFetchResult 2) ShuffleBlockFetcherIterator.FallbackOnPushMergedFailureResult 3) ShuffleBlockFetcherIterator.PushMergedRemoteMetaFailedFetchResult

      This initiates fetching fallback blocks for a push-merged block or a shuffle chunk that failed to fetch. It makes a call to the map output tracker to get the list of original blocks for the given push-merged block/shuffle chunk, split them into remote and local blocks, and process them accordingly. It also updates the numberOfBlocksToFetch in the iterator as it processes failed response and finds more push-merged requests to remote and again updates it with additional requests for original blocks. The fallback happens when: 1. There is an exception while creating shuffle chunks from push-merged-local shuffle block. See fetchLocalBlock. 2. There is a failure when fetching remote shuffle chunks. 3. There is a failure when processing SuccessFetchResult which is for a shuffle chunk (local or remote). 4. There is a zero-size buffer when processing SuccessFetchResult for a shuffle chunk (local or remote).

      Parameters:
      blockId - (undocumented)
      address - (undocumented)
    • isLocalPushMergedBlockAddress

      public boolean isLocalPushMergedBlockAddress(BlockManagerId address)
      Returns true if the address is of a push-merged-local block. false otherwise.
      Parameters:
      address - (undocumented)
      Returns:
      (undocumented)
    • isPushMergedShuffleBlockAddress

      public boolean isPushMergedShuffleBlockAddress(BlockManagerId address)
      Returns true if the address is for a push-merged block.
      Parameters:
      address - (undocumented)
      Returns:
      (undocumented)
    • isRemotePushMergedBlockAddress

      public boolean isRemotePushMergedBlockAddress(BlockManagerId address)
      Returns true if the address is of a remote push-merged block. false otherwise.
      Parameters:
      address - (undocumented)
      Returns:
      (undocumented)
    • removeChunk

      public void removeChunk(ShuffleBlockChunkId blockId)
      This is executed by the task thread when the iterator.next() is invoked and the iterator processes a response of type ShuffleBlockFetcherIterator.SuccessFetchResult.

      Parameters:
      blockId - shuffle chunk id.
    • sendFetchMergedStatusRequest

      public void sendFetchMergedStatusRequest(org.apache.spark.storage.ShuffleBlockFetcherIterator.FetchRequest req)
      This is executed by the task thread when the iterator is initialized and only if it has push-merged blocks for which it needs to fetch the metadata.

      Parameters:
      req - ShuffleBlockFetcherIterator.FetchRequest that only contains requests to fetch metadata of push-merged blocks.