diff --git a/packages/backend/src/core/activitypub/ApInboxService.ts b/packages/backend/src/core/activitypub/ApInboxService.ts index 1eef85aeef..be1de3eab0 100644 --- a/packages/backend/src/core/activitypub/ApInboxService.ts +++ b/packages/backend/src/core/activitypub/ApInboxService.ts @@ -3,7 +3,7 @@ * SPDX-License-Identifier: AGPL-3.0-only */ -import { Inject, Injectable } from '@nestjs/common'; +import { forwardRef, Inject, Injectable } from '@nestjs/common'; import { In } from 'typeorm'; import * as Bull from 'bullmq'; import { DI } from '@/di-symbols.js'; @@ -32,7 +32,11 @@ import { AbuseReportService } from '@/core/AbuseReportService.js'; import { FederatedInstanceService } from '@/core/FederatedInstanceService.js'; import { fromTuple } from '@/misc/from-tuple.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; -import { getApHrefNullable, getApId, getApIds, getApType, getNullableApId, isAccept, isActor, isAdd, isAnnounce, isApObject, isBlock, isCollection, isCollectionOrOrderedCollection, isCreate, isDelete, isFlag, isFollow, isLike, isDislike, isMove, isPost, isReject, isRemove, isTombstone, isUndo, isUpdate, validActor, validPost } from './type.js'; +import InstanceChart from '@/core/chart/charts/instance.js'; +import FederationChart from '@/core/chart/charts/federation.js'; +import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js'; +import { InboxProcessorService } from '@/queue/processors/InboxProcessorService.js'; +import { getApHrefNullable, getApId, getApIds, getApType, getNullableApId, isAccept, isActor, isAdd, isAnnounce, isApObject, isBlock, isCollection, isCollectionOrOrderedCollection, isCreate, isDelete, isFlag, isFollow, isLike, isDislike, isMove, isPost, isReject, isRemove, isTombstone, isUndo, isUpdate, validActor, validPost, isActivity } from './type.js'; import { ApNoteService } from './models/ApNoteService.js'; import { ApLoggerService } from './ApLoggerService.js'; import { ApDbResolverService } from './ApDbResolverService.js'; @@ -41,7 +45,7 @@ import { ApAudienceService } from './ApAudienceService.js'; import { ApPersonService } from './models/ApPersonService.js'; import { ApQuestionService } from './models/ApQuestionService.js'; import type { Resolver } from './ApResolverService.js'; -import type { IAccept, IAdd, IAnnounce, IBlock, ICreate, IDelete, IFlag, IFollow, ILike, IDislike, IObject, IReject, IRemove, IUndo, IUpdate, IMove, IPost } from './type.js'; +import type { IAccept, IAdd, IAnnounce, IBlock, ICreate, IDelete, IFlag, IFollow, ILike, IDislike, IObject, IReject, IRemove, IUndo, IUpdate, IMove, IPost, IActivity } from './type.js'; @Injectable() export class ApInboxService { @@ -88,7 +92,13 @@ export class ApInboxService { private apQuestionService: ApQuestionService, private queueService: QueueService, private globalEventService: GlobalEventService, - private federatedInstanceService: FederatedInstanceService, + private readonly federatedInstanceService: FederatedInstanceService, + private readonly fetchInstanceMetadataService: FetchInstanceMetadataService, + private readonly instanceChart: InstanceChart, + private readonly federationChart: FederationChart, + + @Inject(forwardRef(() => InboxProcessorService)) + private readonly inboxProcessorService: InboxProcessorService, ) { this.logger = this.apLoggerService.logger; } @@ -310,12 +320,15 @@ export class ApInboxService { const targetUri = getApId(activityObject); if (targetUri.startsWith('bear:')) return 'skip: bearcaps url not supported.'; - const target = await resolver.resolve(activityObject).catch(e => { + // Force a fetch by passing URL only, since the target object must be trusted for announceActivity. + // We cannot just re-fetch or the resolver will throw a recursion error. + const target = await resolver.resolve(targetUri).catch(e => { this.logger.error(`Resolution failed: ${e}`); throw e; }); if (isPost(target)) return await this.announceNote(actor, activity, target); + if (isActivity(target)) return await this.announceActivity(activity, target, resolver); return `skip: unknown object type ${getApType(target)}`; } @@ -383,6 +396,68 @@ export class ApInboxService { } } + private async announceActivity(announce: IAnnounce, activity: IActivity, resolver: Resolver): Promise { + // Shouldn't happen, but just in case + if (!activity.id) { + throw new Bull.UnrecoverableError(`Cannot announce an activity with no ID: ${announce.id}`); + } + + // Since this is a new activity, we need to get a new actor. + const actorId = getApId(activity.actor); + const actor = await this.apPersonService.resolvePerson(actorId, resolver); + + // Ignore announce of our own activities + // 1. No URI/host on an MiUser == local user + // 2. Local URI on activity == local activity + if (!actor.uri || !actor.host || this.utilityService.isUriLocal(activity.id)) { + throw new Bull.UnrecoverableError(`Cannot announce a local activity: ${activity.id} (from ${announce.id})`); + } + + // Make sure that actor matches activity host. + // Activity host is already verified by resolver when fetching the activity, so that is the source of truth. + const actorHost = this.utilityService.punyHostPSLDomain(actor.uri); + const activityHost = this.utilityService.punyHostPSLDomain(activity.id); + if (actorHost !== activityHost) { + throw new Bull.UnrecoverableError(`Actor host ${actorHost} does not activity host ${activityHost} in activity ${activity.id} (from ${announce.id})`); + } + + // Update stats (adapted from InboxProcessorService) + this.federationChart.inbox(actor.host).then(); + process.nextTick(async () => { + const i = await (this.meta.enableStatsForFederatedInstances + ? this.federatedInstanceService.fetchOrRegister(actor.host) + : this.federatedInstanceService.fetch(actor.host)); + + if (i == null) return; + + this.inboxProcessorService.updateInstanceQueue.enqueue(i.id, { + latestRequestReceivedAt: new Date(), + shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding', + }); + + if (this.meta.enableChartsForFederatedInstances) { + this.instanceChart.requestReceived(i.host).then(); + } + + this.fetchInstanceMetadataService.fetchInstanceMetadata(i).then(); + }); + + // Process it! + return await this.performOneActivity(actor, activity, resolver) + .finally(() => { + // Update user (adapted from performActivity) + if (actor.lastFetchedAt == null || Date.now() - actor.lastFetchedAt.getTime() > 1000 * 60 * 60 * 24) { + setImmediate(() => { + // Don't re-use the resolver, or it may throw recursion errors. + // Instead, create a new resolver with an appropriately-reduced recursion limit. + this.apPersonService.updatePerson(actor.uri, this.apResolverService.createResolver({ + recursionLimit: resolver.getRecursionLimit() - resolver.getHistory().length, + })); + }); + } + }); + } + @bindThis private async block(actor: MiRemoteUser, activity: IBlock): Promise { // ※ activity.objectにブロック対象があり、それは存在するローカルユーザーのはず diff --git a/packages/backend/src/queue/processors/InboxProcessorService.ts b/packages/backend/src/queue/processors/InboxProcessorService.ts index e128108779..839c8c183a 100644 --- a/packages/backend/src/queue/processors/InboxProcessorService.ts +++ b/packages/backend/src/queue/processors/InboxProcessorService.ts @@ -4,7 +4,7 @@ */ import { URL } from 'node:url'; -import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; +import { forwardRef, Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import httpSignature from '@peertube/http-signature'; import * as Bull from 'bullmq'; import type Logger from '@/logger.js'; @@ -43,7 +43,7 @@ type UpdateInstanceJob = { @Injectable() export class InboxProcessorService implements OnApplicationShutdown { private logger: Logger; - private updateInstanceQueue: CollapsedQueue; + public readonly updateInstanceQueue: CollapsedQueue; constructor( @Inject(DI.meta) @@ -53,6 +53,8 @@ export class InboxProcessorService implements OnApplicationShutdown { private config: Config, private utilityService: UtilityService, + + @Inject(forwardRef(() => ApInboxService)) private apInboxService: ApInboxService, private federatedInstanceService: FederatedInstanceService, private fetchInstanceMetadataService: FetchInstanceMetadataService,