From aa7cadbb6c5068e231638ff25922c439ff949783 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Wed, 21 May 2025 21:51:36 -0400 Subject: [PATCH] implement InternalEventService --- packages/backend/src/core/CoreModule.ts | 6 ++ .../backend/src/core/GlobalEventService.ts | 9 +- .../backend/src/core/InternalEventService.ts | 102 ++++++++++++++++++ .../test/misc/FakeInternalEventService.ts | 92 ++++++++++++++++ 4 files changed, 207 insertions(+), 2 deletions(-) create mode 100644 packages/backend/src/core/InternalEventService.ts create mode 100644 packages/backend/test/misc/FakeInternalEventService.ts diff --git a/packages/backend/src/core/CoreModule.ts b/packages/backend/src/core/CoreModule.ts index dd8e61d322..6839ba0159 100644 --- a/packages/backend/src/core/CoreModule.ts +++ b/packages/backend/src/core/CoreModule.ts @@ -41,6 +41,7 @@ import { HttpRequestService } from './HttpRequestService.js'; import { IdService } from './IdService.js'; import { ImageProcessingService } from './ImageProcessingService.js'; import { SystemAccountService } from './SystemAccountService.js'; +import { InternalEventService } from './InternalEventService.js'; import { InternalStorageService } from './InternalStorageService.js'; import { MetaService } from './MetaService.js'; import { MfmService } from './MfmService.js'; @@ -186,6 +187,7 @@ const $HashtagService: Provider = { provide: 'HashtagService', useExisting: Hash const $HttpRequestService: Provider = { provide: 'HttpRequestService', useExisting: HttpRequestService }; const $IdService: Provider = { provide: 'IdService', useExisting: IdService }; const $ImageProcessingService: Provider = { provide: 'ImageProcessingService', useExisting: ImageProcessingService }; +const $InternalEventService: Provider = { provide: 'InternalEventService', useExisting: InternalEventService }; const $InternalStorageService: Provider = { provide: 'InternalStorageService', useExisting: InternalStorageService }; const $MetaService: Provider = { provide: 'MetaService', useExisting: MetaService }; const $MfmService: Provider = { provide: 'MfmService', useExisting: MfmService }; @@ -345,6 +347,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp HttpRequestService, IdService, ImageProcessingService, + InternalEventService, InternalStorageService, MetaService, MfmService, @@ -500,6 +503,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp $HttpRequestService, $IdService, $ImageProcessingService, + $InternalEventService, $InternalStorageService, $MetaService, $MfmService, @@ -656,6 +660,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp HttpRequestService, IdService, ImageProcessingService, + InternalEventService, InternalStorageService, MetaService, MfmService, @@ -810,6 +815,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp $HttpRequestService, $IdService, $ImageProcessingService, + $InternalEventService, $InternalStorageService, $MetaService, $MfmService, diff --git a/packages/backend/src/core/GlobalEventService.ts b/packages/backend/src/core/GlobalEventService.ts index c0027ae129..d1a5cabd85 100644 --- a/packages/backend/src/core/GlobalEventService.ts +++ b/packages/backend/src/core/GlobalEventService.ts @@ -353,12 +353,12 @@ export class GlobalEventService { } @bindThis - private publish(channel: StreamChannels, type: string | null, value?: any): void { + private async publish(channel: StreamChannels, type: string | null, value?: any): Promise { const message = type == null ? value : value == null ? { type: type, body: null } : { type: type, body: value }; - this.redisForPub.publish(this.config.host, JSON.stringify({ + await this.redisForPub.publish(this.config.host, JSON.stringify({ channel: channel, message: message, })); @@ -369,6 +369,11 @@ export class GlobalEventService { this.publish('internal', type, typeof value === 'undefined' ? null : value); } + @bindThis + public async publishInternalEventAsync(type: K, value?: InternalEventTypes[K]): Promise { + await this.publish('internal', type, typeof value === 'undefined' ? null : value); + } + @bindThis public publishBroadcastStream(type: K, value?: BroadcastTypes[K]): void { this.publish('broadcast', type, typeof value === 'undefined' ? null : value); diff --git a/packages/backend/src/core/InternalEventService.ts b/packages/backend/src/core/InternalEventService.ts new file mode 100644 index 0000000000..375ee928c4 --- /dev/null +++ b/packages/backend/src/core/InternalEventService.ts @@ -0,0 +1,102 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; +import Redis from 'ioredis'; +import { DI } from '@/di-symbols.js'; +import { GlobalEventService } from '@/core/GlobalEventService.js'; +import type { GlobalEvents, InternalEventTypes } from '@/core/GlobalEventService.js'; +import { bindThis } from '@/decorators.js'; + +export type Listener = (value: InternalEventTypes[K], key: K) => void | Promise; + +export interface ListenerProps { + ignoreLocal?: boolean, +} + +@Injectable() +export class InternalEventService implements OnApplicationShutdown { + private readonly listeners = new Map, ListenerProps>>(); + + constructor( + @Inject(DI.redisForSub) + private readonly redisForSub: Redis.Redis, + + private readonly globalEventService: GlobalEventService, + ) { + this.redisForSub.on('message', this.onMessage); + } + + @bindThis + public on(type: K, listener: Listener, props?: ListenerProps): void { + let set = this.listeners.get(type); + if (!set) { + set = new Map(); + this.listeners.set(type, set); + } + + // Functionally, this is just a set with metadata on the values. + set.set(listener as Listener, props ?? {}); + } + + @bindThis + public off(type: K, listener: Listener): void { + this.listeners.get(type)?.delete(listener as Listener); + } + + @bindThis + public async emit(type: K, value: InternalEventTypes[K]): Promise { + await this.emitInternal(type, value, true); + await this.globalEventService.publishInternalEventAsync(type, { ...value, _pid: process.pid }); + } + + @bindThis + private async emitInternal(type: K, value: InternalEventTypes[K], isLocal: boolean): Promise { + const listeners = this.listeners.get(type); + if (!listeners) { + return; + } + + const promises: Promise[] = []; + for (const [listener, props] of listeners) { + if (!isLocal || !props.ignoreLocal) { + const promise = Promise.resolve(listener(value, type)); + promises.push(promise); + } + } + await Promise.all(promises); + } + + @bindThis + private async onMessage(_: string, data: string): Promise { + const obj = JSON.parse(data); + + if (obj.channel === 'internal') { + const { type, body } = obj.message as GlobalEvents['internal']['payload']; + if (!isLocalInternalEvent(body) || body._pid !== process.pid) { + await this.emitInternal(type, body as InternalEventTypes[keyof InternalEventTypes], false); + } + } + } + + @bindThis + public dispose(): void { + this.redisForSub.off('message', this.onMessage); + this.listeners.clear(); + } + + @bindThis + public onApplicationShutdown(): void { + this.dispose(); + } +} + +interface LocalInternalEvent { + _pid: number; +} + +function isLocalInternalEvent(body: object): body is LocalInternalEvent { + return '_pid' in body && typeof(body._pid) === 'number'; +} diff --git a/packages/backend/test/misc/FakeInternalEventService.ts b/packages/backend/test/misc/FakeInternalEventService.ts new file mode 100644 index 0000000000..ffe8b81d78 --- /dev/null +++ b/packages/backend/test/misc/FakeInternalEventService.ts @@ -0,0 +1,92 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import type { Listener, ListenerProps } from '@/core/InternalEventService.js'; +import type Redis from 'ioredis'; +import type { GlobalEventService, InternalEventTypes } from '@/core/GlobalEventService.js'; +import { InternalEventService } from '@/core/InternalEventService.js'; +import { bindThis } from '@/decorators.js'; + +type FakeCall = [K, Parameters]; +type FakeListener = [K, Listener, ListenerProps]; + +/** + * Minimal implementation of InternalEventService meant for use in unit tests. + * There is no redis connection, and metadata is tracked in the public _calls and _listeners arrays. + * The on/off/emit methods are fully functional and can be called in tests to invoke any registered listeners. + */ +export class FakeInternalEventService extends InternalEventService { + /** + * List of calls to public methods, in chronological order. + */ + public _calls: FakeCall[] = []; + + /** + * List of currently registered listeners. + */ + public _listeners: FakeListener[] = []; + + /** + * Resets the mock. + * Clears all listeners and tracked calls. + */ + public _reset() { + this._calls = []; + this._listeners = []; + } + + /** + * Simulates a remote event sent from another process in the cluster via redis. + */ + @bindThis + public async _emitRedis(type: K, value: InternalEventTypes[K]): Promise { + await this.emit(type, value, false); + } + + constructor() { + super( + { on: () => {} } as unknown as Redis.Redis, + {} as unknown as GlobalEventService, + ); + } + + @bindThis + public on(type: K, listener: Listener, props?: ListenerProps): void { + if (!this._listeners.some(l => l[0] === type && l[1] === listener)) { + this._listeners.push([type, listener as Listener, props ?? {}]); + } + this._calls.push(['on', [type, listener as Listener, props]]); + } + + @bindThis + public off(type: K, listener: Listener): void { + this._listeners = this._listeners.filter(l => l[0] !== type || l[1] !== listener); + this._calls.push(['off', [type, listener as Listener]]); + } + + @bindThis + public async emit(type: K, value: InternalEventTypes[K], isLocal = true): Promise { + for (const listener of this._listeners) { + if (listener[0] === type) { + if (!isLocal || !listener[2].ignoreLocal) { + await listener[1](value, type); + } + } + } + this._calls.push(['emit', [type, value]]); + } + + @bindThis + public dispose(): void { + this._listeners = []; + this._calls.push(['dispose', []]); + } + + @bindThis + public onApplicationShutdown(): void { + this._calls.push(['onApplicationShutdown', []]); + } +} +