I ended up adapting this answer https://stackoverflow.com/a/28945444/22825680 to create a "refreshable" replay subject (which is passed refreshSubject in my example) then an operator that utilises it to replace shareReplay(1) in these instances
class CacheSubject<T> implements SubjectLike<T>
{
// Adapted from https://stackoverflow.com/a/28945444/22825680
private readonly mySubjects!: ReplaySubject<Observable<T>>;
private readonly myConcatenatedSubjects!: Observable<T>;
private myCurrentSubject!: ReplaySubject<T>;
constructor(resetSignal$?: Observable<void>)
{
this.mySubjects = new ReplaySubject<Observable<T>>(1);
this.myConcatenatedSubjects = this.mySubjects.pipe(
concatAll(),
);
this.myCurrentSubject = new ReplaySubject<T>();
this.mySubjects.next(this.myCurrentSubject);
if (resetSignal$ != null)
{
resetSignal$.subscribe({
next: () =>
{
this.reset();
},
});
}
}
public reset(): void
{
this.myCurrentSubject.complete();
this.myCurrentSubject = new ReplaySubject<T>();
this.mySubjects.next(this.myCurrentSubject);
}
public next(value: T): void
{
this.myCurrentSubject.next(value);
}
public error(err: any): void
{
this.myCurrentSubject.error(err);
}
public complete()
{
this.myCurrentSubject.complete();
this.mySubjects.complete();
// Make current subject unreachable.
this.myCurrentSubject = new ReplaySubject<T>();
}
public subscribe(observer: Observer<T>): Subscription
{
return this.myConcatenatedSubjects.subscribe(observer);
}
}
function cache<T>(
resetSignal$?: Observable<void>,
): MonoTypeOperatorFunction<T>
{
return share<T>({
connector: () =>
{
return resetSignal$ == null
? new ReplaySubject<T>(1)
: new CacheSubject(resetSignal$);
},
resetOnError: true,
resetOnComplete: false,
resetOnRefCountZero: false,
});
}
// Acts like shareReplay(1) but clears its buffer when resetSignal$ emits.