mirror of
				https://codeberg.org/yeentown/barkey.git
				synced 2025-10-25 18:54:52 +00:00 
			
		
		
		
	resolve collection items in parallel
This commit is contained in:
		
							parent
							
								
									3da3ce9a40
								
							
						
					
					
						commit
						a3f9ff68fa
					
				
					 1 changed files with 44 additions and 38 deletions
				
			
		|  | @ -5,6 +5,7 @@ | ||||||
| 
 | 
 | ||||||
| import { Inject, Injectable } from '@nestjs/common'; | import { Inject, Injectable } from '@nestjs/common'; | ||||||
| import { IsNull, Not } from 'typeorm'; | import { IsNull, Not } from 'typeorm'; | ||||||
|  | import promiseLimit from 'promise-limit'; | ||||||
| import type { MiLocalUser, MiRemoteUser } from '@/models/User.js'; | import type { MiLocalUser, MiRemoteUser } from '@/models/User.js'; | ||||||
| import type { NotesRepository, PollsRepository, NoteReactionsRepository, UsersRepository, FollowRequestsRepository, MiMeta, SkApFetchLog } from '@/models/_.js'; | import type { NotesRepository, PollsRepository, NoteReactionsRepository, UsersRepository, FollowRequestsRepository, MiMeta, SkApFetchLog } from '@/models/_.js'; | ||||||
| import type { Config } from '@/config.js'; | import type { Config } from '@/config.js'; | ||||||
|  | @ -64,13 +65,16 @@ export class Resolver { | ||||||
| 		return this.recursionLimit; | 		return this.recursionLimit; | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	public async resolveCollection(value: string, allowAnonymous?: boolean): Promise<AnyCollection & IObjectWithId>; | 	public async resolveCollection(value: string | IObjectWithId, allowAnonymous?: boolean, sentFromUri?: string): Promise<AnyCollection & IObjectWithId>; | ||||||
| 	public async resolveCollection(value: string | IObject, allowAnonymous?: boolean): Promise<AnyCollection>; | 	public async resolveCollection(value: string | IObject, allowAnonymous: boolean | undefined, sentFromUri: string): Promise<AnyCollection & IObjectWithId>; | ||||||
|  | 	public async resolveCollection(value: string | IObject, allowAnonymous?: boolean, sentFromUri?: string): Promise<AnyCollection>; | ||||||
| 	@bindThis | 	@bindThis | ||||||
| 	public async resolveCollection(value: string | IObject, allowAnonymous?: boolean): Promise<AnyCollection> { | 	public async resolveCollection(value: string | IObject, allowAnonymous?: boolean, sentFromUri?: string): Promise<AnyCollection> { | ||||||
| 		const collection = typeof value === 'string' | 		const collection = typeof value === 'string' | ||||||
| 			? await this.resolve(value, allowAnonymous) | 			? sentFromUri | ||||||
| 			: value; | 				? await this.secureResolve(value, sentFromUri, allowAnonymous) | ||||||
|  | 				: await this.resolve(value, allowAnonymous) | ||||||
|  | 			: value; // TODO try and remove this eventually, as it's a major security foot-gun
 | ||||||
| 
 | 
 | ||||||
| 		if (isCollectionOrOrderedCollection(collection)) { | 		if (isCollectionOrOrderedCollection(collection)) { | ||||||
| 			return collection; | 			return collection; | ||||||
|  | @ -87,65 +91,66 @@ export class Resolver { | ||||||
| 	 * @param collection Collection to resolve from - can be a URL or object of any supported collection type. | 	 * @param collection Collection to resolve from - can be a URL or object of any supported collection type. | ||||||
| 	 * @param limit Maximum number of items to resolve. If null or undefined (default), then items will be resolved until reaching the recursion limit. | 	 * @param limit Maximum number of items to resolve. If null or undefined (default), then items will be resolved until reaching the recursion limit. | ||||||
| 	 * @param allowAnonymousItems If true, collection items can be anonymous (lack an ID). If false (default), then an error is thrown when reaching an item without ID. | 	 * @param allowAnonymousItems If true, collection items can be anonymous (lack an ID). If false (default), then an error is thrown when reaching an item without ID. | ||||||
|  | 	 * @param concurrency Maximum number of items to resolve at once. (default: 4) | ||||||
| 	 */ | 	 */ | ||||||
| 	@bindThis | 	@bindThis | ||||||
| 	public async resolveCollectionItems(collection: string | IObject, limit?: number | null, allowAnonymousItems?: boolean): Promise<IObjectWithId[]> { | 	public async resolveCollectionItems(collection: string | IObjectWithId, limit?: number | null, allowAnonymousItems?: boolean, concurrency = 4): Promise<IObjectWithId[]> { | ||||||
| 		const items: IObjectWithId[] = []; | 		const resolvedItems: IObjectWithId[] = []; | ||||||
| 
 | 
 | ||||||
| 		const collectionObj = await this.resolveCollection(collection); |  | ||||||
| 		await this.resolveCollectionItemsTo(collectionObj, limit ?? undefined, allowAnonymousItems, items); |  | ||||||
| 
 |  | ||||||
| 		return items; |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	private async resolveCollectionItemsTo(current: AnyCollection | null, limit: number | undefined, allowAnonymousItems: boolean | undefined, destination: IObjectWithId[]): Promise<void> { |  | ||||||
| 		// This is pulled up to avoid code duplication below
 | 		// This is pulled up to avoid code duplication below
 | ||||||
| 		const iterate = async(items: ApObject): Promise<void> => { | 		const iterate = async(items: ApObject, current: AnyCollection & IObjectWithId) => { | ||||||
| 			for (const item of toArray(items)) { | 			const sentFrom = current.id; | ||||||
| 				// Stop when we reach the fetch limit
 | 			const itemArr = toArray(items); | ||||||
| 				if (this.history.size > this.recursionLimit) break; | 			const itemLimit = limit ?? Number.MAX_SAFE_INTEGER; | ||||||
| 
 | 			const allowAnonymous = allowAnonymousItems ?? false; | ||||||
| 				// Stop when we reach the item limit
 | 			await this.resolveItemArray(itemArr, sentFrom, itemLimit, concurrency, allowAnonymous, resolvedItems); | ||||||
| 				if (limit != null && limit < 1) break; |  | ||||||
| 
 |  | ||||||
| 				// Use secureResolve whenever possible, to avoid re-fetching items that were included inline.
 |  | ||||||
| 				const resolved = current?.id |  | ||||||
| 					? await this.secureResolve(item, current.id, allowAnonymousItems) |  | ||||||
| 					: await this.resolve(getApId(item), allowAnonymousItems); |  | ||||||
| 				destination.push(resolved); |  | ||||||
| 
 |  | ||||||
| 				// Decrement the outer variable directly, because the code below checks it too
 |  | ||||||
| 				if (limit != null) limit--; |  | ||||||
| 			} |  | ||||||
| 		}; | 		}; | ||||||
| 
 | 
 | ||||||
| 		while (current != null) { | 		let current: (AnyCollection & IObjectWithId) | null = await this.resolveCollection(collection); | ||||||
|  | 		do { | ||||||
| 			// Iterate all items in the current page
 | 			// Iterate all items in the current page
 | ||||||
| 			if (current.items) { | 			if (current.items) { | ||||||
| 				await iterate(current.items); | 				await iterate(current.items, current); | ||||||
| 			} | 			} | ||||||
| 			if (current.orderedItems) { | 			if (current.orderedItems) { | ||||||
| 				await iterate(current.orderedItems); | 				await iterate(current.orderedItems, current); | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			if (this.history.size >= this.recursionLimit) { | 			if (this.history.size >= this.recursionLimit) { | ||||||
| 				// Stop when we reach the fetch limit
 | 				// Stop when we reach the fetch limit
 | ||||||
| 				current = null; | 				current = null; | ||||||
| 			} else if (limit != null && limit < 1) { | 			} else if (limit != null && resolvedItems.length >= limit) { | ||||||
| 				// Stop when we reach the item limit
 | 				// Stop when we reach the item limit
 | ||||||
| 				current = null; | 				current = null; | ||||||
| 			} else if (isCollection(current) || isOrderedCollection(current)) { | 			} else if (isCollection(current) || isOrderedCollection(current)) { | ||||||
| 				// Continue to first page
 | 				// Continue to first page
 | ||||||
| 				current = current.first ? await this.resolveCollection(current.first, true) : null; | 				current = current.first ? await this.resolveCollection(current.first, true, current.id) : null; | ||||||
| 			} else if (isCollectionPage(current) || isOrderedCollectionPage(current)) { | 			} else if (isCollectionPage(current) || isOrderedCollectionPage(current)) { | ||||||
| 				// Continue to next page
 | 				// Continue to next page
 | ||||||
| 				current = current.next ? await this.resolveCollection(current.next, true) : null; | 				current = current.next ? await this.resolveCollection(current.next, true, current.id) : null; | ||||||
| 			} else { | 			} else { | ||||||
| 				// Stop in all other conditions
 | 				// Stop in all other conditions
 | ||||||
| 				current = null; | 				current = null; | ||||||
| 			} | 			} | ||||||
|  | 		} while (current != null); | ||||||
|  | 
 | ||||||
|  | 		return resolvedItems; | ||||||
| 	} | 	} | ||||||
| 	} | 
 | ||||||
|  | 	private async resolveItemArray(source: (string | IObject)[], sentFrom: string, itemLimit: number, concurrency: number, allowAnonymousItems: boolean, destination: IObjectWithId[]): Promise<void> { | ||||||
|  | 		const recursionLimit = this.recursionLimit - this.history.size; | ||||||
|  | 		const batchLimit = Math.min(source.length, recursionLimit, itemLimit); | ||||||
|  | 
 | ||||||
|  | 		const limiter = promiseLimit<IObjectWithId>(concurrency); | ||||||
|  | 		const batch = await Promise.all(source | ||||||
|  | 			.slice(0, batchLimit) | ||||||
|  | 			.map(item => limiter(async () => { | ||||||
|  | 				// Use secureResolve to avoid re-fetching items that were included inline.
 | ||||||
|  | 				return await this.secureResolve(item, sentFrom, allowAnonymousItems); | ||||||
|  | 			}))); | ||||||
|  | 
 | ||||||
|  | 		destination.push(...batch); | ||||||
|  | 	}; | ||||||
| 
 | 
 | ||||||
| 	/** | 	/** | ||||||
| 	 * Securely resolves an AP object or URL that has been sent from another instance. | 	 * Securely resolves an AP object or URL that has been sent from another instance. | ||||||
|  | @ -185,6 +190,7 @@ export class Resolver { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	public async resolve(value: string | [string], allowAnonymous?: boolean): Promise<IObjectWithId>; | 	public async resolve(value: string | [string], allowAnonymous?: boolean): Promise<IObjectWithId>; | ||||||
|  | 	public async resolve(value: string | IObjectWithId | [string | IObjectWithId], allowAnonymous?: boolean): Promise<IObjectWithId>; | ||||||
| 	public async resolve(value: string | IObject | [string | IObject], allowAnonymous?: boolean): Promise<IObject>; | 	public async resolve(value: string | IObject | [string | IObject], allowAnonymous?: boolean): Promise<IObject>; | ||||||
| 	/** | 	/** | ||||||
| 	 * Resolves a URL or object to an AP object. | 	 * Resolves a URL or object to an AP object. | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		
		Reference in a new issue