mirror of
				https://codeberg.org/yeentown/barkey.git
				synced 2025-11-04 07:24:13 +00:00 
			
		
		
		
	Merge branch 'develop' of https://github.com/misskey-dev/misskey into develop
This commit is contained in:
		
						commit
						0ed2a220f4
					
				
					 9 changed files with 59 additions and 30 deletions
				
			
		| 
						 | 
				
			
			@ -3,7 +3,6 @@
 | 
			
		|||
 * SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
import { setTimeout } from 'node:timers/promises';
 | 
			
		||||
import { Global, Inject, Module } from '@nestjs/common';
 | 
			
		||||
import * as Redis from 'ioredis';
 | 
			
		||||
import { DataSource } from 'typeorm';
 | 
			
		||||
| 
						 | 
				
			
			@ -12,6 +11,7 @@ import { DI } from './di-symbols.js';
 | 
			
		|||
import { Config, loadConfig } from './config.js';
 | 
			
		||||
import { createPostgresDataSource } from './postgres.js';
 | 
			
		||||
import { RepositoryModule } from './models/RepositoryModule.js';
 | 
			
		||||
import { allSettled } from './misc/promise-tracker.js';
 | 
			
		||||
import type { Provider, OnApplicationShutdown } from '@nestjs/common';
 | 
			
		||||
 | 
			
		||||
const $config: Provider = {
 | 
			
		||||
| 
						 | 
				
			
			@ -94,14 +94,9 @@ export class GlobalModule implements OnApplicationShutdown {
 | 
			
		|||
	) { }
 | 
			
		||||
 | 
			
		||||
	public async dispose(): Promise<void> {
 | 
			
		||||
		if (process.env.NODE_ENV === 'test') {
 | 
			
		||||
			// XXX:
 | 
			
		||||
			// Shutting down the existing connections causes errors on Jest as
 | 
			
		||||
			// Misskey has asynchronous postgres/redis connections that are not
 | 
			
		||||
			// awaited.
 | 
			
		||||
			// Let's wait for some random time for them to finish.
 | 
			
		||||
			await setTimeout(5000);
 | 
			
		||||
		}
 | 
			
		||||
		// Wait for all potential DB queries
 | 
			
		||||
		await allSettled();
 | 
			
		||||
		// And then disconnect from DB
 | 
			
		||||
		await Promise.all([
 | 
			
		||||
			this.db.destroy(),
 | 
			
		||||
			this.redisClient.disconnect(),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -58,6 +58,7 @@ import { FanoutTimelineService } from '@/core/FanoutTimelineService.js';
 | 
			
		|||
import { UtilityService } from '@/core/UtilityService.js';
 | 
			
		||||
import { UserBlockingService } from '@/core/UserBlockingService.js';
 | 
			
		||||
import { isReply } from '@/misc/is-reply.js';
 | 
			
		||||
import { trackPromise } from '@/misc/promise-tracker.js';
 | 
			
		||||
 | 
			
		||||
type NotificationType = 'reply' | 'renote' | 'quote' | 'mention';
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -676,7 +677,7 @@ export class NoteCreateService implements OnApplicationShutdown {
 | 
			
		|||
						this.relayService.deliverToRelays(user, noteActivity);
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					dm.execute();
 | 
			
		||||
					trackPromise(dm.execute());
 | 
			
		||||
				})();
 | 
			
		||||
			}
 | 
			
		||||
			//#endregion
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -14,6 +14,7 @@ import { IdService } from '@/core/IdService.js';
 | 
			
		|||
import { GlobalEventService } from '@/core/GlobalEventService.js';
 | 
			
		||||
import type { NoteUnreadsRepository, MutingsRepository, NoteThreadMutingsRepository } from '@/models/_.js';
 | 
			
		||||
import { bindThis } from '@/decorators.js';
 | 
			
		||||
import { trackPromise } from '@/misc/promise-tracker.js';
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class NoteReadService implements OnApplicationShutdown {
 | 
			
		||||
| 
						 | 
				
			
			@ -107,7 +108,7 @@ export class NoteReadService implements OnApplicationShutdown {
 | 
			
		|||
 | 
			
		||||
			// TODO: ↓まとめてクエリしたい
 | 
			
		||||
 | 
			
		||||
			this.noteUnreadsRepository.countBy({
 | 
			
		||||
			trackPromise(this.noteUnreadsRepository.countBy({
 | 
			
		||||
				userId: userId,
 | 
			
		||||
				isMentioned: true,
 | 
			
		||||
			}).then(mentionsCount => {
 | 
			
		||||
| 
						 | 
				
			
			@ -115,9 +116,9 @@ export class NoteReadService implements OnApplicationShutdown {
 | 
			
		|||
					// 全て既読になったイベントを発行
 | 
			
		||||
					this.globalEventService.publishMainStream(userId, 'readAllUnreadMentions');
 | 
			
		||||
				}
 | 
			
		||||
			});
 | 
			
		||||
			}));
 | 
			
		||||
 | 
			
		||||
			this.noteUnreadsRepository.countBy({
 | 
			
		||||
			trackPromise(this.noteUnreadsRepository.countBy({
 | 
			
		||||
				userId: userId,
 | 
			
		||||
				isSpecified: true,
 | 
			
		||||
			}).then(specifiedCount => {
 | 
			
		||||
| 
						 | 
				
			
			@ -125,7 +126,7 @@ export class NoteReadService implements OnApplicationShutdown {
 | 
			
		|||
					// 全て既読になったイベントを発行
 | 
			
		||||
					this.globalEventService.publishMainStream(userId, 'readAllUnreadSpecifiedNotes');
 | 
			
		||||
				}
 | 
			
		||||
			});
 | 
			
		||||
			}));
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -20,6 +20,7 @@ import { CacheService } from '@/core/CacheService.js';
 | 
			
		|||
import type { Config } from '@/config.js';
 | 
			
		||||
import { UserListService } from '@/core/UserListService.js';
 | 
			
		||||
import type { FilterUnionByProperty } from '@/types.js';
 | 
			
		||||
import { trackPromise } from '@/misc/promise-tracker.js';
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class NotificationService implements OnApplicationShutdown {
 | 
			
		||||
| 
						 | 
				
			
			@ -74,7 +75,18 @@ export class NotificationService implements OnApplicationShutdown {
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	@bindThis
 | 
			
		||||
	public async createNotification<T extends MiNotification['type']>(
 | 
			
		||||
	public createNotification<T extends MiNotification['type']>(
 | 
			
		||||
		notifieeId: MiUser['id'],
 | 
			
		||||
		type: T,
 | 
			
		||||
		data: Omit<FilterUnionByProperty<MiNotification, 'type', T>, 'type' | 'id' | 'createdAt' | 'notifierId'>,
 | 
			
		||||
		notifierId?: MiUser['id'] | null,
 | 
			
		||||
	) {
 | 
			
		||||
		trackPromise(
 | 
			
		||||
			this.#createNotificationInternal(notifieeId, type, data, notifierId),
 | 
			
		||||
		);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	async #createNotificationInternal<T extends MiNotification['type']>(
 | 
			
		||||
		notifieeId: MiUser['id'],
 | 
			
		||||
		type: T,
 | 
			
		||||
		data: Omit<FilterUnionByProperty<MiNotification, 'type', T>, 'type' | 'id' | 'createdAt' | 'notifierId'>,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,12 +3,12 @@
 | 
			
		|||
 * SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
import { setTimeout } from 'node:timers/promises';
 | 
			
		||||
import { Inject, Module, OnApplicationShutdown } from '@nestjs/common';
 | 
			
		||||
import * as Bull from 'bullmq';
 | 
			
		||||
import { DI } from '@/di-symbols.js';
 | 
			
		||||
import type { Config } from '@/config.js';
 | 
			
		||||
import { QUEUE, baseQueueOptions } from '@/queue/const.js';
 | 
			
		||||
import { allSettled } from '@/misc/promise-tracker.js';
 | 
			
		||||
import type { Provider } from '@nestjs/common';
 | 
			
		||||
import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData } from '../queue/types.js';
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -106,14 +106,9 @@ export class QueueModule implements OnApplicationShutdown {
 | 
			
		|||
	) {}
 | 
			
		||||
 | 
			
		||||
	public async dispose(): Promise<void> {
 | 
			
		||||
		if (process.env.NODE_ENV === 'test') {
 | 
			
		||||
			// XXX:
 | 
			
		||||
			// Shutting down the existing connections causes errors on Jest as
 | 
			
		||||
			// Misskey has asynchronous postgres/redis connections that are not
 | 
			
		||||
			// awaited.
 | 
			
		||||
			// Let's wait for some random time for them to finish.
 | 
			
		||||
			await setTimeout(5000);
 | 
			
		||||
		}
 | 
			
		||||
		// Wait for all potential queue jobs
 | 
			
		||||
		await allSettled();
 | 
			
		||||
		// And then close all queues
 | 
			
		||||
		await Promise.all([
 | 
			
		||||
			this.systemQueue.close(),
 | 
			
		||||
			this.endedPollNotificationQueue.close(),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -28,6 +28,7 @@ import { UserBlockingService } from '@/core/UserBlockingService.js';
 | 
			
		|||
import { CustomEmojiService } from '@/core/CustomEmojiService.js';
 | 
			
		||||
import { RoleService } from '@/core/RoleService.js';
 | 
			
		||||
import { FeaturedService } from '@/core/FeaturedService.js';
 | 
			
		||||
import { trackPromise } from '@/misc/promise-tracker.js';
 | 
			
		||||
 | 
			
		||||
const FALLBACK = '❤';
 | 
			
		||||
const PER_NOTE_REACTION_USER_PAIR_CACHE_MAX = 16;
 | 
			
		||||
| 
						 | 
				
			
			@ -268,7 +269,7 @@ export class ReactionService {
 | 
			
		|||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			dm.execute();
 | 
			
		||||
			trackPromise(dm.execute());
 | 
			
		||||
		}
 | 
			
		||||
		//#endregion
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -316,7 +317,7 @@ export class ReactionService {
 | 
			
		|||
				dm.addDirectRecipe(reactee as MiRemoteUser);
 | 
			
		||||
			}
 | 
			
		||||
			dm.addFollowersRecipe();
 | 
			
		||||
			dm.execute();
 | 
			
		||||
			trackPromise(dm.execute());
 | 
			
		||||
		}
 | 
			
		||||
		//#endregion
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -144,7 +144,7 @@ class DeliverManager {
 | 
			
		|||
		}
 | 
			
		||||
 | 
			
		||||
		// deliver
 | 
			
		||||
		this.queueService.deliverMany(this.actor, this.activity, inboxes);
 | 
			
		||||
		await this.queueService.deliverMany(this.actor, this.activity, inboxes);
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										23
									
								
								packages/backend/src/misc/promise-tracker.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										23
									
								
								packages/backend/src/misc/promise-tracker.ts
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,23 @@
 | 
			
		|||
/*
 | 
			
		||||
 * SPDX-FileCopyrightText: syuilo and other misskey contributors
 | 
			
		||||
 * SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
const promiseRefs: Set<WeakRef<Promise<unknown>>> = new Set();
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * This tracks promises that other modules decided not to wait for,
 | 
			
		||||
 * and makes sure they are all settled before fully closing down the server.
 | 
			
		||||
 */
 | 
			
		||||
export function trackPromise(promise: Promise<unknown>) {
 | 
			
		||||
	if (process.env.NODE_ENV !== 'test') {
 | 
			
		||||
		return;
 | 
			
		||||
	}
 | 
			
		||||
	const ref = new WeakRef(promise);
 | 
			
		||||
	promiseRefs.add(ref);
 | 
			
		||||
	promise.finally(() => promiseRefs.delete(ref));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export async function allSettled(): Promise<void> {
 | 
			
		||||
	await Promise.allSettled([...promiseRefs].map(r => r.deref()));
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -14,6 +14,7 @@ import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
 | 
			
		|||
import { IdService } from '@/core/IdService.js';
 | 
			
		||||
import { FanoutTimelineService } from '@/core/FanoutTimelineService.js';
 | 
			
		||||
import { GlobalEventService } from '@/core/GlobalEventService.js';
 | 
			
		||||
import { trackPromise } from '@/misc/promise-tracker.js';
 | 
			
		||||
import { ApiError } from '../../error.js';
 | 
			
		||||
 | 
			
		||||
export const meta = {
 | 
			
		||||
| 
						 | 
				
			
			@ -92,7 +93,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
 | 
			
		|||
 | 
			
		||||
			antenna.isActive = true;
 | 
			
		||||
			antenna.lastUsedAt = new Date();
 | 
			
		||||
			this.antennasRepository.update(antenna.id, antenna);
 | 
			
		||||
			trackPromise(this.antennasRepository.update(antenna.id, antenna));
 | 
			
		||||
 | 
			
		||||
			if (needPublishEvent) {
 | 
			
		||||
				this.globalEventService.publishInternalEvent('antennaUpdated', antenna);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		
		Reference in a new issue