limit the number of note subscriptions per connection

This commit is contained in:
Hazelnoot 2025-03-27 11:07:26 -04:00
parent bf1c9b67d6
commit 831329499d

View file

@ -23,6 +23,7 @@ import type { EventEmitter } from 'events';
import type Channel from './channel.js'; import type Channel from './channel.js';
const MAX_CHANNELS_PER_CONNECTION = 32; const MAX_CHANNELS_PER_CONNECTION = 32;
const MAX_SUBSCRIPTIONS_PER_CONNECTION = 256;
/** /**
* Main stream connection * Main stream connection
@ -34,7 +35,7 @@ export default class Connection {
private wsConnection: WebSocket.WebSocket; private wsConnection: WebSocket.WebSocket;
public subscriber: StreamEventEmitter; public subscriber: StreamEventEmitter;
private channels: Channel[] = []; private channels: Channel[] = [];
private subscribingNotes: Partial<Record<string, number>> = {}; private subscribingNotes = new Map<string, number>();
private cachedNotes: Packed<'Note'>[] = []; private cachedNotes: Packed<'Note'>[] = [];
public userProfile: MiUserProfile | null = null; public userProfile: MiUserProfile | null = null;
public following: Record<string, Pick<MiFollowing, 'withReplies'> | undefined> = {}; public following: Record<string, Pick<MiFollowing, 'withReplies'> | undefined> = {};
@ -200,9 +201,21 @@ export default class Connection {
if (!isJsonObject(payload)) return; if (!isJsonObject(payload)) return;
if (!payload.id || typeof payload.id !== 'string') return; if (!payload.id || typeof payload.id !== 'string') return;
const current = this.subscribingNotes[payload.id] ?? 0; const current = this.subscribingNotes.get(payload.id) ?? 0;
// Limit the number of distinct notes that can be subscribed to.
// If current is-zero, then this is a new note and we need to check the limit
if (current === 0 && this.subscribingNotes.size >= MAX_SUBSCRIPTIONS_PER_CONNECTION) {
// Map maintains insertion order, so first key is always the oldest
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const oldestKey = this.subscribingNotes.keys().next().value!;
this.subscribingNotes.delete(oldestKey);
this.subscriber.off(`noteStream:${oldestKey}`, this.onNoteStreamMessage);
}
const updated = current + 1; const updated = current + 1;
this.subscribingNotes[payload.id] = updated; this.subscribingNotes.set(payload.id, updated);
if (updated === 1) { if (updated === 1) {
this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage); this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
@ -217,12 +230,12 @@ export default class Connection {
if (!isJsonObject(payload)) return; if (!isJsonObject(payload)) return;
if (!payload.id || typeof payload.id !== 'string') return; if (!payload.id || typeof payload.id !== 'string') return;
const current = this.subscribingNotes[payload.id]; const current = this.subscribingNotes.get(payload.id);
if (current == null) return; if (current == null) return;
const updated = current - 1; const updated = current - 1;
this.subscribingNotes[payload.id] = updated; this.subscribingNotes.set(payload.id, updated);
if (updated <= 0) { if (updated <= 0) {
delete this.subscribingNotes[payload.id]; this.subscribingNotes.delete(payload.id);
this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage); this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage);
} }
} }