"""
pandas DataFrame utility functions for data analysis and manipulation.
This module provides utilities for:
- Converting data types within DataFrames
- Analyzing column structures across multiple DataFrames
- Finding and extracting rows/columns with missing or empty data
- Data quality validation and exploration
- Integer allocation using mathematical methods
- Value rebalancing to eliminate negative values
- Proportional integer distribution across groups
Required dependencies:
pip install qufe[data]
This installs: pandas>=1.1.0, numpy>=1.17.0
"""
from typing import List, Tuple, Dict, Any, Optional, Union
import warnings
[docs]
class PandasHandler:
"""
pandas DataFrame utility handler for data analysis and manipulation.
Provides methods for converting data types, analyzing column structures,
finding missing data, data quality validation, and mathematical allocation.
Args:
default_exclude_cols: Default columns to exclude from NA/empty checks.
Can be overridden in individual method calls.
Raises:
ImportError: If pandas is not installed
Example:
>>> handler = PandasHandler(default_exclude_cols=['id', 'created_at'])
>>> result = handler.convert_list_to_tuple_in_df(df)
"""
[docs]
def __init__(self, default_exclude_cols: Optional[List[str]] = None):
"""Initialize PandasHandler with dependency validation."""
self.pd = self._import_pandas()
self.np = self._import_numpy()
self.default_exclude_cols = default_exclude_cols or []
def _import_pandas(self):
"""Lazy import pandas with helpful error message."""
try:
import pandas as pd
return pd
except ImportError as e:
raise ImportError(
"Data processing functionality requires pandas. "
"Install with: pip install qufe[data]"
) from e
def _import_numpy(self):
"""Lazy import numpy with helpful error message."""
try:
import numpy as np
return np
except ImportError as e:
raise ImportError(
"Data processing functionality requires numpy. "
"Install with: pip install qufe[data]"
) from e
def _validate_dataframe(self, df) -> None:
"""
Validate that input is a pandas DataFrame.
Args:
df: Object to validate
Raises:
TypeError: If input is not a pandas DataFrame
"""
if not isinstance(df, self.pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame")
[docs]
def help(self) -> None:
"""
Display help information for pandas DataFrame utilities.
Shows installation instructions, available methods, and usage examples.
"""
print("qufe.pdhandler.PandasHandler - pandas DataFrame Utilities")
print("=" * 60)
print()
print("✓ Dependencies: INSTALLED")
print()
print("AVAILABLE METHODS:")
print(" • convert_list_to_tuple_in_df(): Convert list values to tuples in DataFrame")
print(" • show_col_names(): Compare column names across multiple DataFrames")
print(" • show_all_na(): Extract rows and columns containing NA values")
print(" • show_all_na_or_empty_rows(): Find rows with NA or empty string values")
print(" • show_all_na_or_empty_columns(): Find columns with NA or empty string values")
print(" • allocate_integer_remainder(): Allocate fractional values as integers")
print(" • rebalance_negative_values(): Redistribute to eliminate negative values")
print(" • calculate_cumulative_balance(): Calculate running balance with segment resets")
print(" • allocate_integer_proportional(): Distribute integers proportionally by weight")
print(" • verify_proportional_allocation(): Verify proportional allocation results")
print()
print("USAGE EXAMPLES:")
print(" from qufe.pdhandler import PandasHandler")
print(" ")
print(" # Initialize handler")
print(" handler = PandasHandler(default_exclude_cols=['id'])")
print(" ")
print(" # Compare columns across DataFrames")
print(" col_dict, comparison_df = handler.show_col_names([df1, df2, df3])")
print(" ")
print(" # Find all NA values in subset")
print(" na_subset = handler.show_all_na(df)")
print(" ")
print(" # Find problematic rows/columns")
print(" problem_rows = handler.show_all_na_or_empty_rows(df)")
print(" ")
print(" # Integer allocation")
print(" result = handler.allocate_integer_remainder(df, 'group', 'value')")
print(" ")
print(" # Rebalance negative values")
print(" result = handler.rebalance_negative_values(df, 'group', 'amount')")
print(" ")
print(" # Cumulative balance calculation")
print(" result = handler.calculate_cumulative_balance(")
print(" df, 'initial', 'inflow', 'outflow', 'new_segment'")
print(" )")
print(" ")
print(" # Proportional integer distribution")
print(" result = handler.allocate_integer_proportional(")
print(" df, 'group', 'total', 'weight', 'allocated'")
print(" )")
[docs]
def convert_list_to_tuple_in_df(self, df) -> object:
"""
Convert list values to tuples in DataFrame object columns.
Preserves None values and other data types unchanged.
Only processes columns with object dtype that contain list values.
Args:
df: Input DataFrame to process (pandas.DataFrame)
Returns:
DataFrame with list values converted to tuples
Raises:
TypeError: If input is not a pandas DataFrame
Example:
>>> handler = PandasHandler()
>>> df = pd.DataFrame({'col1': [[1, 2], [3, 4]], 'col2': ['a', 'b']})
>>> result = handler.convert_list_to_tuple_in_df(df)
>>> print(result['col1'].iloc[0])
(1, 2)
"""
self._validate_dataframe(df)
df_copy = df.copy()
for col in df_copy.columns:
if df_copy[col].dtype == "object" and df_copy[col].map(type).eq(list).any():
df_copy[col] = df_copy[col].apply(lambda x: tuple(x) if isinstance(x, list) else x)
return df_copy
[docs]
def show_col_names(self, dfs: List, print_result: bool = False) -> Tuple[Dict[str, List[str]], object]:
"""
Compare column names across multiple DataFrames.
Creates a comprehensive view of all columns present in the input DataFrames,
showing which columns exist in each DataFrame.
Args:
dfs: List of DataFrames to compare (List[pandas.DataFrame])
print_result: Whether to print the comparison table. Defaults to False.
Returns:
Tuple containing:
- Dictionary mapping DataFrame names to column lists
- Comparison DataFrame showing column presence across DataFrames
Raises:
TypeError: If input is not a list of DataFrames
Example:
>>> handler = PandasHandler()
>>> df1 = pd.DataFrame({'A': [1, 2], 'B': [3, 4]})
>>> df2 = pd.DataFrame({'B': [5, 6], 'C': [7, 8]})
>>> col_dict, comparison_df = handler.show_col_names([df1, df2])
"""
if not isinstance(dfs, list) or not all(isinstance(df, self.pd.DataFrame) for df in dfs):
raise TypeError("Input must be a list of pandas DataFrames")
# Create dictionary mapping each DataFrame to its column list
all_df = {f'df_{idx + 1}': df.columns.to_list() for (idx, df) in enumerate(dfs)}
# Get all unique column names across all DataFrames
all_cols = list(set(col for df_cols in all_df.values() for col in df_cols))
all_cols = sorted(all_cols)
# Create comparison dictionary
df_cols = {'All': all_cols}
df_cols.update({
df_name: [col if col in df_columns else '' for col in all_cols]
for (df_name, df_columns) in all_df.items()
})
# Convert to DataFrame for easy viewing
df_check = self.pd.DataFrame(data=df_cols)
if print_result:
print(df_check)
return (df_cols, df_check)
[docs]
def show_all_na(self, df) -> object:
"""
Extract rows and columns that contain NA values.
Returns a subset of the original DataFrame containing only:
- Rows that have at least one NA value
- Columns that have at least one NA value
Args:
df: Input DataFrame to analyze (pandas.DataFrame)
Returns:
Subset containing only rows and columns with NA values
Raises:
TypeError: If input is not a DataFrame
Example:
>>> handler = PandasHandler()
>>> import numpy as np
>>> df = pd.DataFrame({'A': [1, np.nan], 'B': [3, 4], 'C': [np.nan, 6]})
>>> na_subset = handler.show_all_na(df)
"""
self._validate_dataframe(df)
# Find rows with any NA values
df_rows_na = df[df.isna().any(axis='columns')]
# Find columns with any NA values
df_cols_na = df.columns[df.isna().any()].to_list()
# Return intersection: rows with NA values, showing only columns with NA values
df_na = df_rows_na[df_cols_na]
return df_na
[docs]
def show_all_na_or_empty_rows(self, df, exclude_cols: Optional[List[str]] = None) -> object:
"""
Find rows containing NA values or empty strings.
Identifies rows that have NA values or empty strings ('') in any column,
with option to exclude specific columns from the check.
Args:
df: Input DataFrame to analyze (pandas.DataFrame)
exclude_cols: Columns to exclude from NA/empty check.
If None, uses default_exclude_cols from initialization.
Returns:
Rows containing NA values or empty strings, with all original columns
Raises:
TypeError: If input is not a DataFrame
Example:
>>> handler = PandasHandler(default_exclude_cols=['id'])
>>> df = pd.DataFrame({'A': [1, ''], 'B': [3, 4], 'id': ['x', 'y']})
>>> problem_rows = handler.show_all_na_or_empty_rows(df)
"""
self._validate_dataframe(df)
if exclude_cols is None:
exclude_cols = self.default_exclude_cols
# Select columns to check (excluding specified columns)
cols_to_check = [col for col in df.columns if col not in exclude_cols]
df_check = df[cols_to_check]
# Create mask for rows with NA values or empty strings
mask_row = df_check.isna().any(axis=1) | (df_check == '').any(axis=1)
# Return complete rows that match the criteria
df_na_rows = df[mask_row]
return df_na_rows
[docs]
def show_all_na_or_empty_columns(self, df, exclude_cols: Optional[List[str]] = None) -> object:
"""
Find columns containing NA values or empty strings.
Identifies columns that have NA values or empty strings ('') in any row,
with option to exclude specific columns from the check.
Args:
df: Input DataFrame to analyze (pandas.DataFrame)
exclude_cols: Columns to exclude from NA/empty check.
If None, uses default_exclude_cols from initialization.
Returns:
All rows, but only columns that contain NA values or empty strings
Raises:
TypeError: If input is not a DataFrame
Example:
>>> handler = PandasHandler(default_exclude_cols=['id'])
>>> df = pd.DataFrame({'A': [1, 2], 'B': ['', 'x'], 'id': ['y', 'z']})
>>> problem_cols = handler.show_all_na_or_empty_columns(df)
"""
self._validate_dataframe(df)
if exclude_cols is None:
exclude_cols = self.default_exclude_cols
# Select columns to check (excluding specified columns)
cols_to_check = [col for col in df.columns if col not in exclude_cols]
# Create mask for columns with NA values or empty strings
mask_col = df[cols_to_check].isna().any(axis=0) | (df[cols_to_check] == '').any(axis=0)
# Return all rows but only problematic columns
df_na_cols = df.loc[:, mask_col.index[mask_col]]
return df_na_cols
[docs]
def allocate_integer_remainder(
self,
df,
group_cols: Union[str, List[str]],
value_col: str,
result_col: str = 'allocated',
keep_intermediate: bool = False
) -> object:
"""
Allocate fractional values to integers using the Largest Remainder Method.
Also known as Hamilton's Method or Hare-Niemeyer method, this is a
standard mathematical approach for proportional integer allocation while
preserving group totals.
The method works by:
1. Taking the floor of each fractional value as the base allocation
2. Distributing remaining units to items with largest fractional parts
3. Ensuring the sum of allocated integers equals the floor of group sum
Args:
df: Input DataFrame containing the data to allocate
group_cols: Column(s) to group by for allocation (str or list of str)
value_col: Column containing fractional values to be allocated
result_col: Name for the result column containing integer allocations.
Defaults to 'allocated'.
keep_intermediate: If True, keep intermediate calculation columns.
Defaults to False.
Returns:
DataFrame with integer allocations in the result column
Raises:
TypeError: If input is not a pandas DataFrame
KeyError: If specified columns don't exist in DataFrame
References:
.. [1] Hamilton, A. (1792). "Report on Apportionment"
.. [2] Balinski, M. L., & Young, H. P. (2001). "Fair Representation:
Meeting the Ideal of One Man, One Vote"
Example:
>>> handler = PandasHandler()
>>> data = pd.DataFrame({
... 'group': ['A', 'A', 'B', 'B'],
... 'value': [2.7, 1.8, 3.3, 2.2]
... })
>>> result = handler.allocate_integer_remainder(data, 'group', 'value')
>>> print(result[['group', 'value', 'allocated']])
"""
self._validate_dataframe(df)
# Input validation
df = df.copy()
if isinstance(group_cols, str):
group_cols = [group_cols]
# Check if columns exist
missing_cols = set(group_cols + [value_col]) - set(df.columns)
if missing_cols:
raise KeyError(f"Columns not found in DataFrame: {missing_cols}")
# Calculate group sum
df['_group_sum'] = df.groupby(group_cols)[value_col].transform('sum')
# Calculate base allocation (floor) and fractional remainder
df['_base'] = self.np.floor(df[value_col]).astype('int32')
df['_fraction'] = df[value_col] - df['_base']
# Calculate allocated base sum and remaining units to distribute
df['_base_sum'] = df.groupby(group_cols)['_base'].transform('sum')
df['_remainder'] = self.np.floor(df['_group_sum']).astype('int32') - df['_base_sum']
# Rank by fractional part (largest first) within groups
df['_rank'] = df.groupby(group_cols)['_fraction'].rank(
method='first',
ascending=False
)
# Allocate: base + 1 if ranked within remainder count
df[result_col] = (
df['_base'] +
(df['_rank'] <= df['_remainder']).astype('int32')
).astype('int32')
# Clean up intermediate columns if requested
if not keep_intermediate:
intermediate_cols = [
'_group_sum', '_base', '_fraction',
'_base_sum', '_remainder', '_rank'
]
df = df.drop(columns=intermediate_cols)
return df
[docs]
def rebalance_negative_values(
self,
df,
group_cols: Union[str, List[str]],
value_col: str,
result_col: str = 'rebalanced',
use_offset: bool = False,
keep_intermediate: bool = False
) -> object:
"""
Redistribute values within groups to eliminate negatives while preserving totals.
This function redistributes group totals to eliminate negative values while
maintaining proportional relationships among positive values. It implements
a weighted redistribution algorithm with optional offset handling.
Args:
df: Input DataFrame containing values to rebalance
group_cols: Column(s) defining groups for rebalancing (str or list of str)
value_col: Column containing values to rebalance (may include negatives)
result_col: Name for the result column containing rebalanced values.
Defaults to 'rebalanced'.
use_offset: If True, use offset method to include negative values in
weight calculation. If False, only positive values contribute
to redistribution weights. Defaults to False.
keep_intermediate: If True, keep intermediate calculation columns.
Defaults to False.
Returns:
DataFrame with rebalanced values in the result column
Raises:
TypeError: If input is not a pandas DataFrame
KeyError: If specified columns don't exist in DataFrame
Notes:
- Groups with negative total sum cannot be rebalanced (values unchanged)
- Groups where all values are equal receive equal distribution
- The offset method shifts all values by the minimum to calculate weights
Example:
>>> handler = PandasHandler()
>>> data = pd.DataFrame({
... 'group': ['A', 'A', 'A', 'B', 'B'],
... 'value': [10, -5, 15, 20, -10]
... })
>>> result = handler.rebalance_negative_values(data, 'group', 'value')
>>> print(result[['group', 'value', 'rebalanced']])
"""
self._validate_dataframe(df)
# Input validation
df = df.copy()
if isinstance(group_cols, str):
group_cols = [group_cols]
# Check if columns exist
missing_cols = set(group_cols + [value_col]) - set(df.columns)
if missing_cols:
raise KeyError(f"Columns not found in DataFrame: {missing_cols}")
# Calculate group sum
df['_sum'] = df.groupby(group_cols)[value_col].transform('sum').astype('int32')
# Calculate weights for redistribution
if use_offset:
# Offset method: shift by minimum to make all values positive
df['_min'] = df.groupby(group_cols)[value_col].transform('min').astype('int32')
df['_offset'] = (-df['_min']).clip(lower=0).astype('int32')
df['_weight'] = (df[value_col] + df['_offset']).astype('int32')
else:
# Standard method: use only non-negative values as weights
df['_weight'] = self.np.maximum(df[value_col], 0).astype('int32')
# Calculate weight sum per group
df['_weight_sum'] = df.groupby(group_cols)['_weight'].transform('sum').astype('int32')
# Calculate proportional allocation
mask_valid = df['_weight_sum'] > 0
df['_ratio'] = 0.0
df.loc[mask_valid, '_ratio'] = (
df.loc[mask_valid, '_weight'] / df.loc[mask_valid, '_weight_sum']
)
# Calculate ideal fractional allocation
df['_ideal'] = df['_sum'] * df['_ratio']
# Apply integer allocation using largest remainder method
df['_base'] = self.np.floor(df['_ideal']).astype('int32')
df['_fraction'] = df['_ideal'] - df['_base']
# Calculate remainder to distribute
df['_base_sum'] = df.groupby(group_cols)['_base'].transform('sum').astype('int32')
df['_remainder'] = df['_sum'] - df['_base_sum']
# Rank by fractional part for remainder distribution
df['_rank'] = df.groupby(group_cols)['_fraction'].rank(
method='first',
ascending=False
)
# Final redistribution
df[result_col] = (
df['_base'] +
(df['_rank'] <= df['_remainder']).astype('int32')
).astype('int32')
# Handle special cases
# Case 1: Zero weight sum (all weights are zero)
mask_zero_weight = df['_weight_sum'] == 0
df.loc[mask_zero_weight, result_col] = 0
# Case 2: Negative group sum (cannot rebalance)
mask_negative_sum = df['_sum'] < 0
df.loc[mask_negative_sum, result_col] = df.loc[mask_negative_sum, value_col]
# Clean up intermediate columns if requested
if not keep_intermediate:
intermediate_cols = [
'_sum', '_weight', '_weight_sum', '_ratio', '_ideal',
'_base', '_fraction', '_base_sum', '_remainder', '_rank'
]
if use_offset:
intermediate_cols.extend(['_min', '_offset'])
existing_cols = [col for col in intermediate_cols if col in df.columns]
df = df.drop(columns=existing_cols)
return df
[docs]
def calculate_cumulative_balance(
self,
df,
initial_col: str,
inflow_col: str,
outflow_col: str,
segment_marker_col: str,
result_col: str = 'balance',
group_cols: Optional[Union[str, List[str]]] = None,
keep_intermediate: bool = False,
replace_initial_col: bool = True
) -> object:
"""
Calculate cumulative balance with segment-based reinitialization.
Computes running balance where each segment (marked by a flag column) starts
with an initial value and accumulates inflows minus outflows. This is commonly
used for tracking any cumulative flow with periodic resets.
The calculation follows these rules:
- When segment marker is 1: balance = initial + inflow - outflow
- When segment marker is 0: balance = previous_balance + inflow - outflow
- Each group (if specified) is calculated independently
This method uses vectorized operations for efficient computation on large datasets,
avoiding explicit loops through clever use of cumulative sums and groupby operations.
Args:
df: Input DataFrame with flow data (must be pre-sorted)
initial_col: Column containing initial values for each segment start
inflow_col: Column containing inflow amounts (additions)
outflow_col: Column containing outflow amounts (subtractions)
segment_marker_col: Column with 0/1 values marking segment starts (1=start)
result_col: Name for the result column. Defaults to 'balance'.
group_cols: Optional column(s) to group by for independent calculations.
Can be string or list of strings. Defaults to None.
keep_intermediate: If True, keep intermediate calculation columns.
Defaults to False.
replace_initial_col: If True, replace initial column with previous balance column.
Returns:
DataFrame with cumulative balance in the result column
Raises:
TypeError: If input is not a pandas DataFrame
KeyError: If specified columns don't exist in DataFrame
ValueError: If segment_marker_col contains values other than 0 or 1
Notes:
- DataFrame must be pre-sorted in the desired order
- Segment markers must be 0 or 1 (1 indicates segment start)
- Each group's first row should have segment_marker = 1
- Uses pandas cumsum and groupby for vectorized computation
Example:
>>> handler = PandasHandler()
>>> # Simple cumulative flow tracking
>>> data = pd.DataFrame({
... 'initial': [100, 0, 0, 150, 0],
... 'inflow': [10, 20, 15, 10, 25],
... 'outflow': [5, 30, 10, 20, 15],
... 'new_segment': [1, 0, 0, 1, 0]
... })
>>> result = handler.calculate_cumulative_balance(
... data,
... initial_col='initial',
... inflow_col='inflow',
... outflow_col='outflow',
... segment_marker_col='new_segment'
... )
>>> print(result[['balance']])
>>> # Multi-entity tracking with groups
>>> data = pd.DataFrame({
... 'entity': ['A', 'A', 'A', 'B', 'B', 'B'],
... 'period': [1, 2, 3, 1, 2, 3],
... 'initial': [100, 0, 0, 200, 0, 0],
... 'additions': [50, 30, 40, 60, 70, 80],
... 'deductions': [20, 45, 35, 50, 65, 75],
... 'period_start': [1, 0, 0, 1, 0, 0]
... })
>>> result = handler.calculate_cumulative_balance(
... data,
... initial_col='initial',
... inflow_col='additions',
... outflow_col='deductions',
... segment_marker_col='period_start',
... group_cols='entity'
... )
Algorithm Details:
The method uses a segment-based approach where:
1. Net flow (delta) = inflow - outflow
2. Segments are identified by cumulative sum of markers
3. Initial value propagates through each segment
4. Cumulative delta within segments gives final balance
This achieves O(n) complexity without explicit loops.
"""
self._validate_dataframe(df)
# Input validation
df = df.copy()
# Normalize group_cols to list
if group_cols is None:
group_cols = []
elif isinstance(group_cols, str):
group_cols = [group_cols]
# Check if all required columns exist
required_cols = [initial_col, inflow_col, outflow_col, segment_marker_col] + group_cols
missing_cols = set(required_cols) - set(df.columns)
if missing_cols:
raise KeyError(f"Columns not found in DataFrame: {missing_cols}")
# Validate segment marker values
unique_markers = df[segment_marker_col].unique()
if not set(unique_markers).issubset({0, 1}):
invalid_values = set(unique_markers) - {0, 1}
raise ValueError(
f"Segment marker column '{segment_marker_col}' contains invalid values: {invalid_values}. "
f"Only 0 and 1 are allowed."
)
# Calculate net flow (inflow - outflow)
df['_delta'] = df[inflow_col] - df[outflow_col]
# Create segment identifiers
if group_cols:
# For grouped data, segment numbers restart for each group
df['_segment'] = df.groupby(group_cols)[segment_marker_col].cumsum()
# Extract initial values at segment starts
df['_initial_at_segment'] = self.np.where(
df[segment_marker_col].astype(bool),
df[initial_col],
self.np.nan
)
# Propagate initial values through each segment
df['_segment_initial'] = (
df.groupby(group_cols + ['_segment'])['_initial_at_segment']
.transform('first')
)
# Calculate cumulative delta within each segment
df['_cumulative_delta'] = (
df.groupby(group_cols + ['_segment'])['_delta']
.cumsum()
)
else:
# For ungrouped data, simpler calculation
df['_segment'] = df[segment_marker_col].cumsum()
# Extract initial values at segment starts
df['_initial_at_segment'] = self.np.where(
df[segment_marker_col].astype(bool),
df[initial_col],
self.np.nan
)
# Propagate initial values through each segment
df['_segment_initial'] = (
df.groupby('_segment')['_initial_at_segment']
.transform('first')
)
# Calculate cumulative delta within each segment
df['_cumulative_delta'] = (
df.groupby('_segment')['_delta']
.cumsum()
)
# Calculate final balance: segment initial + cumulative delta
df[result_col] = df['_segment_initial'] + df['_cumulative_delta']
# Calculate previous_balance: initial if segment_marker==1, else balance
if group_cols:
shifted_balance = df.groupby(group_cols)[result_col].shift(1)
else:
shifted_balance = df[result_col].shift(1)
previous_balance_col = f'previous_{result_col}'
df[previous_balance_col] = self.np.where(
df[segment_marker_col] == 1,
df[initial_col],
shifted_balance
)
# Validation: previous_balance + inflow - outflow == balance
df['_validation'] = df[previous_balance_col] + df[inflow_col] - df[outflow_col]
validation_mismatch = ~self.np.isclose(
df[result_col].fillna(0),
df['_validation'].fillna(0),
rtol=1e-9,
atol=1e-9
)
if validation_mismatch.any():
mismatch_count = validation_mismatch.sum()
raise ValueError(
f"Validation failed: {mismatch_count} rows have mismatch between "
f"calculated balance and validation formula "
f"({previous_balance_col} + {inflow_col} - {outflow_col} != {result_col})"
)
# Handle edge case: if first row has no segment marker
if group_cols:
# Check each group's first row
first_rows = df.groupby(group_cols).head(1)
if (first_rows[segment_marker_col] == 0).any():
warnings.warn(
f"Some groups have first row with {segment_marker_col}=0. "
f"These rows will have NaN in {result_col}. "
f"Consider ensuring each group starts with {segment_marker_col}=1.",
UserWarning
)
else:
# Check overall first row
if df.iloc[0][segment_marker_col] == 0:
warnings.warn(
f"First row has {segment_marker_col}=0. "
f"This row will have NaN in {result_col}. "
f"Consider ensuring data starts with {segment_marker_col}=1.",
UserWarning
)
# Clean up intermediate columns if requested
if not keep_intermediate:
intermediate_cols = [
'_delta', '_segment', '_initial_at_segment',
'_segment_initial', '_cumulative_delta'
]
existing_cols = [col for col in intermediate_cols if col in df.columns]
df = df.drop(columns=existing_cols)
if replace_initial_col:
df = df.drop(columns=initial_col).rename(columns={previous_balance_col: initial_col})
return df
[docs]
def allocate_integer_proportional(
self,
df,
group_cols: Union[str, List[str]],
total_col: str,
weight_col: str,
result_col: str = 'allocated',
keep_intermediate: bool = False,
int_dtype: str = 'int64'
) -> object:
"""
Distribute group totals as integers proportional to weights using Hamilton's Method.
Each group distributes its total value (assumed identical across rows in the group)
as integers proportional to weight values, ensuring the sum of allocated integers
equals the group total exactly.
This method implements Hamilton's Method (Largest Remainder Method), which:
1. Calculates proportional float values based on weights
2. Takes floor of each value as base allocation
3. Distributes remaining units to items with largest fractional remainders
Args:
df: Input DataFrame containing groups, totals, and weights
group_cols: Column(s) defining groups for distribution (str or list of str)
total_col: Column containing group total (must be identical within each group)
weight_col: Column containing positive weights for proportional distribution
result_col: Name for the result column containing integer allocations.
Defaults to 'allocated'.
keep_intermediate: If True, keep intermediate calculation columns for debugging
(_raw, _base, _remainder, _rank_in_group, _remaining, _increment).
Defaults to False.
int_dtype: Integer dtype for result column ('int64' or 'int32').
Use 'int32' for memory savings when total values are below 2^31.
Defaults to 'int64'.
Returns:
DataFrame with proportional integer allocations in the result column
Raises:
TypeError: If input is not a pandas DataFrame
KeyError: If specified columns don't exist in DataFrame
Notes:
- Assumes total_col values are identical within each group
- Assumes weight_col contains positive values (or at least group sum > 0)
- Uses Hamilton's Method for fair integer distribution
- Result sum per group equals total_col value exactly
Example:
>>> handler = PandasHandler()
>>> # Single group column
>>> data = pd.DataFrame({
... 'group': ['A', 'A', 'A', 'B', 'B'],
... 'total': [100, 100, 100, 50, 50],
... 'weight': [30, 50, 20, 40, 60]
... })
>>> result = handler.allocate_integer_proportional(
... data, 'group', 'total', 'weight'
... )
>>> print(result[['group', 'total', 'weight', 'allocated']])
>>> # Verify sum per group equals total
>>> print(result.groupby('group')['allocated'].sum())
>>> # Multiple group columns
>>> data = pd.DataFrame({
... 'region': ['East', 'East', 'West', 'West'],
... 'category': ['A', 'B', 'A', 'B'],
... 'total': [100, 100, 50, 50],
... 'weight': [60, 40, 30, 70]
... })
>>> result = handler.allocate_integer_proportional(
... data, ['region', 'category'], 'total', 'weight'
... )
"""
self._validate_dataframe(df)
# Input validation
df = df.copy()
if isinstance(group_cols, str):
group_cols = [group_cols]
# Check if columns exist
required_cols = group_cols + [total_col, weight_col]
missing_cols = set(required_cols) - set(df.columns)
if missing_cols:
raise KeyError(f"Columns not found in DataFrame: {missing_cols}")
# Group operations
grp = df.groupby(group_cols, sort=False)
# Calculate group statistics
weight_sum = grp[weight_col].transform("sum")
total_per_row = grp[total_col].transform("first")
# Calculate proportional float values
raw = total_per_row * df[weight_col] / weight_sum
# Base allocation (floor)
base = self.np.floor(raw).astype(int_dtype)
# Fractional remainder
remainder = raw - base
# Calculate remaining units to distribute per group
base_sum = grp[weight_col].transform(lambda x: base[x.index].sum())
remaining = (total_per_row - base_sum).astype(int_dtype)
# Rank items by remainder within each group (largest remainder first)
df['_remainder'] = remainder
df['_group_id'] = grp.ngroup()
# Calculate rank within group using argsort for efficiency
df['_rank_in_group'] = (
grp['_remainder']
.transform(lambda x: self.pd.Series(self.np.argsort(-x.values), index=x.index))
)
# Distribute remaining units to top-ranked items
increment = (df['_rank_in_group'] < remaining).astype(int_dtype)
# Final allocation
df[result_col] = base + increment
# Optional: keep intermediate columns for debugging
if keep_intermediate:
df['_raw'] = raw
df['_base'] = base
df['_remaining'] = remaining
df['_increment'] = increment
else:
# Clean up intermediate columns
df = df.drop(columns=['_remainder', '_group_id', '_rank_in_group'])
return df
[docs]
def verify_proportional_allocation(
self,
df,
group_cols: Union[str, List[str]],
total_col: str,
result_col: str
) -> object:
"""
Verify proportional allocation results.
Checks that the sum of allocated values per group equals the expected total
for each group. Useful for validating results from allocate_integer_proportional.
Args:
df: DataFrame containing allocation results
group_cols: Column(s) defining groups (str or list of str)
total_col: Column containing expected group totals
result_col: Column containing allocated values to verify
Returns:
DataFrame with verification results showing:
- Group identifier(s)
- Expected total (first value of total_col in group)
- Actual total (sum of result_col in group)
- Validity flag (True if expected == actual)
Raises:
TypeError: If input is not a pandas DataFrame
KeyError: If specified columns don't exist in DataFrame
Example:
>>> handler = PandasHandler()
>>> # Single group column
>>> result = handler.allocate_integer_proportional(
... df, 'group', 'total', 'weight'
... )
>>> verification = handler.verify_proportional_allocation(
... result, 'group', 'total', 'allocated'
... )
>>> # Check for any mismatches
>>> if not verification['is_valid'].all():
... print("Allocation errors found!")
... print(verification[~verification['is_valid']])
>>> # Multiple group columns
>>> result = handler.allocate_integer_proportional(
... df, ['region', 'category'], 'total', 'weight'
... )
>>> verification = handler.verify_proportional_allocation(
... result, ['region', 'category'], 'total', 'allocated'
... )
"""
self._validate_dataframe(df)
# Input validation
if isinstance(group_cols, str):
group_cols = [group_cols]
# Check if columns exist
required_cols = group_cols + [total_col, result_col]
missing_cols = set(required_cols) - set(df.columns)
if missing_cols:
raise KeyError(f"Columns not found in DataFrame: {missing_cols}")
# Group and aggregate
verification = df.groupby(group_cols, sort=False).agg({
total_col: 'first',
result_col: 'sum'
}).reset_index()
# Rename aggregated columns
rename_dict = {
total_col: 'expected_total',
result_col: 'actual_total'
}
verification = verification.rename(columns=rename_dict)
verification['is_valid'] = (
verification['expected_total'] == verification['actual_total']
)
return verification