mirror of
https://codeberg.org/yeentown/barkey.git
synced 2025-08-21 10:33:37 +00:00
implement InternalEventService
This commit is contained in:
parent
00c0bdbc94
commit
aa7cadbb6c
4 changed files with 207 additions and 2 deletions
|
@ -41,6 +41,7 @@ import { HttpRequestService } from './HttpRequestService.js';
|
||||||
import { IdService } from './IdService.js';
|
import { IdService } from './IdService.js';
|
||||||
import { ImageProcessingService } from './ImageProcessingService.js';
|
import { ImageProcessingService } from './ImageProcessingService.js';
|
||||||
import { SystemAccountService } from './SystemAccountService.js';
|
import { SystemAccountService } from './SystemAccountService.js';
|
||||||
|
import { InternalEventService } from './InternalEventService.js';
|
||||||
import { InternalStorageService } from './InternalStorageService.js';
|
import { InternalStorageService } from './InternalStorageService.js';
|
||||||
import { MetaService } from './MetaService.js';
|
import { MetaService } from './MetaService.js';
|
||||||
import { MfmService } from './MfmService.js';
|
import { MfmService } from './MfmService.js';
|
||||||
|
@ -186,6 +187,7 @@ const $HashtagService: Provider = { provide: 'HashtagService', useExisting: Hash
|
||||||
const $HttpRequestService: Provider = { provide: 'HttpRequestService', useExisting: HttpRequestService };
|
const $HttpRequestService: Provider = { provide: 'HttpRequestService', useExisting: HttpRequestService };
|
||||||
const $IdService: Provider = { provide: 'IdService', useExisting: IdService };
|
const $IdService: Provider = { provide: 'IdService', useExisting: IdService };
|
||||||
const $ImageProcessingService: Provider = { provide: 'ImageProcessingService', useExisting: ImageProcessingService };
|
const $ImageProcessingService: Provider = { provide: 'ImageProcessingService', useExisting: ImageProcessingService };
|
||||||
|
const $InternalEventService: Provider = { provide: 'InternalEventService', useExisting: InternalEventService };
|
||||||
const $InternalStorageService: Provider = { provide: 'InternalStorageService', useExisting: InternalStorageService };
|
const $InternalStorageService: Provider = { provide: 'InternalStorageService', useExisting: InternalStorageService };
|
||||||
const $MetaService: Provider = { provide: 'MetaService', useExisting: MetaService };
|
const $MetaService: Provider = { provide: 'MetaService', useExisting: MetaService };
|
||||||
const $MfmService: Provider = { provide: 'MfmService', useExisting: MfmService };
|
const $MfmService: Provider = { provide: 'MfmService', useExisting: MfmService };
|
||||||
|
@ -345,6 +347,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp
|
||||||
HttpRequestService,
|
HttpRequestService,
|
||||||
IdService,
|
IdService,
|
||||||
ImageProcessingService,
|
ImageProcessingService,
|
||||||
|
InternalEventService,
|
||||||
InternalStorageService,
|
InternalStorageService,
|
||||||
MetaService,
|
MetaService,
|
||||||
MfmService,
|
MfmService,
|
||||||
|
@ -500,6 +503,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp
|
||||||
$HttpRequestService,
|
$HttpRequestService,
|
||||||
$IdService,
|
$IdService,
|
||||||
$ImageProcessingService,
|
$ImageProcessingService,
|
||||||
|
$InternalEventService,
|
||||||
$InternalStorageService,
|
$InternalStorageService,
|
||||||
$MetaService,
|
$MetaService,
|
||||||
$MfmService,
|
$MfmService,
|
||||||
|
@ -656,6 +660,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp
|
||||||
HttpRequestService,
|
HttpRequestService,
|
||||||
IdService,
|
IdService,
|
||||||
ImageProcessingService,
|
ImageProcessingService,
|
||||||
|
InternalEventService,
|
||||||
InternalStorageService,
|
InternalStorageService,
|
||||||
MetaService,
|
MetaService,
|
||||||
MfmService,
|
MfmService,
|
||||||
|
@ -810,6 +815,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp
|
||||||
$HttpRequestService,
|
$HttpRequestService,
|
||||||
$IdService,
|
$IdService,
|
||||||
$ImageProcessingService,
|
$ImageProcessingService,
|
||||||
|
$InternalEventService,
|
||||||
$InternalStorageService,
|
$InternalStorageService,
|
||||||
$MetaService,
|
$MetaService,
|
||||||
$MfmService,
|
$MfmService,
|
||||||
|
|
|
@ -353,12 +353,12 @@ export class GlobalEventService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
private publish(channel: StreamChannels, type: string | null, value?: any): void {
|
private async publish(channel: StreamChannels, type: string | null, value?: any): Promise<void> {
|
||||||
const message = type == null ? value : value == null ?
|
const message = type == null ? value : value == null ?
|
||||||
{ type: type, body: null } :
|
{ type: type, body: null } :
|
||||||
{ type: type, body: value };
|
{ type: type, body: value };
|
||||||
|
|
||||||
this.redisForPub.publish(this.config.host, JSON.stringify({
|
await this.redisForPub.publish(this.config.host, JSON.stringify({
|
||||||
channel: channel,
|
channel: channel,
|
||||||
message: message,
|
message: message,
|
||||||
}));
|
}));
|
||||||
|
@ -369,6 +369,11 @@ export class GlobalEventService {
|
||||||
this.publish('internal', type, typeof value === 'undefined' ? null : value);
|
this.publish('internal', type, typeof value === 'undefined' ? null : value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public async publishInternalEventAsync<K extends keyof InternalEventTypes>(type: K, value?: InternalEventTypes[K]): Promise<void> {
|
||||||
|
await this.publish('internal', type, typeof value === 'undefined' ? null : value);
|
||||||
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
public publishBroadcastStream<K extends keyof BroadcastTypes>(type: K, value?: BroadcastTypes[K]): void {
|
public publishBroadcastStream<K extends keyof BroadcastTypes>(type: K, value?: BroadcastTypes[K]): void {
|
||||||
this.publish('broadcast', type, typeof value === 'undefined' ? null : value);
|
this.publish('broadcast', type, typeof value === 'undefined' ? null : value);
|
||||||
|
|
102
packages/backend/src/core/InternalEventService.ts
Normal file
102
packages/backend/src/core/InternalEventService.ts
Normal file
|
@ -0,0 +1,102 @@
|
||||||
|
/*
|
||||||
|
* SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
|
||||||
|
import Redis from 'ioredis';
|
||||||
|
import { DI } from '@/di-symbols.js';
|
||||||
|
import { GlobalEventService } from '@/core/GlobalEventService.js';
|
||||||
|
import type { GlobalEvents, InternalEventTypes } from '@/core/GlobalEventService.js';
|
||||||
|
import { bindThis } from '@/decorators.js';
|
||||||
|
|
||||||
|
export type Listener<K extends keyof InternalEventTypes> = (value: InternalEventTypes[K], key: K) => void | Promise<void>;
|
||||||
|
|
||||||
|
export interface ListenerProps {
|
||||||
|
ignoreLocal?: boolean,
|
||||||
|
}
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class InternalEventService implements OnApplicationShutdown {
|
||||||
|
private readonly listeners = new Map<keyof InternalEventTypes, Map<Listener<keyof InternalEventTypes>, ListenerProps>>();
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
@Inject(DI.redisForSub)
|
||||||
|
private readonly redisForSub: Redis.Redis,
|
||||||
|
|
||||||
|
private readonly globalEventService: GlobalEventService,
|
||||||
|
) {
|
||||||
|
this.redisForSub.on('message', this.onMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public on<K extends keyof InternalEventTypes>(type: K, listener: Listener<K>, props?: ListenerProps): void {
|
||||||
|
let set = this.listeners.get(type);
|
||||||
|
if (!set) {
|
||||||
|
set = new Map();
|
||||||
|
this.listeners.set(type, set);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Functionally, this is just a set with metadata on the values.
|
||||||
|
set.set(listener as Listener<keyof InternalEventTypes>, props ?? {});
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public off<K extends keyof InternalEventTypes>(type: K, listener: Listener<K>): void {
|
||||||
|
this.listeners.get(type)?.delete(listener as Listener<keyof InternalEventTypes>);
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public async emit<K extends keyof InternalEventTypes>(type: K, value: InternalEventTypes[K]): Promise<void> {
|
||||||
|
await this.emitInternal(type, value, true);
|
||||||
|
await this.globalEventService.publishInternalEventAsync(type, { ...value, _pid: process.pid });
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
private async emitInternal<K extends keyof InternalEventTypes>(type: K, value: InternalEventTypes[K], isLocal: boolean): Promise<void> {
|
||||||
|
const listeners = this.listeners.get(type);
|
||||||
|
if (!listeners) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const promises: Promise<void>[] = [];
|
||||||
|
for (const [listener, props] of listeners) {
|
||||||
|
if (!isLocal || !props.ignoreLocal) {
|
||||||
|
const promise = Promise.resolve(listener(value, type));
|
||||||
|
promises.push(promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await Promise.all(promises);
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
private async onMessage(_: string, data: string): Promise<void> {
|
||||||
|
const obj = JSON.parse(data);
|
||||||
|
|
||||||
|
if (obj.channel === 'internal') {
|
||||||
|
const { type, body } = obj.message as GlobalEvents['internal']['payload'];
|
||||||
|
if (!isLocalInternalEvent(body) || body._pid !== process.pid) {
|
||||||
|
await this.emitInternal(type, body as InternalEventTypes[keyof InternalEventTypes], false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public dispose(): void {
|
||||||
|
this.redisForSub.off('message', this.onMessage);
|
||||||
|
this.listeners.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public onApplicationShutdown(): void {
|
||||||
|
this.dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface LocalInternalEvent {
|
||||||
|
_pid: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
function isLocalInternalEvent(body: object): body is LocalInternalEvent {
|
||||||
|
return '_pid' in body && typeof(body._pid) === 'number';
|
||||||
|
}
|
92
packages/backend/test/misc/FakeInternalEventService.ts
Normal file
92
packages/backend/test/misc/FakeInternalEventService.ts
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
/*
|
||||||
|
* SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { Listener, ListenerProps } from '@/core/InternalEventService.js';
|
||||||
|
import type Redis from 'ioredis';
|
||||||
|
import type { GlobalEventService, InternalEventTypes } from '@/core/GlobalEventService.js';
|
||||||
|
import { InternalEventService } from '@/core/InternalEventService.js';
|
||||||
|
import { bindThis } from '@/decorators.js';
|
||||||
|
|
||||||
|
type FakeCall<K extends keyof InternalEventService> = [K, Parameters<InternalEventService[K]>];
|
||||||
|
type FakeListener<K extends keyof InternalEventTypes> = [K, Listener<K>, ListenerProps];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Minimal implementation of InternalEventService meant for use in unit tests.
|
||||||
|
* There is no redis connection, and metadata is tracked in the public _calls and _listeners arrays.
|
||||||
|
* The on/off/emit methods are fully functional and can be called in tests to invoke any registered listeners.
|
||||||
|
*/
|
||||||
|
export class FakeInternalEventService extends InternalEventService {
|
||||||
|
/**
|
||||||
|
* List of calls to public methods, in chronological order.
|
||||||
|
*/
|
||||||
|
public _calls: FakeCall<keyof InternalEventService>[] = [];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List of currently registered listeners.
|
||||||
|
*/
|
||||||
|
public _listeners: FakeListener<keyof InternalEventTypes>[] = [];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resets the mock.
|
||||||
|
* Clears all listeners and tracked calls.
|
||||||
|
*/
|
||||||
|
public _reset() {
|
||||||
|
this._calls = [];
|
||||||
|
this._listeners = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulates a remote event sent from another process in the cluster via redis.
|
||||||
|
*/
|
||||||
|
@bindThis
|
||||||
|
public async _emitRedis<K extends keyof InternalEventTypes>(type: K, value: InternalEventTypes[K]): Promise<void> {
|
||||||
|
await this.emit(type, value, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
super(
|
||||||
|
{ on: () => {} } as unknown as Redis.Redis,
|
||||||
|
{} as unknown as GlobalEventService,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public on<K extends keyof InternalEventTypes>(type: K, listener: Listener<K>, props?: ListenerProps): void {
|
||||||
|
if (!this._listeners.some(l => l[0] === type && l[1] === listener)) {
|
||||||
|
this._listeners.push([type, listener as Listener<keyof InternalEventTypes>, props ?? {}]);
|
||||||
|
}
|
||||||
|
this._calls.push(['on', [type, listener as Listener<keyof InternalEventTypes>, props]]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public off<K extends keyof InternalEventTypes>(type: K, listener: Listener<K>): void {
|
||||||
|
this._listeners = this._listeners.filter(l => l[0] !== type || l[1] !== listener);
|
||||||
|
this._calls.push(['off', [type, listener as Listener<keyof InternalEventTypes>]]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public async emit<K extends keyof InternalEventTypes>(type: K, value: InternalEventTypes[K], isLocal = true): Promise<void> {
|
||||||
|
for (const listener of this._listeners) {
|
||||||
|
if (listener[0] === type) {
|
||||||
|
if (!isLocal || !listener[2].ignoreLocal) {
|
||||||
|
await listener[1](value, type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this._calls.push(['emit', [type, value]]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public dispose(): void {
|
||||||
|
this._listeners = [];
|
||||||
|
this._calls.push(['dispose', []]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public onApplicationShutdown(): void {
|
||||||
|
this._calls.push(['onApplicationShutdown', []]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue