From c41d617e6364d34021ea10f7ee9bc081b6d3a244 Mon Sep 17 00:00:00 2001 From: Hazelnoot Date: Thu, 27 Mar 2025 19:33:32 -0400 Subject: [PATCH] limit the number of active connections per client, and limit upgrade requests by user --- .../server/api/StreamingApiServerService.ts | 70 ++++++++++++++----- 1 file changed, 52 insertions(+), 18 deletions(-) diff --git a/packages/backend/src/server/api/StreamingApiServerService.ts b/packages/backend/src/server/api/StreamingApiServerService.ts index 1c2569bf8d..c7db4549d3 100644 --- a/packages/backend/src/server/api/StreamingApiServerService.ts +++ b/packages/backend/src/server/api/StreamingApiServerService.ts @@ -28,10 +28,15 @@ import MainStreamConnection from './stream/Connection.js'; import { ChannelsService } from './stream/ChannelsService.js'; import type * as http from 'node:http'; +// Maximum number of simultaneous connections by client (user ID or IP address). +// Excess connections will be closed automatically. +const MAX_CONNECTIONS_PER_CLIENT = 32; + @Injectable() export class StreamingApiServerService { #wss: WebSocket.WebSocketServer; #connections = new Map(); + #connectionsByClient = new Map>(); // key: IP / user ID -> value: connection #cleanConnectionsIntervalId: NodeJS.Timeout | null = null; constructor( @@ -80,22 +85,6 @@ export class StreamingApiServerService { return; } - // ServerServices sets `trustProxy: true`, which inside - // fastify/request.js ends up calling `proxyAddr` in this way, - // so we do the same - const requestIp = proxyAddr(request, () => { return true; } ); - - const limitActor = getIpHash(requestIp); - if (await this.rateLimitThis(limitActor, { - key: 'wsconnect', - duration: ms('5min'), - max: 32, - })) { - socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n'); - socket.destroy(); - return; - } - const q = new URL(request.url, `http://${request.headers.host}`).searchParams; let user: MiLocalUser | null = null; @@ -133,9 +122,41 @@ export class StreamingApiServerService { return; } - const rateLimiter = () => { - const limitActor = user ?? getIpHash(requestIp); + // ServerServices sets `trustProxy: true`, which inside fastify/request.js ends up calling `proxyAddr` in this way, so we do the same. + const requestIp = proxyAddr(request, () => true ); + const limitActor = user?.id ?? getIpHash(requestIp); + if (await this.rateLimitThis(limitActor, { + key: 'wsconnect', + duration: ms('5min'), + max: 32, + })) { + socket.write('HTTP/1.1 429 Rate Limit Exceeded\r\n\r\n'); + socket.destroy(); + return; + } + // For performance and code simplicity, obtain and hold this reference for the lifetime of the connection. + // This should be safe because the map entry should only be deleted after *all* connections close. + let connectionsForClient = this.#connectionsByClient.get(limitActor); + if (!connectionsForClient) { + connectionsForClient = new Set(); + this.#connectionsByClient.set(limitActor, connectionsForClient); + } + + // Close excess connections + while (connectionsForClient.size >= MAX_CONNECTIONS_PER_CLIENT) { + // Set maintains insertion order, so first entry is the oldest. + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const oldestConnection = connectionsForClient.values().next().value!; + + // Technically, the close() handler should remove this entry. + // But if that ever fails, then we could enter an infinite loop. + // We manually remove the connection here just in case. + oldestConnection.close(1008, 'Disconnected - too many simultaneous connections'); + connectionsForClient.delete(oldestConnection); + } + + const rateLimiter = () => { // Rather high limit because when catching up at the top of a timeline, the frontend may render many many notes. // Each of which causes a message via `useNoteCapture` to ask for realtime updates of that note. return this.rateLimitThis(limitActor, { @@ -159,6 +180,19 @@ export class StreamingApiServerService { await stream.init(); this.#wss.handleUpgrade(request, socket, head, (ws) => { + connectionsForClient.add(ws); + + // Call before emit() in case it throws an error. + // We don't want to leave dangling references! + ws.once('close', () => { + connectionsForClient.delete(ws); + + // Make sure we don't leak the Set objects! + if (connectionsForClient.size < 1) { + this.#connectionsByClient.delete(limitActor); + } + }); + this.#wss.emit('connection', ws, request, { stream, user, app, });