class StreamingContext extends Logging
Main entry point for Spark Streaming functionality. It provides methods used to create
org.apache.spark.streaming.dstream.DStreams from various input sources. It can be either
created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf
configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext.
The associated SparkContext can be accessed using context.sparkContext. After
creating and transforming DStreams, the streaming computation can be started and stopped
using context.start() and context.stop(), respectively.
context.awaitTermination() allows the current thread to wait for the termination
of the context by stop() or by an exception.
- Annotations
 - @deprecated
 - Deprecated
 (Since version Spark 3.4.0) DStream is deprecated. Migrate to Structured Streaming.
- Source
 - StreamingContext.scala
 
- Alphabetic
 - By Inheritance
 
- StreamingContext
 - Logging
 - AnyRef
 - Any
 
- Hide All
 - Show All
 
- Public
 - All
 
Instance Constructors
- 
      
      
      
        
      
    
      
        
        new
      
      
        StreamingContext(path: String, sparkContext: SparkContext)
      
      
      
Recreate a StreamingContext from a checkpoint file using an existing SparkContext.
Recreate a StreamingContext from a checkpoint file using an existing SparkContext.
- path
 Path to the directory that was specified as the checkpoint directory
- sparkContext
 Existing SparkContext
 - 
      
      
      
        
      
    
      
        
        new
      
      
        StreamingContext(path: String)
      
      
      
Recreate a StreamingContext from a checkpoint file.
Recreate a StreamingContext from a checkpoint file.
- path
 Path to the directory that was specified as the checkpoint directory
 - 
      
      
      
        
      
    
      
        
        new
      
      
        StreamingContext(path: String, hadoopConf: Configuration)
      
      
      
Recreate a StreamingContext from a checkpoint file.
Recreate a StreamingContext from a checkpoint file.
- path
 Path to the directory that was specified as the checkpoint directory
- hadoopConf
 Optional, configuration object if necessary for reading from HDFS compatible filesystems
 - 
      
      
      
        
      
    
      
        
        new
      
      
        StreamingContext(master: String, appName: String, batchDuration: Duration, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map())
      
      
      
Create a StreamingContext by providing the details necessary for creating a new SparkContext.
Create a StreamingContext by providing the details necessary for creating a new SparkContext.
- master
 cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- appName
 a name for your job, to display on the cluster web UI
- batchDuration
 the time interval at which streaming data will be divided into batches
 - 
      
      
      
        
      
    
      
        
        new
      
      
        StreamingContext(conf: SparkConf, batchDuration: Duration)
      
      
      
Create a StreamingContext by providing the configuration necessary for a new SparkContext.
Create a StreamingContext by providing the configuration necessary for a new SparkContext.
- conf
 a org.apache.spark.SparkConf object specifying Spark parameters
- batchDuration
 the time interval at which streaming data will be divided into batches
 - 
      
      
      
        
      
    
      
        
        new
      
      
        StreamingContext(sparkContext: SparkContext, batchDuration: Duration)
      
      
      
Create a StreamingContext using an existing SparkContext.
Create a StreamingContext using an existing SparkContext.
- sparkContext
 existing SparkContext
- batchDuration
 the time interval at which streaming data will be divided into batches
 
Value Members
- 
      
      
      
        
      
    
      
        final 
        def
      
      
        !=(arg0: Any): Boolean
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        ##(): Int
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        ==(arg0: Any): Boolean
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        addStreamingListener(streamingListener: StreamingListener): Unit
      
      
      
Add a org.apache.spark.streaming.scheduler.StreamingListener object for receiving system events related to streaming.
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        asInstanceOf[T0]: T0
      
      
      
- Definition Classes
 - Any
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        awaitTermination(): Unit
      
      
      
Wait for the execution to stop.
Wait for the execution to stop. Any exceptions that occurs during the execution will be thrown in this thread.
 - 
      
      
      
        
      
    
      
        
        def
      
      
        awaitTerminationOrTimeout(timeout: Long): Boolean
      
      
      
Wait for the execution to stop.
Wait for the execution to stop. Any exceptions that occurs during the execution will be thrown in this thread.
- timeout
 time to wait in milliseconds
- returns
 trueif it's stopped; or throw the reported error during the execution; orfalseif the waiting time elapsed before returning from the method.
 - 
      
      
      
        
      
    
      
        
        def
      
      
        binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]
      
      
      
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them as flat binary files, assuming a fixed length per record, generating one byte array per record.
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them as flat binary files, assuming a fixed length per record, generating one byte array per record. Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored.
- directory
 HDFS directory to monitor for new file
- recordLength
 length of each record in bytes
- Note
 We ensure that the byte array for each record in the resulting RDDs of the DStream has the provided record length.
 - 
      
      
      
        
      
    
      
        
        def
      
      
        checkpoint(directory: String): Unit
      
      
      
Set the context to periodically checkpoint the DStream operations for driver fault-tolerance.
Set the context to periodically checkpoint the DStream operations for driver fault-tolerance.
- directory
 HDFS-compatible directory where the checkpoint data will be reliably stored. Note that this must be a fault-tolerant file system like HDFS.
 - 
      
      
      
        
      
    
      
        
        def
      
      
        clone(): AnyRef
      
      
      
- Attributes
 - protected[lang]
 - Definition Classes
 - AnyRef
 - Annotations
 - @throws( ... ) @native() @IntrinsicCandidate()
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        eq(arg0: AnyRef): Boolean
      
      
      
- Definition Classes
 - AnyRef
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        equals(arg0: Any): Boolean
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        fileStream[K, V, F <: InputFormat[K, V]](directory: String, filter: (Path) ⇒ Boolean, newFilesOnly: Boolean, conf: Configuration)(implicit arg0: ClassTag[K], arg1: ClassTag[V], arg2: ClassTag[F]): InputDStream[(K, V)]
      
      
      
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format.
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format. Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored.
- K
 Key type for reading HDFS file
- V
 Value type for reading HDFS file
- F
 Input format for reading HDFS file
- directory
 HDFS directory to monitor for new file
- filter
 Function to filter paths to process
- newFilesOnly
 Should process only new files and ignore existing files in the directory
- conf
 Hadoop configuration
 - 
      
      
      
        
      
    
      
        
        def
      
      
        fileStream[K, V, F <: InputFormat[K, V]](directory: String, filter: (Path) ⇒ Boolean, newFilesOnly: Boolean)(implicit arg0: ClassTag[K], arg1: ClassTag[V], arg2: ClassTag[F]): InputDStream[(K, V)]
      
      
      
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format.
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format. Files must be written to the monitored directory by "moving" them from another location within the same file system.
- K
 Key type for reading HDFS file
- V
 Value type for reading HDFS file
- F
 Input format for reading HDFS file
- directory
 HDFS directory to monitor for new file
- filter
 Function to filter paths to process
- newFilesOnly
 Should process only new files and ignore existing files in the directory
 - 
      
      
      
        
      
    
      
        
        def
      
      
        fileStream[K, V, F <: InputFormat[K, V]](directory: String)(implicit arg0: ClassTag[K], arg1: ClassTag[V], arg2: ClassTag[F]): InputDStream[(K, V)]
      
      
      
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format.
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format. Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored.
- K
 Key type for reading HDFS file
- V
 Value type for reading HDFS file
- F
 Input format for reading HDFS file
- directory
 HDFS directory to monitor for new file
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        getClass(): Class[_]
      
      
      
- Definition Classes
 - AnyRef → Any
 - Annotations
 - @native() @IntrinsicCandidate()
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        getState(): StreamingContextState
      
      
      
:: DeveloperApi ::
:: DeveloperApi ::
Return the current state of the context. The context can be in three possible states -
- StreamingContextState.INITIALIZED - The context has been created, but not started yet. Input DStreams, transformations and output operations can be created on the context.
 - StreamingContextState.ACTIVE - The context has been started, and not stopped. Input DStreams, transformations and output operations cannot be created on the context.
 - StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
 
- Annotations
 - @DeveloperApi()
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        hashCode(): Int
      
      
      
- Definition Classes
 - AnyRef → Any
 - Annotations
 - @native() @IntrinsicCandidate()
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        initializeLogIfNecessary(isInterpreter: Boolean, silent: Boolean): Boolean
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        initializeLogIfNecessary(isInterpreter: Boolean): Unit
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        isInstanceOf[T0]: Boolean
      
      
      
- Definition Classes
 - Any
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        isTraceEnabled(): Boolean
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        log: Logger
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        logDebug(msg: ⇒ String, throwable: Throwable): Unit
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        logDebug(msg: ⇒ String): Unit
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        logError(msg: ⇒ String, throwable: Throwable): Unit
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        logError(msg: ⇒ String): Unit
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        logInfo(msg: ⇒ String, throwable: Throwable): Unit
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        logInfo(msg: ⇒ String): Unit
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        logName: String
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        logTrace(msg: ⇒ String, throwable: Throwable): Unit
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        logTrace(msg: ⇒ String): Unit
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        logWarning(msg: ⇒ String, throwable: Throwable): Unit
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        logWarning(msg: ⇒ String): Unit
      
      
      
- Attributes
 - protected
 - Definition Classes
 - Logging
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        ne(arg0: AnyRef): Boolean
      
      
      
- Definition Classes
 - AnyRef
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        notify(): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @native() @IntrinsicCandidate()
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        notifyAll(): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @native() @IntrinsicCandidate()
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean, defaultRDD: RDD[T])(implicit arg0: ClassTag[T]): InputDStream[T]
      
      
      
Create an input stream from a queue of RDDs.
Create an input stream from a queue of RDDs. In each batch, it will process either one or all of the RDDs returned by the queue.
- T
 Type of objects in the RDD
- queue
 Queue of RDDs. Modifications to this data structure must be synchronized.
- oneAtATime
 Whether only one RDD should be consumed from the queue in every interval
- defaultRDD
 Default RDD is returned by the DStream when the queue is empty. Set as null if no RDD should be returned when empty
- Note
 Arbitrary RDDs can be added to
queueStream, there is no way to recover data of those RDDs, soqueueStreamdoesn't support checkpointing.
 - 
      
      
      
        
      
    
      
        
        def
      
      
        queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean = true)(implicit arg0: ClassTag[T]): InputDStream[T]
      
      
      
Create an input stream from a queue of RDDs.
Create an input stream from a queue of RDDs. In each batch, it will process either one or all of the RDDs returned by the queue.
- T
 Type of objects in the RDD
- queue
 Queue of RDDs. Modifications to this data structure must be synchronized.
- oneAtATime
 Whether only one RDD should be consumed from the queue in every interval
- Note
 Arbitrary RDDs can be added to
queueStream, there is no way to recover data of those RDDs, soqueueStreamdoesn't support checkpointing.
 - 
      
      
      
        
      
    
      
        
        def
      
      
        rawSocketStream[T](hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)(implicit arg0: ClassTag[T]): ReceiverInputDStream[T]
      
      
      
Create an input stream from network source hostname:port, where data is received as serialized blocks (serialized using the Spark's serializer) that can be directly pushed into the block manager without deserializing them.
Create an input stream from network source hostname:port, where data is received as serialized blocks (serialized using the Spark's serializer) that can be directly pushed into the block manager without deserializing them. This is the most efficient way to receive data.
- T
 Type of the objects in the received blocks
- hostname
 Hostname to connect to for receiving data
- port
 Port to connect to for receiving data
- storageLevel
 Storage level to use for storing the received objects (default: StorageLevel.MEMORY_AND_DISK_SER_2)
 - 
      
      
      
        
      
    
      
        
        def
      
      
        receiverStream[T](receiver: Receiver[T])(implicit arg0: ClassTag[T]): ReceiverInputDStream[T]
      
      
      
Create an input stream with any arbitrary user implemented receiver.
Create an input stream with any arbitrary user implemented receiver. Find more details at https://spark.apache.org/docs/latest/streaming-custom-receivers.html
- receiver
 Custom implementation of Receiver
 - 
      
      
      
        
      
    
      
        
        def
      
      
        remember(duration: Duration): Unit
      
      
      
Set each DStream in this context to remember RDDs it generated in the last given duration.
Set each DStream in this context to remember RDDs it generated in the last given duration. DStreams remember RDDs only for a limited duration of time and release them for garbage collection. This method allows the developer to specify how long to remember the RDDs ( if the developer wishes to query old data outside the DStream computation).
- duration
 Minimum duration that each DStream should remember its RDDs
 -  def removeStreamingListener(streamingListener: StreamingListener): Unit
 - 
      
      
      
        
      
    
      
        
        def
      
      
        socketStream[T](hostname: String, port: Int, converter: (InputStream) ⇒ Iterator[T], storageLevel: StorageLevel)(implicit arg0: ClassTag[T]): ReceiverInputDStream[T]
      
      
      
Creates an input stream from TCP source hostname:port.
Creates an input stream from TCP source hostname:port. Data is received using a TCP socket and the receive bytes it interpreted as object using the given converter.
- T
 Type of the objects received (after converting bytes to objects)
- hostname
 Hostname to connect to for receiving data
- port
 Port to connect to for receiving data
- converter
 Function to convert the byte stream to objects
- storageLevel
 Storage level to use for storing the received objects
 - 
      
      
      
        
      
    
      
        
        def
      
      
        socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
      
      
      
Creates an input stream from TCP source hostname:port.
Creates an input stream from TCP source hostname:port. Data is received using a TCP socket and the receive bytes is interpreted as UTF8 encoded
\ndelimited lines.- hostname
 Hostname to connect to for receiving data
- port
 Port to connect to for receiving data
- storageLevel
 Storage level to use for storing the received objects (default: StorageLevel.MEMORY_AND_DISK_SER_2)
- See also
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        sparkContext: SparkContext
      
      
      
Return the associated Spark context
 - 
      
      
      
        
      
    
      
        
        def
      
      
        start(): Unit
      
      
      
Start the execution of the streams.
Start the execution of the streams.
- Exceptions thrown
 IllegalStateExceptionif the StreamingContext is already stopped.
 - 
      
      
      
        
      
    
      
        
        def
      
      
        stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
      
      
      
Stop the execution of the streams, with option of ensuring all received data has been processed.
Stop the execution of the streams, with option of ensuring all received data has been processed.
- stopSparkContext
 if true, stops the associated SparkContext. The underlying SparkContext will be stopped regardless of whether this StreamingContext has been started.
- stopGracefully
 if true, stops gracefully by waiting for the processing of all received data to be completed
 - 
      
      
      
        
      
    
      
        
        def
      
      
        stop(stopSparkContext: Boolean = ...): Unit
      
      
      
Stop the execution of the streams immediately (does not wait for all received data to be processed).
Stop the execution of the streams immediately (does not wait for all received data to be processed). By default, if
stopSparkContextis not specified, the underlying SparkContext will also be stopped. This implicit behavior can be configured using the SparkConf configuration spark.streaming.stopSparkContextByDefault.- stopSparkContext
 If true, stops the associated SparkContext. The underlying SparkContext will be stopped regardless of whether this StreamingContext has been started.
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        synchronized[T0](arg0: ⇒ T0): T0
      
      
      
- Definition Classes
 - AnyRef
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        textFileStream(directory: String): DStream[String]
      
      
      
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files (using key as LongWritable, value as Text and input format as TextInputFormat).
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files (using key as LongWritable, value as Text and input format as TextInputFormat). Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored. The text files must be encoded as UTF-8.
- directory
 HDFS directory to monitor for new file
 - 
      
      
      
        
      
    
      
        
        def
      
      
        toString(): String
      
      
      
- Definition Classes
 - AnyRef → Any
 
 - 
      
      
      
        
      
    
      
        
        def
      
      
        transform[T](dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) ⇒ RDD[T])(implicit arg0: ClassTag[T]): DStream[T]
      
      
      
Create a new DStream in which each RDD is generated by applying a function on RDDs of the DStreams.
 - 
      
      
      
        
      
    
      
        
        def
      
      
        union[T](streams: Seq[DStream[T]])(implicit arg0: ClassTag[T]): DStream[T]
      
      
      
Create a unified DStream from multiple DStreams of the same type and same slide duration.
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        wait(arg0: Long, arg1: Int): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @throws( ... )
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        wait(arg0: Long): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @throws( ... ) @native()
 
 - 
      
      
      
        
      
    
      
        final 
        def
      
      
        wait(): Unit
      
      
      
- Definition Classes
 - AnyRef
 - Annotations
 - @throws( ... )
 
 
Deprecated Value Members
- 
      
      
      
        
      
    
      
        
        def
      
      
        finalize(): Unit
      
      
      
- Attributes
 - protected[lang]
 - Definition Classes
 - AnyRef
 - Annotations
 - @throws( classOf[java.lang.Throwable] ) @Deprecated
 - Deprecated