## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#fromabcimportABCMeta,abstractmethodfromfunctoolsimportpartialfromtypingimportAny,Callable,Generic,List,Optionalimportnumpyasnpfrompyspark.sqlimportWindowfrompyspark.sqlimportfunctionsasFfrompyspark.pandas.missing.windowimport(MissingPandasLikeRolling,MissingPandasLikeRollingGroupby,MissingPandasLikeExpanding,MissingPandasLikeExpandingGroupby,MissingPandasLikeExponentialMoving,MissingPandasLikeExponentialMovingGroupby,)# For running doctests and reference resolution in PyCharm.frompysparkimportpandasasps# noqa: F401frompyspark.pandas._typingimportFrameLikefrompyspark.pandas.groupbyimportGroupBy,DataFrameGroupByfrompyspark.pandas.internalimportNATURAL_ORDER_COLUMN_NAME,SPARK_INDEX_NAME_FORMATfrompyspark.pandas.sparkimportfunctionsasSFfrompyspark.pandas.utilsimportscol_forfrompyspark.sql.columnimportColumnfrompyspark.sql.typesimport(DoubleType,)frompyspark.sql.windowimportWindowSpecclassRollingAndExpanding(Generic[FrameLike],metaclass=ABCMeta):def__init__(self,window:WindowSpec,min_periods:int):self._window=window# This unbounded Window is later used to handle 'min_periods' for now.self._unbounded_window=Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(Window.unboundedPreceding,Window.currentRow)self._min_periods=min_periods@abstractmethoddef_apply_as_series_or_frame(self,func:Callable[[Column],Column])->FrameLike:""" Wraps a function that handles Spark column in order to support it in both pandas-on-Spark Series and DataFrame. Note that the given `func` name should be same as the API's method name. """pass@abstractmethoddefcount(self)->FrameLike:passdefsum(self)->FrameLike:defsum(scol:Column)->Column:returnF.when(F.row_number().over(self._unbounded_window)>=self._min_periods,F.sum(scol).over(self._window),).otherwise(F.lit(None))returnself._apply_as_series_or_frame(sum)defmin(self)->FrameLike:defmin(scol:Column)->Column:returnF.when(F.row_number().over(self._unbounded_window)>=self._min_periods,F.min(scol).over(self._window),).otherwise(F.lit(None))returnself._apply_as_series_or_frame(min)defmax(self)->FrameLike:defmax(scol:Column)->Column:returnF.when(F.row_number().over(self._unbounded_window)>=self._min_periods,F.max(scol).over(self._window),).otherwise(F.lit(None))returnself._apply_as_series_or_frame(max)defmean(self)->FrameLike:defmean(scol:Column)->Column:returnF.when(F.row_number().over(self._unbounded_window)>=self._min_periods,F.mean(scol).over(self._window),).otherwise(F.lit(None))returnself._apply_as_series_or_frame(mean)defquantile(self,q:float,accuracy:int=10000)->FrameLike:defquantile(scol:Column)->Column:returnF.when(F.row_number().over(self._unbounded_window)>=self._min_periods,F.percentile_approx(scol.cast(DoubleType()),q,accuracy).over(self._window),).otherwise(F.lit(None))returnself._apply_as_series_or_frame(quantile)defstd(self)->FrameLike:defstd(scol:Column)->Column:returnF.when(F.row_number().over(self._unbounded_window)>=self._min_periods,F.stddev(scol).over(self._window),).otherwise(F.lit(None))returnself._apply_as_series_or_frame(std)defvar(self)->FrameLike:defvar(scol:Column)->Column:returnF.when(F.row_number().over(self._unbounded_window)>=self._min_periods,F.variance(scol).over(self._window),).otherwise(F.lit(None))returnself._apply_as_series_or_frame(var)defskew(self)->FrameLike:defskew(scol:Column)->Column:returnF.when(F.row_number().over(self._unbounded_window)>=self._min_periods,SF.skew(scol).over(self._window),).otherwise(F.lit(None))returnself._apply_as_series_or_frame(skew)defkurt(self)->FrameLike:defkurt(scol:Column)->Column:returnF.when(F.row_number().over(self._unbounded_window)>=self._min_periods,SF.kurt(scol).over(self._window),).otherwise(F.lit(None))returnself._apply_as_series_or_frame(kurt)classRollingLike(RollingAndExpanding[FrameLike]):def__init__(self,window:int,min_periods:Optional[int]=None,):ifwindow<0:raiseValueError("window must be >= 0")if(min_periodsisnotNone)and(min_periods<0):raiseValueError("min_periods must be >= 0")ifmin_periodsisNone:# TODO: 'min_periods' is not equivalent in pandas because it does not count NA as# a value.min_periods=windowwindow_spec=Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(Window.currentRow-(window-1),Window.currentRow)super().__init__(window_spec,min_periods)defcount(self)->FrameLike:defcount(scol:Column)->Column:returnF.count(scol).over(self._window)returnself._apply_as_series_or_frame(count).astype("float64")# type: ignore[attr-defined]classRolling(RollingLike[FrameLike]):def__init__(self,psdf_or_psser:FrameLike,window:int,min_periods:Optional[int]=None,):frompyspark.pandas.frameimportDataFramefrompyspark.pandas.seriesimportSeriessuper().__init__(window,min_periods)self._psdf_or_psser=psdf_or_psserifnotisinstance(psdf_or_psser,(DataFrame,Series)):raiseTypeError("psdf_or_psser must be a series or dataframe; however, got: %s"%type(psdf_or_psser))def__getattr__(self,item:str)->Any:ifhasattr(MissingPandasLikeRolling,item):property_or_func=getattr(MissingPandasLikeRolling,item)ifisinstance(property_or_func,property):returnproperty_or_func.fget(self)else:returnpartial(property_or_func,self)raiseAttributeError(item)def_apply_as_series_or_frame(self,func:Callable[[Column],Column])->FrameLike:returnself._psdf_or_psser._apply_series_op(lambdapsser:psser._with_new_scol(func(psser.spark.column)),# TODO: dtype?should_resolve=True,)
[docs]defcount(self)->FrameLike:""" The rolling count of any non-NaN observations inside the window. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Return type is the same as the original object with `np.float64` dtype. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.count : Count of the full Series. pyspark.pandas.DataFrame.count : Count of the full DataFrame. Examples -------- >>> s = ps.Series([2, 3, float("nan"), 10]) >>> s.rolling(1).count() 0 1.0 1 1.0 2 0.0 3 1.0 dtype: float64 >>> s.rolling(3).count() 0 1.0 1 2.0 2 2.0 3 2.0 dtype: float64 >>> s.to_frame().rolling(1).count() 0 0 1.0 1 1.0 2 0.0 3 1.0 >>> s.to_frame().rolling(3).count() 0 0 1.0 1 2.0 2 2.0 3 2.0 """returnsuper().count()
[docs]defsum(self)->FrameLike:""" Calculate rolling summation of given DataFrame or Series. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Same type as the input, with the same index, containing the rolling summation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.sum : Reducing sum for Series. pyspark.pandas.DataFrame.sum : Reducing sum for DataFrame. Examples -------- >>> s = ps.Series([4, 3, 5, 2, 6]) >>> s 0 4 1 3 2 5 3 2 4 6 dtype: int64 >>> s.rolling(2).sum() 0 NaN 1 7.0 2 8.0 3 7.0 4 8.0 dtype: float64 >>> s.rolling(3).sum() 0 NaN 1 NaN 2 12.0 3 10.0 4 13.0 dtype: float64 For DataFrame, each rolling summation is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df A B 0 4 16 1 3 9 2 5 25 3 2 4 4 6 36 >>> df.rolling(2).sum() A B 0 NaN NaN 1 7.0 25.0 2 8.0 34.0 3 7.0 29.0 4 8.0 40.0 >>> df.rolling(3).sum() A B 0 NaN NaN 1 NaN NaN 2 12.0 50.0 3 10.0 38.0 4 13.0 65.0 """returnsuper().sum()
[docs]defmin(self)->FrameLike:""" Calculate the rolling minimum. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returned object type is determined by the caller of the rolling calculation. See Also -------- pyspark.pandas.Series.rolling : Calling object with a Series. pyspark.pandas.DataFrame.rolling : Calling object with a DataFrame. pyspark.pandas.Series.min : Similar method for Series. pyspark.pandas.DataFrame.min : Similar method for DataFrame. Examples -------- >>> s = ps.Series([4, 3, 5, 2, 6]) >>> s 0 4 1 3 2 5 3 2 4 6 dtype: int64 >>> s.rolling(2).min() 0 NaN 1 3.0 2 3.0 3 2.0 4 2.0 dtype: float64 >>> s.rolling(3).min() 0 NaN 1 NaN 2 3.0 3 2.0 4 2.0 dtype: float64 For DataFrame, each rolling minimum is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df A B 0 4 16 1 3 9 2 5 25 3 2 4 4 6 36 >>> df.rolling(2).min() A B 0 NaN NaN 1 3.0 9.0 2 3.0 9.0 3 2.0 4.0 4 2.0 4.0 >>> df.rolling(3).min() A B 0 NaN NaN 1 NaN NaN 2 3.0 9.0 3 2.0 4.0 4 2.0 4.0 """returnsuper().min()
[docs]defmax(self)->FrameLike:""" Calculate the rolling maximum. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Return type is determined by the caller. See Also -------- pyspark.pandas.Series.rolling : Series rolling. pyspark.pandas.DataFrame.rolling : DataFrame rolling. pyspark.pandas.Series.max : Similar method for Series. pyspark.pandas.DataFrame.max : Similar method for DataFrame. Examples -------- >>> s = ps.Series([4, 3, 5, 2, 6]) >>> s 0 4 1 3 2 5 3 2 4 6 dtype: int64 >>> s.rolling(2).max() 0 NaN 1 4.0 2 5.0 3 5.0 4 6.0 dtype: float64 >>> s.rolling(3).max() 0 NaN 1 NaN 2 5.0 3 5.0 4 6.0 dtype: float64 For DataFrame, each rolling maximum is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df A B 0 4 16 1 3 9 2 5 25 3 2 4 4 6 36 >>> df.rolling(2).max() A B 0 NaN NaN 1 4.0 16.0 2 5.0 25.0 3 5.0 25.0 4 6.0 36.0 >>> df.rolling(3).max() A B 0 NaN NaN 1 NaN NaN 2 5.0 25.0 3 5.0 25.0 4 6.0 36.0 """returnsuper().max()
[docs]defmean(self)->FrameLike:""" Calculate the rolling mean of the values. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returned object type is determined by the caller of the rolling calculation. See Also -------- pyspark.pandas.Series.rolling : Calling object with Series data. pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. pyspark.pandas.Series.mean : Equivalent method for Series. pyspark.pandas.DataFrame.mean : Equivalent method for DataFrame. Examples -------- >>> s = ps.Series([4, 3, 5, 2, 6]) >>> s 0 4 1 3 2 5 3 2 4 6 dtype: int64 >>> s.rolling(2).mean() 0 NaN 1 3.5 2 4.0 3 3.5 4 4.0 dtype: float64 >>> s.rolling(3).mean() 0 NaN 1 NaN 2 4.000000 3 3.333333 4 4.333333 dtype: float64 For DataFrame, each rolling mean is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df A B 0 4 16 1 3 9 2 5 25 3 2 4 4 6 36 >>> df.rolling(2).mean() A B 0 NaN NaN 1 3.5 12.5 2 4.0 17.0 3 3.5 14.5 4 4.0 20.0 >>> df.rolling(3).mean() A B 0 NaN NaN 1 NaN NaN 2 4.000000 16.666667 3 3.333333 12.666667 4 4.333333 21.666667 """returnsuper().mean()
[docs]defquantile(self,quantile:float,accuracy:int=10000)->FrameLike:""" Calculate the rolling quantile of the values. .. versionadded:: 3.4.0 Parameters ---------- quantile : float Value between 0 and 1 providing the quantile to compute. accuracy : int, optional Default accuracy of approximation. Larger value means better accuracy. The relative error can be deduced by 1.0 / accuracy. This is a panda-on-Spark specific parameter. Returns ------- Series or DataFrame Returned object type is determined by the caller of the rolling calculation. Notes ----- `quantile` in pandas-on-Spark are using distributed percentile approximation algorithm unlike pandas, the result might be different with pandas, also `interpolation` parameter is not supported yet. the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. See Also -------- pyspark.pandas.Series.rolling : Calling rolling with Series data. pyspark.pandas.DataFrame.rolling : Calling rolling with DataFrames. pyspark.pandas.Series.quantile : Aggregating quantile for Series. pyspark.pandas.DataFrame.quantile : Aggregating quantile for DataFrame. Examples -------- >>> s = ps.Series([4, 3, 5, 2, 6]) >>> s 0 4 1 3 2 5 3 2 4 6 dtype: int64 >>> s.rolling(2).quantile(0.5) 0 NaN 1 3.0 2 3.0 3 2.0 4 2.0 dtype: float64 >>> s.rolling(3).quantile(0.5) 0 NaN 1 NaN 2 4.0 3 3.0 4 5.0 dtype: float64 For DataFrame, each rolling quantile is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df A B 0 4 16 1 3 9 2 5 25 3 2 4 4 6 36 >>> df.rolling(2).quantile(0.5) A B 0 NaN NaN 1 3.0 9.0 2 3.0 9.0 3 2.0 4.0 4 2.0 4.0 >>> df.rolling(3).quantile(0.5) A B 0 NaN NaN 1 NaN NaN 2 4.0 16.0 3 3.0 9.0 4 5.0 25.0 """returnsuper().quantile(quantile,accuracy)
defstd(self)->FrameLike:""" Calculate rolling standard deviation. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returns the same object type as the caller of the rolling calculation. See Also -------- pyspark.pandas.Series.rolling : Calling object with Series data. pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. pyspark.pandas.Series.std : Equivalent method for Series. pyspark.pandas.DataFrame.std : Equivalent method for DataFrame. numpy.std : Equivalent method for Numpy array. Examples -------- >>> s = ps.Series([5, 5, 6, 7, 5, 5, 5]) >>> s.rolling(3).std() 0 NaN 1 NaN 2 0.577350 3 1.000000 4 1.000000 5 1.154701 6 0.000000 dtype: float64 For DataFrame, each rolling standard deviation is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.rolling(2).std() A B 0 NaN NaN 1 0.000000 0.000000 2 0.707107 7.778175 3 0.707107 9.192388 4 1.414214 16.970563 5 0.000000 0.000000 6 0.000000 0.000000 """returnsuper().std()defvar(self)->FrameLike:""" Calculate unbiased rolling variance. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returns the same object type as the caller of the rolling calculation. See Also -------- Series.rolling : Calling object with Series data. DataFrame.rolling : Calling object with DataFrames. Series.var : Equivalent method for Series. DataFrame.var : Equivalent method for DataFrame. numpy.var : Equivalent method for Numpy array. Examples -------- >>> s = ps.Series([5, 5, 6, 7, 5, 5, 5]) >>> s.rolling(3).var() 0 NaN 1 NaN 2 0.333333 3 1.000000 4 1.000000 5 1.333333 6 0.000000 dtype: float64 For DataFrame, each unbiased rolling variance is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.rolling(2).var() A B 0 NaN NaN 1 0.0 0.0 2 0.5 60.5 3 0.5 84.5 4 2.0 288.0 5 0.0 0.0 6 0.0 0.0 """returnsuper().var()defskew(self)->FrameLike:""" Calculate unbiased rolling skew. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returns the same object type as the caller of the rolling calculation. See Also -------- pyspark.pandas.Series.rolling : Calling object with Series data. pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. pyspark.pandas.Series.std : Equivalent method for Series. pyspark.pandas.DataFrame.std : Equivalent method for DataFrame. numpy.std : Equivalent method for Numpy array. Examples -------- >>> s = ps.Series([5, 5, 6, 7, 5, 1, 5, 9]) >>> s.rolling(3).skew() 0 NaN 1 NaN 2 1.732051 3 0.000000 4 0.000000 5 -0.935220 6 -1.732051 7 0.000000 dtype: float64 For DataFrame, each rolling standard deviation is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.rolling(5).skew() A B 0 NaN NaN 1 NaN NaN 2 NaN NaN 3 NaN NaN 4 1.257788 1.369456 5 -1.492685 -0.526039 6 -1.492685 -0.526039 7 -0.551618 0.686072 """returnsuper().skew()defkurt(self)->FrameLike:""" Calculate unbiased rolling kurtosis. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returns the same object type as the caller of the rolling calculation. See Also -------- pyspark.pandas.Series.rolling : Calling object with Series data. pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. pyspark.pandas.Series.var : Equivalent method for Series. pyspark.pandas.DataFrame.var : Equivalent method for DataFrame. numpy.var : Equivalent method for Numpy array. Examples -------- >>> s = ps.Series([5, 5, 6, 7, 5, 1, 5, 9]) >>> s.rolling(4).kurt() 0 NaN 1 NaN 2 NaN 3 -1.289256 4 -1.289256 5 2.234867 6 2.227147 7 1.500000 dtype: float64 For DataFrame, each unbiased rolling variance is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.rolling(5).kurt() A B 0 NaN NaN 1 NaN NaN 2 NaN NaN 3 NaN NaN 4 0.312500 0.906336 5 2.818047 1.016942 6 2.818047 1.016942 7 0.867769 0.389750 """returnsuper().kurt()classRollingGroupby(RollingLike[FrameLike]):def__init__(self,groupby:GroupBy[FrameLike],window:int,min_periods:Optional[int]=None,):super().__init__(window,min_periods)self._groupby=groupbyself._window=self._window.partitionBy(*[ser.spark.columnforseringroupby._groupkeys])self._unbounded_window=self._unbounded_window.partitionBy(*[ser.spark.columnforseringroupby._groupkeys])def__getattr__(self,item:str)->Any:ifhasattr(MissingPandasLikeRollingGroupby,item):property_or_func=getattr(MissingPandasLikeRollingGroupby,item)ifisinstance(property_or_func,property):returnproperty_or_func.fget(self)else:returnpartial(property_or_func,self)raiseAttributeError(item)def_apply_as_series_or_frame(self,func:Callable[[Column],Column])->FrameLike:""" Wraps a function that handles Spark column in order to support it in both pandas-on-Spark Series and DataFrame. Note that the given `func` name should be same as the API's method name. """frompyspark.pandasimportDataFramegroupby=self._groupbypsdf=groupby._psdf# Here we need to include grouped key as an index, and shift previous index.# [index_column0, index_column1] -> [grouped key, index_column0, index_column1]new_index_scols:List[Column]=[]new_index_spark_column_names=[]new_index_names=[]new_index_fields=[]forgroupkeyingroupby._groupkeys:index_column_name=SPARK_INDEX_NAME_FORMAT(len(new_index_scols))new_index_scols.append(groupkey.spark.column.alias(index_column_name))new_index_spark_column_names.append(index_column_name)new_index_names.append(groupkey._column_label)new_index_fields.append(groupkey._internal.data_fields[0].copy(name=index_column_name))fornew_index_scol,index_name,index_fieldinzip(psdf._internal.index_spark_columns,psdf._internal.index_names,psdf._internal.index_fields,):index_column_name=SPARK_INDEX_NAME_FORMAT(len(new_index_scols))new_index_scols.append(new_index_scol.alias(index_column_name))new_index_spark_column_names.append(index_column_name)new_index_names.append(index_name)new_index_fields.append(index_field.copy(name=index_column_name))ifgroupby._agg_columns_selected:agg_columns=groupby._agg_columnselse:# pandas doesn't keep the groupkey as a column from 1.3 for DataFrameGroupBycolumn_labels_to_exclude=groupby._column_labels_to_exclude.copy()ifisinstance(groupby,DataFrameGroupBy):forgroupkeyingroupby._groupkeys:# type: ignore[attr-defined]column_labels_to_exclude.add(groupkey._internal.column_labels[0])agg_columns=[psdf._psser_for(label)forlabelinpsdf._internal.column_labelsiflabelnotincolumn_labels_to_exclude]applied=[]foragg_columninagg_columns:applied.append(agg_column._with_new_scol(func(agg_column.spark.column)))# TODO: dtype?# Seems like pandas filters out when grouped key is NA.cond=groupby._groupkeys[0].spark.column.isNotNull()forcingroupby._groupkeys[1:]:cond=cond|c.spark.column.isNotNull()sdf=psdf._internal.spark_frame.filter(cond).select(new_index_scols+[c.spark.columnforcinapplied])internal=psdf._internal.copy(spark_frame=sdf,index_spark_columns=[scol_for(sdf,col)forcolinnew_index_spark_column_names],index_names=new_index_names,index_fields=new_index_fields,column_labels=[c._column_labelforcinapplied],data_spark_columns=[scol_for(sdf,c._internal.data_spark_column_names[0])forcinapplied],data_fields=[c._internal.data_fields[0]forcinapplied],)returngroupby._handle_output(DataFrame(internal))defcount(self)->FrameLike:""" The rolling count of any non-NaN observations inside the window. Returns ------- Series or DataFrame Returned object type is determined by the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.rolling : Calling object with Series data. pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. pyspark.pandas.Series.count : Count of the full Series. pyspark.pandas.DataFrame.count : Count of the full DataFrame. Examples -------- >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) >>> s.groupby(s).rolling(3).count().sort_index() 2 0 1.0 1 2.0 3 2 1.0 3 2.0 4 3.0 4 5 1.0 6 2.0 7 3.0 8 3.0 5 9 1.0 10 2.0 dtype: float64 For DataFrame, each rolling count is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.groupby(df.A).rolling(2).count().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 2 0 1.0 1 2.0 3 2 1.0 3 2.0 4 2.0 4 5 1.0 6 2.0 7 2.0 8 2.0 5 9 1.0 10 2.0 """returnsuper().count()defsum(self)->FrameLike:""" The rolling summation of any non-NaN observations inside the window. Returns ------- Series or DataFrame Returned object type is determined by the caller of the rolling calculation. See Also -------- pyspark.pandas.Series.rolling : Calling object with Series data. pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. pyspark.pandas.Series.sum : Sum of the full Series. pyspark.pandas.DataFrame.sum : Sum of the full DataFrame. Examples -------- >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) >>> s.groupby(s).rolling(3).sum().sort_index() 2 0 NaN 1 NaN 3 2 NaN 3 NaN 4 9.0 4 5 NaN 6 NaN 7 12.0 8 12.0 5 9 NaN 10 NaN dtype: float64 For DataFrame, each rolling summation is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.groupby(df.A).rolling(2).sum().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 2 0 NaN 1 8.0 3 2 NaN 3 18.0 4 18.0 4 5 NaN 6 32.0 7 32.0 8 32.0 5 9 NaN 10 50.0 """returnsuper().sum()defmin(self)->FrameLike:""" The rolling minimum of any non-NaN observations inside the window. Returns ------- Series or DataFrame Returned object type is determined by the caller of the rolling calculation. See Also -------- pyspark.pandas.Series.rolling : Calling object with Series data. pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. pyspark.pandas.Series.min : Min of the full Series. pyspark.pandas.DataFrame.min : Min of the full DataFrame. Examples -------- >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) >>> s.groupby(s).rolling(3).min().sort_index() 2 0 NaN 1 NaN 3 2 NaN 3 NaN 4 3.0 4 5 NaN 6 NaN 7 4.0 8 4.0 5 9 NaN 10 NaN dtype: float64 For DataFrame, each rolling minimum is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.groupby(df.A).rolling(2).min().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 2 0 NaN 1 4.0 3 2 NaN 3 9.0 4 9.0 4 5 NaN 6 16.0 7 16.0 8 16.0 5 9 NaN 10 25.0 """returnsuper().min()defmax(self)->FrameLike:""" The rolling maximum of any non-NaN observations inside the window. Returns ------- Series or DataFrame Returned object type is determined by the caller of the rolling calculation. See Also -------- pyspark.pandas.Series.rolling : Calling object with Series data. pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. pyspark.pandas.Series.max : Max of the full Series. pyspark.pandas.DataFrame.max : Max of the full DataFrame. Examples -------- >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) >>> s.groupby(s).rolling(3).max().sort_index() 2 0 NaN 1 NaN 3 2 NaN 3 NaN 4 3.0 4 5 NaN 6 NaN 7 4.0 8 4.0 5 9 NaN 10 NaN dtype: float64 For DataFrame, each rolling maximum is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.groupby(df.A).rolling(2).max().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 2 0 NaN 1 4.0 3 2 NaN 3 9.0 4 9.0 4 5 NaN 6 16.0 7 16.0 8 16.0 5 9 NaN 10 25.0 """returnsuper().max()defmean(self)->FrameLike:""" The rolling mean of any non-NaN observations inside the window. Returns ------- Series or DataFrame Returned object type is determined by the caller of the rolling calculation. See Also -------- pyspark.pandas.Series.rolling : Calling object with Series data. pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. pyspark.pandas.Series.mean : Mean of the full Series. pyspark.pandas.DataFrame.mean : Mean of the full DataFrame. Examples -------- >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) >>> s.groupby(s).rolling(3).mean().sort_index() 2 0 NaN 1 NaN 3 2 NaN 3 NaN 4 3.0 4 5 NaN 6 NaN 7 4.0 8 4.0 5 9 NaN 10 NaN dtype: float64 For DataFrame, each rolling mean is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.groupby(df.A).rolling(2).mean().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 2 0 NaN 1 4.0 3 2 NaN 3 9.0 4 9.0 4 5 NaN 6 16.0 7 16.0 8 16.0 5 9 NaN 10 25.0 """returnsuper().mean()defquantile(self,quantile:float,accuracy:int=10000)->FrameLike:""" Calculate rolling quantile. .. versionadded:: 3.4.0 Parameters ---------- quantile : float Value between 0 and 1 providing the quantile to compute. accuracy : int, optional Default accuracy of approximation. Larger value means better accuracy. The relative error can be deduced by 1.0 / accuracy. This is a panda-on-Spark specific parameter. Returns ------- Series or DataFrame Returned object type is determined by the caller of the rolling calculation. Notes ----- `quantile` in pandas-on-Spark are using distributed percentile approximation algorithm unlike pandas, the result might be different with pandas, also `interpolation` parameter is not supported yet. See Also -------- pyspark.pandas.Series.rolling : Calling rolling with Series data. pyspark.pandas.DataFrame.rolling : Calling rolling with DataFrames. pyspark.pandas.Series.quantile : Aggregating quantile for Series. pyspark.pandas.DataFrame.quantile : Aggregating quantile for DataFrame. Examples -------- >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) >>> s.groupby(s).rolling(3).quantile(0.5).sort_index() 2 0 NaN 1 NaN 3 2 NaN 3 NaN 4 3.0 4 5 NaN 6 NaN 7 4.0 8 4.0 5 9 NaN 10 NaN dtype: float64 For DataFrame, each rolling quantile is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.groupby(df.A).rolling(2).quantile(0.5).sort_index() B A 2 0 NaN 1 4.0 3 2 NaN 3 9.0 4 9.0 4 5 NaN 6 16.0 7 16.0 8 16.0 5 9 NaN 10 25.0 """returnsuper().quantile(quantile,accuracy)defstd(self)->FrameLike:""" Calculate rolling standard deviation. Returns ------- Series or DataFrame Returns the same object type as the caller of the rolling calculation. See Also -------- pyspark.pandas.Series.rolling : Calling object with Series data. pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. pyspark.pandas.Series.std : Equivalent method for Series. pyspark.pandas.DataFrame.std : Equivalent method for DataFrame. numpy.std : Equivalent method for Numpy array. """returnsuper().std()defvar(self)->FrameLike:""" Calculate unbiased rolling variance. Returns ------- Series or DataFrame Returns the same object type as the caller of the rolling calculation. See Also -------- pyspark.pandas.Series.rolling : Calling object with Series data. pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. pyspark.pandas.Series.var : Equivalent method for Series. pyspark.pandas.DataFrame.var : Equivalent method for DataFrame. numpy.var : Equivalent method for Numpy array. """returnsuper().var()defskew(self)->FrameLike:""" Calculate unbiased rolling skew. Returns ------- Series or DataFrame Returns the same object type as the caller of the rolling calculation. See Also -------- pyspark.pandas.Series.rolling : Calling object with Series data. pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. pyspark.pandas.Series.std : Equivalent method for Series. pyspark.pandas.DataFrame.std : Equivalent method for DataFrame. numpy.std : Equivalent method for Numpy array. """returnsuper().skew()defkurt(self)->FrameLike:""" Calculate unbiased rolling kurtosis. Returns ------- Series or DataFrame Returns the same object type as the caller of the rolling calculation. See Also -------- pyspark.pandas.Series.rolling : Calling object with Series data. pyspark.pandas.DataFrame.rolling : Calling object with DataFrames. pyspark.pandas.Series.var : Equivalent method for Series. pyspark.pandas.DataFrame.var : Equivalent method for DataFrame. numpy.var : Equivalent method for Numpy array. """returnsuper().kurt()classExpandingLike(RollingAndExpanding[FrameLike]):def__init__(self,min_periods:int=1):ifmin_periods<0:raiseValueError("min_periods must be >= 0")window=Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(Window.unboundedPreceding,Window.currentRow)super().__init__(window,min_periods)defcount(self)->FrameLike:defcount(scol:Column)->Column:returnF.when(F.row_number().over(self._unbounded_window)>=self._min_periods,F.count(scol).over(self._window),).otherwise(F.lit(None))returnself._apply_as_series_or_frame(count).astype("float64")# type: ignore[attr-defined]classExpanding(ExpandingLike[FrameLike]):def__init__(self,psdf_or_psser:FrameLike,min_periods:int=1):frompyspark.pandas.frameimportDataFramefrompyspark.pandas.seriesimportSeriessuper().__init__(min_periods)ifnotisinstance(psdf_or_psser,(DataFrame,Series)):raiseTypeError("psdf_or_psser must be a series or dataframe; however, got: %s"%type(psdf_or_psser))self._psdf_or_psser=psdf_or_psserdef__getattr__(self,item:str)->Any:ifhasattr(MissingPandasLikeExpanding,item):property_or_func=getattr(MissingPandasLikeExpanding,item)ifisinstance(property_or_func,property):returnproperty_or_func.fget(self)else:returnpartial(property_or_func,self)raiseAttributeError(item)# TODO: when add 'axis' parameter, should add to here too.def__repr__(self)->str:return"Expanding [min_periods={}]".format(self._min_periods)_apply_as_series_or_frame=Rolling._apply_as_series_or_frame
[docs]defcount(self)->FrameLike:""" The expanding count of any non-NaN observations inside the window. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returned object type is determined by the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.count : Count of the full Series. pyspark.pandas.DataFrame.count : Count of the full DataFrame. Examples -------- >>> s = ps.Series([2, 3, float("nan"), 10]) >>> s.expanding().count() 0 1.0 1 2.0 2 2.0 3 3.0 dtype: float64 >>> s.to_frame().expanding().count() 0 0 1.0 1 2.0 2 2.0 3 3.0 """returnsuper().count()
[docs]defsum(self)->FrameLike:""" Calculate expanding summation of given DataFrame or Series. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Same type as the input, with the same index, containing the expanding summation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.sum : Reducing sum for Series. pyspark.pandas.DataFrame.sum : Reducing sum for DataFrame. Examples -------- >>> s = ps.Series([1, 2, 3, 4, 5]) >>> s 0 1 1 2 2 3 3 4 4 5 dtype: int64 >>> s.expanding(3).sum() 0 NaN 1 NaN 2 6.0 3 10.0 4 15.0 dtype: float64 For DataFrame, each expanding summation is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df A B 0 1 1 1 2 4 2 3 9 3 4 16 4 5 25 >>> df.expanding(3).sum() A B 0 NaN NaN 1 NaN NaN 2 6.0 14.0 3 10.0 30.0 4 15.0 55.0 """returnsuper().sum()
[docs]defmin(self)->FrameLike:""" Calculate the expanding minimum. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returned object type is determined by the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.expanding : Calling object with a Series. pyspark.pandas.DataFrame.expanding : Calling object with a DataFrame. pyspark.pandas.Series.min : Similar method for Series. pyspark.pandas.DataFrame.min : Similar method for DataFrame. Examples -------- Performing a expanding minimum with a window size of 3. >>> s = ps.Series([4, 3, 5, 2, 6]) >>> s.expanding(3).min() 0 NaN 1 NaN 2 3.0 3 2.0 4 2.0 dtype: float64 """returnsuper().min()
[docs]defmax(self)->FrameLike:""" Calculate the expanding maximum. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Return type is determined by the caller. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.max : Similar method for Series. pyspark.pandas.DataFrame.max : Similar method for DataFrame. Examples -------- Performing a expanding minimum with a window size of 3. >>> s = ps.Series([4, 3, 5, 2, 6]) >>> s.expanding(3).max() 0 NaN 1 NaN 2 5.0 3 5.0 4 6.0 dtype: float64 """returnsuper().max()
[docs]defmean(self)->FrameLike:""" Calculate the expanding mean of the values. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returned object type is determined by the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.mean : Equivalent method for Series. pyspark.pandas.DataFrame.mean : Equivalent method for DataFrame. Examples -------- The below examples will show expanding mean calculations with window sizes of two and three, respectively. >>> s = ps.Series([1, 2, 3, 4]) >>> s.expanding(2).mean() 0 NaN 1 1.5 2 2.0 3 2.5 dtype: float64 >>> s.expanding(3).mean() 0 NaN 1 NaN 2 2.0 3 2.5 dtype: float64 """returnsuper().mean()
[docs]defquantile(self,quantile:float,accuracy:int=10000)->FrameLike:""" Calculate the expanding quantile of the values. Returns ------- Series or DataFrame Returned object type is determined by the caller of the expanding calculation. Parameters ---------- quantile : float Value between 0 and 1 providing the quantile to compute. accuracy : int, optional Default accuracy of approximation. Larger value means better accuracy. The relative error can be deduced by 1.0 / accuracy. This is a panda-on-Spark specific parameter. Notes ----- `quantile` in pandas-on-Spark are using distributed percentile approximation algorithm unlike pandas, the result might be different with pandas (the result is similar to the interpolation set to `lower`), also `interpolation` parameter is not supported yet. the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. See Also -------- pyspark.pandas.Series.expanding : Calling expanding with Series data. pyspark.pandas.DataFrame.expanding : Calling expanding with DataFrames. pyspark.pandas.Series.quantile : Aggregating quantile for Series. pyspark.pandas.DataFrame.quantile : Aggregating quantile for DataFrame. Examples -------- The below examples will show expanding quantile calculations with window sizes of two and three, respectively. >>> s = ps.Series([1, 2, 3, 4]) >>> s.expanding(2).quantile(0.5) 0 NaN 1 1.0 2 2.0 3 2.0 dtype: float64 >>> s.expanding(3).quantile(0.5) 0 NaN 1 NaN 2 2.0 3 2.0 dtype: float64 """returnsuper().quantile(quantile,accuracy)
defstd(self)->FrameLike:""" Calculate expanding standard deviation. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returns the same object type as the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.std : Equivalent method for Series. pyspark.pandas.DataFrame.std : Equivalent method for DataFrame. numpy.std : Equivalent method for Numpy array. Examples -------- >>> s = ps.Series([5, 5, 6, 7, 5, 5, 5]) >>> s.expanding(3).std() 0 NaN 1 NaN 2 0.577350 3 0.957427 4 0.894427 5 0.836660 6 0.786796 dtype: float64 For DataFrame, each expanding standard deviation variance is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.expanding(2).std() A B 0 NaN NaN 1 0.000000 0.000000 2 0.577350 6.350853 3 0.957427 11.412712 4 0.894427 10.630146 5 0.836660 9.928075 6 0.786796 9.327379 """returnsuper().std()defvar(self)->FrameLike:""" Calculate unbiased expanding variance. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returns the same object type as the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.var : Equivalent method for Series. pyspark.pandas.DataFrame.var : Equivalent method for DataFrame. numpy.var : Equivalent method for Numpy array. Examples -------- >>> s = ps.Series([5, 5, 6, 7, 5, 5, 5]) >>> s.expanding(3).var() 0 NaN 1 NaN 2 0.333333 3 0.916667 4 0.800000 5 0.700000 6 0.619048 dtype: float64 For DataFrame, each unbiased expanding variance is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.expanding(2).var() A B 0 NaN NaN 1 0.000000 0.000000 2 0.333333 40.333333 3 0.916667 130.250000 4 0.800000 113.000000 5 0.700000 98.566667 6 0.619048 87.000000 """returnsuper().var()defskew(self)->FrameLike:""" Calculate unbiased expanding skew. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returns the same object type as the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.std : Equivalent method for Series. pyspark.pandas.DataFrame.std : Equivalent method for DataFrame. numpy.std : Equivalent method for Numpy array. Examples -------- >>> s = ps.Series([5, 5, 6, 7, 5, 1, 5, 9]) >>> s.expanding(3).skew() 0 NaN 1 NaN 2 1.732051 3 0.854563 4 1.257788 5 -1.571593 6 -1.657542 7 -0.521760 dtype: float64 For DataFrame, each expanding standard deviation variance is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.expanding(5).skew() A B 0 NaN NaN 1 NaN NaN 2 NaN NaN 3 NaN NaN 4 1.257788 1.369456 5 -1.571593 -0.423309 6 -1.657542 -0.355737 7 -0.521760 1.116874 """returnsuper().skew()defkurt(self)->FrameLike:""" Calculate unbiased expanding kurtosis. .. note:: the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returns the same object type as the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.var : Equivalent method for Series. pyspark.pandas.DataFrame.var : Equivalent method for DataFrame. numpy.var : Equivalent method for Numpy array. Examples -------- >>> s = ps.Series([5, 5, 6, 7, 5, 1, 5, 9]) >>> s.expanding(4).kurt() 0 NaN 1 NaN 2 NaN 3 -1.289256 4 0.312500 5 3.419520 6 4.028185 7 2.230373 dtype: float64 For DataFrame, each unbiased expanding variance is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.expanding(5).kurt() A B 0 NaN NaN 1 NaN NaN 2 NaN NaN 3 NaN NaN 4 0.312500 0.906336 5 3.419520 1.486581 6 4.028185 1.936169 7 2.230373 2.273792 """returnsuper().kurt()classExpandingGroupby(ExpandingLike[FrameLike]):def__init__(self,groupby:GroupBy[FrameLike],min_periods:int=1):super().__init__(min_periods)self._groupby=groupbyself._window=self._window.partitionBy(*[ser.spark.columnforseringroupby._groupkeys])self._unbounded_window=self._window.partitionBy(*[ser.spark.columnforseringroupby._groupkeys])def__getattr__(self,item:str)->Any:ifhasattr(MissingPandasLikeExpandingGroupby,item):property_or_func=getattr(MissingPandasLikeExpandingGroupby,item)ifisinstance(property_or_func,property):returnproperty_or_func.fget(self)else:returnpartial(property_or_func,self)raiseAttributeError(item)_apply_as_series_or_frame=RollingGroupby._apply_as_series_or_framedefcount(self)->FrameLike:""" The expanding count of any non-NaN observations inside the window. Returns ------- Series or DataFrame Returned object type is determined by the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.count : Count of the full Series. pyspark.pandas.DataFrame.count : Count of the full DataFrame. Examples -------- >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) >>> s.groupby(s).expanding(3).count().sort_index() 2 0 NaN 1 NaN 3 2 NaN 3 NaN 4 3.0 4 5 NaN 6 NaN 7 3.0 8 4.0 5 9 NaN 10 NaN dtype: float64 For DataFrame, each expanding count is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.groupby(df.A).expanding(2).count().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 2 0 NaN 1 2.0 3 2 NaN 3 2.0 4 3.0 4 5 NaN 6 2.0 7 3.0 8 4.0 5 9 NaN 10 2.0 """returnsuper().count()defsum(self)->FrameLike:""" Calculate expanding summation of given DataFrame or Series. Returns ------- Series or DataFrame Same type as the input, with the same index, containing the expanding summation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.sum : Reducing sum for Series. pyspark.pandas.DataFrame.sum : Reducing sum for DataFrame. Examples -------- >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) >>> s.groupby(s).expanding(3).sum().sort_index() 2 0 NaN 1 NaN 3 2 NaN 3 NaN 4 9.0 4 5 NaN 6 NaN 7 12.0 8 16.0 5 9 NaN 10 NaN dtype: float64 For DataFrame, each expanding summation is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.groupby(df.A).expanding(2).sum().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 2 0 NaN 1 8.0 3 2 NaN 3 18.0 4 27.0 4 5 NaN 6 32.0 7 48.0 8 64.0 5 9 NaN 10 50.0 """returnsuper().sum()defmin(self)->FrameLike:""" Calculate the expanding minimum. Returns ------- Series or DataFrame Returned object type is determined by the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.expanding : Calling object with a Series. pyspark.pandas.DataFrame.expanding : Calling object with a DataFrame. pyspark.pandas.Series.min : Similar method for Series. pyspark.pandas.DataFrame.min : Similar method for DataFrame. Examples -------- >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) >>> s.groupby(s).expanding(3).min().sort_index() 2 0 NaN 1 NaN 3 2 NaN 3 NaN 4 3.0 4 5 NaN 6 NaN 7 4.0 8 4.0 5 9 NaN 10 NaN dtype: float64 For DataFrame, each expanding minimum is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.groupby(df.A).expanding(2).min().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 2 0 NaN 1 4.0 3 2 NaN 3 9.0 4 9.0 4 5 NaN 6 16.0 7 16.0 8 16.0 5 9 NaN 10 25.0 """returnsuper().min()defmax(self)->FrameLike:""" Calculate the expanding maximum. Returns ------- Series or DataFrame Return type is determined by the caller. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.max : Similar method for Series. pyspark.pandas.DataFrame.max : Similar method for DataFrame. Examples -------- >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) >>> s.groupby(s).expanding(3).max().sort_index() 2 0 NaN 1 NaN 3 2 NaN 3 NaN 4 3.0 4 5 NaN 6 NaN 7 4.0 8 4.0 5 9 NaN 10 NaN dtype: float64 For DataFrame, each expanding maximum is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.groupby(df.A).expanding(2).max().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 2 0 NaN 1 4.0 3 2 NaN 3 9.0 4 9.0 4 5 NaN 6 16.0 7 16.0 8 16.0 5 9 NaN 10 25.0 """returnsuper().max()defmean(self)->FrameLike:""" Calculate the expanding mean of the values. Returns ------- Series or DataFrame Returned object type is determined by the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.mean : Equivalent method for Series. pyspark.pandas.DataFrame.mean : Equivalent method for DataFrame. Examples -------- >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) >>> s.groupby(s).expanding(3).mean().sort_index() 2 0 NaN 1 NaN 3 2 NaN 3 NaN 4 3.0 4 5 NaN 6 NaN 7 4.0 8 4.0 5 9 NaN 10 NaN dtype: float64 For DataFrame, each expanding mean is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.groupby(df.A).expanding(2).mean().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 2 0 NaN 1 4.0 3 2 NaN 3 9.0 4 9.0 4 5 NaN 6 16.0 7 16.0 8 16.0 5 9 NaN 10 25.0 """returnsuper().mean()defquantile(self,quantile:float,accuracy:int=10000)->FrameLike:""" Calculate the expanding quantile of the values. .. versionadded:: 3.4.0 Parameters ---------- quantile : float Value between 0 and 1 providing the quantile to compute. accuracy : int, optional Default accuracy of approximation. Larger value means better accuracy. The relative error can be deduced by 1.0 / accuracy. This is a panda-on-Spark specific parameter. Returns ------- Series or DataFrame Returned object type is determined by the caller of the expanding calculation. Notes ----- `quantile` in pandas-on-Spark are using distributed percentile approximation algorithm unlike pandas, the result might be different with pandas, also `interpolation` parameter is not supported yet. See Also -------- pyspark.pandas.Series.expanding : Calling expanding with Series data. pyspark.pandas.DataFrame.expanding : Calling expanding with DataFrames. pyspark.pandas.Series.quantile : Aggregating quantile for Series. pyspark.pandas.DataFrame.quantile : Aggregating quantile for DataFrame. Examples -------- >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) >>> s.groupby(s).expanding(3).quantile(0.5).sort_index() 2 0 NaN 1 NaN 3 2 NaN 3 NaN 4 3.0 4 5 NaN 6 NaN 7 4.0 8 4.0 5 9 NaN 10 NaN dtype: float64 For DataFrame, each expanding quantile is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.groupby(df.A).expanding(2).quantile(0.5).sort_index() B A 2 0 NaN 1 4.0 3 2 NaN 3 9.0 4 9.0 4 5 NaN 6 16.0 7 16.0 8 16.0 5 9 NaN 10 25.0 """returnsuper().quantile(quantile,accuracy)defstd(self)->FrameLike:""" Calculate expanding standard deviation. Returns ------- Series or DataFrame Returns the same object type as the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.expanding: Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.std : Equivalent method for Series. pyspark.pandas.DataFrame.std : Equivalent method for DataFrame. numpy.std : Equivalent method for Numpy array. """returnsuper().std()defvar(self)->FrameLike:""" Calculate unbiased expanding variance. Returns ------- Series or DataFrame Returns the same object type as the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.var : Equivalent method for Series. pyspark.pandas.DataFrame.var : Equivalent method for DataFrame. numpy.var : Equivalent method for Numpy array. """returnsuper().var()defskew(self)->FrameLike:""" Calculate expanding standard skew. Returns ------- Series or DataFrame Returns the same object type as the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.expanding: Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.std : Equivalent method for Series. pyspark.pandas.DataFrame.std : Equivalent method for DataFrame. numpy.std : Equivalent method for Numpy array. """returnsuper().skew()defkurt(self)->FrameLike:""" Calculate unbiased expanding kurtosis. Returns ------- Series or DataFrame Returns the same object type as the caller of the expanding calculation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.var : Equivalent method for Series. pyspark.pandas.DataFrame.var : Equivalent method for DataFrame. numpy.var : Equivalent method for Numpy array. """returnsuper().kurt()classExponentialMovingLike(Generic[FrameLike],metaclass=ABCMeta):def__init__(self,window:WindowSpec,com:Optional[float]=None,span:Optional[float]=None,halflife:Optional[float]=None,alpha:Optional[float]=None,min_periods:Optional[int]=None,ignore_na:bool=False,):if(min_periodsisnotNone)and(min_periods<0):raiseValueError("min_periods must be >= 0")ifmin_periodsisNone:min_periods=0self._min_periods=min_periodsself._ignore_na=ignore_naself._window=window# This unbounded Window is later used to handle 'min_periods' for now.self._unbounded_window=Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(Window.unboundedPreceding,Window.currentRow)if(comisnotNone)and(notcom>=0):raiseValueError("com must be >= 0")self._com=comif(spanisnotNone)and(notspan>=1):raiseValueError("span must be >= 1")self._span=spanif(halflifeisnotNone)and(nothalflife>0):raiseValueError("halflife must be > 0")self._halflife=halflifeif(alphaisnotNone)and(not0<alpha<=1):raiseValueError("alpha must be in (0, 1]")self._alpha=alphadef_compute_unified_alpha(self)->float:unified_alpha=np.nanopt_count=0ifself._comisnotNone:unified_alpha=1.0/(1+self._com)opt_count+=1ifself._spanisnotNone:unified_alpha=2.0/(1+self._span)opt_count+=1ifself._halflifeisnotNone:unified_alpha=1.0-np.exp(-np.log(2)/self._halflife)opt_count+=1ifself._alphaisnotNone:unified_alpha=self._alphaopt_count+=1ifopt_count==0:raiseValueError("Must pass one of com, span, halflife, or alpha")ifopt_count!=1:raiseValueError("com, span, halflife, and alpha are mutually exclusive")returnunified_alpha@abstractmethoddef_apply_as_series_or_frame(self,func:Callable[[Column],Column])->FrameLike:""" Wraps a function that handles Spark column in order to support it in both pandas-on-Spark Series and DataFrame. Note that the given `func` name should be same as the API's method name. """passdefmean(self)->FrameLike:unified_alpha=self._compute_unified_alpha()defmean(scol:Column)->Column:col_ewm=SF.ewm(scol,unified_alpha,self._ignore_na)returnF.when(F.count(F.when(~scol.isNull(),1).otherwise(None)).over(self._unbounded_window)>=self._min_periods,col_ewm.over(self._window),).otherwise(F.lit(None))returnself._apply_as_series_or_frame(mean)classExponentialMoving(ExponentialMovingLike[FrameLike]):def__init__(self,psdf_or_psser:FrameLike,com:Optional[float]=None,span:Optional[float]=None,halflife:Optional[float]=None,alpha:Optional[float]=None,min_periods:Optional[int]=None,ignore_na:bool=False,):frompyspark.pandas.frameimportDataFramefrompyspark.pandas.seriesimportSeriesifnotisinstance(psdf_or_psser,(DataFrame,Series)):raiseTypeError("psdf_or_psser must be a series or dataframe; however, got: %s"%type(psdf_or_psser))self._psdf_or_psser=psdf_or_psserwindow_spec=Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(Window.unboundedPreceding,Window.currentRow)super().__init__(window_spec,com,span,halflife,alpha,min_periods,ignore_na)def__getattr__(self,item:str)->Any:ifhasattr(MissingPandasLikeExponentialMoving,item):property_or_func=getattr(MissingPandasLikeExponentialMoving,item)ifisinstance(property_or_func,property):returnproperty_or_func.fget(self)else:returnpartial(property_or_func,self)raiseAttributeError(item)_apply_as_series_or_frame=Rolling._apply_as_series_or_frame
[docs]defmean(self)->FrameLike:""" Calculate an online exponentially weighted mean. Notes ----- There are behavior differences between pandas-on-Spark and pandas. * the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returned object type is determined by the caller of the exponentially calculation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.mean : Equivalent method for Series. pyspark.pandas.DataFrame.mean : Equivalent method for DataFrame. Examples -------- The below examples will show computing exponentially weighted moving average. >>> df = ps.DataFrame({'s1': [.2, .0, .6, .2, .4, .5, .6], 's2': [2, 1, 3, 1, 0, 0, 0]}) >>> df.ewm(com=0.1).mean() s1 s2 0 0.200000 2.000000 1 0.016667 1.083333 2 0.547368 2.827068 3 0.231557 1.165984 4 0.384688 0.105992 5 0.489517 0.009636 6 0.589956 0.000876 >>> df.s2.ewm(halflife=1.5, min_periods=3).mean() 0 NaN 1 NaN 2 2.182572 3 1.663174 4 0.979949 5 0.593155 6 0.364668 Name: s2, dtype: float64 """returnsuper().mean()
# TODO: when add 'adjust' parameter, should add to here too.def__repr__(self)->str:return("ExponentialMoving [com={}, span={}, halflife={}, alpha={}, ""min_periods={}, ignore_na={}]".format(self._com,self._span,self._halflife,self._alpha,self._min_periods,self._ignore_na,))classExponentialMovingGroupby(ExponentialMovingLike[FrameLike]):def__init__(self,groupby:GroupBy[FrameLike],com:Optional[float]=None,span:Optional[float]=None,halflife:Optional[float]=None,alpha:Optional[float]=None,min_periods:Optional[int]=None,ignore_na:bool=False,):window_spec=Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(Window.unboundedPreceding,Window.currentRow)super().__init__(window_spec,com,span,halflife,alpha,min_periods,ignore_na)self._groupby=groupbyself._window=self._window.partitionBy(*[ser.spark.columnforseringroupby._groupkeys])self._unbounded_window=self._unbounded_window.partitionBy(*[ser.spark.columnforseringroupby._groupkeys])def__getattr__(self,item:str)->Any:ifhasattr(MissingPandasLikeExponentialMovingGroupby,item):property_or_func=getattr(MissingPandasLikeExponentialMovingGroupby,item)ifisinstance(property_or_func,property):returnproperty_or_func.fget(self)else:returnpartial(property_or_func,self)raiseAttributeError(item)_apply_as_series_or_frame=RollingGroupby._apply_as_series_or_framedefmean(self)->FrameLike:""" Calculate an online exponentially weighted mean. Notes ----- There are behavior differences between pandas-on-Spark and pandas. * the current implementation of this API uses Spark's Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset. Returns ------- Series or DataFrame Returned object type is determined by the caller of the exponentially calculation. See Also -------- pyspark.pandas.Series.expanding : Calling object with Series data. pyspark.pandas.DataFrame.expanding : Calling object with DataFrames. pyspark.pandas.Series.mean : Equivalent method for Series. pyspark.pandas.DataFrame.mean : Equivalent method for DataFrame. Examples -------- >>> s = ps.Series([2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5]) >>> s.groupby(s).ewm(alpha=0.5).mean().sort_index() 2 0 2.0 1 2.0 3 2 3.0 3 3.0 4 3.0 4 5 4.0 6 4.0 7 4.0 8 4.0 5 9 5.0 10 5.0 dtype: float64 For DataFrame, each ewm mean is computed column-wise. >>> df = ps.DataFrame({"A": s.to_numpy(), "B": s.to_numpy() ** 2}) >>> df.groupby(df.A).ewm(alpha=0.5).mean().sort_index() # doctest: +NORMALIZE_WHITESPACE B A 2 0 4.0 1 4.0 3 2 9.0 3 9.0 4 9.0 4 5 16.0 6 16.0 7 16.0 8 16.0 5 9 25.0 10 25.0 """returnsuper().mean()# TODO: when add 'adjust' parameter, should add to here too.def__repr__(self)->str:return("ExponentialMovingGroupby [com={}, span={}, halflife={}, alpha={}, ""min_periods={}, ignore_na={}]".format(self._com,self._span,self._halflife,self._alpha,self._min_periods,self._ignore_na,))def_test()->None:importosimportdoctestimportsysfrompyspark.sqlimportSparkSessionimportpyspark.pandas.windowos.chdir(os.environ["SPARK_HOME"])globs=pyspark.pandas.window.__dict__.copy()globs["ps"]=pyspark.pandasspark=(SparkSession.builder.master("local[4]").appName("pyspark.pandas.window tests").getOrCreate())(failure_count,test_count)=doctest.testmod(pyspark.pandas.window,globs=globs,optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE,)spark.stop()iffailure_count:sys.exit(-1)if__name__=="__main__":_test()