Trait

org.apache.spark.sql.streaming

GroupState

Related Doc: package streaming

Permalink

trait GroupState[S] extends LogicalGroupState[S]

:: Experimental ::

Wrapper class for interacting with per-group state data in mapGroupsWithState and flatMapGroupsWithState operations on KeyValueGroupedDataset.

Detail description on [map/flatMap]GroupsWithState operation -------------------------------------------------------------- Both, mapGroupsWithState and flatMapGroupsWithState in KeyValueGroupedDataset will invoke the user-given function on each group (defined by the grouping function in Dataset.groupByKey()) while maintaining user-defined per-group state between invocations. For a static batch Dataset, the function will be invoked once per group. For a streaming Dataset, the function will be invoked for each group repeatedly in every trigger. That is, in every batch of the StreamingQuery, the function will be invoked once for each group that has data in the trigger. Furthermore, if timeout is set, then the function will invoked on timed out groups (more detail below).

The function is invoked with following parameters.

In case of a batch Dataset, there is only one invocation and state object will be empty as there is no prior state. Essentially, for batch Datasets, [map/flatMap]GroupsWithState is equivalent to [map/flatMap]Groups and any updates to the state and/or timeouts have no effect.

The major difference between mapGroupsWithState and flatMapGroupsWithState is that the former allows the function to return one and only one record, whereas the latter allows the function to return any number of records (including no records). Furthermore, the flatMapGroupsWithState is associated with an operation output mode, which can be either Append or Update. Semantically, this defines whether the output records of one trigger is effectively replacing the previously output records (from previous triggers) or is appending to the list of previously output records. Essentially, this defines how the Result Table (refer to the semantics in the programming guide) is updated, and allows us to reason about the semantics of later operations.

Important points to note about the function (both mapGroupsWithState and flatMapGroupsWithState).

Important points to note about using GroupState.

Important points to note about using GroupStateTimeout.

Scala example of using GroupState in mapGroupsWithState:

// A mapping function that maintains an integer state for string keys and returns a string.
// Additionally, it sets a timeout to remove the state if it has not received data for an hour.
def mappingFunction(key: String, value: Iterator[Int], state: GroupState[Int]): String = {

  if (state.hasTimedOut) {                // If called when timing out, remove the state
    state.remove()

  } else if (state.exists) {              // If state exists, use it for processing
    val existingState = state.get         // Get the existing state
    val shouldRemove = ...                // Decide whether to remove the state
    if (shouldRemove) {
      state.remove()                      // Remove the state

    } else {
      val newState = ...
      state.update(newState)              // Set the new state
      state.setTimeoutDuration("1 hour")  // Set the timeout
    }

  } else {
    val initialState = ...
    state.update(initialState)            // Set the initial state
    state.setTimeoutDuration("1 hour")    // Set the timeout
  }
  ...
  // return something
}

dataset
  .groupByKey(...)
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(mappingFunction)

Java example of using GroupState:

// A mapping function that maintains an integer state for string keys and returns a string.
// Additionally, it sets a timeout to remove the state if it has not received data for an hour.
MapGroupsWithStateFunction<String, Integer, Integer, String> mappingFunction =
   new MapGroupsWithStateFunction<String, Integer, Integer, String>() {

     @Override
     public String call(String key, Iterator<Integer> value, GroupState<Integer> state) {
       if (state.hasTimedOut()) {            // If called when timing out, remove the state
         state.remove();

       } else if (state.exists()) {            // If state exists, use it for processing
         int existingState = state.get();      // Get the existing state
         boolean shouldRemove = ...;           // Decide whether to remove the state
         if (shouldRemove) {
           state.remove();                     // Remove the state

         } else {
           int newState = ...;
           state.update(newState);             // Set the new state
           state.setTimeoutDuration("1 hour"); // Set the timeout
         }

       } else {
         int initialState = ...;               // Set the initial state
         state.update(initialState);
         state.setTimeoutDuration("1 hour");   // Set the timeout
       }
       ...
        // return something
     }
   };

dataset
    .groupByKey(...)
    .mapGroupsWithState(
        mappingFunction, Encoders.INT, Encoders.STRING, GroupStateTimeout.ProcessingTimeTimeout);
S

User-defined type of the state to be stored for each group. Must be encodable into Spark SQL types (see Encoder for more details).

Annotations
@Experimental() @Evolving()
Source
GroupState.scala
Since

2.2.0

Linear Supertypes
LogicalGroupState[S], AnyRef, Any
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. GroupState
  2. LogicalGroupState
  3. AnyRef
  4. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Abstract Value Members

  1. abstract def exists: Boolean

    Permalink

    Whether state exists or not.

  2. abstract def get: S

    Permalink

    Get the state value if it exists, or throw NoSuchElementException.

    Get the state value if it exists, or throw NoSuchElementException.

    Annotations
    @throws( "when state does not exist" )
  3. abstract def getCurrentProcessingTimeMs(): Long

    Permalink

    Get the current processing time as milliseconds in epoch time.

    Get the current processing time as milliseconds in epoch time.

    Note

    In a streaming query, this will return a constant value throughout the duration of a trigger, even if the trigger is re-executed.

  4. abstract def getCurrentWatermarkMs(): Long

    Permalink

    Get the current event time watermark as milliseconds in epoch time.

    Get the current event time watermark as milliseconds in epoch time.

    Annotations
    @throws( ... )
    Note

    In a streaming query, this can be called only when watermark is set before calling [map/flatMap]GroupsWithState. In a batch query, this method always returns -1.

  5. abstract def getOption: Option[S]

    Permalink

    Get the state value as a scala Option.

  6. abstract def hasTimedOut: Boolean

    Permalink

    Whether the function has been called because the key has timed out.

    Whether the function has been called because the key has timed out.

    Note

    This can return true only when timeouts are enabled in [map/flatMap]GroupsWithState.

  7. abstract def remove(): Unit

    Permalink

    Remove this state.

  8. abstract def setTimeoutDuration(duration: String): Unit

    Permalink

    Set the timeout duration for this key as a string.

    Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.

    Annotations
    @throws( ... ) @throws( ... )
    Note

    This method has no effect when used in a batch query.

    ,

    Processing time timeout must be enabled in [map/flatMap]GroupsWithState for calling this method.

  9. abstract def setTimeoutDuration(durationMs: Long): Unit

    Permalink

    Set the timeout duration in ms for this key.

    Set the timeout duration in ms for this key.

    Annotations
    @throws( "if 'durationMs' is not positive" ) @throws( ... )
    Note

    This method has no effect when used in a batch query.

    ,

    Processing time timeout must be enabled in [map/flatMap]GroupsWithState for calling this method.

  10. abstract def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit

    Permalink

    Set the timeout timestamp for this key as a java.sql.Date and an additional duration as a string (e.g.

    Set the timeout timestamp for this key as a java.sql.Date and an additional duration as a string (e.g. "1 hour", "2 days", etc.). The final timestamp (including the additional duration) cannot be older than the current watermark.

    Annotations
    @throws( ... ) @throws( ... )
    Note

    This method has no side effect when used in a batch query.

    ,

    Event time timeout must be enabled in [map/flatMap]GroupsWithState for calling this method.

  11. abstract def setTimeoutTimestamp(timestamp: Date): Unit

    Permalink

    Set the timeout timestamp for this key as a java.sql.Date.

    Set the timeout timestamp for this key as a java.sql.Date. This timestamp cannot be older than the current watermark.

    Annotations
    @throws( ... )
    Note

    This method has no side effect when used in a batch query.

    ,

    Event time timeout must be enabled in [map/flatMap]GroupsWithState for calling this method.

  12. abstract def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit

    Permalink

    Set the timeout timestamp for this key as milliseconds in epoch time and an additional duration as a string (e.g.

    Set the timeout timestamp for this key as milliseconds in epoch time and an additional duration as a string (e.g. "1 hour", "2 days", etc.). The final timestamp (including the additional duration) cannot be older than the current watermark.

    Annotations
    @throws( ... ) @throws( ... )
    Note

    This method has no side effect when used in a batch query.

    ,

    Event time timeout must be enabled in [map/flatMap]GroupsWithState for calling this method.

  13. abstract def setTimeoutTimestamp(timestampMs: Long): Unit

    Permalink

    Set the timeout timestamp for this key as milliseconds in epoch time.

    Set the timeout timestamp for this key as milliseconds in epoch time. This timestamp cannot be older than the current watermark.

    Annotations
    @throws( ... ) @throws( ... )
    Note

    This method has no effect when used in a batch query.

    ,

    Event time timeout must be enabled in [map/flatMap]GroupsWithState for calling this method.

  14. abstract def update(newState: S): Unit

    Permalink

    Update the value of the state.

Concrete Value Members

  1. final def !=(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int

    Permalink
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  4. final def asInstanceOf[T0]: T0

    Permalink
    Definition Classes
    Any
  5. def clone(): AnyRef

    Permalink
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  6. final def eq(arg0: AnyRef): Boolean

    Permalink
    Definition Classes
    AnyRef
  7. def equals(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  8. def finalize(): Unit

    Permalink
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  9. final def getClass(): Class[_]

    Permalink
    Definition Classes
    AnyRef → Any
  10. def hashCode(): Int

    Permalink
    Definition Classes
    AnyRef → Any
  11. final def isInstanceOf[T0]: Boolean

    Permalink
    Definition Classes
    Any
  12. final def ne(arg0: AnyRef): Boolean

    Permalink
    Definition Classes
    AnyRef
  13. final def notify(): Unit

    Permalink
    Definition Classes
    AnyRef
  14. final def notifyAll(): Unit

    Permalink
    Definition Classes
    AnyRef
  15. final def synchronized[T0](arg0: ⇒ T0): T0

    Permalink
    Definition Classes
    AnyRef
  16. def toString(): String

    Permalink
    Definition Classes
    AnyRef → Any
  17. final def wait(): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  18. final def wait(arg0: Long, arg1: Int): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  19. final def wait(arg0: Long): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )

Inherited from LogicalGroupState[S]

Inherited from AnyRef

Inherited from Any

Ungrouped