From f446d77cb51af1f66a0042feec2f0907537a16ce Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Thu, 5 Jun 2025 10:49:16 -0400 Subject: [PATCH] implement QuantumKVCache --- .../backend/src/core/GlobalEventService.ts | 1 + packages/backend/src/misc/cache.ts | 233 ++++++++++ packages/backend/test/unit/misc/cache.ts | 431 ++++++++++++++++++ 3 files changed, 665 insertions(+) create mode 100644 packages/backend/test/unit/misc/cache.ts diff --git a/packages/backend/src/core/GlobalEventService.ts b/packages/backend/src/core/GlobalEventService.ts index d1a5cabd85..763ab8c086 100644 --- a/packages/backend/src/core/GlobalEventService.ts +++ b/packages/backend/src/core/GlobalEventService.ts @@ -265,6 +265,7 @@ export interface InternalEventTypes { unmute: { muterId: MiUser['id']; muteeId: MiUser['id']; }; userListMemberAdded: { userListId: MiUserList['id']; memberId: MiUser['id']; }; userListMemberRemoved: { userListId: MiUserList['id']; memberId: MiUser['id']; }; + quantumCacheUpdated: { name: string, key: string, op: 's' | 'd' }; } type EventTypesToEventPayload = EventUnionFromDictionary>>; diff --git a/packages/backend/src/misc/cache.ts b/packages/backend/src/misc/cache.ts index a6ab96c189..31e6f126b8 100644 --- a/packages/backend/src/misc/cache.ts +++ b/packages/backend/src/misc/cache.ts @@ -5,6 +5,8 @@ import * as Redis from 'ioredis'; import { bindThis } from '@/decorators.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; +import { InternalEventTypes } from '@/core/GlobalEventService.js'; export class RedisKVCache { private readonly lifetime: number; @@ -322,6 +324,10 @@ export class MemoryKVCache { clearInterval(this.gcIntervalHandle); } + public get size() { + return this.cache.size; + } + public get entries() { return this.cache.entries(); } @@ -410,3 +416,230 @@ export class MemorySingleCache { return value; } } + +export interface QuantumKVOpts { + /** + * Memory cache lifetime in milliseconds. + */ + lifetime: number; + + /** + * Callback to fetch the value for a key that wasn't found in the cache. + * May be synchronous or async. + */ + fetcher: (key: string, cache: QuantumKVCache) => T | Promise; + + /** + * Optional callback when a value is created or changed in the cache, either locally or elsewhere in the cluster. + * This is called *after* the cache state is updated. + * May be synchronous or async. + */ + onSet?: (key: string, cache: QuantumKVCache) => void | Promise; + + /** + * Optional callback when a value is deleted from the cache, either locally or elsewhere in the cluster. + * This is called *after* the cache state is updated. + * May be synchronous or async. + */ + onDelete?: (key: string, cache: QuantumKVCache) => void | Promise; +} + +/** + * QuantumKVCache is a lifetime-bounded memory cache (like MemoryKVCache) with automatic cross-cluster synchronization via Redis. + * All nodes in the cluster are guaranteed to have a *subset* view of the current accurate state, though individual processes may have different items in their local cache. + * This ensures that a call to get() will never return stale data. + */ +export class QuantumKVCache implements Iterable<[key: string, value: T]> { + private readonly memoryCache: MemoryKVCache; + + private readonly fetcher: QuantumKVOpts['fetcher']; + private readonly onSet: QuantumKVOpts['onSet']; + private readonly onDelete: QuantumKVOpts['onDelete']; + + /** + * @param internalEventService Service bus to synchronize events. + * @param name Unique name of the cache - must be the same in all processes. + * @param opts Cache options + */ + constructor( + private readonly internalEventService: InternalEventService, + private readonly name: string, + opts: QuantumKVOpts, + ) { + this.memoryCache = new MemoryKVCache(opts.lifetime); + this.fetcher = opts.fetcher; + this.onSet = opts.onSet; + this.onDelete = opts.onDelete; + + this.internalEventService.on('quantumCacheUpdated', this.onQuantumCacheUpdated, { + // Ignore our own events, otherwise we'll immediately erase any set value. + ignoreLocal: true, + }); + } + + /** + * The number of items currently in memory. + * This applies to the local subset view, not the cross-cluster cache state. + */ + public get size() { + return this.memoryCache.size; + } + + /** + * Iterates all [key, value] pairs in memory. + * This applies to the local subset view, not the cross-cluster cache state. + */ + @bindThis + public *entries(): Generator<[key: string, value: T]> { + for (const entry of this.memoryCache.entries) { + yield [entry[0], entry[1].value]; + } + } + + /** + * Iterates all keys in memory. + * This applies to the local subset view, not the cross-cluster cache state. + */ + @bindThis + public *keys() { + for (const entry of this.memoryCache.entries) { + yield entry[0]; + } + } + + /** + * Iterates all values pairs in memory. + * This applies to the local subset view, not the cross-cluster cache state. + */ + @bindThis + public *values() { + for (const entry of this.memoryCache.entries) { + yield entry[1].value; + } + } + + /** + * Creates or updates a value in the cache, and erases any stale caches across the cluster. + * Fires an onSet event after the cache has been updated in all processes. + * Skips if the value is unchanged. + */ + @bindThis + public async set(key: string, value: T): Promise { + if (this.memoryCache.get(key) === value) { + return; + } + + this.memoryCache.set(key, value); + + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', key }); + + if (this.onSet) { + await this.onSet(key, this); + } + } + + /** + * Gets or fetches a value from the cache. + * Fires an onSet event, but does not emit an update event to other processes. + */ + @bindThis + public async get(key: string): Promise { + let value = this.memoryCache.get(key); + if (value === undefined) { + value = await this.fetcher(key, this); + this.memoryCache.set(key, value); + + if (this.onSet) { + await this.onSet(key, this); + } + } + return value; + } + + /** + * Alias to get(), included for backwards-compatibility with RedisKVCache. + * @deprecated use get() instead + */ + @bindThis + public async fetch(key: string): Promise { + return await this.get(key); + } + + /** + * Returns true is a key exists in memory. + * This applies to the local subset view, not the cross-cluster cache state. + */ + @bindThis + public has(key: string): boolean { + return this.memoryCache.get(key) !== undefined; + } + + /** + * Deletes a value from the cache, and erases any stale caches across the cluster. + * Fires an onDelete event after the cache has been updated in all processes. + */ + @bindThis + public async delete(key: string): Promise { + this.memoryCache.delete(key); + + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', key }); + + if (this.onDelete) { + await this.onDelete(key, this); + } + } + + /** + * Refreshes the value of a key from the fetcher, and erases any stale caches across the cluster. + * Fires an onSet event after the cache has been updated in all processes. + */ + @bindThis + public async refresh(key: string): Promise { + const value = await this.fetcher(key, this); + await this.set(key, value); + return value; + } + + /** + * Erases all entries from the local memory cache. + * Does not send any events or update other processes. + */ + @bindThis + public gc() { + this.memoryCache.gc(); + } + + /** + * Erases all data and disconnects from the cluster. + * This *must* be called when shutting down to prevent memory leaks! + */ + @bindThis + public dispose() { + this.internalEventService.off('quantumCacheUpdated', this.onQuantumCacheUpdated); + + this.memoryCache.dispose(); + } + + @bindThis + private async onQuantumCacheUpdated(data: InternalEventTypes['quantumCacheUpdated']): Promise { + if (data.name === this.name) { + this.memoryCache.delete(data.key); + + if (data.op === 's' && this.onSet) { + await this.onSet(data.key, this); + } + + if (data.op === 'd' && this.onDelete) { + await this.onDelete(data.key, this); + } + } + } + + /** + * Iterates all [key, value] pairs in memory. + * This applies to the local subset view, not the cross-cluster cache state. + */ + [Symbol.iterator](): Iterator<[key: string, value: T]> { + return this.entries(); + } +} diff --git a/packages/backend/test/unit/misc/cache.ts b/packages/backend/test/unit/misc/cache.ts new file mode 100644 index 0000000000..5b242c47d4 --- /dev/null +++ b/packages/backend/test/unit/misc/cache.ts @@ -0,0 +1,431 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { jest } from '@jest/globals'; +import { FakeInternalEventService } from '../../misc/FakeInternalEventService.js'; +import { QuantumKVCache, QuantumKVOpts } from '@/misc/cache.js'; + +describe(QuantumKVCache, () => { + let fakeInternalEventService: FakeInternalEventService; + let madeCaches: { dispose: () => void }[]; + + function makeCache(opts?: Partial> & { name?: string }): QuantumKVCache { + const _opts = { + name: 'test', + lifetime: Infinity, + fetcher: () => { throw new Error('not implemented'); }, + } satisfies QuantumKVOpts & { name: string }; + + if (opts) { + Object.assign(_opts, opts); + } + + const cache = new QuantumKVCache(fakeInternalEventService, _opts.name, _opts); + madeCaches.push(cache); + return cache; + } + + beforeEach(() => { + madeCaches = []; + fakeInternalEventService = new FakeInternalEventService(); + }); + + afterEach(() => { + madeCaches.forEach(cache => { + cache.dispose(); + }); + }); + + it('should connect on construct', () => { + makeCache(); + + expect(fakeInternalEventService._calls).toContainEqual(['on', ['quantumCacheUpdated', expect.anything(), { ignoreLocal: true }]]); + }); + + it('should disconnect on dispose', () => { + const cache = makeCache(); + + cache.dispose(); + + const callback = fakeInternalEventService._calls + .find(c => c[0] === 'on' && c[1][0] === 'quantumCacheUpdated') + ?.[1][1]; + expect(fakeInternalEventService._calls).toContainEqual(['off', ['quantumCacheUpdated', callback]]); + }); + + it('should store in memory cache', async () => { + const cache = makeCache(); + + await cache.set('foo', 'bar'); + await cache.set('alpha', 'omega'); + + const result1 = await cache.get('foo'); + const result2 = await cache.get('alpha'); + + expect(result1).toBe('bar'); + expect(result2).toBe('omega'); + }); + + it('should emit event when storing', async () => { + const cache = makeCache({ name: 'fake' }); + + await cache.set('foo', 'bar'); + + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', key: 'foo' }]]); + }); + + it('should call onSet when storing', async () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onSet: fakeOnSet, + }); + + await cache.set('foo', 'bar'); + + expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); + }); + + it('should not emit event when storing unchanged value', async () => { + const cache = makeCache({ name: 'fake' }); + + await cache.set('foo', 'bar'); + await cache.set('foo', 'bar'); + + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); + }); + + it('should not call onSet when storing unchanged value', async () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onSet: fakeOnSet, + }); + + await cache.set('foo', 'bar'); + await cache.set('foo', 'bar'); + + expect(fakeOnSet).toHaveBeenCalledTimes(1); + }); + + it('should fetch when getting an unknown value', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + const result = await cache.get('foo'); + + expect(result).toBe('value#foo'); + }); + + it('should store fetched value in memory cache', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + await cache.get('foo'); + + const result = cache.has('foo'); + expect(result).toBe(true); + }); + + it('should call onSet when fetching', async () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + onSet: fakeOnSet, + }); + + await cache.get('foo'); + + expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); + }); + + it('should not emit event when fetching', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + await cache.get('foo'); + + expect(fakeInternalEventService._calls).not.toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', key: 'foo' }]]); + }); + + it('should delete from memory cache', async () => { + const cache = makeCache(); + + await cache.set('foo', 'bar'); + await cache.delete('foo'); + + const result = cache.has('foo'); + expect(result).toBe(false); + }); + + it('should call onDelete when deleting', async () => { + const fakeOnDelete = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onDelete: fakeOnDelete, + }); + + await cache.set('foo', 'bar'); + await cache.delete('foo'); + + expect(fakeOnDelete).toHaveBeenCalledWith('foo', cache); + }); + + it('should emit event when deleting', async () => { + const cache = makeCache({ name: 'fake' }); + + await cache.set('foo', 'bar'); + await cache.delete('foo'); + + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 'd', key: 'foo' }]]); + }); + + it('should delete when receiving set event', async () => { + const cache = makeCache({ name: 'fake' }); + await cache.set('foo', 'bar'); + + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 's', key: 'foo' }); + + const result = cache.has('foo'); + expect(result).toBe(false); + }); + + it('should call onSet when receiving set event', async () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onSet: fakeOnSet, + }); + + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 's', key: 'foo' }); + + expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); + }); + + it('should delete when receiving delete event', async () => { + const cache = makeCache({ name: 'fake' }); + await cache.set('foo', 'bar'); + + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 'd', key: 'foo' }); + + const result = cache.has('foo'); + expect(result).toBe(false); + }); + + it('should call onDelete when receiving delete event', async () => { + const fakeOnDelete = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onDelete: fakeOnDelete, + }); + await cache.set('foo', 'bar'); + + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 'd', key: 'foo' }); + + expect(fakeOnDelete).toHaveBeenCalledWith('foo', cache); + }); + + describe('fetch', () => { + it('should perform same logic as get', async () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + onSet: fakeOnSet, + }); + + // noinspection JSDeprecatedSymbols + const result = await cache.fetch('foo'); + + expect(result).toBe('value#foo'); + expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); + expect(fakeInternalEventService._calls).not.toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', key: 'foo' }]]); + }); + }); + + describe('refresh', () => { + it('should populate the value', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + await cache.refresh('foo'); + + const result = cache.has('foo'); + expect(result).toBe(true); + }); + + it('should return the value', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + const result = await cache.refresh('foo'); + + expect(result).toBe('value#foo'); + }); + + it('should replace the value if it exists', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + await cache.set('foo', 'bar'); + const result = await cache.refresh('foo'); + + expect(result).toBe('value#foo'); + }); + + it('should call onSet', async () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + onSet: fakeOnSet, + }); + + await cache.refresh('foo') + + expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); + }); + + it('should emit event', async () => { + const cache = makeCache({ + name: 'fake', + fetcher: key => `value#${key}`, + }); + + await cache.refresh('foo'); + + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', key: 'foo' }]]); + }); + }); + + describe('has', () => { + it('should return false when empty', () => { + const cache = makeCache(); + const result = cache.has('foo'); + expect(result).toBe(false); + }); + + it('should return false when value is not in memory', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = cache.has('alpha'); + + expect(result).toBe(false); + }); + + it('should return true when value is in memory', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = cache.has('foo'); + + expect(result).toBe(true); + }); + }); + + describe('size', () => { + it('should return 0 when empty', () => { + const cache = makeCache(); + expect(cache.size).toBe(0); + }); + + it('should return correct size when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + expect(cache.size).toBe(1); + }); + }); + + describe('entries', () => { + it('should return empty when empty', () => { + const cache = makeCache(); + + const result = Array.from(cache.entries()); + + expect(result).toHaveLength(0); + }); + + it('should return all entries when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = Array.from(cache.entries()); + + expect(result).toEqual([['foo', 'bar']]); + }); + }); + + describe('keys', () => { + it('should return empty when empty', () => { + const cache = makeCache(); + + const result = Array.from(cache.keys()); + + expect(result).toHaveLength(0); + }); + + it('should return all keys when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = Array.from(cache.keys()); + + expect(result).toEqual(['foo']); + }); + }); + + describe('values', () => { + it('should return empty when empty', () => { + const cache = makeCache(); + + const result = Array.from(cache.values()); + + expect(result).toHaveLength(0); + }); + + it('should return all values when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = Array.from(cache.values()); + + expect(result).toEqual(['bar']); + }); + }); + + describe('[Symbol.iterator]', () => { + it('should return empty when empty', () => { + const cache = makeCache(); + + const result = Array.from(cache); + + expect(result).toHaveLength(0); + }); + + it('should return all entries when populated', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = Array.from(cache); + + expect(result).toEqual([['foo', 'bar']]); + }); + }); +});