barkey/packages/backend/src/queue/processors/InboxProcessorService.ts

406 lines
14 KiB
TypeScript

/*
* SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { URL } from 'node:url';
import { createHash } from 'crypto';
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import httpSignature from '@peertube/http-signature';
import * as Bull from 'bullmq';
import type Logger from '@/logger.js';
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
import InstanceChart from '@/core/chart/charts/instance.js';
import ApRequestChart from '@/core/chart/charts/ap-request.js';
import FederationChart from '@/core/chart/charts/federation.js';
import { getApId } from '@/core/activitypub/type.js';
import type { IActivity } from '@/core/activitypub/type.js';
import type { MiRemoteUser } from '@/models/User.js';
import type { MiUserPublickey } from '@/models/UserPublickey.js';
import { ApDbResolverService } from '@/core/activitypub/ApDbResolverService.js';
import { StatusError } from '@/misc/status-error.js';
import { UtilityService } from '@/core/UtilityService.js';
import { ApPersonService } from '@/core/activitypub/models/ApPersonService.js';
import { JsonLdService } from '@/core/activitypub/JsonLdService.js';
import { ApInboxService } from '@/core/activitypub/ApInboxService.js';
import { bindThis } from '@/decorators.js';
import { IdentifiableError } from '@/misc/identifiable-error.js';
import { CollapsedQueue } from '@/misc/collapsed-queue.js';
import { MiNote } from '@/models/Note.js';
import { MiMeta } from '@/models/Meta.js';
import { DI } from '@/di-symbols.js';
import { IdService } from '@/core/IdService.js';
import { JsonValue } from '@/misc/json-value.js';
import { SkActivityLog, SkActivityContext } from '@/models/_.js';
import type { ActivityLogsRepository, ActivityContextRepository } from '@/models/_.js';
import type { Config } from '@/config.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type { InboxJobData } from '../types.js';
type UpdateInstanceJob = {
latestRequestReceivedAt: Date,
shouldUnsuspend: boolean,
};
@Injectable()
export class InboxProcessorService implements OnApplicationShutdown {
private logger: Logger;
private updateInstanceQueue: CollapsedQueue<MiNote['id'], UpdateInstanceJob>;
constructor(
@Inject(DI.meta)
private meta: MiMeta,
@Inject(DI.config)
private config: Config,
private utilityService: UtilityService,
private apInboxService: ApInboxService,
private federatedInstanceService: FederatedInstanceService,
private fetchInstanceMetadataService: FetchInstanceMetadataService,
private jsonLdService: JsonLdService,
private apPersonService: ApPersonService,
private apDbResolverService: ApDbResolverService,
private instanceChart: InstanceChart,
private apRequestChart: ApRequestChart,
private federationChart: FederationChart,
private queueLoggerService: QueueLoggerService,
private idService: IdService,
@Inject(DI.activityContextRepository)
private activityContextRepository: ActivityContextRepository,
@Inject(DI.activityLogsRepository)
private activityLogsRepository: ActivityLogsRepository,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('inbox');
this.updateInstanceQueue = new CollapsedQueue(process.env.NODE_ENV !== 'test' ? 60 * 1000 * 5 : 0, this.collapseUpdateInstanceJobs, this.performUpdateInstance);
}
@bindThis
public async process(job: Bull.Job<InboxJobData>): Promise<string> {
if (this.config.activityLogging.enabled) {
return await this._processLogged(job);
} else {
return await this._process(job);
}
}
private async _processLogged(job: Bull.Job<InboxJobData>): Promise<string> {
const payload = job.data.activity;
const keyId = job.data.signature.keyId;
const log = this.createLog(payload, keyId);
// Pre-save the activity in case it leads to a hard-crash.
if (this.config.activityLogging.preSave) {
await this.recordLog(log);
}
try {
const startTime = process.hrtime.bigint();
const result = await this._process(job, log);
const endTime = process.hrtime.bigint();
// Truncate nanoseconds to microseconds, then scale to milliseconds.
// 123,456,789 ns -> 123,456 us -> 123.456 ms
const duration = Number((endTime - startTime) / 1000n) / 1000;
if (duration > 10000) {
this.logger.warn(`Activity ${JSON.stringify(payload.id)} by "${keyId}" took ${(duration / 1000).toFixed(1)} seconds to complete`);
}
log.accepted = result.startsWith('ok');
log.duration = duration;
log.result = result;
return result;
} catch (err) {
log.accepted = false;
log.result = String(err);
throw err;
} finally {
// Save or finalize asynchronously
this.recordLog(log).catch(err => this.logger.error('Failed to record AP activity:', err));
}
}
private async _process(job: Bull.Job<InboxJobData>, log?: SkActivityLog): Promise<string> {
const signature = job.data.signature; // HTTP-signature
let activity = job.data.activity;
//#region Log
const info = Object.assign({}, activity);
delete info['@context'];
this.logger.debug(JSON.stringify(info, null, 2));
//#endregion
const host = this.utilityService.toPuny(new URL(signature.keyId).hostname);
if (!this.utilityService.isFederationAllowedHost(host)) {
return `Blocked request: ${host}`;
}
const keyIdLower = signature.keyId.toLowerCase();
if (keyIdLower.startsWith('acct:')) {
return `Old keyId is no longer supported. ${keyIdLower}`;
}
// HTTP-Signature keyIdを元にDBから取得
let authUser: {
user: MiRemoteUser;
key: MiUserPublickey | null;
} | null = await this.apDbResolverService.getAuthUserFromKeyId(signature.keyId);
// keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得
if (authUser == null) {
try {
authUser = await this.apDbResolverService.getAuthUserFromApId(getApId(activity.actor));
} catch (err) {
// 対象が4xxならスキップ
if (err instanceof StatusError) {
if (!err.isRetryable) {
throw new Bull.UnrecoverableError(`skip: Ignored deleted actors on both ends ${activity.actor} - ${err.statusCode}`);
}
throw new Error(`Error in actor ${activity.actor} - ${err.statusCode}`);
}
}
}
// それでもわからなければ終了
if (authUser == null) {
throw new Bull.UnrecoverableError('skip: failed to resolve user');
}
// publicKey がなくても終了
if (authUser.key == null) {
throw new Bull.UnrecoverableError('skip: failed to resolve user publicKey');
}
// HTTP-Signatureの検証
let httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem);
// maybe they changed their key? refetch it
if (!httpSignatureValidated) {
authUser.key = await this.apDbResolverService.refetchPublicKeyForApId(authUser.user);
if (authUser.key != null) {
httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem);
}
}
// また、signatureのsignerは、activity.actorと一致する必要がある
if (!httpSignatureValidated || authUser.user.uri !== getApId(activity.actor)) {
// 一致しなくても、でもLD-Signatureがありそうならそっちも見る
const ldSignature = activity.signature;
if (ldSignature) {
if (ldSignature.type !== 'RsaSignature2017') {
throw new Bull.UnrecoverableError(`skip: unsupported LD-signature type ${ldSignature.type}`);
}
// ldSignature.creator: https://example.oom/users/user#main-key
// みたいになっててUserを引っ張れば公開キーも入ることを期待する
if (ldSignature.creator) {
const candicate = ldSignature.creator.replace(/#.*/, '');
await this.apPersonService.resolvePerson(candicate).catch(() => null);
}
// keyIdからLD-Signatureのユーザーを取得
authUser = await this.apDbResolverService.getAuthUserFromKeyId(ldSignature.creator);
if (authUser == null) {
throw new Bull.UnrecoverableError('skip: LD-Signatureのユーザーが取得できませんでした');
}
if (authUser.key == null) {
throw new Bull.UnrecoverableError('skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした');
}
const jsonLd = this.jsonLdService.use();
// LD-Signature検証
const verified = await jsonLd.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
if (!verified) {
throw new Bull.UnrecoverableError('skip: LD-Signatureの検証に失敗しました');
}
// アクティビティを正規化
delete activity.signature;
try {
activity = await jsonLd.compact(activity) as IActivity;
} catch (e) {
throw new Bull.UnrecoverableError(`skip: failed to compact activity: ${e}`);
}
// TODO: 元のアクティビティと非互換な形に正規化される場合は転送をスキップする
// https://github.com/mastodon/mastodon/blob/664b0ca/app/services/activitypub/process_collection_service.rb#L24-L29
activity.signature = ldSignature;
// もう一度actorチェック
if (authUser.user.uri !== activity.actor) {
throw new Bull.UnrecoverableError(`skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`);
}
const ldHost = this.utilityService.extractDbHost(authUser.user.uri);
if (!this.utilityService.isFederationAllowedHost(ldHost)) {
throw new Bull.UnrecoverableError(`Blocked request: ${ldHost}`);
}
} else {
throw new Bull.UnrecoverableError(`skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`);
}
}
// activity.idがあればホストが署名者のホストであることを確認する
if (typeof activity.id === 'string') {
const signerHost = this.utilityService.extractDbHost(authUser.user.uri!);
const activityIdHost = this.utilityService.extractDbHost(activity.id);
if (signerHost !== activityIdHost) {
throw new Bull.UnrecoverableError(`skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`);
}
} else {
// Activity ID should only be string or undefined.
delete activity.id;
}
// Record verified user in log
if (log) {
log.verified = true;
log.authUser = authUser.user;
log.authUserId = authUser.user.id;
}
this.apRequestChart.inbox();
this.federationChart.inbox(authUser.user.host);
// Update instance stats
process.nextTick(async () => {
const i = await (this.meta.enableStatsForFederatedInstances
? this.federatedInstanceService.fetchOrRegister(authUser.user.host)
: this.federatedInstanceService.fetch(authUser.user.host));
if (i == null) return;
this.updateInstanceQueue.enqueue(i.id, {
latestRequestReceivedAt: new Date(),
shouldUnsuspend: i.suspensionState === 'autoSuspendedForNotResponding',
});
if (this.meta.enableChartsForFederatedInstances) {
this.instanceChart.requestReceived(i.host);
}
this.fetchInstanceMetadataService.fetchInstanceMetadata(i);
});
// アクティビティを処理
try {
const result = await this.apInboxService.performActivity(authUser.user, activity);
if (result && !result.startsWith('ok')) {
if (result.startsWith('skip:')) {
this.logger.info(`inbox activity ignored: id=${activity.id} reason=${result}`);
} else {
this.logger.warn(`inbox activity failed: id=${activity.id} reason=${result}`);
}
return result;
}
} catch (e) {
if (e instanceof IdentifiableError) {
if (e.id === '689ee33f-f97c-479a-ac49-1b9f8140af99') {
return 'blocked notes with prohibited words';
}
if (e.id === '85ab9bd7-3a41-4530-959d-f07073900109') {
return 'actor has been suspended';
}
if (e.id === 'd450b8a9-48e4-4dab-ae36-f4db763fda7c') { // invalid Note
return e.message;
}
}
if (e instanceof StatusError && !e.isRetryable) {
return `skip: permanent error ${e.statusCode}`;
}
if (e instanceof IdentifiableError && !e.isRetryable) {
if (e.message) {
return `skip: permanent error ${e.id}: ${e.message}`;
} else {
return `skip: permanent error ${e.id}`;
}
}
throw e;
}
return 'ok';
}
@bindThis
public collapseUpdateInstanceJobs(oldJob: UpdateInstanceJob, newJob: UpdateInstanceJob) {
const latestRequestReceivedAt = oldJob.latestRequestReceivedAt < newJob.latestRequestReceivedAt
? newJob.latestRequestReceivedAt
: oldJob.latestRequestReceivedAt;
const shouldUnsuspend = oldJob.shouldUnsuspend || newJob.shouldUnsuspend;
return {
latestRequestReceivedAt,
shouldUnsuspend,
};
}
@bindThis
public async performUpdateInstance(id: string, job: UpdateInstanceJob) {
await this.federatedInstanceService.update(id, {
latestRequestReceivedAt: new Date(),
isNotResponding: false,
// もしサーバーが死んでるために配信が止まっていた場合には自動的に復活させてあげる
suspensionState: job.shouldUnsuspend ? 'none' : undefined,
});
}
@bindThis
public async dispose(): Promise<void> {
await this.updateInstanceQueue.performAllNow();
}
@bindThis
async onApplicationShutdown(signal?: string) {
await this.dispose();
}
private createLog(payload: IActivity, keyId: string): SkActivityLog {
const activity = Object.assign({}, payload, { '@context': undefined }) as unknown as JsonValue;
const host = this.utilityService.extractDbHost(keyId);
const log = new SkActivityLog({
id: this.idService.gen(),
at: new Date(),
verified: false,
accepted: false,
activity,
keyId,
host,
});
const context = payload['@context'];
if (context) {
const md5 = createHash('md5').update(JSON.stringify(context)).digest('base64');
log.contextHash = md5;
log.context = new SkActivityContext({
md5,
json: context,
});
}
return log;
}
private async recordLog(log: SkActivityLog): Promise<void> {
if (log.context) {
// https://stackoverflow.com/a/47064558
await this.activityContextRepository
.createQueryBuilder('context_body')
.insert()
.into(SkActivityContext)
.values(log.context)
.orIgnore('md5')
.execute();
}
await this.activityLogsRepository.upsert(log, ['id']);
}
}