#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Callable, Generic, Optional, Type, Union, TYPE_CHECKING
import warnings
from pyspark.pandas._typing import T
if TYPE_CHECKING:
    from pyspark.pandas.frame import DataFrame
    from pyspark.pandas.indexes import Index
    from pyspark.pandas.series import Series
class CachedAccessor(Generic[T]):
    """
    Custom property-like object.
    A descriptor for caching accessors:
    Parameters
    ----------
    name : str
        Namespace that accessor methods, properties, etc will be accessed under, e.g. "foo" for a
        dataframe accessor yields the accessor ``df.foo``
    accessor: cls
        Class with the extension methods.
    Notes
    -----
    For accessor, the class's __init__ method assumes that you are registering an accessor for one
    of ``Series``, ``DataFrame``, or ``Index``.
    This object is not meant to be instantiated directly. Instead, use register_dataframe_accessor,
    register_series_accessor, or register_index_accessor.
    The pandas-on-Spark accessor is modified based on pandas.core.accessor.
    """
    def __init__(self, name: str, accessor: Type[T]) -> None:
        self._name = name
        self._accessor = accessor
    def __get__(
        self, obj: Optional[Union["DataFrame", "Series", "Index"]], cls: Type[T]
    ) -> Union[T, Type[T]]:
        if obj is None:
            return self._accessor
        accessor_obj = self._accessor(obj)  # type: ignore[call-arg]
        object.__setattr__(obj, self._name, accessor_obj)
        return accessor_obj
def _register_accessor(
    name: str, cls: Union[Type["DataFrame"], Type["Series"], Type["Index"]]
) -> Callable[[Type[T]], Type[T]]:
    """
    Register a custom accessor on {klass} objects.
    Parameters
    ----------
    name : str
        Name under which the accessor should be registered. A warning is issued if this name
        conflicts with a preexisting attribute.
    Returns
    -------
    callable
        A class decorator.
    See Also
    --------
    register_dataframe_accessor: Register a custom accessor on DataFrame objects
    register_series_accessor: Register a custom accessor on Series objects
    register_index_accessor: Register a custom accessor on Index objects
    Notes
    -----
    When accessed, your accessor will be initialized with the pandas-on-Spark object the user
    is interacting with. The code signature must be:
    .. code-block:: python
        def __init__(self, pandas_on_spark_obj):
            # constructor logic
        ...
    In the pandas API, if data passed to your accessor has an incorrect dtype, it's recommended to
    raise an ``AttributeError`` for consistency purposes. In pandas-on-Spark, ``ValueError`` is more
    frequently used to annotate when a value's datatype is unexpected for a given method/function.
    Ultimately, you can structure this however you like, but pandas-on-Spark would likely do
    something like this:
    >>> ps.Series(['a', 'b']).dt
    ...
    Traceback (most recent call last):
        ...
    ValueError: Cannot call DatetimeMethods on type StringType()
    Note: This function is not meant to be used directly - instead, use register_dataframe_accessor,
    register_series_accessor, or register_index_accessor.
    """
    def decorator(accessor: Type[T]) -> Type[T]:
        if hasattr(cls, name):
            msg = (
                "registration of accessor {0} under name '{1}' for type {2} is overriding "
                "a preexisting attribute with the same name.".format(accessor, name, cls.__name__)
            )
            warnings.warn(
                msg,
                UserWarning,
                stacklevel=2,
            )
        setattr(cls, name, CachedAccessor(name, accessor))
        return accessor
    return decorator
[docs]def register_dataframe_accessor(name: str) -> Callable[[Type[T]], Type[T]]:
    """
    Register a custom accessor with a DataFrame
    Parameters
    ----------
    name : str
        name used when calling the accessor after its registered
    Returns
    -------
    callable
        A class decorator.
    See Also
    --------
    register_series_accessor: Register a custom accessor on Series objects
    register_index_accessor: Register a custom accessor on Index objects
    Notes
    -----
    When accessed, your accessor will be initialized with the pandas-on-Spark object the user
    is interacting with. The accessor's init method should always ingest the object being accessed.
    See the examples for the init signature.
    In the pandas API, if data passed to your accessor has an incorrect dtype, it's recommended to
    raise an ``AttributeError`` for consistency purposes. In pandas-on-Spark, ``ValueError`` is more
    frequently used to annotate when a value's datatype is unexpected for a given method/function.
    Ultimately, you can structure this however you like, but pandas-on-Spark would likely do
    something like this:
    >>> ps.Series(['a', 'b']).dt
    ...
    Traceback (most recent call last):
        ...
    ValueError: Cannot call DatetimeMethods on type StringType()
    Examples
    --------
    In your library code::
        from pyspark.pandas.extensions import register_dataframe_accessor
        @register_dataframe_accessor("geo")
        class GeoAccessor:
            def __init__(self, pandas_on_spark_obj):
                self._obj = pandas_on_spark_obj
                # other constructor logic
            @property
            def center(self):
                # return the geographic center point of this DataFrame
                lat = self._obj.latitude
                lon = self._obj.longitude
                return (float(lon.mean()), float(lat.mean()))
            def plot(self):
                # plot this array's data on a map
                pass
    Then, in an ipython session::
        >>> ## Import if the accessor is in the other file.
        >>> # from my_ext_lib import GeoAccessor
        >>> psdf = ps.DataFrame({"longitude": np.linspace(0,10),
        ...                     "latitude": np.linspace(0, 20)})
        >>> psdf.geo.center  # doctest: +SKIP
        (5.0, 10.0)
        >>> psdf.geo.plot()  # doctest: +SKIP
    """
    from pyspark.pandas import DataFrame
    return _register_accessor(name, DataFrame) 
[docs]def register_series_accessor(name: str) -> Callable[[Type[T]], Type[T]]:
    """
    Register a custom accessor with a Series object
    Parameters
    ----------
    name : str
        name used when calling the accessor after its registered
    Returns
    -------
    callable
        A class decorator.
    See Also
    --------
    register_dataframe_accessor: Register a custom accessor on DataFrame objects
    register_index_accessor: Register a custom accessor on Index objects
    Notes
    -----
    When accessed, your accessor will be initialized with the pandas-on-Spark object the user is
    interacting with. The code signature must be::
        def __init__(self, pandas_on_spark_obj):
            # constructor logic
        ...
    In the pandas API, if data passed to your accessor has an incorrect dtype, it's recommended to
    raise an ``AttributeError`` for consistency purposes. In pandas-on-Spark, ``ValueError`` is more
    frequently used to annotate when a value's datatype is unexpected for a given method/function.
    Ultimately, you can structure this however you like, but pandas-on-Spark would likely do
    something like this:
    >>> ps.Series(['a', 'b']).dt
    ...
    Traceback (most recent call last):
        ...
    ValueError: Cannot call DatetimeMethods on type StringType()
    Examples
    --------
    In your library code::
        from pyspark.pandas.extensions import register_series_accessor
        @register_series_accessor("geo")
        class GeoAccessor:
            def __init__(self, pandas_on_spark_obj):
                self._obj = pandas_on_spark_obj
            @property
            def is_valid(self):
                # boolean check to see if series contains valid geometry
                return True
    Then, in an ipython session::
        >>> ## Import if the accessor is in the other file.
        >>> # from my_ext_lib import GeoAccessor
        >>> psdf = ps.DataFrame({"longitude": np.linspace(0,10),
        ...                     "latitude": np.linspace(0, 20)})
        >>> psdf.longitude.geo.is_valid  # doctest: +SKIP
        True
    """
    from pyspark.pandas import Series
    return _register_accessor(name, Series) 
[docs]def register_index_accessor(name: str) -> Callable[[Type[T]], Type[T]]:
    """
    Register a custom accessor with an Index
    Parameters
    ----------
    name : str
        name used when calling the accessor after its registered
    Returns
    -------
    callable
        A class decorator.
    See Also
    --------
    register_dataframe_accessor: Register a custom accessor on DataFrame objects
    register_series_accessor: Register a custom accessor on Series objects
    Notes
    -----
    When accessed, your accessor will be initialized with the pandas-on-Spark object the user is
    interacting with. The code signature must be::
        def __init__(self, pandas_on_spark_obj):
            # constructor logic
        ...
    In the pandas API, if data passed to your accessor has an incorrect dtype, it's recommended to
    raise an ``AttributeError`` for consistency purposes. In pandas-on-Spark, ``ValueError`` is more
    frequently used to annotate when a value's datatype is unexpected for a given method/function.
    Ultimately, you can structure this however you like, but pandas-on-Spark would likely do
    something like this:
    >>> ps.Series(['a', 'b']).dt
    ...
    Traceback (most recent call last):
        ...
    ValueError: Cannot call DatetimeMethods on type StringType()
    Examples
    --------
    In your library code::
        from pyspark.pandas.extensions import register_index_accessor
        @register_index_accessor("foo")
        class CustomAccessor:
            def __init__(self, pandas_on_spark_obj):
                self._obj = pandas_on_spark_obj
                self.item = "baz"
            @property
            def bar(self):
                # return item value
                return self.item
    Then, in an ipython session::
        >>> ## Import if the accessor is in the other file.
        >>> # from my_ext_lib import CustomAccessor
        >>> psdf = ps.DataFrame({"longitude": np.linspace(0,10),
        ...                     "latitude": np.linspace(0, 20)})
        >>> psdf.index.foo.bar  # doctest: +SKIP
        'baz'
    """
    from pyspark.pandas import Index
    return _register_accessor(name, Index) 
def _test() -> None:
    import os
    import doctest
    import sys
    import numpy
    from pyspark.sql import SparkSession
    import pyspark.pandas.extensions
    os.chdir(os.environ["SPARK_HOME"])
    globs = pyspark.pandas.extensions.__dict__.copy()
    globs["np"] = numpy
    globs["ps"] = pyspark.pandas
    spark = (
        SparkSession.builder.master("local[4]")
        .appName("pyspark.pandas.extensions tests")
        .getOrCreate()
    )
    (failure_count, test_count) = doctest.testmod(
        pyspark.pandas.extensions,
        globs=globs,
        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
    )
    spark.stop()
    if failure_count:
        sys.exit(-1)
if __name__ == "__main__":
    _test()