Source code for pyspark.ml.functions

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from pyspark import SparkContext
from pyspark.sql.column import Column, _to_java_column


[docs]def vector_to_array(col, dtype="float64"): """ Converts a column of MLlib sparse/dense vectors into a column of dense arrays. .. versionadded:: 3.0.0 Parameters ---------- col : :py:class:`pyspark.sql.Column` or str Input column dtype : str, optional The data type of the output array. Valid values: "float64" or "float32". Returns ------- :py:class:`pyspark.sql.Column` The converted column of dense arrays. Examples -------- >>> from pyspark.ml.linalg import Vectors >>> from pyspark.ml.functions import vector_to_array >>> from pyspark.mllib.linalg import Vectors as OldVectors >>> df = spark.createDataFrame([ ... (Vectors.dense(1.0, 2.0, 3.0), OldVectors.dense(10.0, 20.0, 30.0)), ... (Vectors.sparse(3, [(0, 2.0), (2, 3.0)]), ... OldVectors.sparse(3, [(0, 20.0), (2, 30.0)]))], ... ["vec", "oldVec"]) >>> df1 = df.select(vector_to_array("vec").alias("vec"), ... vector_to_array("oldVec").alias("oldVec")) >>> df1.collect() [Row(vec=[1.0, 2.0, 3.0], oldVec=[10.0, 20.0, 30.0]), Row(vec=[2.0, 0.0, 3.0], oldVec=[20.0, 0.0, 30.0])] >>> df2 = df.select(vector_to_array("vec", "float32").alias("vec"), ... vector_to_array("oldVec", "float32").alias("oldVec")) >>> df2.collect() [Row(vec=[1.0, 2.0, 3.0], oldVec=[10.0, 20.0, 30.0]), Row(vec=[2.0, 0.0, 3.0], oldVec=[20.0, 0.0, 30.0])] >>> df1.schema.fields [StructField(vec,ArrayType(DoubleType,false),false), StructField(oldVec,ArrayType(DoubleType,false),false)] >>> df2.schema.fields [StructField(vec,ArrayType(FloatType,false),false), StructField(oldVec,ArrayType(FloatType,false),false)] """ sc = SparkContext._active_spark_context return Column( sc._jvm.org.apache.spark.ml.functions.vector_to_array(_to_java_column(col), dtype))
[docs]def array_to_vector(col): """ Converts a column of array of numeric type into a column of dense vectors in MLlib .. versionadded:: 3.1.0 Parameters ---------- col : :py:class:`pyspark.sql.Column` or str Input column Returns ------- :py:class:`pyspark.sql.Column` The converted column of MLlib dense vectors. Examples -------- >>> from pyspark.ml.functions import array_to_vector >>> df1 = spark.createDataFrame([([1.5, 2.5],),], schema='v1 array<double>') >>> df1.select(array_to_vector('v1').alias('vec1')).collect() [Row(vec1=DenseVector([1.5, 2.5]))] >>> df2 = spark.createDataFrame([([1.5, 3.5],),], schema='v1 array<float>') >>> df2.select(array_to_vector('v1').alias('vec1')).collect() [Row(vec1=DenseVector([1.5, 3.5]))] >>> df3 = spark.createDataFrame([([1, 3],),], schema='v1 array<int>') >>> df3.select(array_to_vector('v1').alias('vec1')).collect() [Row(vec1=DenseVector([1.0, 3.0]))] """ sc = SparkContext._active_spark_context return Column( sc._jvm.org.apache.spark.ml.functions.array_to_vector(_to_java_column(col)))
def _test(): import doctest from pyspark.sql import SparkSession import pyspark.ml.functions import sys globs = pyspark.ml.functions.__dict__.copy() spark = SparkSession.builder \ .master("local[2]") \ .appName("ml.functions tests") \ .getOrCreate() sc = spark.sparkContext globs['sc'] = sc globs['spark'] = spark (failure_count, test_count) = doctest.testmod( pyspark.ml.functions, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) spark.stop() if failure_count: sys.exit(-1) if __name__ == "__main__": _test()