mirror of
https://codeberg.org/yeentown/barkey.git
synced 2025-04-28 09:36:56 +00:00
* 1. ed25519キーペアを発行・Personとして公開鍵を送受信 * validate additionalPublicKeys * getAuthUserFromApIdはmainを選ぶ * ✌️ * fix * signatureAlgorithm * set publicKeyCache lifetime * refresh * httpMessageSignatureAcceptable * ED25519_SIGNED_ALGORITHM * ED25519_PUBLIC_KEY_SIGNATURE_ALGORITHM * remove sign additionalPublicKeys signature requirements * httpMessageSignaturesSupported * httpMessageSignaturesImplementationLevel * httpMessageSignaturesImplementationLevel: '01' * perf(federation): Use hint for getAuthUserFromApId (#13470) * Hint for getAuthUserFromApId * とどのつまりこれでいいのか? * use @misskey-dev/node-http-message-signatures * fix * signedPost, signedGet * ap-request.tsを復活させる * remove digest prerender * fix test? * fix test * add httpMessageSignaturesImplementationLevel to FederationInstance * ManyToOne * fetchPersonWithRenewal * exactKey * ✌️ * use const * use gen-key-pair fn. from '@misskey-dev/node-http-message-signatures' * update node-http-message-signatures * fix * @misskey-dev/node-http-message-signatures@0.0.0-alpha.11 * getAuthUserFromApIdでupdatePersonの頻度を増やす * cacheRaw.date * use requiredInputs https://github.com/misskey-dev/misskey/pull/13464#discussion_r1509964359 * update @misskey-dev/node-http-message-signatures * clean up * err msg * fix(backend): fetchInstanceMetadataのLockが永遠に解除されない問題を修正 Co-authored-by: まっちゃとーにゅ <17376330+u1-liquid@users.noreply.github.com> * fix httpMessageSignaturesImplementationLevel validation * fix test * fix * comment * comment * improve test * fix * use Promise.all in genRSAAndEd25519KeyPair * refreshAndprepareEd25519KeyPair * refreshAndfindKey * commetn * refactor public keys add * digestプリレンダを復活させる RFC実装時にどうするか考える * fix, async * fix * !== true * use save * Deliver update person when new key generated (not tested) https://github.com/misskey-dev/misskey/pull/13464#issuecomment-1977049061 * 循環参照で落ちるのを解消? * fix? * Revert "fix?" This reverts commit 0082f6f8e8c5d5febd14933ba9a1ac643f70ca92. * a * logger * log * change logger * 秘密鍵の変更は、フラグではなく鍵を引き回すようにする * addAllKnowingSharedInboxRecipe * nanka meccha kaeta * delivre * キャッシュ有効チェックはロック取得前に行う * @misskey-dev/node-http-message-signatures@0.0.3 * PrivateKeyPem * getLocalUserPrivateKey * fix test * if * fix ap-request * update node-http-message-signatures * fix type error * update package * fix type * update package * retry no key * @misskey-dev/node-http-message-signatures@0.0.8 * fix type error * log keyid * logger * db-resolver * JSON.stringify * HTTP Signatureがなかったり使えなかったりしそうな場合にLD Signatureを活用するように * inbox-delayed use actor if no signature * ユーザーとキーの同一性チェックはhostの一致にする * log signature parse err * save array * とりあえずtryで囲っておく * fetchPersonWithRenewalでエラーが起きたら古いデータを返す * use transactionalEntityManager * fix spdx * @misskey-dev/node-http-message-signatures@0.0.10 * add comment * fix * publicKeyに配列が入ってもいいようにする https://github.com/misskey-dev/misskey/pull/13950 * define additionalPublicKeys * fix * merge fix * refreshAndprepareEd25519KeyPair → refreshAndPrepareEd25519KeyPair * remove gen-key-pair.ts * defaultMaxListeners = 512 * Revert "defaultMaxListeners = 512" This reverts commit f2c412c18057a9300540794ccbe4dfbf6d259ed6. * genRSAAndEd25519KeyPairではキーを直列に生成する? * maxConcurrency: 8 * maxConcurrency: 16 * maxConcurrency: 8 * Revert "genRSAAndEd25519KeyPairではキーを直列に生成する?" This reverts commit d0aada55c1ed5aa98f18731ec82f3ac5eb5a6c16. * maxWorkers: '90%' * Revert "maxWorkers: '90%'" This reverts commit 9e0a93f110456320d6485a871f014f7cdab29b33. * e2e/timelines.tsで個々のテストに対するtimeoutを削除, maxConcurrency: 32 * better error handling of this.userPublickeysRepository.delete * better comment * set result to keypairEntityCache * deliverJobConcurrency: 16, deliverJobPerSec: 1024, inboxJobConcurrency: 4 * inboxJobPerSec: 64 * delete request.headers['host']; * fix * // node-fetch will generate this for us. if we keep 'Host', it won't change with redirects! * move delete host * modify comment * modify comment * fix correct → collect * refreshAndfindKey → refreshAndFindKey * modify comment * modify attachLdSignature * getApId, InboxProcessorService * TODO * [skip ci] add CHANGELOG --------- Co-authored-by: MeiMei <30769358+mei23@users.noreply.github.com> Co-authored-by: まっちゃとーにゅ <17376330+u1-liquid@users.noreply.github.com>
159 lines
5.4 KiB
TypeScript
159 lines
5.4 KiB
TypeScript
/*
|
|
* SPDX-FileCopyrightText: syuilo and misskey-project
|
|
* SPDX-License-Identifier: AGPL-3.0-only
|
|
*/
|
|
|
|
import { Inject, Injectable } from '@nestjs/common';
|
|
import * as Bull from 'bullmq';
|
|
import { Not } from 'typeorm';
|
|
import { DI } from '@/di-symbols.js';
|
|
import type { InstancesRepository } from '@/models/_.js';
|
|
import type Logger from '@/logger.js';
|
|
import { MetaService } from '@/core/MetaService.js';
|
|
import { ApRequestService } from '@/core/activitypub/ApRequestService.js';
|
|
import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
|
|
import { FetchInstanceMetadataService } from '@/core/FetchInstanceMetadataService.js';
|
|
import { MemorySingleCache } from '@/misc/cache.js';
|
|
import type { MiInstance } from '@/models/Instance.js';
|
|
import InstanceChart from '@/core/chart/charts/instance.js';
|
|
import ApRequestChart from '@/core/chart/charts/ap-request.js';
|
|
import FederationChart from '@/core/chart/charts/federation.js';
|
|
import { StatusError } from '@/misc/status-error.js';
|
|
import { UtilityService } from '@/core/UtilityService.js';
|
|
import { bindThis } from '@/decorators.js';
|
|
import { QueueLoggerService } from '../QueueLoggerService.js';
|
|
import type { DeliverJobData } from '../types.js';
|
|
|
|
@Injectable()
|
|
export class DeliverProcessorService {
|
|
private logger: Logger;
|
|
private suspendedHostsCache: MemorySingleCache<MiInstance[]>;
|
|
private latest: string | null;
|
|
|
|
constructor(
|
|
@Inject(DI.instancesRepository)
|
|
private instancesRepository: InstancesRepository,
|
|
|
|
private metaService: MetaService,
|
|
private utilityService: UtilityService,
|
|
private federatedInstanceService: FederatedInstanceService,
|
|
private fetchInstanceMetadataService: FetchInstanceMetadataService,
|
|
private apRequestService: ApRequestService,
|
|
private instanceChart: InstanceChart,
|
|
private apRequestChart: ApRequestChart,
|
|
private federationChart: FederationChart,
|
|
private queueLoggerService: QueueLoggerService,
|
|
) {
|
|
this.logger = this.queueLoggerService.logger.createSubLogger('deliver');
|
|
this.suspendedHostsCache = new MemorySingleCache<MiInstance[]>(1000 * 60 * 60);
|
|
}
|
|
|
|
@bindThis
|
|
public async process(job: Bull.Job<DeliverJobData>): Promise<string> {
|
|
const { host } = new URL(job.data.to);
|
|
|
|
// ブロックしてたら中断
|
|
const meta = await this.metaService.fetch();
|
|
if (this.utilityService.isBlockedHost(meta.blockedHosts, this.utilityService.toPuny(host))) {
|
|
return 'skip (blocked)';
|
|
}
|
|
|
|
// isSuspendedなら中断
|
|
let suspendedHosts = this.suspendedHostsCache.get();
|
|
if (suspendedHosts == null) {
|
|
suspendedHosts = await this.instancesRepository.find({
|
|
where: {
|
|
suspensionState: Not('none'),
|
|
},
|
|
});
|
|
this.suspendedHostsCache.set(suspendedHosts);
|
|
}
|
|
if (suspendedHosts.map(x => x.host).includes(this.utilityService.toPuny(host))) {
|
|
return 'skip (suspended)';
|
|
}
|
|
|
|
try {
|
|
const _server = await this.federatedInstanceService.fetch(host);
|
|
await this.fetchInstanceMetadataService.fetchInstanceMetadata(_server).then(() => {});
|
|
const server = await this.federatedInstanceService.fetch(host);
|
|
|
|
await this.apRequestService.signedPost(
|
|
job.data.user,
|
|
job.data.to,
|
|
job.data.content,
|
|
server.httpMessageSignaturesImplementationLevel,
|
|
job.data.digest,
|
|
job.data.privateKey,
|
|
);
|
|
|
|
// Update stats
|
|
if (server.isNotResponding) {
|
|
this.federatedInstanceService.update(server.id, {
|
|
isNotResponding: false,
|
|
notRespondingSince: null,
|
|
});
|
|
}
|
|
|
|
this.apRequestChart.deliverSucc();
|
|
this.federationChart.deliverd(server.host, true);
|
|
|
|
if (meta.enableChartsForFederatedInstances) {
|
|
this.instanceChart.requestSent(server.host, true);
|
|
}
|
|
|
|
return 'Success';
|
|
} catch (res) {
|
|
// Update stats
|
|
this.federatedInstanceService.fetch(host).then(i => {
|
|
if (!i.isNotResponding) {
|
|
this.federatedInstanceService.update(i.id, {
|
|
isNotResponding: true,
|
|
notRespondingSince: new Date(),
|
|
});
|
|
} else if (i.notRespondingSince) {
|
|
// 1週間以上不通ならサスペンド
|
|
if (i.suspensionState === 'none' && i.notRespondingSince.getTime() <= Date.now() - 1000 * 60 * 60 * 24 * 7) {
|
|
this.federatedInstanceService.update(i.id, {
|
|
suspensionState: 'autoSuspendedForNotResponding',
|
|
});
|
|
}
|
|
} else {
|
|
// isNotRespondingがtrueでnotRespondingSinceがnullの場合はnotRespondingSinceをセット
|
|
// notRespondingSinceは新たな機能なので、それ以前のデータにはnotRespondingSinceがない場合がある
|
|
this.federatedInstanceService.update(i.id, {
|
|
notRespondingSince: new Date(),
|
|
});
|
|
}
|
|
|
|
this.apRequestChart.deliverFail();
|
|
this.federationChart.deliverd(i.host, false);
|
|
|
|
if (meta.enableChartsForFederatedInstances) {
|
|
this.instanceChart.requestSent(i.host, false);
|
|
}
|
|
});
|
|
|
|
if (res instanceof StatusError) {
|
|
// 4xx
|
|
if (!res.isRetryable) {
|
|
// 相手が閉鎖していることを明示しているため、配送停止する
|
|
if (job.data.isSharedInbox && res.statusCode === 410) {
|
|
this.federatedInstanceService.fetch(host).then(i => {
|
|
this.federatedInstanceService.update(i.id, {
|
|
suspensionState: 'goneSuspended',
|
|
});
|
|
});
|
|
throw new Bull.UnrecoverableError(`${host} is gone`);
|
|
}
|
|
throw new Bull.UnrecoverableError(`${res.statusCode} ${res.statusMessage}`);
|
|
}
|
|
|
|
// 5xx etc.
|
|
throw new Error(`${res.statusCode} ${res.statusMessage}`);
|
|
} else {
|
|
// DNS error, socket error, timeout ...
|
|
throw res;
|
|
}
|
|
}
|
|
}
|
|
}
|