Object
org.apache.spark.mllib.recommendation.ALS
All Implemented Interfaces:
Serializable, org.apache.spark.internal.Logging, scala.Serializable

public class ALS extends Object implements scala.Serializable, org.apache.spark.internal.Logging
Alternating Least Squares matrix factorization.

ALS attempts to estimate the ratings matrix R as the product of two lower-rank matrices, X and Y, i.e. X * Yt = R. Typically these approximations are called 'factor' matrices. The general approach is iterative. During each iteration, one of the factor matrices is held constant, while the other is solved for using least squares. The newly-solved factor matrix is then held constant while solving for the other factor matrix.

This is a blocked implementation of the ALS factorization algorithm that groups the two sets of factors (referred to as "users" and "products") into blocks and reduces communication by only sending one copy of each user vector to each product block on each iteration, and only for the product blocks that need that user's feature vector. This is achieved by precomputing some information about the ratings matrix to determine the "out-links" of each user (which blocks of products it will contribute to) and "in-link" information for each product (which of the feature vectors it receives from each user block it will depend on). This allows us to send only an array of feature vectors between each user block and product block, and have the product block find the users' ratings and update the products based on these messages.

For implicit preference data, the algorithm used is based on "Collaborative Filtering for Implicit Feedback Datasets", available at here, adapted for the blocked approach used here.

Essentially instead of finding the low-rank approximations to the rating matrix R, this finds the approximations for a preference matrix P where the elements of P are 1 if r > 0 and 0 if r <= 0. The ratings then act as 'confidence' values related to strength of indicated user preferences rather than explicit ratings given to items.

Note: the input rating RDD to the ALS implementation should be deterministic. Nondeterministic data can cause failure during fitting ALS model. For example, an order-sensitive operation like sampling after a repartition makes RDD output nondeterministic, like rdd.repartition(2).sample(false, 0.5, 1618). Checkpointing sampled RDD or adding a sort before sampling can help make the RDD deterministic.

See Also:
  • Nested Class Summary

    Nested classes/interfaces inherited from interface org.apache.spark.internal.Logging

    org.apache.spark.internal.Logging.SparkShellLoggingFilter
  • Constructor Summary

    Constructors
    Constructor
    Description
    ALS()
    Constructs an ALS instance with default parameters: {numBlocks: -1, rank: 10, iterations: 10, lambda: 0.01, implicitPrefs: false, alpha: 1.0}.
  • Method Summary

    Modifier and Type
    Method
    Description
    run(JavaRDD<Rating> ratings)
    Java-friendly version of ALS.run.
    run(RDD<Rating> ratings)
    Run ALS with the configured parameters on an input RDD of Rating objects.
    setAlpha(double alpha)
    Sets the constant used in computing confidence in implicit ALS.
    setBlocks(int numBlocks)
    Set the number of blocks for both user blocks and product blocks to parallelize the computation into; pass -1 for an auto-configured number of blocks.
    setCheckpointInterval(int checkpointInterval)
    Set period (in iterations) between checkpoints (default = 10).
    Sets storage level for final RDDs (user/product used in MatrixFactorizationModel).
    setImplicitPrefs(boolean implicitPrefs)
    Sets whether to use implicit preference.
    Sets storage level for intermediate RDDs (user/product in/out links).
    setIterations(int iterations)
    Set the number of iterations to run.
    setLambda(double lambda)
    Set the regularization parameter, lambda.
    setNonnegative(boolean b)
    Set whether the least-squares problems solved at each iteration should have nonnegativity constraints.
    setProductBlocks(int numProductBlocks)
    Set the number of product blocks to parallelize the computation.
    setRank(int rank)
    Set the rank of the feature matrices computed (number of features).
    setSeed(long seed)
    Sets a random seed to have deterministic results.
    setUserBlocks(int numUserBlocks)
    Set the number of user blocks to parallelize the computation.
    train(RDD<Rating> ratings, int rank, int iterations)
    Train a matrix factorization model given an RDD of ratings by users for a subset of products.
    train(RDD<Rating> ratings, int rank, int iterations, double lambda)
    Train a matrix factorization model given an RDD of ratings by users for a subset of products.
    train(RDD<Rating> ratings, int rank, int iterations, double lambda, int blocks)
    Train a matrix factorization model given an RDD of ratings by users for a subset of products.
    train(RDD<Rating> ratings, int rank, int iterations, double lambda, int blocks, long seed)
    Train a matrix factorization model given an RDD of ratings by users for a subset of products.
    trainImplicit(RDD<Rating> ratings, int rank, int iterations)
    Train a matrix factorization model given an RDD of 'implicit preferences' of users for a subset of products.
    trainImplicit(RDD<Rating> ratings, int rank, int iterations, double lambda, double alpha)
    Train a matrix factorization model given an RDD of 'implicit preferences' of users for a subset of products.
    trainImplicit(RDD<Rating> ratings, int rank, int iterations, double lambda, int blocks, double alpha)
    Train a matrix factorization model given an RDD of 'implicit preferences' of users for a subset of products.
    trainImplicit(RDD<Rating> ratings, int rank, int iterations, double lambda, int blocks, double alpha, long seed)
    Train a matrix factorization model given an RDD of 'implicit preferences' given by users to some products, in the form of (userID, productID, preference) pairs.

    Methods inherited from class java.lang.Object

    equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait

    Methods inherited from interface org.apache.spark.internal.Logging

    initializeForcefully, initializeLogIfNecessary, initializeLogIfNecessary, initializeLogIfNecessary$default$2, isTraceEnabled, log, logDebug, logDebug, logError, logError, logInfo, logInfo, logName, logTrace, logTrace, logWarning, logWarning, org$apache$spark$internal$Logging$$log_, org$apache$spark$internal$Logging$$log__$eq
  • Constructor Details

    • ALS

      public ALS()
      Constructs an ALS instance with default parameters: {numBlocks: -1, rank: 10, iterations: 10, lambda: 0.01, implicitPrefs: false, alpha: 1.0}.
  • Method Details

    • train

      public static MatrixFactorizationModel train(RDD<Rating> ratings, int rank, int iterations, double lambda, int blocks, long seed)
      Train a matrix factorization model given an RDD of ratings by users for a subset of products. The ratings matrix is approximated as the product of two lower-rank matrices of a given rank (number of features). To solve for these features, ALS is run iteratively with a configurable level of parallelism.

      Parameters:
      ratings - RDD of Rating objects with userID, productID, and rating
      rank - number of features to use (also referred to as the number of latent factors)
      iterations - number of iterations of ALS
      lambda - regularization parameter
      blocks - level of parallelism to split computation into
      seed - random seed for initial matrix factorization model
      Returns:
      (undocumented)
    • train

      public static MatrixFactorizationModel train(RDD<Rating> ratings, int rank, int iterations, double lambda, int blocks)
      Train a matrix factorization model given an RDD of ratings by users for a subset of products. The ratings matrix is approximated as the product of two lower-rank matrices of a given rank (number of features). To solve for these features, ALS is run iteratively with a configurable level of parallelism.

      Parameters:
      ratings - RDD of Rating objects with userID, productID, and rating
      rank - number of features to use (also referred to as the number of latent factors)
      iterations - number of iterations of ALS
      lambda - regularization parameter
      blocks - level of parallelism to split computation into
      Returns:
      (undocumented)
    • train

      public static MatrixFactorizationModel train(RDD<Rating> ratings, int rank, int iterations, double lambda)
      Train a matrix factorization model given an RDD of ratings by users for a subset of products. The ratings matrix is approximated as the product of two lower-rank matrices of a given rank (number of features). To solve for these features, ALS is run iteratively with a level of parallelism automatically based on the number of partitions in ratings.

      Parameters:
      ratings - RDD of Rating objects with userID, productID, and rating
      rank - number of features to use (also referred to as the number of latent factors)
      iterations - number of iterations of ALS
      lambda - regularization parameter
      Returns:
      (undocumented)
    • train

      public static MatrixFactorizationModel train(RDD<Rating> ratings, int rank, int iterations)
      Train a matrix factorization model given an RDD of ratings by users for a subset of products. The ratings matrix is approximated as the product of two lower-rank matrices of a given rank (number of features). To solve for these features, ALS is run iteratively with a level of parallelism automatically based on the number of partitions in ratings.

      Parameters:
      ratings - RDD of Rating objects with userID, productID, and rating
      rank - number of features to use (also referred to as the number of latent factors)
      iterations - number of iterations of ALS
      Returns:
      (undocumented)
    • trainImplicit

      public static MatrixFactorizationModel trainImplicit(RDD<Rating> ratings, int rank, int iterations, double lambda, int blocks, double alpha, long seed)
      Train a matrix factorization model given an RDD of 'implicit preferences' given by users to some products, in the form of (userID, productID, preference) pairs. We approximate the ratings matrix as the product of two lower-rank matrices of a given rank (number of features). To solve for these features, we run a given number of iterations of ALS. This is done using a level of parallelism given by blocks.

      Parameters:
      ratings - RDD of (userID, productID, rating) pairs
      rank - number of features to use (also referred to as the number of latent factors)
      iterations - number of iterations of ALS
      lambda - regularization parameter
      blocks - level of parallelism to split computation into
      alpha - confidence parameter
      seed - random seed for initial matrix factorization model
      Returns:
      (undocumented)
    • trainImplicit

      public static MatrixFactorizationModel trainImplicit(RDD<Rating> ratings, int rank, int iterations, double lambda, int blocks, double alpha)
      Train a matrix factorization model given an RDD of 'implicit preferences' of users for a subset of products. The ratings matrix is approximated as the product of two lower-rank matrices of a given rank (number of features). To solve for these features, ALS is run iteratively with a configurable level of parallelism.

      Parameters:
      ratings - RDD of Rating objects with userID, productID, and rating
      rank - number of features to use (also referred to as the number of latent factors)
      iterations - number of iterations of ALS
      lambda - regularization parameter
      blocks - level of parallelism to split computation into
      alpha - confidence parameter
      Returns:
      (undocumented)
    • trainImplicit

      public static MatrixFactorizationModel trainImplicit(RDD<Rating> ratings, int rank, int iterations, double lambda, double alpha)
      Train a matrix factorization model given an RDD of 'implicit preferences' of users for a subset of products. The ratings matrix is approximated as the product of two lower-rank matrices of a given rank (number of features). To solve for these features, ALS is run iteratively with a level of parallelism determined automatically based on the number of partitions in ratings.

      Parameters:
      ratings - RDD of Rating objects with userID, productID, and rating
      rank - number of features to use (also referred to as the number of latent factors)
      iterations - number of iterations of ALS
      lambda - regularization parameter
      alpha - confidence parameter
      Returns:
      (undocumented)
    • trainImplicit

      public static MatrixFactorizationModel trainImplicit(RDD<Rating> ratings, int rank, int iterations)
      Train a matrix factorization model given an RDD of 'implicit preferences' of users for a subset of products. The ratings matrix is approximated as the product of two lower-rank matrices of a given rank (number of features). To solve for these features, ALS is run iteratively with a level of parallelism determined automatically based on the number of partitions in ratings.

      Parameters:
      ratings - RDD of Rating objects with userID, productID, and rating
      rank - number of features to use (also referred to as the number of latent factors)
      iterations - number of iterations of ALS
      Returns:
      (undocumented)
    • setBlocks

      public ALS setBlocks(int numBlocks)
      Set the number of blocks for both user blocks and product blocks to parallelize the computation into; pass -1 for an auto-configured number of blocks. Default: -1.
      Parameters:
      numBlocks - (undocumented)
      Returns:
      (undocumented)
    • setUserBlocks

      public ALS setUserBlocks(int numUserBlocks)
      Set the number of user blocks to parallelize the computation.
      Parameters:
      numUserBlocks - (undocumented)
      Returns:
      (undocumented)
    • setProductBlocks

      public ALS setProductBlocks(int numProductBlocks)
      Set the number of product blocks to parallelize the computation.
      Parameters:
      numProductBlocks - (undocumented)
      Returns:
      (undocumented)
    • setRank

      public ALS setRank(int rank)
      Set the rank of the feature matrices computed (number of features). Default: 10.
    • setIterations

      public ALS setIterations(int iterations)
      Set the number of iterations to run. Default: 10.
    • setLambda

      public ALS setLambda(double lambda)
      Set the regularization parameter, lambda. Default: 0.01.
    • setImplicitPrefs

      public ALS setImplicitPrefs(boolean implicitPrefs)
      Sets whether to use implicit preference. Default: false.
    • setAlpha

      public ALS setAlpha(double alpha)
      Sets the constant used in computing confidence in implicit ALS. Default: 1.0.
      Parameters:
      alpha - (undocumented)
      Returns:
      (undocumented)
    • setSeed

      public ALS setSeed(long seed)
      Sets a random seed to have deterministic results.
    • setNonnegative

      public ALS setNonnegative(boolean b)
      Set whether the least-squares problems solved at each iteration should have nonnegativity constraints.
      Parameters:
      b - (undocumented)
      Returns:
      (undocumented)
    • setIntermediateRDDStorageLevel

      public ALS setIntermediateRDDStorageLevel(StorageLevel storageLevel)
      Sets storage level for intermediate RDDs (user/product in/out links). The default value is MEMORY_AND_DISK. Users can change it to a serialized storage, e.g., MEMORY_AND_DISK_SER and set spark.rdd.compress to true to reduce the space requirement, at the cost of speed.
      Parameters:
      storageLevel - (undocumented)
      Returns:
      (undocumented)
    • setFinalRDDStorageLevel

      public ALS setFinalRDDStorageLevel(StorageLevel storageLevel)
      Sets storage level for final RDDs (user/product used in MatrixFactorizationModel). The default value is MEMORY_AND_DISK. Users can change it to a serialized storage, e.g. MEMORY_AND_DISK_SER and set spark.rdd.compress to true to reduce the space requirement, at the cost of speed.
      Parameters:
      storageLevel - (undocumented)
      Returns:
      (undocumented)
    • setCheckpointInterval

      public ALS setCheckpointInterval(int checkpointInterval)
      Set period (in iterations) between checkpoints (default = 10). Checkpointing helps with recovery (when nodes fail) and StackOverflow exceptions caused by long lineage. It also helps with eliminating temporary shuffle files on disk, which can be important when there are many ALS iterations. If the checkpoint directory is not set in SparkContext, this setting is ignored.
      Parameters:
      checkpointInterval - (undocumented)
      Returns:
      (undocumented)
    • run

      public MatrixFactorizationModel run(RDD<Rating> ratings)
      Run ALS with the configured parameters on an input RDD of Rating objects. Returns a MatrixFactorizationModel with feature vectors for each user and product.
      Parameters:
      ratings - (undocumented)
      Returns:
      (undocumented)
    • run

      public MatrixFactorizationModel run(JavaRDD<Rating> ratings)
      Java-friendly version of ALS.run.
      Parameters:
      ratings - (undocumented)
      Returns:
      (undocumented)