diff --git a/packages/backend/migration/1738293576355-create_ap_fetch_log.js b/packages/backend/migration/1738293576355-create_ap_fetch_log.js new file mode 100644 index 0000000000..4371f50b4a --- /dev/null +++ b/packages/backend/migration/1738293576355-create_ap_fetch_log.js @@ -0,0 +1,19 @@ +export class CreateApFetchLog1738293576355 { + name = 'CreateApFetchLog1738293576355' + + async up(queryRunner) { + await queryRunner.query(`CREATE TABLE "ap_fetch_log" ("id" character varying(32) NOT NULL, "at" TIMESTAMP WITH TIME ZONE NOT NULL, "duration" double precision, "host" text NOT NULL, "request_uri" text NOT NULL, "object_uri" text, "accepted" boolean, "result" text, "object" jsonb, "context_hash" text, CONSTRAINT "PK_ap_fetch_log" PRIMARY KEY ("id"))`); + await queryRunner.query(`CREATE INDEX "IDX_ap_fetch_log_at" ON "ap_fetch_log" ("at") `); + await queryRunner.query(`CREATE INDEX "IDX_ap_fetch_log_host" ON "ap_fetch_log" ("host") `); + await queryRunner.query(`CREATE INDEX "IDX_ap_fetch_log_object_uri" ON "ap_fetch_log" ("object_uri") `); + await queryRunner.query(`ALTER TABLE "ap_fetch_log" ADD CONSTRAINT "FK_ap_fetch_log_context_hash" FOREIGN KEY ("context_hash") REFERENCES "ap_context"("md5") ON DELETE CASCADE ON UPDATE NO ACTION`); + } + + async down(queryRunner) { + await queryRunner.query(`ALTER TABLE "ap_fetch_log" DROP CONSTRAINT "FK_ap_fetch_log_context_hash"`); + await queryRunner.query(`DROP INDEX "public"."IDX_ap_fetch_log_object_uri"`); + await queryRunner.query(`DROP INDEX "public"."IDX_ap_fetch_log_host"`); + await queryRunner.query(`DROP INDEX "public"."IDX_ap_fetch_log_at"`); + await queryRunner.query(`DROP TABLE "ap_fetch_log"`); + } +} diff --git a/packages/backend/src/core/ApLogService.ts b/packages/backend/src/core/ApLogService.ts new file mode 100644 index 0000000000..362eba24be --- /dev/null +++ b/packages/backend/src/core/ApLogService.ts @@ -0,0 +1,189 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { createHash } from 'crypto'; +import { Inject, Injectable } from '@nestjs/common'; +import { LessThan } from 'typeorm'; +import { DI } from '@/di-symbols.js'; +import { SkApFetchLog, SkApInboxLog, SkApContext } from '@/models/_.js'; +import type { ApContextsRepository, ApFetchLogsRepository, ApInboxLogsRepository } from '@/models/_.js'; +import type { Config } from '@/config.js'; +import { JsonValue } from '@/misc/json-value.js'; +import { UtilityService } from '@/core/UtilityService.js'; +import { IdService } from '@/core/IdService.js'; +import { IActivity, IObject } from './activitypub/type.js'; + +@Injectable() +export class ApLogService { + constructor( + @Inject(DI.config) + private readonly config: Config, + + @Inject(DI.apContextsRepository) + private apContextsRepository: ApContextsRepository, + + @Inject(DI.apInboxLogsRepository) + private readonly apInboxLogsRepository: ApInboxLogsRepository, + + @Inject(DI.apFetchLogsRepository) + private readonly apFetchLogsRepository: ApFetchLogsRepository, + + private readonly utilityService: UtilityService, + private readonly idService: IdService, + ) {} + + /** + * Creates an inbox log from an activity, and saves it if pre-save is enabled. + */ + public async createInboxLog(data: Partial & { + activity: IActivity, + keyId: string, + }): Promise { + const { object: activity, context, contextHash } = extractObjectContext(data.activity); + const host = this.utilityService.extractDbHost(data.keyId); + + const log = new SkApInboxLog({ + id: this.idService.gen(), + at: new Date(), + verified: false, + accepted: false, + host, + ...data, + activity, + context, + contextHash, + }); + + if (this.config.activityLogging.preSave) { + await this.saveInboxLog(log); + } + + return log; + } + + /** + * Saves or finalizes an inbox log. + */ + public async saveInboxLog(log: SkApInboxLog): Promise { + if (log.context) { + await this.saveContext(log.context); + } + + // Will be UPDATE with preSave, and INSERT without. + await this.apInboxLogsRepository.upsert(log, ['id']); + return log; + } + + /** + * Creates a fetch log from an activity, and saves it if pre-save is enabled. + */ + public async createFetchLog(data: Partial & { + requestUri: string + host: string, + }): Promise { + const log = new SkApFetchLog({ + id: this.idService.gen(), + at: new Date(), + accepted: false, + ...data, + }); + + if (this.config.activityLogging.preSave) { + await this.saveFetchLog(log); + } + + return log; + } + + /** + * Saves or finalizes a fetch log. + */ + public async saveFetchLog(log: SkApFetchLog): Promise { + if (log.context) { + await this.saveContext(log.context); + } + + // Will be UPDATE with preSave, and INSERT without. + await this.apFetchLogsRepository.upsert(log, ['id']); + return log; + } + + private async saveContext(context: SkApContext): Promise { + // https://stackoverflow.com/a/47064558 + await this.apContextsRepository + .createQueryBuilder('activity_context') + .insert() + .into(SkApContext) + .values(context) + .orIgnore('md5') + .execute(); + } + + /** + * Deletes all expired AP logs and garbage-collects the AP context cache. + * Returns the total number of deleted rows. + */ + public async deleteExpiredLogs(): Promise { + // This is the date in UTC of the oldest log to KEEP + const oldestAllowed = new Date(Date.now() - this.config.activityLogging.maxAge); + + // Delete all logs older than the threshold. + const inboxDeleted = await this.deleteExpiredInboxLogs(oldestAllowed); + const fetchDeleted = await this.deleteExpiredFetchLogs(oldestAllowed); + + return inboxDeleted + fetchDeleted; + } + + private async deleteExpiredInboxLogs(oldestAllowed: Date): Promise { + const { affected } = await this.apInboxLogsRepository.delete({ + at: LessThan(oldestAllowed), + }); + + return affected ?? 0; + } + + private async deleteExpiredFetchLogs(oldestAllowed: Date): Promise { + const { affected } = await this.apFetchLogsRepository.delete({ + at: LessThan(oldestAllowed), + }); + + return affected ?? 0; + } +} + +export function extractObjectContext(input: T) { + const object = Object.assign({}, input, { '@context': undefined }) as Omit; + const { context, contextHash } = parseContext(input['@context']); + + return { object, context, contextHash }; +} + +export function parseContext(input: JsonValue | undefined): { contextHash: string | null, context: SkApContext | null } { + // Empty contexts are excluded for easier querying + if (input == null) { + return { + contextHash: null, + context: null, + }; + } + + const contextHash = createHash('md5').update(JSON.stringify(input)).digest('base64'); + const context = new SkApContext({ + md5: contextHash, + json: input, + }); + return { contextHash, context }; +} + +export function calculateDurationSince(startTime: bigint): number { + // Calculate the processing time with correct rounding and decimals. + // 1. Truncate nanoseconds to microseconds + // 2. Scale to 1/10 millisecond ticks. + // 3. Round to nearest tick. + // 4. Sale to milliseconds + // Example: 123,456,789 ns -> 123,456 us -> 12,345.6 ticks -> 12,346 ticks -> 123.46 ms + const endTime = process.hrtime.bigint(); + return Math.round(Number((endTime - startTime) / 1000n) / 10) / 100; +} diff --git a/packages/backend/src/core/CoreModule.ts b/packages/backend/src/core/CoreModule.ts index 8c9f419c44..47be6967d7 100644 --- a/packages/backend/src/core/CoreModule.ts +++ b/packages/backend/src/core/CoreModule.ts @@ -157,6 +157,7 @@ import { QueueService } from './QueueService.js'; import { LoggerService } from './LoggerService.js'; import { SponsorsService } from './SponsorsService.js'; import type { Provider } from '@nestjs/common'; +import { ApLogService } from '@/core/ApLogService.js'; //#region 文字列ベースでのinjection用(循環参照対応のため) const $LoggerService: Provider = { provide: 'LoggerService', useExisting: LoggerService }; @@ -166,6 +167,7 @@ const $AccountMoveService: Provider = { provide: 'AccountMoveService', useExisti const $AccountUpdateService: Provider = { provide: 'AccountUpdateService', useExisting: AccountUpdateService }; const $AnnouncementService: Provider = { provide: 'AnnouncementService', useExisting: AnnouncementService }; const $AntennaService: Provider = { provide: 'AntennaService', useExisting: AntennaService }; +const $ApLogService: Provider = { provide: 'ApLogService', useExisting: ApLogService }; const $AppLockService: Provider = { provide: 'AppLockService', useExisting: AppLockService }; const $AchievementService: Provider = { provide: 'AchievementService', useExisting: AchievementService }; const $AvatarDecorationService: Provider = { provide: 'AvatarDecorationService', useExisting: AvatarDecorationService }; @@ -322,6 +324,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp AccountUpdateService, AnnouncementService, AntennaService, + ApLogService, AppLockService, AchievementService, AvatarDecorationService, @@ -474,6 +477,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp $AccountUpdateService, $AnnouncementService, $AntennaService, + $ApLogService, $AppLockService, $AchievementService, $AvatarDecorationService, @@ -627,6 +631,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp AccountUpdateService, AnnouncementService, AntennaService, + ApLogService, AppLockService, AchievementService, AvatarDecorationService, @@ -778,6 +783,7 @@ const $SponsorsService: Provider = { provide: 'SponsorsService', useExisting: Sp $AccountUpdateService, $AnnouncementService, $AntennaService, + $ApLogService, $AppLockService, $AchievementService, $AvatarDecorationService, diff --git a/packages/backend/src/core/activitypub/ApResolverService.ts b/packages/backend/src/core/activitypub/ApResolverService.ts index a0c3a4846c..410803609c 100644 --- a/packages/backend/src/core/activitypub/ApResolverService.ts +++ b/packages/backend/src/core/activitypub/ApResolverService.ts @@ -7,7 +7,7 @@ import { Inject, Injectable } from '@nestjs/common'; import { IsNull, Not } from 'typeorm'; import type { MiLocalUser, MiRemoteUser } from '@/models/User.js'; import { InstanceActorService } from '@/core/InstanceActorService.js'; -import type { NotesRepository, PollsRepository, NoteReactionsRepository, UsersRepository, FollowRequestsRepository, MiMeta } from '@/models/_.js'; +import type { NotesRepository, PollsRepository, NoteReactionsRepository, UsersRepository, FollowRequestsRepository, MiMeta, SkApFetchLog } from '@/models/_.js'; import type { Config } from '@/config.js'; import { HttpRequestService } from '@/core/HttpRequestService.js'; import { DI } from '@/di-symbols.js'; @@ -17,7 +17,8 @@ import { LoggerService } from '@/core/LoggerService.js'; import type Logger from '@/logger.js'; import { fromTuple } from '@/misc/from-tuple.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -import { isCollectionOrOrderedCollection } from './type.js'; +import { ApLogService, calculateDurationSince, extractObjectContext } from '@/core/ApLogService.js'; +import { getNullableApId, isCollectionOrOrderedCollection } from './type.js'; import { ApDbResolverService } from './ApDbResolverService.js'; import { ApRendererService } from './ApRendererService.js'; import { ApRequestService } from './ApRequestService.js'; @@ -43,6 +44,7 @@ export class Resolver { private apRendererService: ApRendererService, private apDbResolverService: ApDbResolverService, private loggerService: LoggerService, + private readonly apLogService: ApLogService, private recursionLimit = 256, ) { this.history = new Set(); @@ -81,6 +83,44 @@ export class Resolver { return value; } + const host = this.utilityService.extractDbHost(value); + if (this.config.activityLogging.enabled && !this.utilityService.isSelfHost(host)) { + return await this._resolveLogged(value, host); + } else { + return await this._resolve(value, host); + } + } + + private async _resolveLogged(requestUri: string, host: string): Promise { + const startTime = process.hrtime.bigint(); + + const log = await this.apLogService.createFetchLog({ + host: host, + requestUri, + }); + + try { + const result = await this._resolve(requestUri, host, log); + + log.accepted = true; + log.result = 'ok'; + + return result; + } catch (err) { + log.accepted = false; + log.result = String(err); + + throw err; + } finally { + log.duration = calculateDurationSince(startTime); + + // Save or finalize asynchronously + this.apLogService.saveFetchLog(log) + .catch(err => this.logger.error('Failed to record AP object fetch:', err)); + } + } + + private async _resolve(value: string, host: string, log?: SkApFetchLog): Promise { if (value.includes('#')) { // URLs with fragment parts cannot be resolved correctly because // the fragment part does not get transmitted over HTTP(S). @@ -98,7 +138,6 @@ export class Resolver { this.history.add(value); - const host = this.utilityService.extractDbHost(value); if (this.utilityService.isSelfHost(host)) { return await this.resolveLocal(value); } @@ -115,6 +154,20 @@ export class Resolver { ? await this.apRequestService.signedGet(value, this.user) as IObject : await this.httpRequestService.getActivityJson(value)) as IObject; + if (log) { + const { object: objectOnly, context, contextHash } = extractObjectContext(object); + const objectUri = getNullableApId(object); + + if (objectUri) { + log.objectUri = objectUri; + log.host = this.utilityService.extractDbHost(objectUri); + } + + log.object = objectOnly; + log.context = context; + log.contextHash = contextHash; + } + if ( Array.isArray(object['@context']) ? !(object['@context'] as unknown[]).includes('https://www.w3.org/ns/activitystreams') : @@ -232,6 +285,7 @@ export class ApResolverService { private apRendererService: ApRendererService, private apDbResolverService: ApDbResolverService, private loggerService: LoggerService, + private readonly apLogService: ApLogService, ) { } @@ -252,6 +306,7 @@ export class ApResolverService { this.apRendererService, this.apDbResolverService, this.loggerService, + this.apLogService, ); } } diff --git a/packages/backend/src/daemons/ApLogCleanupService.ts b/packages/backend/src/daemons/ApLogCleanupService.ts index 261c6e3517..2b6693e19e 100644 --- a/packages/backend/src/daemons/ApLogCleanupService.ts +++ b/packages/backend/src/daemons/ApLogCleanupService.ts @@ -3,14 +3,11 @@ * SPDX-License-Identifier: AGPL-3.0-only */ -import { Inject, Injectable, type OnApplicationShutdown } from '@nestjs/common'; -import { LessThan } from 'typeorm'; -import { DI } from '@/di-symbols.js'; -import type { Config } from '@/config.js'; +import { Injectable, type OnApplicationShutdown } from '@nestjs/common'; import { bindThis } from '@/decorators.js'; -import type { ApInboxLogsRepository } from '@/models/_.js'; import { LoggerService } from '@/core/LoggerService.js'; import Logger from '@/logger.js'; +import { ApLogService } from '@/core/ApLogService.js'; // 10 minutes export const scanInterval = 1000 * 60 * 10; @@ -21,12 +18,7 @@ export class ApLogCleanupService implements OnApplicationShutdown { private scanTimer: NodeJS.Timeout | null = null; constructor( - @Inject(DI.config) - private readonly config: Config, - - @Inject(DI.apInboxLogsRepository) - private readonly apInboxLogsRepository: ApInboxLogsRepository, - + private readonly apLogService: ApLogService, loggerService: LoggerService, ) { this.logger = loggerService.getLogger('activity-log-cleanup'); @@ -47,15 +39,12 @@ export class ApLogCleanupService implements OnApplicationShutdown { @bindThis private async tick(): Promise { - // This is the date in UTC of the oldest log to KEEP - const oldestAllowed = new Date(Date.now() - this.config.activityLogging.maxAge); - - // Delete all logs older than the threshold. - const { affected } = await this.apInboxLogsRepository.delete({ - at: LessThan(oldestAllowed), - }); - - this.logger.info(`Activity Log cleanup complete; removed ${affected ?? 0} expired logs.`); + try { + const affected = this.apLogService.deleteExpiredLogs(); + this.logger.info(`Activity Log cleanup complete; removed ${affected} expired logs.`); + } catch (err) { + this.logger.error('Activity Log cleanup failed:', err as Error); + } } @bindThis diff --git a/packages/backend/src/di-symbols.ts b/packages/backend/src/di-symbols.ts index 6b53d38fb7..9f4ef5e2e9 100644 --- a/packages/backend/src/di-symbols.ts +++ b/packages/backend/src/di-symbols.ts @@ -23,6 +23,7 @@ export const DI = { avatarDecorationsRepository: Symbol('avatarDecorationsRepository'), latestNotesRepository: Symbol('latestNotesRepository'), apContextsRepository: Symbol('apContextsRepository'), + apFetchLogsRepository: Symbol('apFetchLogsRepository'), apInboxLogsRepository: Symbol('apInboxLogsRepository'), noteFavoritesRepository: Symbol('noteFavoritesRepository'), noteThreadMutingsRepository: Symbol('noteThreadMutingsRepository'), diff --git a/packages/backend/src/models/RepositoryModule.ts b/packages/backend/src/models/RepositoryModule.ts index dd4ba1c0e4..78510ba588 100644 --- a/packages/backend/src/models/RepositoryModule.ts +++ b/packages/backend/src/models/RepositoryModule.ts @@ -82,6 +82,7 @@ import { MiWebhook, NoteEdit, SkApContext, + SkApFetchLog, SkApInboxLog, } from './_.js'; import type { DataSource } from 'typeorm'; @@ -134,6 +135,12 @@ const $apContextRepository: Provider = { inject: [DI.db], }; +const $apFetchLogsRepository: Provider = { + provide: DI.apFetchLogsRepository, + useFactory: (db: DataSource) => db.getRepository(SkApFetchLog).extend(miRepository as MiRepository), + inject: [DI.db], +}; + const $apInboxLogsRepository: Provider = { provide: DI.apInboxLogsRepository, useFactory: (db: DataSource) => db.getRepository(SkApInboxLog).extend(miRepository as MiRepository), @@ -541,6 +548,7 @@ const $noteScheduleRepository: Provider = { $avatarDecorationsRepository, $latestNotesRepository, $apContextRepository, + $apFetchLogsRepository, $apInboxLogsRepository, $noteFavoritesRepository, $noteThreadMutingsRepository, @@ -617,6 +625,7 @@ const $noteScheduleRepository: Provider = { $avatarDecorationsRepository, $latestNotesRepository, $apContextRepository, + $apFetchLogsRepository, $apInboxLogsRepository, $noteFavoritesRepository, $noteThreadMutingsRepository, diff --git a/packages/backend/src/models/SkApFetchLog.ts b/packages/backend/src/models/SkApFetchLog.ts new file mode 100644 index 0000000000..1e7d861b6c --- /dev/null +++ b/packages/backend/src/models/SkApFetchLog.ts @@ -0,0 +1,89 @@ +/* + * SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors + * SPDX-License-Identifier: AGPL-3.0-only + */ + +import { Column, Index, JoinColumn, ManyToOne, PrimaryColumn, Entity } from 'typeorm'; +import { SkApContext } from '@/models/SkApContext.js'; +import { id } from './util/id.js'; + +/** + * Records objects fetched via AP + */ +@Entity('ap_fetch_log') +export class SkApFetchLog { + @PrimaryColumn({ + ...id(), + primaryKeyConstraintName: 'PK_ap_fetch_log', + }) + public id: string; + + @Index('IDX_ap_fetch_log_at') + @Column('timestamptz') + public at: Date; + + /** + * Processing duration in milliseconds + */ + @Column('double precision', { nullable: true }) + public duration: number | null = null; + + /** + * DB hostname extracted from responseUri, or requestUri if fetch is incomplete + */ + @Index('IDX_ap_fetch_log_host') + @Column('text') + public host: string; + + /** + * Original requested URI + */ + @Column('text', { + name: 'request_uri', + }) + public requestUri: string; + + /** + * Canonical URI / object ID, taken from the final payload + */ + @Column('text', { + name: 'object_uri', + nullable: true, + }) + @Index('IDX_ap_fetch_log_object_uri') + public objectUri: string | null = null; + + @Column('boolean', { nullable: true }) + public accepted: boolean | null = null; + + @Column('text', { nullable: true }) + public result: string | null = null; + + @Column('jsonb', { nullable: true }) + // https://github.com/typeorm/typeorm/issues/8559 + // eslint-disable-next-line @typescript-eslint/no-explicit-any + public object: any | null = null; + + @Column({ + type: 'text', + name: 'context_hash', + nullable: true, + }) + public contextHash: string | null; + + @ManyToOne(() => SkApContext, { + onDelete: 'CASCADE', + nullable: true, + }) + @JoinColumn({ + name: 'context_hash', + foreignKeyConstraintName: 'FK_ap_fetch_log_context_hash', + }) + public context: SkApContext | null; + + constructor(data?: Partial) { + if (data) { + Object.assign(this, data); + } + } +} diff --git a/packages/backend/src/models/_.ts b/packages/backend/src/models/_.ts index dabcf89d2c..4bd6e78ef4 100644 --- a/packages/backend/src/models/_.ts +++ b/packages/backend/src/models/_.ts @@ -83,6 +83,7 @@ import { MiBubbleGameRecord } from '@/models/BubbleGameRecord.js'; import { MiReversiGame } from '@/models/ReversiGame.js'; import { MiNoteSchedule } from '@/models/NoteSchedule.js'; import { SkApInboxLog } from '@/models/SkApInboxLog.js'; +import { SkApFetchLog } from '@/models/SkApFetchLog.js'; import { SkApContext } from '@/models/SkApContext.js'; import type { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity.js'; @@ -132,6 +133,7 @@ export const miRepository = { export { SkLatestNote, SkApContext, + SkApFetchLog, SkApInboxLog, MiAbuseUserReport, MiAbuseReportNotificationRecipient, @@ -234,6 +236,7 @@ export type InstancesRepository = Repository & MiRepository & MiRepository; export type LatestNotesRepository = Repository & MiRepository; export type ApContextsRepository = Repository & MiRepository; +export type ApFetchLogsRepository = Repository & MiRepository; export type ApInboxLogsRepository = Repository & MiRepository; export type ModerationLogsRepository = Repository & MiRepository; export type MutingsRepository = Repository & MiRepository; diff --git a/packages/backend/src/postgres.ts b/packages/backend/src/postgres.ts index 9437ac936a..1a5fdc8412 100644 --- a/packages/backend/src/postgres.ts +++ b/packages/backend/src/postgres.ts @@ -86,6 +86,7 @@ import MisskeyLogger from '@/logger.js'; import { bindThis } from '@/decorators.js'; import { SkLatestNote } from '@/models/LatestNote.js'; import { SkApContext } from '@/models/SkApContext.js'; +import { SkApFetchLog } from '@/models/SkApFetchLog.js'; import { SkApInboxLog } from '@/models/SkApInboxLog.js'; pg.types.setTypeParser(20, Number); @@ -174,6 +175,7 @@ class MyCustomLogger implements Logger { export const entities = [ SkLatestNote, SkApContext, + SkApFetchLog, SkApInboxLog, MiAnnouncement, MiAnnouncementRead, diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index 4182f3e090..557a759136 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -4,7 +4,6 @@ */ import { URL } from 'node:url'; -import { createHash } from 'crypto'; import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; import * as Bull from 'bullmq'; @@ -30,11 +29,9 @@ import { CollapsedQueue } from '@/misc/collapsed-queue.js'; import { MiNote } from '@/models/Note.js'; import { MiMeta } from '@/models/Meta.js'; import { DI } from '@/di-symbols.js'; -import { IdService } from '@/core/IdService.js'; -import { JsonValue } from '@/misc/json-value.js'; -import { SkApInboxLog, SkApContext } from '@/models/_.js'; -import type { ApInboxLogsRepository, ApContextsRepository } from '@/models/_.js'; +import { SkApInboxLog } from '@/models/_.js'; import type { Config } from '@/config.js'; +import { ApLogService, calculateDurationSince } from '@/core/ApLogService.js'; import { QueueLoggerService } from '../QueueLoggerService.js'; import type { InboxJobData } from '../types.js'; @@ -66,13 +63,7 @@ export class InboxProcessorService implements OnApplicationShutdown { private apRequestChart: ApRequestChart, private federationChart: FederationChart, private queueLoggerService: QueueLoggerService, - private idService: IdService, - - @Inject(DI.apContextsRepository) - private apContextsRepository: ApContextsRepository, - - @Inject(DI.apInboxLogsRepository) - private apInboxLogsRepository: ApInboxLogsRepository, + private readonly apLogService: ApLogService, ) { this.logger = this.queueLoggerService.logger.createSubLogger('inbox'); this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance); @@ -89,14 +80,9 @@ export class InboxProcessorService implements OnApplicationShutdown { private async _processLogged(job: Bull.Job): Promise { const startTime = process.hrtime.bigint(); - const payload = job.data.activity; + const activity = job.data.activity; const keyId = job.data.signature.keyId; - const log = this.createLog(payload, keyId); - - // Pre-save the activity in case it leads to a hard-crash. - if (this.config.activityLogging.preSave) { - await this.recordLog(log); - } + const log = await this.apLogService.createInboxLog({ activity, keyId }); try { const result = await this._process(job, log); @@ -111,24 +97,18 @@ export class InboxProcessorService implements OnApplicationShutdown { throw err; } finally { - // Calculate the activity processing time with correct rounding and decimals. - // 1. Truncate nanoseconds to microseconds - // 2. Scale to 1/10 millisecond ticks. - // 3. Round to nearest tick. - // 4. Sale to milliseconds - // Example: 123,456,789 ns -> 123,456 us -> 12,345.6 ticks -> 12,346 ticks -> 123.46 ms - const endTime = process.hrtime.bigint(); - const duration = Math.round(Number((endTime - startTime) / 1000n) / 10) / 100; - log.duration = duration; + const duration = log.duration = calculateDurationSince(startTime); + // TODO remove this // Activities should time out after roughly 5 seconds. // A runtime longer than 10 seconds could indicate a problem or attack. if (duration > 10000) { - this.logger.warn(`Activity ${JSON.stringify(payload.id)} by "${keyId}" took ${(duration / 1000).toFixed(1)} seconds to complete`); + this.logger.warn(`Activity ${JSON.stringify(activity.id)} by "${keyId}" took ${(duration / 1000).toFixed(1)} seconds to complete`); } // Save or finalize asynchronously - this.recordLog(log).catch(err => this.logger.error('Failed to record AP activity:', err)); + this.apLogService.saveInboxLog(log) + .catch(err => this.logger.error('Failed to record AP activity:', err)); } } @@ -368,46 +348,4 @@ export class InboxProcessorService implements OnApplicationShutdown { async onApplicationShutdown(signal?: string) { await this.dispose(); } - - private createLog(payload: IActivity, keyId: string): SkApInboxLog { - const activity = Object.assign({}, payload, { '@context': undefined }) as unknown as JsonValue; - const host = this.utilityService.extractDbHost(keyId); - - const log = new SkApInboxLog({ - id: this.idService.gen(), - at: new Date(), - verified: false, - accepted: false, - activity, - keyId, - host, - }); - - const context = payload['@context']; - if (context) { - const md5 = createHash('md5').update(JSON.stringify(context)).digest('base64'); - log.contextHash = md5; - log.context = new SkApContext({ - md5, - json: context, - }); - } - - return log; - } - - private async recordLog(log: SkApInboxLog): Promise { - if (log.context) { - // https://stackoverflow.com/a/47064558 - await this.apContextsRepository - .createQueryBuilder('activity_context') - .insert() - .into(SkApContext) - .values(log.context) - .orIgnore('md5') - .execute(); - } - - await this.apInboxLogsRepository.upsert(log, ['id']); - } }