reduce log spam from QueueProcessorService

This commit is contained in:
Hazelnoot 2025-05-25 14:51:10 -04:00
parent 3e9ca84347
commit 22653efdc4
3 changed files with 141 additions and 50 deletions

View file

@ -0,0 +1,60 @@
/*
* SPDX-FileCopyrightText: hazelnoot and other Sharkey contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/
import * as Bull from 'bullmq';
import { AbortError, FetchError } from 'node-fetch';
import { StatusError } from '@/misc/status-error.js';
import { IdentifiableError } from '@/misc/identifiable-error.js';
import { renderInlineError } from '@/misc/render-inline-error.js';
import { CaptchaError, captchaErrorCodes } from '@/core/CaptchaService.js';
export function renderFullError(e?: unknown): unknown {
if (e === undefined) return 'undefined';
if (e === null) return 'null';
if (e instanceof Error) {
if (isSimpleError(e)) {
return renderInlineError(e);
}
const data: ErrorData = {};
if (e.stack) data.stack = e.stack;
if (e.message) data.message = e.message;
if (e.name) data.name = e.name;
// mix "cause" and "errors"
if (e instanceof AggregateError && e.errors.length > 0) {
const causes = e.errors.map(inner => renderFullError(inner));
if (e.cause) {
causes.push(renderFullError(e.cause));
}
data.cause = causes;
} else if (e.cause) {
data.cause = renderFullError(e.cause);
}
return data;
}
return e;
}
function isSimpleError(e: Error): boolean {
if (e instanceof Bull.UnrecoverableError) return true;
if (e instanceof AbortError || e.name === 'AbortError') return true;
if (e instanceof FetchError || e.name === 'FetchError') return true;
if (e instanceof StatusError) return true;
if (e instanceof IdentifiableError) return true;
if (e instanceof FetchError) return true;
if (e instanceof CaptchaError && e.code !== captchaErrorCodes.unknown) return true;
return false;
}
interface ErrorData {
stack?: Error['stack'];
message?: Error['message'];
name?: Error['name'];
cause?: Error['cause'] | Error['cause'][];
}

View file

@ -34,6 +34,9 @@ function renderTo(err: unknown, parts: string[]): void {
}
function printError(err: unknown): string {
if (err === undefined) return 'undefined';
if (err === null) return 'null';
if (err instanceof IdentifiableError) {
if (err.message) {
return `${err.name} ${err.id}: ${err.message}`;

View file

@ -6,15 +6,12 @@
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import * as Bull from 'bullmq';
import * as Sentry from '@sentry/node';
import { AbortError, FetchError } from 'node-fetch';
import type { Config } from '@/config.js';
import { DI } from '@/di-symbols.js';
import type Logger from '@/logger.js';
import { bindThis } from '@/decorators.js';
import { CheckModeratorsActivityProcessorService } from '@/queue/processors/CheckModeratorsActivityProcessorService.js';
import { StatusError } from '@/misc/status-error.js';
import { IdentifiableError } from '@/misc/identifiable-error.js';
import { renderInlineError } from '@/misc/render-inline-error.js';
import { renderFullError } from '@/misc/render-full-error.js';
import { UserWebhookDeliverProcessorService } from './processors/UserWebhookDeliverProcessorService.js';
import { SystemWebhookDeliverProcessorService } from './processors/SystemWebhookDeliverProcessorService.js';
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
@ -76,7 +73,9 @@ function getJobInfo(job: Bull.Job | undefined, increment = false): string {
const currentAttempts = job.attemptsMade + (increment ? 1 : 0);
const maxAttempts = job.opts.attempts ?? 0;
return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
return job.name
? `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated} name=${job.name}`
: `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
}
@Injectable()
@ -137,35 +136,6 @@ export class QueueProcessorService implements OnApplicationShutdown {
) {
this.logger = this.queueLoggerService.logger;
function renderError(e?: Error) {
// 何故かeがundefinedで来ることがある
if (!e) return '?';
if (e instanceof Bull.UnrecoverableError || e instanceof AbortError || e.name === 'AbortError' || e instanceof FetchError || e.name === 'FetchError' || e instanceof StatusError || e instanceof IdentifiableError || e instanceof FetchError) {
return renderInlineError(e);
}
return {
stack: e.stack,
message: e.message,
name: e.name,
};
}
function renderJob(job?: Bull.Job) {
if (!job) return '?';
const info: Record<string, string> = {
info: getJobInfo(job),
data: job.data,
};
if (job.name) info.name = job.name;
if (job.failedReason) info.failedReason = job.failedReason;
return info;
}
//#region system
{
const processer = (job: Bull.Job) => {
@ -199,7 +169,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err: Error) => {
logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: System: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
@ -207,7 +177,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
.on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@ -264,7 +234,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: DB: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
@ -272,7 +242,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
.on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@ -304,7 +274,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: Deliver: ${err.name}: ${err.message}`, {
level: 'error',
@ -312,7 +282,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
.on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@ -344,7 +314,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job: renderJob(job), e: renderError(err) });
this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: Inbox: ${err.name}: ${err.message}`, {
level: 'error',
@ -352,7 +322,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
.on('error', (err: Error) => logger.error('inbox error:', renderError(err)))
.on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@ -384,7 +354,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: UserWebhookDeliver: ${err.name}: ${err.message}`, {
level: 'error',
@ -392,7 +362,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
.on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@ -424,7 +394,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.name}: ${err.message}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`);
this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: SystemWebhookDeliver: ${err.name}: ${err.message}`, {
level: 'error',
@ -432,7 +402,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
.on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@ -471,7 +441,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: Relationship: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
@ -479,7 +449,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
.on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
@ -512,7 +482,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
logger.error(`failed(${err.name}: ${err.message}) id=${job?.id ?? '?'}`, { job: renderJob(job), e: renderError(err) });
this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: ObjectStorage: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
@ -520,13 +490,15 @@ export class QueueProcessorService implements OnApplicationShutdown {
});
}
})
.on('error', (err: Error) => logger.error(`error ${err.name}: ${err.message}`, { e: renderError(err) }))
.on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
//#region ended poll notification
{
const logger = this.logger.createSubLogger('endedPollNotification');
this.endedPollNotificationQueueWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => {
if (this.config.sentryForBackend) {
return Sentry.startSpan({ name: 'Queue: EndedPollNotification' }, () => this.endedPollNotificationProcessorService.process(job));
@ -537,19 +509,75 @@ export class QueueProcessorService implements OnApplicationShutdown {
...baseWorkerOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
autorun: false,
});
this.endedPollNotificationQueueWorker
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: EndedPollNotification: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
.on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
//#region schedule note post
{
const logger = this.logger.createSubLogger('scheduleNotePost');
this.schedulerNotePostQueueWorker = new Bull.Worker(QUEUE.SCHEDULE_NOTE_POST, (job) => this.scheduleNotePostProcessorService.process(job), {
...baseWorkerOptions(this.config, QUEUE.SCHEDULE_NOTE_POST),
autorun: false,
});
this.schedulerNotePostQueueWorker
.on('active', (job) => logger.debug(`active id=${job.id}`))
.on('completed', (job, result) => logger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => {
this.logError(logger, err, job);
if (config.sentryForBackend) {
Sentry.captureMessage(`Queue: ${QUEUE.SCHEDULE_NOTE_POST}: ${job?.name ?? '?'}: ${err.name}: ${err.message}`, {
level: 'error',
extra: { job, err },
});
}
})
.on('error', (err: Error) => this.logError(logger, err))
.on('stalled', (jobId) => logger.warn(`stalled id=${jobId}`));
}
//#endregion
}
private logError(logger: Logger, err: unknown, job?: Bull.Job | null): void {
const parts: string[] = [];
// Render job
if (job) {
parts.push('job [');
parts.push(getJobInfo(job));
parts.push('] failed: ');
} else {
parts.push('job failed: ');
}
// Render error
const fullError = renderFullError(err);
const errorText = typeof(fullError) === 'string' ? fullError : undefined;
if (errorText) {
parts.push(errorText);
} else if (job?.failedReason) {
parts.push(job.failedReason);
}
const message = parts.join('');
const data = typeof(fullError) !== 'string' ? { err: fullError } : undefined;
logger.error(message, data);
}
@bindThis
public async start(): Promise<void> {
await Promise.all([