Class ALS
- All Implemented Interfaces:
Serializable,org.apache.spark.internal.Logging,Params,HasBlockSize,HasCheckpointInterval,HasMaxIter,HasPredictionCol,HasRegParam,HasSeed,ALSModelParams,ALSParams,DefaultParamsWritable,Identifiable,MLWritable
ALS attempts to estimate the ratings matrix R as the product of two lower-rank matrices,
X and Y, i.e. X * Yt = R. Typically these approximations are called 'factor' matrices.
The general approach is iterative. During each iteration, one of the factor matrices is held
constant, while the other is solved for using least squares. The newly-solved factor matrix is
then held constant while solving for the other factor matrix.
This is a blocked implementation of the ALS factorization algorithm that groups the two sets of factors (referred to as "users" and "products") into blocks and reduces communication by only sending one copy of each user vector to each product block on each iteration, and only for the product blocks that need that user's feature vector. This is achieved by pre-computing some information about the ratings matrix to determine the "out-links" of each user (which blocks of products it will contribute to) and "in-link" information for each product (which of the feature vectors it receives from each user block it will depend on). This allows us to send only an array of feature vectors between each user block and product block, and have the product block find the users' ratings and update the products based on these messages.
For implicit preference data, the algorithm used is based on "Collaborative Filtering for Implicit Feedback Datasets", available at https://doi.org/10.1109/ICDM.2008.22, adapted for the blocked approach used here.
Essentially instead of finding the low-rank approximations to the rating matrix R,
this finds the approximations for a preference matrix P where the elements of P are 1 if
r is greater than 0 and 0 if r is less than or equal to 0. The ratings then act as 'confidence'
values related to strength of indicated user
preferences rather than explicit ratings given to items.
Note: the input rating dataset to the ALS implementation should be deterministic.
Nondeterministic data can cause failure during fitting ALS model.
For example, an order-sensitive operation like sampling after a repartition makes dataset
output nondeterministic, like dataset.repartition(2).sample(false, 0.5, 1618).
Checkpointing sampled dataset or adding a sort before sampling can help make the dataset
deterministic.
- See Also:
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classstatic interfaceTrait for least squares solvers applied to the normal equation.static classRating class for better code readability.static classstatic classNested classes/interfaces inherited from interface org.apache.spark.internal.Logging
org.apache.spark.internal.Logging.LogStringContext, org.apache.spark.internal.Logging.SparkShellLoggingFilter -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionalpha()Param for the alpha parameter in the implicit preference formulation (nonnegative).final IntParamParam for block size for stacking input data in matrices.final IntParamParam for set checkpoint interval (>= 1) or disable checkpoint (-1).Param for strategy for dealing with unknown or new users/items at prediction time.Creates a copy of this instance with the same UID and some extra params.longestimateModelSize(Dataset<?> dataset) Param for StorageLevel for ALS model factors.Fits a model to the input data.Param to decide whether to use implicit preference.Param for StorageLevel for intermediate datasets.itemCol()Param for the column name for item ids.static ALSstatic org.apache.spark.internal.Logging.LogStringContextLogStringContext(scala.StringContext sc) final IntParammaxIter()Param for maximum number of iterations (>= 0).Param for whether to apply nonnegativity constraints.Param for number of item blocks (positive).Param for number of user blocks (positive).static org.slf4j.Loggerstatic voidorg$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger x$1) Param for prediction column name.rank()Param for rank of the matrix factorization (positive).Param for the column name for ratings.static MLReader<T>read()final DoubleParamregParam()Param for regularization parameter (>= 0).final LongParamseed()Param for random seed.setAlpha(double value) setBlockSize(int value) Set block size for stacking input data in matrices.setCheckpointInterval(int value) setColdStartStrategy(String value) setFinalStorageLevel(String value) setImplicitPrefs(boolean value) setItemCol(String value) setMaxIter(int value) setNonnegative(boolean value) setNumBlocks(int value) Sets both numUserBlocks and numItemBlocks to the specific value.setNumItemBlocks(int value) setNumUserBlocks(int value) setPredictionCol(String value) setRank(int value) setRatingCol(String value) setRegParam(double value) setSeed(long value) setUserCol(String value) train(RDD<ALS.Rating<ID>> ratings, int rank, int numUserBlocks, int numItemBlocks, int maxIter, double regParam, boolean implicitPrefs, double alpha, boolean nonnegative, StorageLevel intermediateRDDStorageLevel, StorageLevel finalRDDStorageLevel, int checkpointInterval, long seed, scala.reflect.ClassTag<ID> evidence$1, scala.math.Ordering<ID> ord) Implementation of the ALS algorithm.transformSchema(StructType schema) Check transform validity and derive the output schema from the input schema.uid()An immutable unique ID for the object and its derivatives.userCol()Param for the column name for user ids.Methods inherited from class org.apache.spark.ml.PipelineStage
paramsMethods inherited from class java.lang.Object
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.apache.spark.ml.recommendation.ALSModelParams
checkIntegers, getColdStartStrategy, getItemCol, getUserColMethods inherited from interface org.apache.spark.ml.recommendation.ALSParams
getAlpha, getFinalStorageLevel, getImplicitPrefs, getIntermediateStorageLevel, getNonnegative, getNumItemBlocks, getNumUserBlocks, getRank, getRatingCol, validateAndTransformSchemaMethods inherited from interface org.apache.spark.ml.util.DefaultParamsWritable
writeMethods inherited from interface org.apache.spark.ml.param.shared.HasBlockSize
getBlockSizeMethods inherited from interface org.apache.spark.ml.param.shared.HasCheckpointInterval
getCheckpointIntervalMethods inherited from interface org.apache.spark.ml.param.shared.HasMaxIter
getMaxIterMethods inherited from interface org.apache.spark.ml.param.shared.HasPredictionCol
getPredictionColMethods inherited from interface org.apache.spark.ml.param.shared.HasRegParam
getRegParamMethods inherited from interface org.apache.spark.ml.util.Identifiable
toStringMethods inherited from interface org.apache.spark.internal.Logging
initializeForcefully, initializeLogIfNecessary, initializeLogIfNecessary, initializeLogIfNecessary$default$2, isTraceEnabled, log, logBasedOnLevel, logDebug, logDebug, logDebug, logDebug, logError, logError, logError, logError, logInfo, logInfo, logInfo, logInfo, logName, LogStringContext, logTrace, logTrace, logTrace, logTrace, logWarning, logWarning, logWarning, logWarning, org$apache$spark$internal$Logging$$log_, org$apache$spark$internal$Logging$$log__$eq, withLogContextMethods inherited from interface org.apache.spark.ml.util.MLWritable
saveMethods inherited from interface org.apache.spark.ml.param.Params
clear, copyValues, defaultCopy, defaultParamMap, estimateMatadataSize, explainParam, explainParams, extractParamMap, extractParamMap, get, getDefault, getOrDefault, getParam, hasDefault, hasParam, isDefined, isSet, onParamChange, paramMap, params, set, set, set, setDefault, setDefault, shouldOwn
-
Constructor Details
-
ALS
-
ALS
public ALS()
-
-
Method Details
-
load
-
train
public static <ID> scala.Tuple2<RDD<scala.Tuple2<ID,float[]>>, trainRDD<scala.Tuple2<ID, float[]>>> (RDD<ALS.Rating<ID>> ratings, int rank, int numUserBlocks, int numItemBlocks, int maxIter, double regParam, boolean implicitPrefs, double alpha, boolean nonnegative, StorageLevel intermediateRDDStorageLevel, StorageLevel finalRDDStorageLevel, int checkpointInterval, long seed, scala.reflect.ClassTag<ID> evidence$1, scala.math.Ordering<ID> ord) Implementation of the ALS algorithm.This implementation of the ALS factorization algorithm partitions the two sets of factors among Spark workers so as to reduce network communication by only sending one copy of each factor vector to each Spark worker on each iteration, and only if needed. This is achieved by precomputing some information about the ratings matrix to determine which users require which item factors and vice versa. See the Scaladoc for
InBlockfor a detailed explanation of how the precomputation is done.In addition, since each iteration of calculating the factor matrices depends on the known ratings, which are spread across Spark partitions, a naive implementation would incur significant network communication overhead between Spark workers, as the ratings RDD would be repeatedly shuffled during each iteration. This implementation reduces that overhead by performing the shuffling operation up front, precomputing each partition's ratings dependencies and duplicating those values to the appropriate workers before starting iterations to solve for the factor matrices. See the Scaladoc for
OutBlockfor a detailed explanation of how the precomputation is done.Note that the term "rating block" is a bit of a misnomer, as the ratings are not partitioned by contiguous blocks from the ratings matrix but by a hash function on the rating's location in the matrix. If it helps you to visualize the partitions, it is easier to think of the term "block" as referring to a subset of an RDD containing the ratings rather than a contiguous submatrix of the ratings matrix.
- Parameters:
ratings- (undocumented)rank- (undocumented)numUserBlocks- (undocumented)numItemBlocks- (undocumented)maxIter- (undocumented)regParam- (undocumented)implicitPrefs- (undocumented)alpha- (undocumented)nonnegative- (undocumented)intermediateRDDStorageLevel- (undocumented)finalRDDStorageLevel- (undocumented)checkpointInterval- (undocumented)seed- (undocumented)evidence$1- (undocumented)ord- (undocumented)- Returns:
- (undocumented)
-
read
-
org$apache$spark$internal$Logging$$log_
public static org.slf4j.Logger org$apache$spark$internal$Logging$$log_() -
org$apache$spark$internal$Logging$$log__$eq
public static void org$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger x$1) -
LogStringContext
public static org.apache.spark.internal.Logging.LogStringContext LogStringContext(scala.StringContext sc) -
rank
Description copied from interface:ALSParamsParam for rank of the matrix factorization (positive). Default: 10 -
numUserBlocks
Description copied from interface:ALSParamsParam for number of user blocks (positive). Default: 10- Specified by:
numUserBlocksin interfaceALSParams- Returns:
- (undocumented)
-
numItemBlocks
Description copied from interface:ALSParamsParam for number of item blocks (positive). Default: 10- Specified by:
numItemBlocksin interfaceALSParams- Returns:
- (undocumented)
-
implicitPrefs
Description copied from interface:ALSParamsParam to decide whether to use implicit preference. Default: false- Specified by:
implicitPrefsin interfaceALSParams- Returns:
- (undocumented)
-
alpha
Description copied from interface:ALSParamsParam for the alpha parameter in the implicit preference formulation (nonnegative). Default: 1.0 -
ratingCol
Description copied from interface:ALSParamsParam for the column name for ratings. Default: "rating" -
nonnegative
Description copied from interface:ALSParamsParam for whether to apply nonnegativity constraints. Default: false- Specified by:
nonnegativein interfaceALSParams- Returns:
- (undocumented)
-
intermediateStorageLevel
Description copied from interface:ALSParamsParam for StorageLevel for intermediate datasets. Pass in a string representation ofStorageLevel. Cannot be "NONE". Default: "MEMORY_AND_DISK".- Specified by:
intermediateStorageLevelin interfaceALSParams- Returns:
- (undocumented)
-
finalStorageLevel
Description copied from interface:ALSParamsParam for StorageLevel for ALS model factors. Pass in a string representation ofStorageLevel. Default: "MEMORY_AND_DISK".- Specified by:
finalStorageLevelin interfaceALSParams- Returns:
- (undocumented)
-
seed
Description copied from interface:HasSeedParam for random seed. -
checkpointInterval
Description copied from interface:HasCheckpointIntervalParam for set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.- Specified by:
checkpointIntervalin interfaceHasCheckpointInterval- Returns:
- (undocumented)
-
regParam
Description copied from interface:HasRegParamParam for regularization parameter (>= 0).- Specified by:
regParamin interfaceHasRegParam- Returns:
- (undocumented)
-
maxIter
Description copied from interface:HasMaxIterParam for maximum number of iterations (>= 0).- Specified by:
maxIterin interfaceHasMaxIter- Returns:
- (undocumented)
-
userCol
Description copied from interface:ALSModelParamsParam for the column name for user ids. Ids must be integers. Other numeric types are supported for this column, but will be cast to integers as long as they fall within the integer value range. Default: "user"- Specified by:
userColin interfaceALSModelParams- Returns:
- (undocumented)
-
itemCol
Description copied from interface:ALSModelParamsParam for the column name for item ids. Ids must be integers. Other numeric types are supported for this column, but will be cast to integers as long as they fall within the integer value range. Default: "item"- Specified by:
itemColin interfaceALSModelParams- Returns:
- (undocumented)
-
coldStartStrategy
Description copied from interface:ALSModelParamsParam for strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: - "nan": predicted value for unknown ids will be NaN. - "drop": rows in the input DataFrame containing unknown ids will be dropped from the output DataFrame containing predictions. Default: "nan".- Specified by:
coldStartStrategyin interfaceALSModelParams- Returns:
- (undocumented)
-
blockSize
Description copied from interface:HasBlockSizeParam for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data..- Specified by:
blockSizein interfaceHasBlockSize- Returns:
- (undocumented)
-
predictionCol
Description copied from interface:HasPredictionColParam for prediction column name.- Specified by:
predictionColin interfaceHasPredictionCol- Returns:
- (undocumented)
-
uid
Description copied from interface:IdentifiableAn immutable unique ID for the object and its derivatives.- Specified by:
uidin interfaceIdentifiable- Returns:
- (undocumented)
-
setRank
-
setNumUserBlocks
-
setNumItemBlocks
-
setImplicitPrefs
-
setAlpha
-
setUserCol
-
setItemCol
-
setRatingCol
-
setPredictionCol
-
setMaxIter
-
setRegParam
-
setNonnegative
-
setCheckpointInterval
-
setSeed
-
setIntermediateStorageLevel
-
setFinalStorageLevel
-
setColdStartStrategy
-
setBlockSize
Set block size for stacking input data in matrices. Default is 4096.- Parameters:
value- (undocumented)- Returns:
- (undocumented)
-
setNumBlocks
Sets both numUserBlocks and numItemBlocks to the specific value.- Parameters:
value- (undocumented)- Returns:
- (undocumented)
-
fit
Description copied from class:EstimatorFits a model to the input data. -
transformSchema
Description copied from class:PipelineStageCheck transform validity and derive the output schema from the input schema.We check validity for interactions between parameters during
transformSchemaand raise an exception if any parameter value is invalid. Parameter value checks which do not depend on other parameters are handled byParam.validate().Typical implementation should first conduct verification on schema change and parameter validity, including complex parameter interaction checks.
- Specified by:
transformSchemain classPipelineStage- Parameters:
schema- (undocumented)- Returns:
- (undocumented)
-
copy
Description copied from interface:ParamsCreates a copy of this instance with the same UID and some extra params. Subclasses should implement this method and set the return type properly. SeedefaultCopy(). -
estimateModelSize
-