public class ALS
extends Object
implements scala.Serializable, org.apache.spark.internal.Logging
ALS attempts to estimate the ratings matrix R
as the product of two lowerrank matrices,
X
and Y
, i.e. X * Yt = R
. Typically these approximations are called 'factor' matrices.
The general approach is iterative. During each iteration, one of the factor matrices is held
constant, while the other is solved for using least squares. The newlysolved factor matrix is
then held constant while solving for the other factor matrix.
This is a blocked implementation of the ALS factorization algorithm that groups the two sets of factors (referred to as "users" and "products") into blocks and reduces communication by only sending one copy of each user vector to each product block on each iteration, and only for the product blocks that need that user's feature vector. This is achieved by precomputing some information about the ratings matrix to determine the "outlinks" of each user (which blocks of products it will contribute to) and "inlink" information for each product (which of the feature vectors it receives from each user block it will depend on). This allows us to send only an array of feature vectors between each user block and product block, and have the product block find the users' ratings and update the products based on these messages.
For implicit preference data, the algorithm used is based on "Collaborative Filtering for Implicit Feedback Datasets", available at here, adapted for the blocked approach used here.
Essentially instead of finding the lowrank approximations to the rating matrix R
,
this finds the approximations for a preference matrix P
where the elements of P
are 1 if
r > 0 and 0 if r <= 0. The ratings then act as 'confidence' values related to strength of
indicated user
preferences rather than explicit ratings given to items.
Note: the input rating RDD to the ALS implementation should be deterministic.
Nondeterministic data can cause failure during fitting ALS model.
For example, an ordersensitive operation like sampling after a repartition makes RDD
output nondeterministic, like rdd.repartition(2).sample(false, 0.5, 1618)
.
Checkpointing sampled RDD or adding a sort before sampling can help make the RDD
deterministic.
Constructor and Description 

ALS()
Constructs an ALS instance with default parameters: {numBlocks: 1, rank: 10, iterations: 10,
lambda: 0.01, implicitPrefs: false, alpha: 1.0}.

Modifier and Type  Method and Description 

MatrixFactorizationModel 
run(JavaRDD<Rating> ratings)
Javafriendly version of
ALS.run . 
MatrixFactorizationModel 
run(RDD<Rating> ratings)
Run ALS with the configured parameters on an input RDD of
Rating objects. 
ALS 
setAlpha(double alpha)
Sets the constant used in computing confidence in implicit ALS.

ALS 
setBlocks(int numBlocks)
Set the number of blocks for both user blocks and product blocks to parallelize the computation
into; pass 1 for an autoconfigured number of blocks.

ALS 
setCheckpointInterval(int checkpointInterval)
Set period (in iterations) between checkpoints (default = 10).

ALS 
setFinalRDDStorageLevel(StorageLevel storageLevel)
Sets storage level for final RDDs (user/product used in MatrixFactorizationModel).

ALS 
setImplicitPrefs(boolean implicitPrefs)
Sets whether to use implicit preference.

ALS 
setIntermediateRDDStorageLevel(StorageLevel storageLevel)
Sets storage level for intermediate RDDs (user/product in/out links).

ALS 
setIterations(int iterations)
Set the number of iterations to run.

ALS 
setLambda(double lambda)
Set the regularization parameter, lambda.

ALS 
setNonnegative(boolean b)
Set whether the leastsquares problems solved at each iteration should have
nonnegativity constraints.

ALS 
setProductBlocks(int numProductBlocks)
Set the number of product blocks to parallelize the computation.

ALS 
setRank(int rank)
Set the rank of the feature matrices computed (number of features).

ALS 
setSeed(long seed)
Sets a random seed to have deterministic results.

ALS 
setUserBlocks(int numUserBlocks)
Set the number of user blocks to parallelize the computation.

static MatrixFactorizationModel 
train(RDD<Rating> ratings,
int rank,
int iterations)
Train a matrix factorization model given an RDD of ratings by users for a subset of products.

static MatrixFactorizationModel 
train(RDD<Rating> ratings,
int rank,
int iterations,
double lambda)
Train a matrix factorization model given an RDD of ratings by users for a subset of products.

static MatrixFactorizationModel 
train(RDD<Rating> ratings,
int rank,
int iterations,
double lambda,
int blocks)
Train a matrix factorization model given an RDD of ratings by users for a subset of products.

static MatrixFactorizationModel 
train(RDD<Rating> ratings,
int rank,
int iterations,
double lambda,
int blocks,
long seed)
Train a matrix factorization model given an RDD of ratings by users for a subset of products.

static MatrixFactorizationModel 
trainImplicit(RDD<Rating> ratings,
int rank,
int iterations)
Train a matrix factorization model given an RDD of 'implicit preferences' of users for a
subset of products.

static MatrixFactorizationModel 
trainImplicit(RDD<Rating> ratings,
int rank,
int iterations,
double lambda,
double alpha)
Train a matrix factorization model given an RDD of 'implicit preferences' of users for a
subset of products.

static MatrixFactorizationModel 
trainImplicit(RDD<Rating> ratings,
int rank,
int iterations,
double lambda,
int blocks,
double alpha)
Train a matrix factorization model given an RDD of 'implicit preferences' of users for a
subset of products.

static MatrixFactorizationModel 
trainImplicit(RDD<Rating> ratings,
int rank,
int iterations,
double lambda,
int blocks,
double alpha,
long seed)
Train a matrix factorization model given an RDD of 'implicit preferences' given by users
to some products, in the form of (userID, productID, preference) pairs.

equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
$init$, initializeForcefully, initializeLogIfNecessary, initializeLogIfNecessary, initializeLogIfNecessary$default$2, initLock, isTraceEnabled, log, logDebug, logDebug, logError, logError, logInfo, logInfo, logName, logTrace, logTrace, logWarning, logWarning, org$apache$spark$internal$Logging$$log__$eq, org$apache$spark$internal$Logging$$log_, uninitialize
public ALS()
public static MatrixFactorizationModel train(RDD<Rating> ratings, int rank, int iterations, double lambda, int blocks, long seed)
ratings
 RDD of Rating
objects with userID, productID, and ratingrank
 number of features to use (also referred to as the number of latent factors)iterations
 number of iterations of ALSlambda
 regularization parameterblocks
 level of parallelism to split computation intoseed
 random seed for initial matrix factorization modelpublic static MatrixFactorizationModel train(RDD<Rating> ratings, int rank, int iterations, double lambda, int blocks)
ratings
 RDD of Rating
objects with userID, productID, and ratingrank
 number of features to use (also referred to as the number of latent factors)iterations
 number of iterations of ALSlambda
 regularization parameterblocks
 level of parallelism to split computation intopublic static MatrixFactorizationModel train(RDD<Rating> ratings, int rank, int iterations, double lambda)
ratings
.
ratings
 RDD of Rating
objects with userID, productID, and ratingrank
 number of features to use (also referred to as the number of latent factors)iterations
 number of iterations of ALSlambda
 regularization parameterpublic static MatrixFactorizationModel train(RDD<Rating> ratings, int rank, int iterations)
ratings
.
ratings
 RDD of Rating
objects with userID, productID, and ratingrank
 number of features to use (also referred to as the number of latent factors)iterations
 number of iterations of ALSpublic static MatrixFactorizationModel trainImplicit(RDD<Rating> ratings, int rank, int iterations, double lambda, int blocks, double alpha, long seed)
blocks
.
ratings
 RDD of (userID, productID, rating) pairsrank
 number of features to use (also referred to as the number of latent factors)iterations
 number of iterations of ALSlambda
 regularization parameterblocks
 level of parallelism to split computation intoalpha
 confidence parameterseed
 random seed for initial matrix factorization modelpublic static MatrixFactorizationModel trainImplicit(RDD<Rating> ratings, int rank, int iterations, double lambda, int blocks, double alpha)
ratings
 RDD of Rating
objects with userID, productID, and ratingrank
 number of features to use (also referred to as the number of latent factors)iterations
 number of iterations of ALSlambda
 regularization parameterblocks
 level of parallelism to split computation intoalpha
 confidence parameterpublic static MatrixFactorizationModel trainImplicit(RDD<Rating> ratings, int rank, int iterations, double lambda, double alpha)
ratings
.
ratings
 RDD of Rating
objects with userID, productID, and ratingrank
 number of features to use (also referred to as the number of latent factors)iterations
 number of iterations of ALSlambda
 regularization parameteralpha
 confidence parameterpublic static MatrixFactorizationModel trainImplicit(RDD<Rating> ratings, int rank, int iterations)
ratings
.
ratings
 RDD of Rating
objects with userID, productID, and ratingrank
 number of features to use (also referred to as the number of latent factors)iterations
 number of iterations of ALSpublic ALS setBlocks(int numBlocks)
numBlocks
 (undocumented)public ALS setUserBlocks(int numUserBlocks)
numUserBlocks
 (undocumented)public ALS setProductBlocks(int numProductBlocks)
numProductBlocks
 (undocumented)public ALS setRank(int rank)
public ALS setIterations(int iterations)
public ALS setLambda(double lambda)
public ALS setImplicitPrefs(boolean implicitPrefs)
public ALS setAlpha(double alpha)
alpha
 (undocumented)public ALS setSeed(long seed)
public ALS setNonnegative(boolean b)
b
 (undocumented)public ALS setIntermediateRDDStorageLevel(StorageLevel storageLevel)
MEMORY_AND_DISK
. Users can change it to a serialized storage, e.g., MEMORY_AND_DISK_SER
and
set spark.rdd.compress
to true
to reduce the space requirement, at the cost of speed.storageLevel
 (undocumented)public ALS setFinalRDDStorageLevel(StorageLevel storageLevel)
MEMORY_AND_DISK
. Users can change it to a serialized storage, e.g.
MEMORY_AND_DISK_SER
and set spark.rdd.compress
to true
to reduce the space requirement,
at the cost of speed.storageLevel
 (undocumented)public ALS setCheckpointInterval(int checkpointInterval)
SparkContext
,
this setting is ignored.checkpointInterval
 (undocumented)public MatrixFactorizationModel run(RDD<Rating> ratings)
Rating
objects.
Returns a MatrixFactorizationModel with feature vectors for each user and product.ratings
 (undocumented)public MatrixFactorizationModel run(JavaRDD<Rating> ratings)
ALS.run
.ratings
 (undocumented)