trait TestGroupState[S] extends GroupState[S]
:: Experimental ::
The extended version of GroupState interface with extra getters of state machine fields to improve testability of the GroupState implementations which inherit from the extended interface.
Scala example of using TestGroupState:
// Please refer to ScalaDoc of `GroupState` for the Scala definition of `mappingFunction()` import org.apache.spark.api.java.Optional import org.apache.spark.sql.streaming.GroupStateTimeout import org.apache.spark.sql.streaming.TestGroupState // other imports // test class setups test("MapGroupsWithState state transition function") { // Creates the prevState input for the state transition function // with desired configs. The `create()` API would guarantee that // the generated instance has the same behavior as the one built by // engine with the same configs. val prevState = TestGroupState.create[Int]( optionalState = Optional.empty[Int], timeoutConf = NoTimeout, batchProcessingTimeMs = 1L, eventTimeWatermarkMs = Optional.of(1L), hasTimedOut = false) val key: String = ... val values: Iterator[Int] = ... // Asserts the prevState is in init state without updates. assert(!prevState.isUpdated) // Calls the state transition function with the test previous state // with desired configs. mappingFunction(key, values, prevState) // Asserts the test GroupState object has been updated but not removed // after calling the state transition function assert(prevState.isUpdated) assert(!prevState.isRemoved) }
Java example of using TestGroupSate:
// Please refer to ScalaDoc of `GroupState` for the Java definition of `mappingFunction()` import org.apache.spark.api.java.Optional; import org.apache.spark.sql.streaming.GroupStateTimeout; import org.apache.spark.sql.streaming.TestGroupState; // other imports // test class setups // test `MapGroupsWithState` state transition function `mappingFunction()` public void testMappingFunctionWithTestGroupState() { // Creates the prevState input for the state transition function // with desired configs. The `create()` API would guarantee that // the generated instance has the same behavior as the one built by // engine with the same configs. TestGroupState<Int> prevState = TestGroupState.create( Optional.empty(), GroupStateTimeout.NoTimeout(), 1L, Optional.of(1L), false); String key = ...; Integer[] values = ...; // Asserts the prevState is in init state without updates. Assert.assertFalse(prevState.isUpdated()); // Calls the state transition function with the test previous state // with desired configs. mappingFunction.call(key, Arrays.asList(values).iterator(), prevState); // Asserts the test GroupState object has been updated but not removed // after calling the state transition function Assert.assertTrue(prevState.isUpdated()); Assert.assertFalse(prevState.isRemoved()); }
- S
 User-defined type of the state to be stored for each group. Must be encodable into Spark SQL types (see
Encoderfor more details).
- Annotations
 - @Experimental() @Evolving()
 - Source
 - TestGroupState.scala
 - Since
 3.2.0
- Alphabetic
 - By Inheritance
 
- TestGroupState
 - GroupState
 - LogicalGroupState
 - AnyRef
 - Any
 
- Hide All
 - Show All
 
- Public
 - All
 
Abstract Value Members
- 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        exists: Boolean
      
      
      
Whether state exists or not.
Whether state exists or not.
- Definition Classes
 - GroupState
 
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        get: S
      
      
      
Get the state value if it exists, or throw NoSuchElementException.
Get the state value if it exists, or throw NoSuchElementException.
- Definition Classes
 - GroupState
 - Annotations
 - @throws( "when state does not exist" )
 
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        getCurrentProcessingTimeMs(): Long
      
      
      
Get the current processing time as milliseconds in epoch time.
Get the current processing time as milliseconds in epoch time.
- Definition Classes
 - GroupState
 - Note
 In a streaming query, this will return a constant value throughout the duration of a trigger, even if the trigger is re-executed.
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        getCurrentWatermarkMs(): Long
      
      
      
Get the current event time watermark as milliseconds in epoch time.
Get the current event time watermark as milliseconds in epoch time.
- Definition Classes
 - GroupState
 - Annotations
 - @throws( ... )
 - Note
 In a streaming query, this can be called only when watermark is set before calling
,[map/flatMap]GroupsWithState. In a batch query, this method always returns -1.The watermark gets propagated in the end of each query. As a result, this method will return 0 (1970-01-01T00:00:00) for the first micro-batch. If you use this value as a part of the timestamp set in the
setTimeoutTimestamp, it may lead to the state expiring immediately in the next micro-batch, once the watermark gets the real value from your data.
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        getOption: Option[S]
      
      
      
Get the state value as a scala Option.
Get the state value as a scala Option.
- Definition Classes
 - GroupState
 
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        getTimeoutTimestampMs: Optional[Long]
      
      
      
Returns the timestamp if
setTimeoutTimestamp()is called.Returns the timestamp if
setTimeoutTimestamp()is called. Or, returns batch processing time + the duration whensetTimeoutDuration()is called.Otherwise, returns
Optional.emptyif not set. - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        hasTimedOut: Boolean
      
      
      
Whether the function has been called because the key has timed out.
Whether the function has been called because the key has timed out.
- Definition Classes
 - GroupState
 - Note
 This can return true only when timeouts are enabled in
[map/flatMap]GroupsWithState.
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        isRemoved: Boolean
      
      
      
Whether the state has been marked for removing
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        isUpdated: Boolean
      
      
      
Whether the state has been updated but not removed
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        remove(): Unit
      
      
      
Remove this state.
Remove this state.
- Definition Classes
 - GroupState
 
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        setTimeoutDuration(duration: String): Unit
      
      
      
Set the timeout duration for this key as a string.
Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.
- Definition Classes
 - GroupState
 - Annotations
 - @throws( ... ) @throws( ... )
 - Note
 Processing time timeout must be enabled in
,[map/flatMap]GroupsWithStatefor calling this method.This method has no effect when used in a batch query.
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        setTimeoutDuration(durationMs: Long): Unit
      
      
      
Set the timeout duration in ms for this key.
Set the timeout duration in ms for this key.
- Definition Classes
 - GroupState
 - Annotations
 - @throws( "if 'durationMs' is not positive" ) @throws( ... )
 - Note
 Processing time timeout must be enabled in
,[map/flatMap]GroupsWithStatefor calling this method.This method has no effect when used in a batch query.
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit
      
      
      
Set the timeout timestamp for this key as a java.sql.Date and an additional duration as a string (e.g.
Set the timeout timestamp for this key as a java.sql.Date and an additional duration as a string (e.g. "1 hour", "2 days", etc.). The final timestamp (including the additional duration) cannot be older than the current watermark.
- Definition Classes
 - GroupState
 - Annotations
 - @throws( ... ) @throws( ... )
 - Note
 Event time timeout must be enabled in
,[map/flatMap]GroupsWithStatefor calling this method.This method has no side effect when used in a batch query.
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        setTimeoutTimestamp(timestamp: Date): Unit
      
      
      
Set the timeout timestamp for this key as a java.sql.Date.
Set the timeout timestamp for this key as a java.sql.Date. This timestamp cannot be older than the current watermark.
- Definition Classes
 - GroupState
 - Annotations
 - @throws( ... )
 - Note
 Event time timeout must be enabled in
,[map/flatMap]GroupsWithStatefor calling this method.This method has no side effect when used in a batch query.
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit
      
      
      
Set the timeout timestamp for this key as milliseconds in epoch time and an additional duration as a string (e.g.
Set the timeout timestamp for this key as milliseconds in epoch time and an additional duration as a string (e.g. "1 hour", "2 days", etc.). The final timestamp (including the additional duration) cannot be older than the current watermark.
- Definition Classes
 - GroupState
 - Annotations
 - @throws( ... ) @throws( ... )
 - Note
 Event time timeout must be enabled in
,[map/flatMap]GroupsWithStatefor calling this method.This method has no side effect when used in a batch query.
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        setTimeoutTimestamp(timestampMs: Long): Unit
      
      
      
Set the timeout timestamp for this key as milliseconds in epoch time.
Set the timeout timestamp for this key as milliseconds in epoch time. This timestamp cannot be older than the current watermark.
- Definition Classes
 - GroupState
 - Annotations
 - @throws( ... ) @throws( ... )
 - Note
 Event time timeout must be enabled in
,[map/flatMap]GroupsWithStatefor calling this method.This method has no effect when used in a batch query.
 - 
      
      
      
        
      
    
      
        abstract 
        def
      
      
        update(newState: S): Unit
      
      
      
Update the value of the state.
Update the value of the state.
- Definition Classes
 - GroupState
 
 
Concrete Value Members
- 
      
      
      
        
      
    
      
        final 
        def
      
      
        !=(arg0: Any): Boolean
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        ##(): Int
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        ==(arg0: Any): Boolean
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        asInstanceOf[T0]: T0
      
      
      
- Definition Classes
 - Any
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        clone(): AnyRef
      
      
      
- Attributes
 - protected[lang]
 - Definition Classes
 - AnyRef
 - Annotations
 - @throws( ... ) @native()
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        eq(arg0: AnyRef): Boolean
      
      
      
- Definition Classes
 - AnyRef
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        equals(arg0: Any): Boolean
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        finalize(): Unit
      
      
      
- Attributes
 - protected[lang]
 - Definition Classes
 - AnyRef
 - Annotations
 - @throws( classOf[java.lang.Throwable] )
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        getClass(): Class[_]
      
      
      
- Definition Classes
 - AnyRef → Any
 - Annotations
 - @native()
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        hashCode(): Int
      
      
      
- Definition Classes
 - AnyRef → Any
 - Annotations
 - @native()
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        isInstanceOf[T0]: Boolean
      
      
      
- Definition Classes
 - Any
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        ne(arg0: AnyRef): Boolean
      
      
      
- Definition Classes
 - AnyRef
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        notify(): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @native()
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        notifyAll(): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @native()
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        synchronized[T0](arg0: ⇒ T0): T0
      
      
      
- Definition Classes
 - AnyRef
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        toString(): String
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        wait(): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @throws( ... )
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        wait(arg0: Long, arg1: Int): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @throws( ... )
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        wait(arg0: Long): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @throws( ... ) @native()