79723174

Date: 2025-08-02 06:52:04
Score: 1.5
Natty:
Report link

This is an old question but I have not seen an answer anywhere that does not use Xarray (which is overkill for this simple example). Since I recently made such a function using pandas and base Python alone, I thought I would share it here. It's pretty lengthy to handle all foreseeable cases, but please let me know if you find an edge case that Claude Code and I haven't anticipated.

import pandas as pd
from typing import Union, Optional
import calendar


def custom_year_averages(
    data: pd.Series,
    start_month: int,
    end_month: int,
    years: Optional[Union[int, list, range]] = None
) -> pd.Series:
    """
    Compute weighted averages over custom year periods that may straddle calendar years.
    
    Parameters
    ----------
    data : pd.Series
        Time series data with DatetimeIndex. Can be daily or monthly frequency.
    start_month : int
        Starting month of the custom year (1-12).
    end_month : int
        Ending month of the custom year (1-12).
    years : int, list, range, or None
        Year(s) for which to compute averages. If None, computes for all available years.
        For straddling periods (e.g., Apr-Mar), year refers to the year of the end month.
    
    Returns
    -------
    pd.Series
        Series index by year, with weighted averages as values.
        
    Examples
    --------
    .. jupyter-execute::
    
        import pandas as pd
        import numpy as np
        
        # Create sample monthly data
        dates = pd.date_range('2020-01-01', '2023-12-31', freq='M')
        values = np.arange(len(dates))  # Sequential values for clarity
        ts = pd.Series(values, index=dates, name='monthly_values')
        
        # Example 1: Single year (int)
        # Compute Jan-Mar average for 2021 only
        avg_single = custom_year_averages(ts, 1, 3, years=2021)
        print("Single year (2021):")
        print(avg_single)
        print()
        
        # Example 2: List of specific years
        # Compute Apr-Mar averages for selected years
        avg_list = custom_year_averages(ts, 4, 3, years=[2021, 2023])
        print("Specific years [2021, 2023]:")
        print(avg_list)
        print()
        
        # Example 3: Range of years
        # Compute Oct-Sep averages for consecutive years
        avg_range = custom_year_averages(ts, 10, 9, years=range(2021, 2024))
        print("Range of years (2021-2023):")
        print(avg_range)
        print()
        
        # Example 4: All available years (None - default)
        # Compute Jan-Dec averages for all years in data
        avg_all = custom_year_averages(ts, 1, 12, years=None)
        print("All available years (None):")
        print(avg_all)
        print()
        
        # Example 5: Straddling periods with different year specifications
        # Apr-Mar periods: year refers to the March year
        avg_straddle = custom_year_averages(ts, 4, 3, years=range(2021, 2024))
        print("Straddling periods (Apr-Mar), years 2021-2023:")
        print(avg_straddle)
    
    .. jupyter-execute::
    
        # Daily data example with uneven spacing
        daily_dates = pd.to_datetime([
            '2021-01-01', '2021-01-05', '2021-01-20',
            '2021-02-01', '2021-02-15', '2021-02-28',
            '2021-03-05', '2021-03-25', '2021-03-31'
        ])
        daily_values = [10, 15, 20, 25, 30, 35, 40, 45, 50]
        daily_ts = pd.Series(daily_values, index=daily_dates, name='daily_data')
        
        # Compute weighted average (accounts for uneven spacing)
        daily_avg = custom_year_averages(daily_ts, 1, 3, years=2021)
        print("Daily data with uneven spacing (Jan-Mar 2021):")
        print(f"Weighted average: {daily_avg.iloc[0]:.2f}")
        print(f"Simple mean: {daily_ts.mean():.2f}")
        print("(Note: weighted average accounts for time intervals between observations)")
    """
    if not isinstance(data.index, pd.DatetimeIndex):
        raise ValueError("Data must have a DatetimeIndex")
    
    if not (1 <= start_month <= 12 and 1 <= end_month <= 12):
        raise ValueError("Months must be between 1 and 12")
    
    # Determine if the period straddles calendar years
    straddles_year = start_month > end_month
    
    # Get available years from data
    data_years = sorted(data.index.year.unique())
    
    if years is None:
        if straddles_year:
            # For straddling periods, we need data from both years
            years = [y for y in data_years if y > min(data_years)]
        else:
            years = data_years
    elif isinstance(years, int):
        years = [years]
    
    results = {}
    
    for year in years:
        if straddles_year:
            # Period spans two calendar years (e.g., Apr 2020 to Mar 2021)
            start_date = pd.Timestamp(year=year-1, month=start_month, day=1)
            end_date = pd.Timestamp(year=year, month=end_month, day=calendar.monthrange(year, end_month)[1])
        else:
            # Period within single calendar year
            start_date = pd.Timestamp(year=year, month=start_month, day=1)
            end_date = pd.Timestamp(year=year, month=end_month, day=calendar.monthrange(year, end_month)[1])
        
        # Filter data for this period
        mask = (data.index >= start_date) & (data.index <= end_date)
        period_data = data[mask]
        
        if len(period_data) == 0:
            continue
            
        # Calculate weighted average
        weighted_avg = _calculate_weighted_average(period_data, start_date, end_date)
        
        # Use the ending year as the index (standard for fiscal years)
        results[year] = weighted_avg
    
    # Return results only for years that have data
    return pd.Series(list(results.values()), index=list(results.keys()), name=data.name or 'weighted_average')


def _calculate_weighted_average(
    data: pd.Series, 
    period_start: pd.Timestamp, 
    period_end: pd.Timestamp
) -> float:
    """
    Calculate weighted average where weights are based on time intervals.
    
    For monthly data, each month gets equal weight regardless of days.
    For other frequencies, weights are based on time intervals.
    """
    if len(data) == 0:
        return np.nan
    
    if len(data) == 1:
        return data.iloc[0]
    
    # Sort data by index
    data = data.sort_index()
    timestamps = data.index.to_series()
    
    # Check if this looks like monthly data (all timestamps are month-ends)
    is_monthly = all(
        ts.day == calendar.monthrange(ts.year, ts.month)[1] 
        for ts in timestamps
    )
    
    if is_monthly:
        # For monthly data, give each month equal weight
        return data.mean()
    
    # For non-monthly data, use time-interval weighting
    intervals = []
    
    for i, ts in enumerate(timestamps):
        if i == 0:
            # First observation: from period start to midpoint with next
            if len(timestamps) > 1:
                interval_end = ts + (timestamps.iloc[i+1] - ts) / 2
            else:
                interval_end = period_end
            interval_start = period_start
        elif i == len(timestamps) - 1:
            # Last observation: from midpoint with previous to period end
            interval_start = timestamps.iloc[i-1] + (ts - timestamps.iloc[i-1]) / 2
            interval_end = period_end
        else:
            # Middle observations: from midpoint with previous to midpoint with next
            interval_start = timestamps.iloc[i-1] + (ts - timestamps.iloc[i-1]) / 2
            interval_end = ts + (timestamps.iloc[i+1] - ts) / 2
        
        # Ensure intervals don't extend beyond the period
        interval_start = max(interval_start, period_start)
        interval_end = min(interval_end, period_end)
        
        interval_duration = (interval_end - interval_start).total_seconds()
        intervals.append(max(interval_duration, 0))  # Ensure non-negative
    
    weights = np.array(intervals)
    
    # Handle case where all weights are zero
    if weights.sum() == 0:
        return data.mean()
    
    # Calculate weighted average
    weighted_sum = (data.values * weights).sum()
    total_weight = weights.sum()
    
    return weighted_sum / total_weight
Reasons:
  • RegEx Blacklisted phrase (2.5): please let me know
  • Long answer (-1):
  • Has code block (-0.5):
  • Low reputation (0.5):
Posted by: CommonClimate