mirror of
https://codeberg.org/yeentown/barkey.git
synced 2025-07-07 12:36:57 +00:00
convert many RedisKVCaches to QuantumKVCache or MemoryKVCache
This commit is contained in:
parent
1f2742ddd7
commit
46a6612dc0
14 changed files with 126 additions and 127 deletions
|
@ -7,12 +7,13 @@ import { Inject, Injectable } from '@nestjs/common';
|
|||
import * as Redis from 'ioredis';
|
||||
import { IsNull } from 'typeorm';
|
||||
import type { BlockingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiFollowing, MiNote } from '@/models/_.js';
|
||||
import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js';
|
||||
import { MemoryKVCache, QuantumKVCache, RedisKVCache } from '@/misc/cache.js';
|
||||
import type { MiLocalUser, MiUser } from '@/models/User.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import { UserEntityService } from '@/core/entities/UserEntityService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import type { GlobalEvents } from '@/core/GlobalEventService.js';
|
||||
import type { GlobalEvents, InternalEventTypes } from '@/core/GlobalEventService.js';
|
||||
import { InternalEventService } from '@/core/InternalEventService.js';
|
||||
import type { OnApplicationShutdown } from '@nestjs/common';
|
||||
|
||||
export interface FollowStats {
|
||||
|
@ -39,12 +40,12 @@ export class CacheService implements OnApplicationShutdown {
|
|||
public localUserByNativeTokenCache: MemoryKVCache<MiLocalUser | null>;
|
||||
public localUserByIdCache: MemoryKVCache<MiLocalUser>;
|
||||
public uriPersonCache: MemoryKVCache<MiUser | null>;
|
||||
public userProfileCache: RedisKVCache<MiUserProfile>;
|
||||
public userMutingsCache: RedisKVCache<Set<string>>;
|
||||
public userBlockingCache: RedisKVCache<Set<string>>;
|
||||
public userBlockedCache: RedisKVCache<Set<string>>; // NOTE: 「被」Blockキャッシュ
|
||||
public renoteMutingsCache: RedisKVCache<Set<string>>;
|
||||
public userFollowingsCache: RedisKVCache<Record<string, Pick<MiFollowing, 'withReplies'> | undefined>>;
|
||||
public userProfileCache: QuantumKVCache<MiUserProfile>;
|
||||
public userMutingsCache: QuantumKVCache<Set<string>>;
|
||||
public userBlockingCache: QuantumKVCache<Set<string>>;
|
||||
public userBlockedCache: QuantumKVCache<Set<string>>; // NOTE: 「被」Blockキャッシュ
|
||||
public renoteMutingsCache: QuantumKVCache<Set<string>>;
|
||||
public userFollowingsCache: QuantumKVCache<Record<string, Pick<MiFollowing, 'withReplies'> | undefined>>;
|
||||
private readonly userFollowStatsCache = new MemoryKVCache<FollowStats>(1000 * 60 * 10); // 10 minutes
|
||||
private readonly translationsCache: RedisKVCache<CachedTranslationEntity>;
|
||||
|
||||
|
@ -74,6 +75,7 @@ export class CacheService implements OnApplicationShutdown {
|
|||
private followingsRepository: FollowingsRepository,
|
||||
|
||||
private userEntityService: UserEntityService,
|
||||
private readonly internalEventService: InternalEventService,
|
||||
) {
|
||||
//this.onMessage = this.onMessage.bind(this);
|
||||
|
||||
|
@ -82,49 +84,33 @@ export class CacheService implements OnApplicationShutdown {
|
|||
this.localUserByIdCache = new MemoryKVCache<MiLocalUser>(1000 * 60 * 5); // 5m
|
||||
this.uriPersonCache = new MemoryKVCache<MiUser | null>(1000 * 60 * 5); // 5m
|
||||
|
||||
this.userProfileCache = new RedisKVCache<MiUserProfile>(this.redisClient, 'userProfile', {
|
||||
this.userProfileCache = new QuantumKVCache(this.internalEventService, 'userProfile', {
|
||||
lifetime: 1000 * 60 * 30, // 30m
|
||||
memoryCacheLifetime: 1000 * 60, // 1m
|
||||
fetcher: (key) => this.userProfilesRepository.findOneByOrFail({ userId: key }),
|
||||
toRedisConverter: (value) => JSON.stringify(value),
|
||||
fromRedisConverter: (value) => JSON.parse(value), // TODO: date型の考慮
|
||||
});
|
||||
|
||||
this.userMutingsCache = new RedisKVCache<Set<string>>(this.redisClient, 'userMutings', {
|
||||
this.userMutingsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userMutings', {
|
||||
lifetime: 1000 * 60 * 30, // 30m
|
||||
memoryCacheLifetime: 1000 * 60, // 1m
|
||||
fetcher: (key) => this.mutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))),
|
||||
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
|
||||
fromRedisConverter: (value) => new Set(JSON.parse(value)),
|
||||
});
|
||||
|
||||
this.userBlockingCache = new RedisKVCache<Set<string>>(this.redisClient, 'userBlocking', {
|
||||
this.userBlockingCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userBlocking', {
|
||||
lifetime: 1000 * 60 * 30, // 30m
|
||||
memoryCacheLifetime: 1000 * 60, // 1m
|
||||
fetcher: (key) => this.blockingsRepository.find({ where: { blockerId: key }, select: ['blockeeId'] }).then(xs => new Set(xs.map(x => x.blockeeId))),
|
||||
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
|
||||
fromRedisConverter: (value) => new Set(JSON.parse(value)),
|
||||
});
|
||||
|
||||
this.userBlockedCache = new RedisKVCache<Set<string>>(this.redisClient, 'userBlocked', {
|
||||
this.userBlockedCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userBlocked', {
|
||||
lifetime: 1000 * 60 * 30, // 30m
|
||||
memoryCacheLifetime: 1000 * 60, // 1m
|
||||
fetcher: (key) => this.blockingsRepository.find({ where: { blockeeId: key }, select: ['blockerId'] }).then(xs => new Set(xs.map(x => x.blockerId))),
|
||||
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
|
||||
fromRedisConverter: (value) => new Set(JSON.parse(value)),
|
||||
});
|
||||
|
||||
this.renoteMutingsCache = new RedisKVCache<Set<string>>(this.redisClient, 'renoteMutings', {
|
||||
this.renoteMutingsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'renoteMutings', {
|
||||
lifetime: 1000 * 60 * 30, // 30m
|
||||
memoryCacheLifetime: 1000 * 60, // 1m
|
||||
fetcher: (key) => this.renoteMutingsRepository.find({ where: { muterId: key }, select: ['muteeId'] }).then(xs => new Set(xs.map(x => x.muteeId))),
|
||||
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
|
||||
fromRedisConverter: (value) => new Set(JSON.parse(value)),
|
||||
});
|
||||
|
||||
this.userFollowingsCache = new RedisKVCache<Record<string, Pick<MiFollowing, 'withReplies'> | undefined>>(this.redisClient, 'userFollowings', {
|
||||
this.userFollowingsCache = new QuantumKVCache<Record<string, Pick<MiFollowing, 'withReplies'> | undefined>>(this.internalEventService, 'userFollowings', {
|
||||
lifetime: 1000 * 60 * 30, // 30m
|
||||
memoryCacheLifetime: 1000 * 60, // 1m
|
||||
fetcher: (key) => this.followingsRepository.find({ where: { followerId: key }, select: ['followeeId', 'withReplies'] }).then(xs => {
|
||||
const obj: Record<string, Pick<MiFollowing, 'withReplies'> | undefined> = {};
|
||||
for (const x of xs) {
|
||||
|
@ -132,8 +118,6 @@ export class CacheService implements OnApplicationShutdown {
|
|||
}
|
||||
return obj;
|
||||
}),
|
||||
toRedisConverter: (value) => JSON.stringify(value),
|
||||
fromRedisConverter: (value) => JSON.parse(value),
|
||||
});
|
||||
|
||||
this.translationsCache = new RedisKVCache<CachedTranslationEntity>(this.redisClient, 'translations', {
|
||||
|
@ -143,20 +127,21 @@ export class CacheService implements OnApplicationShutdown {
|
|||
|
||||
// NOTE: チャンネルのフォロー状況キャッシュはChannelFollowingServiceで行っている
|
||||
|
||||
this.redisForSub.on('message', this.onMessage);
|
||||
this.internalEventService.on('userChangeSuspendedState', this.onUserEvent);
|
||||
this.internalEventService.on('userChangeDeletedState', this.onUserEvent);
|
||||
this.internalEventService.on('remoteUserUpdated', this.onUserEvent);
|
||||
this.internalEventService.on('localUserUpdated', this.onUserEvent);
|
||||
this.internalEventService.on('userChangeSuspendedState', this.onUserEvent);
|
||||
this.internalEventService.on('userTokenRegenerated', this.onTokenEvent);
|
||||
this.internalEventService.on('follow', this.onFollowEvent);
|
||||
this.internalEventService.on('unfollow', this.onFollowEvent);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private async onMessage(_: string, data: string): Promise<void> {
|
||||
const obj = JSON.parse(data);
|
||||
|
||||
if (obj.channel === 'internal') {
|
||||
const { type, body } = obj.message as GlobalEvents['internal']['payload'];
|
||||
switch (type) {
|
||||
case 'userChangeSuspendedState':
|
||||
case 'userChangeDeletedState':
|
||||
case 'remoteUserUpdated':
|
||||
case 'localUserUpdated': {
|
||||
private async onUserEvent<E extends 'userChangeSuspendedState' | 'userChangeDeletedState' | 'remoteUserUpdated' | 'localUserUpdated'>(body: InternalEventTypes[E]): Promise<void> {
|
||||
{
|
||||
{
|
||||
{
|
||||
const user = await this.usersRepository.findOneBy({ id: body.id });
|
||||
if (user == null) {
|
||||
this.userByIdCache.delete(body.id);
|
||||
|
@ -178,20 +163,32 @@ export class CacheService implements OnApplicationShutdown {
|
|||
this.localUserByIdCache.set(user.id, user);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'userTokenRegenerated': {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async onTokenEvent<E extends 'userTokenRegenerated'>(body: InternalEventTypes[E]): Promise<void> {
|
||||
{
|
||||
{
|
||||
{
|
||||
const user = await this.usersRepository.findOneByOrFail({ id: body.id }) as MiLocalUser;
|
||||
this.localUserByNativeTokenCache.delete(body.oldToken);
|
||||
this.localUserByNativeTokenCache.set(body.newToken, user);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async onFollowEvent<E extends 'follow' | 'unfollow'>(body: InternalEventTypes[E], type: E): Promise<void> {
|
||||
{
|
||||
switch (type) {
|
||||
case 'follow': {
|
||||
const follower = this.userByIdCache.get(body.followerId);
|
||||
if (follower) follower.followingCount++;
|
||||
const followee = this.userByIdCache.get(body.followeeId);
|
||||
if (followee) followee.followersCount++;
|
||||
this.userFollowingsCache.delete(body.followerId);
|
||||
await this.userFollowingsCache.delete(body.followerId);
|
||||
this.userFollowStatsCache.delete(body.followerId);
|
||||
this.userFollowStatsCache.delete(body.followeeId);
|
||||
break;
|
||||
|
@ -201,13 +198,11 @@ export class CacheService implements OnApplicationShutdown {
|
|||
if (follower) follower.followingCount--;
|
||||
const followee = this.userByIdCache.get(body.followeeId);
|
||||
if (followee) followee.followersCount--;
|
||||
this.userFollowingsCache.delete(body.followerId);
|
||||
await this.userFollowingsCache.delete(body.followerId);
|
||||
this.userFollowStatsCache.delete(body.followerId);
|
||||
this.userFollowStatsCache.delete(body.followeeId);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -300,7 +295,14 @@ export class CacheService implements OnApplicationShutdown {
|
|||
|
||||
@bindThis
|
||||
public dispose(): void {
|
||||
this.redisForSub.off('message', this.onMessage);
|
||||
this.internalEventService.off('userChangeSuspendedState', this.onUserEvent);
|
||||
this.internalEventService.off('userChangeDeletedState', this.onUserEvent);
|
||||
this.internalEventService.off('remoteUserUpdated', this.onUserEvent);
|
||||
this.internalEventService.off('localUserUpdated', this.onUserEvent);
|
||||
this.internalEventService.off('userChangeSuspendedState', this.onUserEvent);
|
||||
this.internalEventService.off('userTokenRegenerated', this.onTokenEvent);
|
||||
this.internalEventService.off('follow', this.onFollowEvent);
|
||||
this.internalEventService.off('unfollow', this.onFollowEvent);
|
||||
this.userByIdCache.dispose();
|
||||
this.localUserByNativeTokenCache.dispose();
|
||||
this.localUserByIdCache.dispose();
|
||||
|
|
|
@ -9,14 +9,16 @@ import { DI } from '@/di-symbols.js';
|
|||
import type { ChannelFollowingsRepository } from '@/models/_.js';
|
||||
import { MiChannel } from '@/models/_.js';
|
||||
import { IdService } from '@/core/IdService.js';
|
||||
import { GlobalEvents, GlobalEventService } from '@/core/GlobalEventService.js';
|
||||
import { GlobalEvents, GlobalEventService, InternalEventTypes } from '@/core/GlobalEventService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import type { MiLocalUser } from '@/models/User.js';
|
||||
import { RedisKVCache } from '@/misc/cache.js';
|
||||
import { QuantumKVCache, RedisKVCache } from '@/misc/cache.js';
|
||||
import { InternalEventService } from './InternalEventService.js';
|
||||
|
||||
@Injectable()
|
||||
export class ChannelFollowingService implements OnModuleInit {
|
||||
public userFollowingChannelsCache: RedisKVCache<Set<string>>;
|
||||
// TODO check for regs
|
||||
public userFollowingChannelsCache: QuantumKVCache<Set<string>>;
|
||||
|
||||
constructor(
|
||||
@Inject(DI.redis)
|
||||
|
@ -27,19 +29,18 @@ export class ChannelFollowingService implements OnModuleInit {
|
|||
private channelFollowingsRepository: ChannelFollowingsRepository,
|
||||
private idService: IdService,
|
||||
private globalEventService: GlobalEventService,
|
||||
private readonly internalEventService: InternalEventService,
|
||||
) {
|
||||
this.userFollowingChannelsCache = new RedisKVCache<Set<string>>(this.redisClient, 'userFollowingChannels', {
|
||||
this.userFollowingChannelsCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userFollowingChannels', {
|
||||
lifetime: 1000 * 60 * 30, // 30m
|
||||
memoryCacheLifetime: 1000 * 60, // 1m
|
||||
fetcher: (key) => this.channelFollowingsRepository.find({
|
||||
where: { followerId: key },
|
||||
select: ['followeeId'],
|
||||
}).then(xs => new Set(xs.map(x => x.followeeId))),
|
||||
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
|
||||
fromRedisConverter: (value) => new Set(JSON.parse(value)),
|
||||
});
|
||||
|
||||
this.redisForSub.on('message', this.onMessage);
|
||||
this.internalEventService.on('followChannel', this.onMessage);
|
||||
this.internalEventService.on('unfollowChannel', this.onMessage);
|
||||
}
|
||||
|
||||
onModuleInit() {
|
||||
|
@ -79,18 +80,15 @@ export class ChannelFollowingService implements OnModuleInit {
|
|||
}
|
||||
|
||||
@bindThis
|
||||
private async onMessage(_: string, data: string): Promise<void> {
|
||||
const obj = JSON.parse(data);
|
||||
|
||||
if (obj.channel === 'internal') {
|
||||
const { type, body } = obj.message as GlobalEvents['internal']['payload'];
|
||||
private async onMessage<E extends 'followChannel' | 'unfollowChannel'>(body: InternalEventTypes[E], type: E): Promise<void> {
|
||||
{
|
||||
switch (type) {
|
||||
case 'followChannel': {
|
||||
this.userFollowingChannelsCache.refresh(body.userId);
|
||||
await this.userFollowingChannelsCache.delete(body.userId);
|
||||
break;
|
||||
}
|
||||
case 'unfollowChannel': {
|
||||
this.userFollowingChannelsCache.delete(body.userId);
|
||||
await this.userFollowingChannelsCache.delete(body.userId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -99,6 +97,8 @@ export class ChannelFollowingService implements OnModuleInit {
|
|||
|
||||
@bindThis
|
||||
public dispose(): void {
|
||||
this.internalEventService.off('followChannel', this.onMessage);
|
||||
this.internalEventService.off('unfollowChannel', this.onMessage);
|
||||
this.userFollowingChannelsCache.dispose();
|
||||
}
|
||||
|
||||
|
|
|
@ -12,7 +12,8 @@ import type { Packed } from '@/misc/json-schema.js';
|
|||
import { getNoteSummary } from '@/misc/get-note-summary.js';
|
||||
import type { MiMeta, MiSwSubscription, SwSubscriptionsRepository } from '@/models/_.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { RedisKVCache } from '@/misc/cache.js';
|
||||
import { QuantumKVCache, RedisKVCache } from '@/misc/cache.js';
|
||||
import { InternalEventService } from '@/core/InternalEventService.js';
|
||||
|
||||
// Defined also packages/sw/types.ts#L13
|
||||
type PushNotificationsTypes = {
|
||||
|
@ -48,7 +49,7 @@ function truncateBody<T extends keyof PushNotificationsTypes>(type: T, body: Pus
|
|||
|
||||
@Injectable()
|
||||
export class PushNotificationService implements OnApplicationShutdown {
|
||||
private subscriptionsCache: RedisKVCache<MiSwSubscription[]>;
|
||||
private subscriptionsCache: QuantumKVCache<MiSwSubscription[]>;
|
||||
|
||||
constructor(
|
||||
@Inject(DI.config)
|
||||
|
@ -62,13 +63,11 @@ export class PushNotificationService implements OnApplicationShutdown {
|
|||
|
||||
@Inject(DI.swSubscriptionsRepository)
|
||||
private swSubscriptionsRepository: SwSubscriptionsRepository,
|
||||
private readonly internalEventService: InternalEventService,
|
||||
) {
|
||||
this.subscriptionsCache = new RedisKVCache<MiSwSubscription[]>(this.redisClient, 'userSwSubscriptions', {
|
||||
this.subscriptionsCache = new QuantumKVCache<MiSwSubscription[]>(this.internalEventService, 'userSwSubscriptions', {
|
||||
lifetime: 1000 * 60 * 60 * 1, // 1h
|
||||
memoryCacheLifetime: 1000 * 60 * 3, // 3m
|
||||
fetcher: (key) => this.swSubscriptionsRepository.findBy({ userId: key }),
|
||||
toRedisConverter: (value) => JSON.stringify(value),
|
||||
fromRedisConverter: (value) => JSON.parse(value),
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -114,8 +113,8 @@ export class PushNotificationService implements OnApplicationShutdown {
|
|||
endpoint: subscription.endpoint,
|
||||
auth: subscription.auth,
|
||||
publickey: subscription.publickey,
|
||||
}).then(() => {
|
||||
this.refreshCache(userId);
|
||||
}).then(async () => {
|
||||
await this.refreshCache(userId);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
@ -123,8 +122,8 @@ export class PushNotificationService implements OnApplicationShutdown {
|
|||
}
|
||||
|
||||
@bindThis
|
||||
public refreshCache(userId: string): void {
|
||||
this.subscriptionsCache.refresh(userId);
|
||||
public async refreshCache(userId: string): Promise<void> {
|
||||
await this.subscriptionsCache.refresh(userId);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
|
@ -77,8 +77,10 @@ export class UserBlockingService implements OnModuleInit {
|
|||
|
||||
await this.blockingsRepository.insert(blocking);
|
||||
|
||||
this.cacheService.userBlockingCache.refresh(blocker.id);
|
||||
this.cacheService.userBlockedCache.refresh(blockee.id);
|
||||
await Promise.all([
|
||||
this.cacheService.userBlockingCache.delete(blocker.id),
|
||||
this.cacheService.userBlockedCache.delete(blockee.id),
|
||||
]);
|
||||
|
||||
this.globalEventService.publishInternalEvent('blockingCreated', {
|
||||
blockerId: blocker.id,
|
||||
|
@ -168,8 +170,10 @@ export class UserBlockingService implements OnModuleInit {
|
|||
|
||||
await this.blockingsRepository.delete(blocking.id);
|
||||
|
||||
this.cacheService.userBlockingCache.refresh(blocker.id);
|
||||
this.cacheService.userBlockedCache.refresh(blockee.id);
|
||||
await Promise.all([
|
||||
this.cacheService.userBlockingCache.delete(blocker.id),
|
||||
this.cacheService.userBlockedCache.delete(blockee.id),
|
||||
]);
|
||||
|
||||
this.globalEventService.publishInternalEvent('blockingDeleted', {
|
||||
blockerId: blocker.id,
|
||||
|
|
|
@ -29,6 +29,7 @@ import { AccountMoveService } from '@/core/AccountMoveService.js';
|
|||
import { UtilityService } from '@/core/UtilityService.js';
|
||||
import type { ThinUser } from '@/queue/types.js';
|
||||
import { LoggerService } from '@/core/LoggerService.js';
|
||||
import { InternalEventService } from '@/core/InternalEventService.js';
|
||||
import type Logger from '../logger.js';
|
||||
|
||||
type Local = MiLocalUser | {
|
||||
|
@ -86,6 +87,7 @@ export class UserFollowingService implements OnModuleInit {
|
|||
private accountMoveService: AccountMoveService,
|
||||
private perUserFollowingChart: PerUserFollowingChart,
|
||||
private instanceChart: InstanceChart,
|
||||
private readonly internalEventService: InternalEventService,
|
||||
|
||||
loggerService: LoggerService,
|
||||
) {
|
||||
|
@ -264,7 +266,8 @@ export class UserFollowingService implements OnModuleInit {
|
|||
}
|
||||
});
|
||||
|
||||
this.cacheService.userFollowingsCache.refresh(follower.id);
|
||||
// Handled by CacheService
|
||||
//this.cacheService.userFollowingsCache.refresh(follower.id);
|
||||
|
||||
const requestExist = await this.followRequestsRepository.exists({
|
||||
where: {
|
||||
|
@ -291,7 +294,7 @@ export class UserFollowingService implements OnModuleInit {
|
|||
}, followee.id);
|
||||
}
|
||||
|
||||
this.globalEventService.publishInternalEvent('follow', { followerId: follower.id, followeeId: followee.id });
|
||||
await this.internalEventService.emit('follow', { followerId: follower.id, followeeId: followee.id });
|
||||
|
||||
const [followeeUser, followerUser] = await Promise.all([
|
||||
this.usersRepository.findOneByOrFail({ id: followee.id }),
|
||||
|
@ -381,7 +384,8 @@ export class UserFollowingService implements OnModuleInit {
|
|||
|
||||
await this.followingsRepository.delete(following.id);
|
||||
|
||||
this.cacheService.userFollowingsCache.refresh(follower.id);
|
||||
// Handled by CacheService
|
||||
// this.cacheService.userFollowingsCache.refresh(follower.id);
|
||||
|
||||
this.decrementFollowing(following.follower, following.followee);
|
||||
|
||||
|
@ -412,7 +416,7 @@ export class UserFollowingService implements OnModuleInit {
|
|||
follower: MiUser,
|
||||
followee: MiUser,
|
||||
): Promise<void> {
|
||||
this.globalEventService.publishInternalEvent('unfollow', { followerId: follower.id, followeeId: followee.id });
|
||||
await this.internalEventService.emit('unfollow', { followerId: follower.id, followeeId: followee.id });
|
||||
|
||||
// Neither followee nor follower has moved.
|
||||
if (!follower.movedToUri && !followee.movedToUri) {
|
||||
|
|
|
@ -7,14 +7,14 @@ import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
|
|||
import * as Redis from 'ioredis';
|
||||
import type { MiUser } from '@/models/User.js';
|
||||
import type { UserKeypairsRepository } from '@/models/_.js';
|
||||
import { RedisKVCache } from '@/misc/cache.js';
|
||||
import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js';
|
||||
import type { MiUserKeypair } from '@/models/UserKeypair.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
|
||||
@Injectable()
|
||||
export class UserKeypairService implements OnApplicationShutdown {
|
||||
private cache: RedisKVCache<MiUserKeypair>;
|
||||
private cache: MemoryKVCache<MiUserKeypair>;
|
||||
|
||||
constructor(
|
||||
@Inject(DI.redis)
|
||||
|
@ -23,18 +23,12 @@ export class UserKeypairService implements OnApplicationShutdown {
|
|||
@Inject(DI.userKeypairsRepository)
|
||||
private userKeypairsRepository: UserKeypairsRepository,
|
||||
) {
|
||||
this.cache = new RedisKVCache<MiUserKeypair>(this.redisClient, 'userKeypair', {
|
||||
lifetime: 1000 * 60 * 60 * 24, // 24h
|
||||
memoryCacheLifetime: 1000 * 60 * 60, // 1h
|
||||
fetcher: (key) => this.userKeypairsRepository.findOneByOrFail({ userId: key }),
|
||||
toRedisConverter: (value) => JSON.stringify(value),
|
||||
fromRedisConverter: (value) => JSON.parse(value),
|
||||
});
|
||||
this.cache = new MemoryKVCache<MiUserKeypair>(1000 * 60 * 60 * 24); // 24h
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async getUserKeypair(userId: MiUser['id']): Promise<MiUserKeypair> {
|
||||
return await this.cache.fetch(userId);
|
||||
return await this.cache.fetch(userId, () => this.userKeypairsRepository.findOneByOrFail({ userId }));
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
|
|
@ -11,21 +11,22 @@ import type { MiUser } from '@/models/User.js';
|
|||
import type { MiUserList } from '@/models/UserList.js';
|
||||
import type { MiUserListMembership } from '@/models/UserListMembership.js';
|
||||
import { IdService } from '@/core/IdService.js';
|
||||
import type { GlobalEvents } from '@/core/GlobalEventService.js';
|
||||
import type { GlobalEvents, InternalEventTypes } from '@/core/GlobalEventService.js';
|
||||
import { GlobalEventService } from '@/core/GlobalEventService.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import { UserEntityService } from '@/core/entities/UserEntityService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { QueueService } from '@/core/QueueService.js';
|
||||
import { RedisKVCache } from '@/misc/cache.js';
|
||||
import { QuantumKVCache, RedisKVCache } from '@/misc/cache.js';
|
||||
import { RoleService } from '@/core/RoleService.js';
|
||||
import { SystemAccountService } from '@/core/SystemAccountService.js';
|
||||
import { InternalEventService } from '@/core/InternalEventService.js';
|
||||
|
||||
@Injectable()
|
||||
export class UserListService implements OnApplicationShutdown, OnModuleInit {
|
||||
public static TooManyUsersError = class extends Error {};
|
||||
|
||||
public membersCache: RedisKVCache<Set<string>>;
|
||||
public membersCache: QuantumKVCache<Set<string>>;
|
||||
private roleService: RoleService;
|
||||
|
||||
constructor(
|
||||
|
@ -48,16 +49,15 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit {
|
|||
private globalEventService: GlobalEventService,
|
||||
private queueService: QueueService,
|
||||
private systemAccountService: SystemAccountService,
|
||||
private readonly internalEventService: InternalEventService,
|
||||
) {
|
||||
this.membersCache = new RedisKVCache<Set<string>>(this.redisClient, 'userListMembers', {
|
||||
this.membersCache = new QuantumKVCache<Set<string>>(this.internalEventService, 'userListMembers', {
|
||||
lifetime: 1000 * 60 * 30, // 30m
|
||||
memoryCacheLifetime: 1000 * 60, // 1m
|
||||
fetcher: (key) => this.userListMembershipsRepository.find({ where: { userListId: key }, select: ['userId'] }).then(xs => new Set(xs.map(x => x.userId))),
|
||||
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
|
||||
fromRedisConverter: (value) => new Set(JSON.parse(value)),
|
||||
});
|
||||
|
||||
this.redisForSub.on('message', this.onMessage);
|
||||
this.internalEventService.on('userListMemberAdded', this.onMessage);
|
||||
this.internalEventService.on('userListMemberRemoved', this.onMessage);
|
||||
}
|
||||
|
||||
async onModuleInit() {
|
||||
|
@ -65,24 +65,21 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit {
|
|||
}
|
||||
|
||||
@bindThis
|
||||
private async onMessage(_: string, data: string): Promise<void> {
|
||||
const obj = JSON.parse(data);
|
||||
|
||||
if (obj.channel === 'internal') {
|
||||
const { type, body } = obj.message as GlobalEvents['internal']['payload'];
|
||||
private async onMessage<E extends 'userListMemberAdded' | 'userListMemberRemoved'>(body: InternalEventTypes[E], type: E): Promise<void> {
|
||||
{
|
||||
switch (type) {
|
||||
case 'userListMemberAdded': {
|
||||
const { userListId, memberId } = body;
|
||||
const members = await this.membersCache.get(userListId);
|
||||
if (members) {
|
||||
if (this.membersCache.has(userListId)) {
|
||||
const members = await this.membersCache.get(userListId);
|
||||
members.add(memberId);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'userListMemberRemoved': {
|
||||
const { userListId, memberId } = body;
|
||||
const members = await this.membersCache.get(userListId);
|
||||
if (members) {
|
||||
if (this.membersCache.has(userListId)) {
|
||||
const members = await this.membersCache.get(userListId);
|
||||
members.delete(memberId);
|
||||
}
|
||||
break;
|
||||
|
@ -150,7 +147,8 @@ export class UserListService implements OnApplicationShutdown, OnModuleInit {
|
|||
|
||||
@bindThis
|
||||
public dispose(): void {
|
||||
this.redisForSub.off('message', this.onMessage);
|
||||
this.internalEventService.off('userListMemberAdded', this.onMessage);
|
||||
this.internalEventService.off('userListMemberRemoved', this.onMessage);
|
||||
this.membersCache.dispose();
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ export class UserMutingService {
|
|||
muteeId: target.id,
|
||||
});
|
||||
|
||||
this.cacheService.userMutingsCache.refresh(user.id);
|
||||
await this.cacheService.userMutingsCache.delete(user.id);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
@ -43,9 +43,8 @@ export class UserMutingService {
|
|||
id: In(mutings.map(m => m.id)),
|
||||
});
|
||||
|
||||
const muterIds = [...new Set(mutings.map(m => m.muterId))];
|
||||
for (const muterId of muterIds) {
|
||||
this.cacheService.userMutingsCache.refresh(muterId);
|
||||
}
|
||||
await Promise.all(Array
|
||||
.from(new Set(mutings.map(m => m.muterId)))
|
||||
.map(muterId => this.cacheService.userMutingsCache.delete(muterId)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ export class UserRenoteMutingService {
|
|||
muteeId: target.id,
|
||||
});
|
||||
|
||||
await this.cacheService.renoteMutingsCache.refresh(user.id);
|
||||
await this.cacheService.renoteMutingsCache.delete(user.id);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
@ -44,9 +44,8 @@ export class UserRenoteMutingService {
|
|||
id: In(mutings.map(m => m.id)),
|
||||
});
|
||||
|
||||
const muterIds = [...new Set(mutings.map(m => m.muterId))];
|
||||
for (const muterId of muterIds) {
|
||||
await this.cacheService.renoteMutingsCache.refresh(muterId);
|
||||
}
|
||||
await Promise.all(Array
|
||||
.from(new Set(mutings.map(m => m.muterId)))
|
||||
.map(muterId => this.cacheService.renoteMutingsCache.delete(muterId)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,7 +47,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
alwaysMarkNsfw: true,
|
||||
});
|
||||
|
||||
await this.cacheService.userProfileCache.refresh(ps.userId);
|
||||
await this.cacheService.userProfileCache.delete(ps.userId);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -617,7 +617,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
|
||||
const updatedProfile = await this.userProfilesRepository.findOneByOrFail({ userId: user.id });
|
||||
|
||||
this.cacheService.userProfileCache.set(user.id, updatedProfile);
|
||||
await this.cacheService.userProfileCache.set(user.id, updatedProfile);
|
||||
|
||||
// Publish meUpdated event
|
||||
this.globalEventService.publishMainStream(user.id, 'meUpdated', iObj);
|
||||
|
|
|
@ -104,7 +104,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
sendReadMessage: ps.sendReadMessage,
|
||||
});
|
||||
|
||||
this.pushNotificationService.refreshCache(me.id);
|
||||
await this.pushNotificationService.refreshCache(me.id);
|
||||
|
||||
return {
|
||||
state: 'subscribed' as const,
|
||||
|
|
|
@ -46,7 +46,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
});
|
||||
|
||||
if (me) {
|
||||
this.pushNotificationService.refreshCache(me.id);
|
||||
await this.pushNotificationService.refreshCache(me.id);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -86,7 +86,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
|||
sendReadMessage: swSubscription.sendReadMessage,
|
||||
});
|
||||
|
||||
this.pushNotificationService.refreshCache(me.id);
|
||||
await this.pushNotificationService.refreshCache(me.id);
|
||||
|
||||
return {
|
||||
userId: swSubscription.userId,
|
||||
|
|
Loading…
Add table
Reference in a new issue