This is an old question but I have not seen an answer anywhere that does not use Xarray (which is overkill for this simple example). Since I recently made such a function using pandas and base Python alone, I thought I would share it here. It's pretty lengthy to handle all foreseeable cases, but please let me know if you find an edge case that Claude Code and I haven't anticipated.
import pandas as pd
from typing import Union, Optional
import calendar
def custom_year_averages(
data: pd.Series,
start_month: int,
end_month: int,
years: Optional[Union[int, list, range]] = None
) -> pd.Series:
"""
Compute weighted averages over custom year periods that may straddle calendar years.
Parameters
----------
data : pd.Series
Time series data with DatetimeIndex. Can be daily or monthly frequency.
start_month : int
Starting month of the custom year (1-12).
end_month : int
Ending month of the custom year (1-12).
years : int, list, range, or None
Year(s) for which to compute averages. If None, computes for all available years.
For straddling periods (e.g., Apr-Mar), year refers to the year of the end month.
Returns
-------
pd.Series
Series index by year, with weighted averages as values.
Examples
--------
.. jupyter-execute::
import pandas as pd
import numpy as np
# Create sample monthly data
dates = pd.date_range('2020-01-01', '2023-12-31', freq='M')
values = np.arange(len(dates)) # Sequential values for clarity
ts = pd.Series(values, index=dates, name='monthly_values')
# Example 1: Single year (int)
# Compute Jan-Mar average for 2021 only
avg_single = custom_year_averages(ts, 1, 3, years=2021)
print("Single year (2021):")
print(avg_single)
print()
# Example 2: List of specific years
# Compute Apr-Mar averages for selected years
avg_list = custom_year_averages(ts, 4, 3, years=[2021, 2023])
print("Specific years [2021, 2023]:")
print(avg_list)
print()
# Example 3: Range of years
# Compute Oct-Sep averages for consecutive years
avg_range = custom_year_averages(ts, 10, 9, years=range(2021, 2024))
print("Range of years (2021-2023):")
print(avg_range)
print()
# Example 4: All available years (None - default)
# Compute Jan-Dec averages for all years in data
avg_all = custom_year_averages(ts, 1, 12, years=None)
print("All available years (None):")
print(avg_all)
print()
# Example 5: Straddling periods with different year specifications
# Apr-Mar periods: year refers to the March year
avg_straddle = custom_year_averages(ts, 4, 3, years=range(2021, 2024))
print("Straddling periods (Apr-Mar), years 2021-2023:")
print(avg_straddle)
.. jupyter-execute::
# Daily data example with uneven spacing
daily_dates = pd.to_datetime([
'2021-01-01', '2021-01-05', '2021-01-20',
'2021-02-01', '2021-02-15', '2021-02-28',
'2021-03-05', '2021-03-25', '2021-03-31'
])
daily_values = [10, 15, 20, 25, 30, 35, 40, 45, 50]
daily_ts = pd.Series(daily_values, index=daily_dates, name='daily_data')
# Compute weighted average (accounts for uneven spacing)
daily_avg = custom_year_averages(daily_ts, 1, 3, years=2021)
print("Daily data with uneven spacing (Jan-Mar 2021):")
print(f"Weighted average: {daily_avg.iloc[0]:.2f}")
print(f"Simple mean: {daily_ts.mean():.2f}")
print("(Note: weighted average accounts for time intervals between observations)")
"""
if not isinstance(data.index, pd.DatetimeIndex):
raise ValueError("Data must have a DatetimeIndex")
if not (1 <= start_month <= 12 and 1 <= end_month <= 12):
raise ValueError("Months must be between 1 and 12")
# Determine if the period straddles calendar years
straddles_year = start_month > end_month
# Get available years from data
data_years = sorted(data.index.year.unique())
if years is None:
if straddles_year:
# For straddling periods, we need data from both years
years = [y for y in data_years if y > min(data_years)]
else:
years = data_years
elif isinstance(years, int):
years = [years]
results = {}
for year in years:
if straddles_year:
# Period spans two calendar years (e.g., Apr 2020 to Mar 2021)
start_date = pd.Timestamp(year=year-1, month=start_month, day=1)
end_date = pd.Timestamp(year=year, month=end_month, day=calendar.monthrange(year, end_month)[1])
else:
# Period within single calendar year
start_date = pd.Timestamp(year=year, month=start_month, day=1)
end_date = pd.Timestamp(year=year, month=end_month, day=calendar.monthrange(year, end_month)[1])
# Filter data for this period
mask = (data.index >= start_date) & (data.index <= end_date)
period_data = data[mask]
if len(period_data) == 0:
continue
# Calculate weighted average
weighted_avg = _calculate_weighted_average(period_data, start_date, end_date)
# Use the ending year as the index (standard for fiscal years)
results[year] = weighted_avg
# Return results only for years that have data
return pd.Series(list(results.values()), index=list(results.keys()), name=data.name or 'weighted_average')
def _calculate_weighted_average(
data: pd.Series,
period_start: pd.Timestamp,
period_end: pd.Timestamp
) -> float:
"""
Calculate weighted average where weights are based on time intervals.
For monthly data, each month gets equal weight regardless of days.
For other frequencies, weights are based on time intervals.
"""
if len(data) == 0:
return np.nan
if len(data) == 1:
return data.iloc[0]
# Sort data by index
data = data.sort_index()
timestamps = data.index.to_series()
# Check if this looks like monthly data (all timestamps are month-ends)
is_monthly = all(
ts.day == calendar.monthrange(ts.year, ts.month)[1]
for ts in timestamps
)
if is_monthly:
# For monthly data, give each month equal weight
return data.mean()
# For non-monthly data, use time-interval weighting
intervals = []
for i, ts in enumerate(timestamps):
if i == 0:
# First observation: from period start to midpoint with next
if len(timestamps) > 1:
interval_end = ts + (timestamps.iloc[i+1] - ts) / 2
else:
interval_end = period_end
interval_start = period_start
elif i == len(timestamps) - 1:
# Last observation: from midpoint with previous to period end
interval_start = timestamps.iloc[i-1] + (ts - timestamps.iloc[i-1]) / 2
interval_end = period_end
else:
# Middle observations: from midpoint with previous to midpoint with next
interval_start = timestamps.iloc[i-1] + (ts - timestamps.iloc[i-1]) / 2
interval_end = ts + (timestamps.iloc[i+1] - ts) / 2
# Ensure intervals don't extend beyond the period
interval_start = max(interval_start, period_start)
interval_end = min(interval_end, period_end)
interval_duration = (interval_end - interval_start).total_seconds()
intervals.append(max(interval_duration, 0)) # Ensure non-negative
weights = np.array(intervals)
# Handle case where all weights are zero
if weights.sum() == 0:
return data.mean()
# Calculate weighted average
weighted_sum = (data.values * weights).sum()
total_weight = weights.sum()
return weighted_sum / total_weight