From 207abaff889291b3878984f81cfe37e9fc465133 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Thu, 5 Jun 2025 14:28:19 -0400 Subject: [PATCH] implement QuantumKVCache.setMany and QuantumKVCache.seleteMany --- .../src/core/ChannelFollowingService.ts | 1 - .../backend/src/core/GlobalEventService.ts | 2 +- packages/backend/src/core/UserListService.ts | 8 +- .../backend/src/core/UserMutingService.ts | 4 +- .../src/core/UserRenoteMutingService.ts | 4 +- packages/backend/src/misc/cache.ts | 87 ++++++++--- packages/backend/test/unit/misc/cache.ts | 146 +++++++++++++++--- 7 files changed, 201 insertions(+), 51 deletions(-) diff --git a/packages/backend/src/core/ChannelFollowingService.ts b/packages/backend/src/core/ChannelFollowingService.ts index 869456998b..26b023179c 100644 --- a/packages/backend/src/core/ChannelFollowingService.ts +++ b/packages/backend/src/core/ChannelFollowingService.ts @@ -17,7 +17,6 @@ import { InternalEventService } from './InternalEventService.js'; @Injectable() export class ChannelFollowingService implements OnModuleInit { - // TODO check for regs public userFollowingChannelsCache: QuantumKVCache>; constructor( diff --git a/packages/backend/src/core/GlobalEventService.ts b/packages/backend/src/core/GlobalEventService.ts index 763ab8c086..de35e9db19 100644 --- a/packages/backend/src/core/GlobalEventService.ts +++ b/packages/backend/src/core/GlobalEventService.ts @@ -265,7 +265,7 @@ export interface InternalEventTypes { unmute: { muterId: MiUser['id']; muteeId: MiUser['id']; }; userListMemberAdded: { userListId: MiUserList['id']; memberId: MiUser['id']; }; userListMemberRemoved: { userListId: MiUserList['id']; memberId: MiUser['id']; }; - quantumCacheUpdated: { name: string, key: string, op: 's' | 'd' }; + quantumCacheUpdated: { name: string, keys: string[], op: 's' | 'd' }; } type EventTypesToEventPayload = EventUnionFromDictionary>>; diff --git a/packages/backend/src/core/UserListService.ts b/packages/backend/src/core/UserListService.ts index 0240184d13..0d2220049a 100644 --- a/packages/backend/src/core/UserListService.ts +++ b/packages/backend/src/core/UserListService.ts @@ -70,16 +70,16 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit { switch (type) { case 'userListMemberAdded': { const { userListId, memberId } = body; - if (this.membersCache.has(userListId)) { - const members = await this.membersCache.get(userListId); + const members = this.membersCache.get(userListId); + if (members) { members.add(memberId); } break; } case 'userListMemberRemoved': { const { userListId, memberId } = body; - if (this.membersCache.has(userListId)) { - const members = await this.membersCache.get(userListId); + const members = this.membersCache.get(userListId); + if (members) { members.delete(memberId); } break; diff --git a/packages/backend/src/core/UserMutingService.ts b/packages/backend/src/core/UserMutingService.ts index 4f72c1863b..c15a979d0f 100644 --- a/packages/backend/src/core/UserMutingService.ts +++ b/packages/backend/src/core/UserMutingService.ts @@ -43,8 +43,6 @@ export class UserMutingService { id: In(mutings.map(m => m.id)), }); - await Promise.all(Array - .from(new Set(mutings.map(m => m.muterId))) - .map(muterId => this.cacheService.userMutingsCache.delete(muterId))); + await this.cacheService.userMutingsCache.deleteMany(mutings.map(m => m.muterId)); } } diff --git a/packages/backend/src/core/UserRenoteMutingService.ts b/packages/backend/src/core/UserRenoteMutingService.ts index 9d5ec164c8..7c0693f216 100644 --- a/packages/backend/src/core/UserRenoteMutingService.ts +++ b/packages/backend/src/core/UserRenoteMutingService.ts @@ -44,8 +44,6 @@ export class UserRenoteMutingService { id: In(mutings.map(m => m.id)), }); - await Promise.all(Array - .from(new Set(mutings.map(m => m.muterId))) - .map(muterId => this.cacheService.renoteMutingsCache.delete(muterId))); + await this.cacheService.renoteMutingsCache.deleteMany(mutings.map(m => m.muterId)); } } diff --git a/packages/backend/src/misc/cache.ts b/packages/backend/src/misc/cache.ts index 31e6f126b8..22201e243f 100644 --- a/packages/backend/src/misc/cache.ts +++ b/packages/backend/src/misc/cache.ts @@ -531,19 +531,54 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { this.memoryCache.set(key, value); - await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', key }); + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: [key] }); if (this.onSet) { await this.onSet(key, this); } } + /** + * Creates or updates multiple value in the cache, and erases any stale caches across the cluster. + * Fires an onSet for each changed item event after the cache has been updated in all processes. + * Skips if all values are unchanged. + */ + @bindThis + public async setMany(items: Iterable<[key: string, value: T]>): Promise { + const changedKeys: string[] = []; + + for (const item of items) { + if (this.memoryCache.get(item[0]) !== item[1]) { + changedKeys.push(item[0]); + this.memoryCache.set(item[0], item[1]); + } + } + + if (changedKeys.length > 0) { + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 's', keys: changedKeys }); + + if (this.onSet) { + for (const key of changedKeys) { + await this.onSet(key, this); + } + } + } + } + + /** + * Gets a value from the local memory cache, or returns undefined if not found. + */ + @bindThis + public get(key: string): T | undefined { + return this.memoryCache.get(key); + } + /** * Gets or fetches a value from the cache. * Fires an onSet event, but does not emit an update event to other processes. */ @bindThis - public async get(key: string): Promise { + public async fetch(key: string): Promise { let value = this.memoryCache.get(key); if (value === undefined) { value = await this.fetcher(key, this); @@ -556,15 +591,6 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { return value; } - /** - * Alias to get(), included for backwards-compatibility with RedisKVCache. - * @deprecated use get() instead - */ - @bindThis - public async fetch(key: string): Promise { - return await this.get(key); - } - /** * Returns true is a key exists in memory. * This applies to the local subset view, not the cross-cluster cache state. @@ -582,12 +608,35 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { public async delete(key: string): Promise { this.memoryCache.delete(key); - await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', key }); + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys: [key] }); if (this.onDelete) { await this.onDelete(key, this); } } + /** + * Deletes multiple values from the cache, and erases any stale caches across the cluster. + * Fires an onDelete event for each key after the cache has been updated in all processes. + * Skips if the input is empty. + */ + @bindThis + public async deleteMany(keys: string[]): Promise { + if (keys.length === 0) { + return; + } + + for (const key of keys) { + this.memoryCache.delete(key); + } + + await this.internalEventService.emit('quantumCacheUpdated', { name: this.name, op: 'd', keys }); + + if (this.onDelete) { + for (const key of keys) { + await this.onDelete(key, this); + } + } + } /** * Refreshes the value of a key from the fetcher, and erases any stale caches across the cluster. @@ -623,14 +672,16 @@ export class QuantumKVCache implements Iterable<[key: string, value: T]> { @bindThis private async onQuantumCacheUpdated(data: InternalEventTypes['quantumCacheUpdated']): Promise { if (data.name === this.name) { - this.memoryCache.delete(data.key); + for (const key of data.keys) { + this.memoryCache.delete(key); - if (data.op === 's' && this.onSet) { - await this.onSet(data.key, this); - } + if (data.op === 's' && this.onSet) { + await this.onSet(key, this); + } - if (data.op === 'd' && this.onDelete) { - await this.onDelete(data.key, this); + if (data.op === 'd' && this.onDelete) { + await this.onDelete(key, this); + } } } } diff --git a/packages/backend/test/unit/misc/cache.ts b/packages/backend/test/unit/misc/cache.ts index 5b242c47d4..0b658618e6 100644 --- a/packages/backend/test/unit/misc/cache.ts +++ b/packages/backend/test/unit/misc/cache.ts @@ -73,7 +73,7 @@ describe(QuantumKVCache, () => { await cache.set('foo', 'bar'); - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', key: 'foo' }]]); + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }]]); }); it('should call onSet when storing', async () => { @@ -110,13 +110,13 @@ describe(QuantumKVCache, () => { expect(fakeOnSet).toHaveBeenCalledTimes(1); }); - it('should fetch when getting an unknown value', async () => { + it('should fetch an unknown value', async () => { const cache = makeCache({ name: 'fake', fetcher: key => `value#${key}`, }); - const result = await cache.get('foo'); + const result = await cache.fetch('foo'); expect(result).toBe('value#foo'); }); @@ -127,7 +127,7 @@ describe(QuantumKVCache, () => { fetcher: key => `value#${key}`, }); - await cache.get('foo'); + await cache.fetch('foo'); const result = cache.has('foo'); expect(result).toBe(true); @@ -141,7 +141,7 @@ describe(QuantumKVCache, () => { onSet: fakeOnSet, }); - await cache.get('foo'); + await cache.fetch('foo'); expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); }); @@ -152,9 +152,9 @@ describe(QuantumKVCache, () => { fetcher: key => `value#${key}`, }); - await cache.get('foo'); + await cache.fetch('foo'); - expect(fakeInternalEventService._calls).not.toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', key: 'foo' }]]); + expect(fakeInternalEventService._calls).not.toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }]]); }); it('should delete from memory cache', async () => { @@ -186,14 +186,14 @@ describe(QuantumKVCache, () => { await cache.set('foo', 'bar'); await cache.delete('foo'); - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 'd', key: 'foo' }]]); + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo'] }]]); }); it('should delete when receiving set event', async () => { const cache = makeCache({ name: 'fake' }); await cache.set('foo', 'bar'); - await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 's', key: 'foo' }); + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }); const result = cache.has('foo'); expect(result).toBe(false); @@ -206,7 +206,7 @@ describe(QuantumKVCache, () => { onSet: fakeOnSet, }); - await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 's', key: 'foo' }); + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }); expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); }); @@ -215,7 +215,7 @@ describe(QuantumKVCache, () => { const cache = makeCache({ name: 'fake' }); await cache.set('foo', 'bar'); - await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 'd', key: 'foo' }); + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo'] }); const result = cache.has('foo'); expect(result).toBe(false); @@ -229,26 +229,130 @@ describe(QuantumKVCache, () => { }); await cache.set('foo', 'bar'); - await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 'd', key: 'foo' }); + await fakeInternalEventService._emitRedis('quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo'] }); expect(fakeOnDelete).toHaveBeenCalledWith('foo', cache); }); - describe('fetch', () => { - it('should perform same logic as get', async () => { + describe('get', () => { + it('should return value if present', async () => { + const cache = makeCache(); + await cache.set('foo', 'bar'); + + const result = cache.get('foo'); + + expect(result).toBe('bar'); + }); + it('should return undefined if missing', () => { + const cache = makeCache(); + + const result = cache.get('foo'); + + expect(result).toBe(undefined); + }); + }); + + describe('setMany', () => { + it('should populate all values', async () => { + const cache = makeCache(); + + await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(cache.has('foo')).toBe(true); + expect(cache.has('alpha')).toBe(true); + }); + + it('should emit one event', async () => { + const cache = makeCache({ + name: 'fake', + }); + + await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo', 'alpha'] }]]); + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); + }); + + it('should call onSet for each item', async () => { const fakeOnSet = jest.fn(() => Promise.resolve()); const cache = makeCache({ name: 'fake', - fetcher: key => `value#${key}`, onSet: fakeOnSet, }); - // noinspection JSDeprecatedSymbols - const result = await cache.fetch('foo'); + await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); - expect(result).toBe('value#foo'); expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); - expect(fakeInternalEventService._calls).not.toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', key: 'foo' }]]); + expect(fakeOnSet).toHaveBeenCalledWith('alpha', cache); + }); + + it('should emit events only for changed items', async () => { + const fakeOnSet = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onSet: fakeOnSet, + }); + + await cache.set('foo', 'bar'); + fakeOnSet.mockClear(); + fakeInternalEventService._reset(); + + await cache.setMany([['foo', 'bar'], ['alpha', 'omega']]); + + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['alpha'] }]]); + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); + expect(fakeOnSet).toHaveBeenCalledWith('alpha', cache); + expect(fakeOnSet).toHaveBeenCalledTimes(1); + }); + }); + + describe('deleteMany', () => { + it('should remove keys from memory cache', async () => { + const cache = makeCache(); + + await cache.set('foo', 'bar'); + await cache.set('alpha', 'omega'); + await cache.deleteMany(['foo', 'alpha']); + + expect(cache.has('foo')).toBe(false); + expect(cache.has('alpha')).toBe(false); + }); + + it('should emit only one event', async () => { + const cache = makeCache({ + name: 'fake', + }); + + await cache.deleteMany(['foo', 'alpha']); + + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 'd', keys: ['foo', 'alpha'] }]]); + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(1); + }); + + it('should call onDelete for each key', async () => { + const fakeOnDelete = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onDelete: fakeOnDelete, + }); + + await cache.deleteMany(['foo', 'alpha']); + + expect(fakeOnDelete).toHaveBeenCalledWith('foo', cache); + expect(fakeOnDelete).toHaveBeenCalledWith('alpha', cache); + }); + + it('should do nothing if no keys are provided', async () => { + const fakeOnDelete = jest.fn(() => Promise.resolve()); + const cache = makeCache({ + name: 'fake', + onDelete: fakeOnDelete, + }); + + await cache.deleteMany([]); + + expect(fakeOnDelete).not.toHaveBeenCalled(); + expect(fakeInternalEventService._calls.filter(c => c[0] === 'emit')).toHaveLength(0); }); }); @@ -296,7 +400,7 @@ describe(QuantumKVCache, () => { onSet: fakeOnSet, }); - await cache.refresh('foo') + await cache.refresh('foo'); expect(fakeOnSet).toHaveBeenCalledWith('foo', cache); }); @@ -309,7 +413,7 @@ describe(QuantumKVCache, () => { await cache.refresh('foo'); - expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', key: 'foo' }]]); + expect(fakeInternalEventService._calls).toContainEqual(['emit', ['quantumCacheUpdated', { name: 'fake', op: 's', keys: ['foo'] }]]); }); });