mirror of
https://codeberg.org/yeentown/barkey.git
synced 2025-07-07 04:26:58 +00:00
support Announce(Activity) activities
This commit is contained in:
parent
1eb9070e39
commit
f2bb01f7da
2 changed files with 84 additions and 7 deletions
|
@ -3,7 +3,7 @@
|
|||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { forwardRef, Inject, Injectable } from '@nestjs/common';
|
||||
import { In } from 'typeorm';
|
||||
import * as Bull from 'bullmq';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
|
@ -32,7 +32,11 @@ import { AbuseReportService } from '@/core/AbuseReportService.js';
|
|||
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
|
||||
import { fromTuple } from '@/misc/from-tuple.js';
|
||||
import { IdentifiableError } from '@/misc/identifiable-error.js';
|
||||
import { getApHrefNullable, getApId, getApIds, getApType, getNullableApId, isAccept, isActor, isAdd, isAnnounce, isApObject, isBlock, isCollection, isCollectionOrOrderedCollection, isCreate, isDelete, isFlag, isFollow, isLike, isDislike, isMove, isPost, isReject, isRemove, isTombstone, isUndo, isUpdate, validActor, validPost } from './type.js';
|
||||
import InstanceChart from '@/core/chart/charts/instance.js';
|
||||
import FederationChart from '@/core/chart/charts/federation.js';
|
||||
import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
|
||||
import { InboxProcessorService } from '@/queue/processors/InboxProcessorService.js';
|
||||
import { getApHrefNullable, getApId, getApIds, getApType, getNullableApId, isAccept, isActor, isAdd, isAnnounce, isApObject, isBlock, isCollection, isCollectionOrOrderedCollection, isCreate, isDelete, isFlag, isFollow, isLike, isDislike, isMove, isPost, isReject, isRemove, isTombstone, isUndo, isUpdate, validActor, validPost, isActivity } from './type.js';
|
||||
import { ApNoteService } from './models/ApNoteService.js';
|
||||
import { ApLoggerService } from './ApLoggerService.js';
|
||||
import { ApDbResolverService } from './ApDbResolverService.js';
|
||||
|
@ -41,7 +45,7 @@ import { ApAudienceService } from './ApAudienceService.js';
|
|||
import { ApPersonService } from './models/ApPersonService.js';
|
||||
import { ApQuestionService } from './models/ApQuestionService.js';
|
||||
import type { Resolver } from './ApResolverService.js';
|
||||
import type { IAccept, IAdd, IAnnounce, IBlock, ICreate, IDelete, IFlag, IFollow, ILike, IDislike, IObject, IReject, IRemove, IUndo, IUpdate, IMove, IPost } from './type.js';
|
||||
import type { IAccept, IAdd, IAnnounce, IBlock, ICreate, IDelete, IFlag, IFollow, ILike, IDislike, IObject, IReject, IRemove, IUndo, IUpdate, IMove, IPost, IActivity } from './type.js';
|
||||
|
||||
@Injectable()
|
||||
export class ApInboxService {
|
||||
|
@ -88,7 +92,13 @@ export class ApInboxService {
|
|||
private apQuestionService: ApQuestionService,
|
||||
private queueService: QueueService,
|
||||
private globalEventService: GlobalEventService,
|
||||
private federatedInstanceService: FederatedInstanceService,
|
||||
private readonly federatedInstanceService: FederatedInstanceService,
|
||||
private readonly fetchInstanceMetadataService: FetchInstanceMetadataService,
|
||||
private readonly instanceChart: InstanceChart,
|
||||
private readonly federationChart: FederationChart,
|
||||
|
||||
@Inject(forwardRef(() => InboxProcessorService))
|
||||
private readonly inboxProcessorService: InboxProcessorService,
|
||||
) {
|
||||
this.logger = this.apLoggerService.logger;
|
||||
}
|
||||
|
@ -310,12 +320,15 @@ export class ApInboxService {
|
|||
const targetUri = getApId(activityObject);
|
||||
if (targetUri.startsWith('bear:')) return 'skip: bearcaps url not supported.';
|
||||
|
||||
const target = await resolver.resolve(activityObject).catch(e => {
|
||||
// Force a fetch by passing URL only, since the target object must be trusted for announceActivity.
|
||||
// We cannot just re-fetch or the resolver will throw a recursion error.
|
||||
const target = await resolver.resolve(targetUri).catch(e => {
|
||||
this.logger.error(`Resolution failed: ${e}`);
|
||||
throw e;
|
||||
});
|
||||
|
||||
if (isPost(target)) return await this.announceNote(actor, activity, target);
|
||||
if (isActivity(target)) return await this.announceActivity(activity, target, resolver);
|
||||
|
||||
return `skip: unknown object type ${getApType(target)}`;
|
||||
}
|
||||
|
@ -383,6 +396,68 @@ export class ApInboxService {
|
|||
}
|
||||
}
|
||||
|
||||
private async announceActivity(announce: IAnnounce, activity: IActivity, resolver: Resolver): Promise<string | void> {
|
||||
// Shouldn't happen, but just in case
|
||||
if (!activity.id) {
|
||||
throw new Bull.UnrecoverableError(`Cannot announce an activity with no ID: ${announce.id}`);
|
||||
}
|
||||
|
||||
// Since this is a new activity, we need to get a new actor.
|
||||
const actorId = getApId(activity.actor);
|
||||
const actor = await this.apPersonService.resolvePerson(actorId, resolver);
|
||||
|
||||
// Ignore announce of our own activities
|
||||
// 1. No URI/host on an MiUser == local user
|
||||
// 2. Local URI on activity == local activity
|
||||
if (!actor.uri || !actor.host || this.utilityService.isUriLocal(activity.id)) {
|
||||
throw new Bull.UnrecoverableError(`Cannot announce a local activity: ${activity.id} (from ${announce.id})`);
|
||||
}
|
||||
|
||||
// Make sure that actor matches activity host.
|
||||
// Activity host is already verified by resolver when fetching the activity, so that is the source of truth.
|
||||
const actorHost = this.utilityService.punyHostPSLDomain(actor.uri);
|
||||
const activityHost = this.utilityService.punyHostPSLDomain(activity.id);
|
||||
if (actorHost !== activityHost) {
|
||||
throw new Bull.UnrecoverableError(`Actor host ${actorHost} does not activity host ${activityHost} in activity ${activity.id} (from ${announce.id})`);
|
||||
}
|
||||
|
||||
// Update stats (adapted from InboxProcessorService)
|
||||
this.federationChart.inbox(actor.host).then();
|
||||
process.nextTick(async () => {
|
||||
const i = await (this.meta.enableStatsForFederatedInstances
|
||||
? this.federatedInstanceService.fetchOrRegister(actor.host)
|
||||
: this.federatedInstanceService.fetch(actor.host));
|
||||
|
||||
if (i == null) return;
|
||||
|
||||
this.inboxProcessorService.updateInstanceQueue.enqueue(i.id, {
|
||||
latestRequestReceivedAt: new Date(),
|
||||
shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding',
|
||||
});
|
||||
|
||||
if (this.meta.enableChartsForFederatedInstances) {
|
||||
this.instanceChart.requestReceived(i.host).then();
|
||||
}
|
||||
|
||||
this.fetchInstanceMetadataService.fetchInstanceMetadata(i).then();
|
||||
});
|
||||
|
||||
// Process it!
|
||||
return await this.performOneActivity(actor, activity, resolver)
|
||||
.finally(() => {
|
||||
// Update user (adapted from performActivity)
|
||||
if (actor.lastFetchedAt == null || Date.now() - actor.lastFetchedAt.getTime() > 1000 * 60 * 60 * 24) {
|
||||
setImmediate(() => {
|
||||
// Don't re-use the resolver, or it may throw recursion errors.
|
||||
// Instead, create a new resolver with an appropriately-reduced recursion limit.
|
||||
this.apPersonService.updatePerson(actor.uri, this.apResolverService.createResolver({
|
||||
recursionLimit: resolver.getRecursionLimit() - resolver.getHistory().length,
|
||||
}));
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private async block(actor: MiRemoteUser, activity: IBlock): Promise<string> {
|
||||
// ※ activity.objectにブロック対象があり、それは存在するローカルユーザーのはず
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
*/
|
||||
|
||||
import { URL } from 'node:url';
|
||||
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
|
||||
import { forwardRef, Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
|
||||
import httpSignature from '@peertube/http-signature';
|
||||
import * as Bull from 'bullmq';
|
||||
import type Logger from '@/logger.js';
|
||||
|
@ -43,7 +43,7 @@ type UpdateInstanceJob = {
|
|||
@Injectable()
|
||||
export class InboxProcessorService implements OnApplicationShutdown {
|
||||
private logger: Logger;
|
||||
private updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>;
|
||||
public readonly updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>;
|
||||
|
||||
constructor(
|
||||
@Inject(DI.meta)
|
||||
|
@ -53,6 +53,8 @@ export class InboxProcessorService implements OnApplicationShutdown {
|
|||
private config: Config,
|
||||
|
||||
private utilityService: UtilityService,
|
||||
|
||||
@Inject(forwardRef(() => ApInboxService))
|
||||
private apInboxService: ApInboxService,
|
||||
private federatedInstanceService: FederatedInstanceService,
|
||||
private fetchInstanceMetadataService: FetchInstanceMetadataService,
|
||||
|
|
Loading…
Add table
Reference in a new issue