@@ -129,26 +129,29 @@ public void Stop()
129129 processingQueue . CompleteAdding ( ) ;
130130 }
131131
132- private async Task ParseLoop ( ) {
133- using ( var rateLimitParsing = new SemaphoreSlim ( MaxThreads ) ) {
134- while ( await processingQueue . OutputAvailableAsync ( ) ) {
135- try {
136- var file = await processingQueue . TakeAsync ( ) ;
137- await rateLimitParsing . Locked ( ( ) => {
138- _ = WorkerPool . RunBackground ( async ( ) => {
139- var replay = _analyzer . Analyze ( file ) ;
140- if ( replay != null && file . UploadStatus == UploadStatus . Preprocessed ) {
141- await FingerprintingQueue . EnqueueAsync ( ( replay , file ) ) ;
142- }
143- } ) ;
144- } ) ;
145- }
146- catch ( Exception ex ) {
147- _log . Error ( ex , "Error in parse loop" ) ;
148- }
132+ private async Task ParseLoop ( )
133+ {
134+ //OutputAvailableAsync will keep returning true
135+ //untill all data is processed and processQueue.CompleteAdding is called
136+ while ( await processingQueue . OutputAvailableAsync ( ) ) {
137+ try {
138+ var file = await processingQueue . TakeAsync ( ) ;
139+ //don't wait for completion of background pool task.
140+ //it's internally limited to a fixed number of low-priority threads
141+ //so we can throw as much work on there as we want without choking it
142+ _ = WorkerPool . RunBackground ( async ( ) => {
143+ var replay = _analyzer . Analyze ( file ) ;
144+ if ( replay != null && file . UploadStatus == UploadStatus . Preprocessed ) {
145+ await FingerprintingQueue . EnqueueAsync ( ( replay , file ) ) ;
146+ }
147+ } ) ;
148+ }
149+ catch ( Exception ex ) {
150+ _log . Error ( ex , "Error in parse loop" ) ;
149151 }
150152 }
151153 }
154+
152155 private async Task FingerprintLoop ( ) {
153156 while ( true ) {
154157 var UnFingerprinted = await FingerprintingQueue . DequeueAsync ( ) ;
0 commit comments