Source code for nested_pandas.series.packer

"""Module for converting between "flat" and "list" and "nested" representations

TODO: mask support
TODO: multi-index support
"""

# "|" for python 3.9
from __future__ import annotations

from collections.abc import Sequence

import numpy as np
import pandas as pd
import pyarrow as pa

from nested_pandas.series.dtype import NestedDtype
from nested_pandas.series.ext_array import NestedExtensionArray
from nested_pandas.series.nestedseries import NestedSeries

__all__ = ["pack", "pack_flat", "pack_lists", "pack_seq"]


N_ROWS_INFER_DTYPE = 1000


[docs] def pack( obj, name: str | None = None, *, index=None, on: None | str | list[str] = None, dtype: NestedDtype | pd.ArrowDtype | pa.DataType | None = None, ) -> NestedSeries: """Pack a "flat" dataframe or a sequence of dataframes into a "nested" series. Parameters ---------- obj : pd.DataFrame or Sequence of Input dataframe, with repeated indexes, or a sequence of dataframes or missed values. name : str, optional Name of the output series. index : convertable to pd.Index, optional Index of the output series. If obj is a pd.DataFrame, it is always nested by the original index, and this value is used to override the index after the nesting. on: str or list of str, optional Column name(s) to join on. If None, the index is used. dtype : dtype or None NestedDtype of the output series, or other type to derive from. If None, the dtype is inferred from the first non-missing dataframe. Returns ------- NestedSeries Output series. """ if isinstance(obj, pd.DataFrame): nested = pack_flat(obj, name=name, on=on) if index is not None: nested.index = index return nested return pack_seq(obj, name=name, index=index, dtype=dtype)
[docs] def pack_flat( df: pd.DataFrame, name: str | None = None, *, on: None | str | list[str] = None ) -> NestedSeries: """Make a structure of lists representation of a "flat" dataframe. For the input dataframe with repeated indexes, make a pandas.Series, where each original column is replaced by a structure of lists. The dtype of the column is `nested_pandas.NestedDtype` with the corresponding pyarrow type. The index of the output series is the unique index of the input dataframe. The Series has `.nest` accessor, see :meth:`nested_pandas.series.accessor.NestSeriesAccessor` for details. Parameters ---------- df : pd.DataFrame Input dataframe, with repeated indexes. name : str, optional Name of the NestedSeries. on : str or list of str, optional Column name(s) to join on. If None, the df's index is used. Returns ------- NestedSeries Output series, with unique indexes. See Also -------- nested_pandas.series.accessor.NestSeriesAccessor : .nest accessor for the output series. nested_pandas.series.dtype.NestedDtype : The dtype of the output series. nested_pandas.series.packer.pack_lists : Pack a dataframe of nested arrays. """ if on is not None: df = df.set_index(on) # pandas knows when index is pre-sorted, so it would do nothing if it is already sorted sorted_flat = df.sort_index(kind="stable") try: return pack_sorted_df_into_struct(sorted_flat, name=name) except ValueError: # Check if the error is due to NaN values and raise a more informative message if any(sorted_flat.index.get_level_values(i).hasnans for i in range(sorted_flat.index.nlevels)): if on is None: raise ValueError( "The index contains NaN values. " "NaN values are not supported because they cannot be used for grouping rows. " "Please remove or fill NaN values before packing." ) from None cols = [on] if isinstance(on, str) else list(on) raise ValueError( f"Column(s) {cols} contain NaN values. " "NaN values are not supported because they cannot be used for grouping rows. " "Please remove or fill NaN values before packing." ) from None raise
[docs] def pack_seq( sequence: Sequence, name: str | None = None, *, index: object = None, dtype: NestedDtype | pd.ArrowDtype | pa.DataType | None = None, ) -> NestedSeries: """Pack a sequence of "flat" dataframes into a "nested" series. Parameters ---------- sequence : Sequence of pd.DataFrame or None or pd.NA or convertible to pa.StructScalar Input sequence of dataframes or missed values. name : str, optional Name of the output series. index : pd.Index, optional Index of the output series. dtype : dtype or None NestedDtype of the output series, or other type to derive from. If None, the dtype is inferred from the first non-missing dataframe. Returns ------- NestedSeries Output series. """ if isinstance(sequence, pd.Series): # generalized check for pandas series if index is None: index = sequence.index if name is None: name = sequence.name ext_array = NestedExtensionArray.from_sequence(sequence, dtype=dtype) series = NestedSeries(ext_array, index=index, name=name, copy=False) return series
def pack_sorted_df_into_struct(df: pd.DataFrame, name: str | None = None) -> NestedSeries: """Make a structure of lists representation of a "flat" dataframe. Input dataframe must be sorted and all the columns must have pyarrow dtypes. Parameters ---------- df : pd.DataFrame Input dataframe, with repeated indexes. It must be sorted and all the columns must have pyarrow dtypes. name : str, optional Name of the NestedSeries. Returns ------- NestedSeries Output series, with unique indexes. """ if not df.index.is_monotonic_increasing: raise ValueError("The index of the input dataframe must be sorted") packed_df = view_sorted_df_as_list_arrays(df) # No need to validate the dataframe, the length of the nested arrays is forced to be the same by # the view_sorted_df_as_list_arrays function. return pack_lists(packed_df, name=name, validate=False)
[docs] def pack_lists(df: pd.DataFrame, name: str | None = None, *, validate: bool = True) -> NestedSeries: """Make a series of arrow structures from a dataframe with nested arrays. For the input dataframe with repeated indexes, make a pandas.Series, where each original column is replaced by a structure of lists. The dtype of the column is `nested_pandas.NestedDtype` with the corresponding pyarrow type. The index of the output series is the unique index of the input dataframe. The Series has `.nest` accessor, see :meth:`nested_pandas.series.accessor.NestSeriesAccessor` for details. For every row, all the nested array (aka pyarrow list) lengths must be the same. Parameters ---------- df : pd.DataFrame Input dataframe, with pyarrow list-arrays. name : str, optional Name of the NestedSeries. validate : bool, default True Whether to validate the input dataframe. Returns ------- NestedSeries Output series, with unique indexes. See Also -------- nested_pandas.series.accessor.NestSeriesAccessor : The accessor for the output series. nested_pandas.series.dtype.NestedDtype : The dtype of the output series. nested_pandas.series.packer.pack_flat : Pack a "flat" dataframe with repeated indexes. """ # When series is converted to pa.array it may be both Array and ChunkedArray # We convert it to chunked for the sake of consistency pa_arrays_maybe_chunked = {column: pa.array(df[column]) for column in df.columns} pa_chunked_arrays = { column: arr if isinstance(arr, pa.ChunkedArray) else pa.chunked_array([arr]) for column, arr in pa_arrays_maybe_chunked.items() } # If all chunk arrays have the same chunk lengths, we can build a chunked struct array with no # data copying. chunk_lengths = pa.array([[len(chunk) for chunk in arr.chunks] for arr in pa_chunked_arrays.values()]) if all(chunk_length == chunk_lengths[0] for chunk_length in chunk_lengths): chunks = [] num_chunks = next(iter(pa_chunked_arrays.values())).num_chunks for i in range(num_chunks): chunks.append( pa.StructArray.from_arrays( [arr.chunk(i) for arr in pa_chunked_arrays.values()], names=pa_chunked_arrays.keys(), ) ) struct_array = pa.chunked_array(chunks) else: # "flatten" the chunked arrays struct_array = pa.StructArray.from_arrays( [arr.combine_chunks() for arr in pa_chunked_arrays.values()], names=pa_chunked_arrays.keys(), ) ext_array = NestedExtensionArray(struct_array, validate=validate) return NestedSeries( ext_array, index=df.index, copy=False, name=name, )
def view_sorted_df_as_list_arrays(df: pd.DataFrame) -> pd.DataFrame: """Make a nested array representation of a "flat" dataframe. Parameters ---------- df : pd.DataFrame Input dataframe, with repeated indexes. It must be sorted by its index. Returns ------- pd.DataFrame Output dataframe, with unique indexes. It is a view over the input dataframe, so it would mute the input dataframe if modified. """ if not df.index.is_monotonic_increasing: raise ValueError("The index of the input dataframe must be sorted") offset_array = calculate_sorted_index_offsets(df.index) unique_index = df.index[offset_array[:-1]] series_ = { column: view_sorted_series_as_list_array(df[column], offset_array, unique_index) for column in df.columns } df = pd.DataFrame(series_) return df def view_sorted_series_as_list_array( series: NestedSeries, offset: np.ndarray | None = None, unique_index: np.ndarray | None = None, ) -> NestedSeries: """Make a nested array representation of a "flat" series. Parameters ---------- series : NestedSeries Input series, with repeated indexes. It must be sorted by its index. offset: np.ndarray or None, optional Pre-calculated offsets of the input series index. unique_index: np.ndarray or None, optional Pre-calculated unique index of the input series. If given it must be equal to `series.index.unique()` and `series.index.values[offset[:-1]]`. Returns ------- NestedSeries Output series, with unique indexes. It is a view over the input series, so it would mute the input series if modified. """ if not series.index.is_monotonic_increasing: raise ValueError("The index of the input series must be sorted") if offset is None: offset = calculate_sorted_index_offsets(series.index) if unique_index is None: unique_index = series.index[offset[:-1]] # Input series may be represented by pyarrow.ChunkedArray, in this case pa.array(series) would fail # with "TypeError: Cannot convert a 'ChunkedArray' to a 'ListArray'". # https://github.com/lincc-frameworks/nested-pandas/issues/189 flat_array = pa.array(series, from_pandas=True) if isinstance(flat_array, pa.ChunkedArray): flat_array = flat_array.combine_chunks() list_array = pa.LargeListArray.from_arrays( offset, flat_array, ) return NestedSeries( list_array, dtype=pd.ArrowDtype(list_array.type), index=unique_index, copy=False, name=series.name, ) def calculate_sorted_index_offsets(index: pd.Index) -> np.ndarray: """Calculate the offsets of the pre-sorted index values. Parameters ---------- index : pd.Index Input index, must be sorted. Returns ------- np.ndarray Output array of offsets, one element more than the number of unique index values. """ if not index.is_monotonic_increasing: raise ValueError("The index must be sorted") # pd.Index.duplicated returns False for the first occurance and True for all others. # So our offsets would be indexes of these False values with the array length in the end. offset_but_last = np.nonzero(~index.duplicated(keep="first"))[0] offset = np.append(offset_but_last, len(index)) # LargeListArray uses int64 for offsets offset = offset.astype(np.int64) return offset