Fix mutex scoping, map payload in-place
This commit is contained in:
		
							parent
							
								
									af570c6581
								
							
						
					
					
						commit
						c3d5de600d
					
				
					 6 changed files with 50 additions and 49 deletions
				
			
		
							
								
								
									
										2
									
								
								.env
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								.env
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,2 @@ | ||||||
|  | HOSTNAME=localhost:8079 | ||||||
|  | PORT=8079 | ||||||
							
								
								
									
										2
									
								
								Cargo.lock
									
										
									
										generated
									
									
									
								
							
							
						
						
									
										2
									
								
								Cargo.lock
									
										
									
										generated
									
									
									
								
							|  | @ -1933,7 +1933,7 @@ checksum = "b5eb417147ba9860a96cfe72a0b93bf88fee1744b5636ec99ab20c1aa9376581" | ||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "relay" | name = "relay" | ||||||
| version = "0.2.0" | version = "0.2.1" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "activitystreams", |  "activitystreams", | ||||||
|  "activitystreams-ext", |  "activitystreams-ext", | ||||||
|  |  | ||||||
|  | @ -1,7 +1,7 @@ | ||||||
| [package] | [package] | ||||||
| name = "relay" | name = "relay" | ||||||
| description = "A simple activitypub relay" | description = "A simple activitypub relay" | ||||||
| version = "0.2.0" | version = "0.2.1" | ||||||
| authors = ["asonix <asonix@asonix.dog>"] | authors = ["asonix <asonix@asonix.dog>"] | ||||||
| license-file = "LICENSE" | license-file = "LICENSE" | ||||||
| readme = "README.md" | readme = "README.md" | ||||||
|  |  | ||||||
|  | @ -1,14 +1,12 @@ | ||||||
| use actix_web::{ | use actix_web::{ | ||||||
|     dev::{Payload, Service, ServiceRequest, Transform}, |     dev::{Payload, Service, ServiceRequest, Transform}, | ||||||
|     http::StatusCode, |     http::{Method, StatusCode}, | ||||||
|     web::BytesMut, |     web::BytesMut, | ||||||
|     HttpMessage, HttpResponse, ResponseError, |     HttpMessage, HttpResponse, ResponseError, | ||||||
| }; | }; | ||||||
| use futures::{ | use futures::{ | ||||||
|     channel::mpsc::channel, |     future::{ok, LocalBoxFuture, Ready, TryFutureExt}, | ||||||
|     future::{ok, try_join, LocalBoxFuture, Ready}, |     stream::{once, TryStreamExt}, | ||||||
|     sink::SinkExt, |  | ||||||
|     stream::StreamExt, |  | ||||||
| }; | }; | ||||||
| use log::{error, info}; | use log::{error, info}; | ||||||
| use std::task::{Context, Poll}; | use std::task::{Context, Poll}; | ||||||
|  | @ -68,40 +66,23 @@ where | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     fn call(&mut self, mut req: S::Request) -> Self::Future { |     fn call(&mut self, mut req: S::Request) -> Self::Future { | ||||||
|         if self.0 { |         if self.0 && req.method() == Method::POST { | ||||||
|             let (mut tx, rx) = channel(0); |             let pl = req.take_payload(); | ||||||
| 
 |             req.set_payload(Payload::Stream(Box::pin(once( | ||||||
|             let mut pl = req.take_payload(); |                 pl.try_fold(BytesMut::new(), |mut acc, bytes| async { | ||||||
|             req.set_payload(Payload::Stream(Box::pin(rx))); |                     acc.extend(bytes); | ||||||
|  |                     Ok(acc) | ||||||
|  |                 }) | ||||||
|  |                 .map_ok(|bytes| { | ||||||
|  |                     let bytes = bytes.freeze(); | ||||||
|  |                     info!("{}", String::from_utf8_lossy(&bytes)); | ||||||
|  |                     bytes | ||||||
|  |                 }), | ||||||
|  |             )))); | ||||||
| 
 | 
 | ||||||
|             let fut = self.1.call(req); |             let fut = self.1.call(req); | ||||||
| 
 | 
 | ||||||
|             let payload_fut = async move { |             Box::pin(async move { fut.await }) | ||||||
|                 let mut bytes = BytesMut::new(); |  | ||||||
| 
 |  | ||||||
|                 while let Some(res) = pl.next().await { |  | ||||||
|                     let b = res.map_err(|e| { |  | ||||||
|                         error!("Payload error, {}", e); |  | ||||||
|                         DebugError |  | ||||||
|                     })?; |  | ||||||
|                     bytes.extend(b); |  | ||||||
|                 } |  | ||||||
| 
 |  | ||||||
|                 info!("{}", String::from_utf8_lossy(bytes.as_ref())); |  | ||||||
| 
 |  | ||||||
|                 tx.send(Ok(bytes.freeze())).await.map_err(|e| { |  | ||||||
|                     error!("Error sending bytes, {}", e); |  | ||||||
|                     DebugError |  | ||||||
|                 })?; |  | ||||||
| 
 |  | ||||||
|                 Ok(()) as Result<(), actix_web::Error> |  | ||||||
|             }; |  | ||||||
| 
 |  | ||||||
|             Box::pin(async move { |  | ||||||
|                 let (res, _) = try_join(fut, payload_fut).await?; |  | ||||||
| 
 |  | ||||||
|                 Ok(res) |  | ||||||
|             }) |  | ||||||
|         } else { |         } else { | ||||||
|             let fut = self.1.call(req); |             let fut = self.1.call(req); | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -63,7 +63,7 @@ impl MyVerify { | ||||||
|             actor_id |             actor_id | ||||||
|         } else { |         } else { | ||||||
|             self.0 |             self.0 | ||||||
|                 .fetch_json::<PublicKeyResponse>(public_key_id.as_str()) |                 .fetch::<PublicKeyResponse>(public_key_id.as_str()) | ||||||
|                 .await? |                 .await? | ||||||
|                 .actor_id() |                 .actor_id() | ||||||
|                 .ok_or_else(|| MyError::MissingId)? |                 .ok_or_else(|| MyError::MissingId)? | ||||||
|  |  | ||||||
|  | @ -39,11 +39,20 @@ impl Breakers { | ||||||
| 
 | 
 | ||||||
|     async fn fail(&self, url: &Url) { |     async fn fail(&self, url: &Url) { | ||||||
|         if let Some(domain) = url.domain() { |         if let Some(domain) = url.domain() { | ||||||
|             if let Some(breaker) = self.inner.read().await.get(domain) { |             let should_write = { | ||||||
|                 let owned_breaker = Arc::clone(&breaker); |                 let read = self.inner.read().await; | ||||||
|                 drop(breaker); | 
 | ||||||
|                 owned_breaker.lock().await.fail(); |                 if let Some(breaker) = read.get(domain) { | ||||||
|             } else { |                     let owned_breaker = Arc::clone(&breaker); | ||||||
|  |                     drop(breaker); | ||||||
|  |                     owned_breaker.lock().await.fail(); | ||||||
|  |                     false | ||||||
|  |                 } else { | ||||||
|  |                     true | ||||||
|  |                 } | ||||||
|  |             }; | ||||||
|  | 
 | ||||||
|  |             if should_write { | ||||||
|                 let mut hm = self.inner.write().await; |                 let mut hm = self.inner.write().await; | ||||||
|                 let breaker = hm |                 let breaker = hm | ||||||
|                     .entry(domain.to_owned()) |                     .entry(domain.to_owned()) | ||||||
|  | @ -55,11 +64,20 @@ impl Breakers { | ||||||
| 
 | 
 | ||||||
|     async fn succeed(&self, url: &Url) { |     async fn succeed(&self, url: &Url) { | ||||||
|         if let Some(domain) = url.domain() { |         if let Some(domain) = url.domain() { | ||||||
|             if let Some(breaker) = self.inner.read().await.get(domain) { |             let should_write = { | ||||||
|                 let owned_breaker = Arc::clone(&breaker); |                 let read = self.inner.read().await; | ||||||
|                 drop(breaker); | 
 | ||||||
|                 owned_breaker.lock().await.succeed(); |                 if let Some(breaker) = read.get(domain) { | ||||||
|             } else { |                     let owned_breaker = Arc::clone(&breaker); | ||||||
|  |                     drop(breaker); | ||||||
|  |                     owned_breaker.lock().await.succeed(); | ||||||
|  |                     false | ||||||
|  |                 } else { | ||||||
|  |                     true | ||||||
|  |                 } | ||||||
|  |             }; | ||||||
|  | 
 | ||||||
|  |             if should_write { | ||||||
|                 let mut hm = self.inner.write().await; |                 let mut hm = self.inner.write().await; | ||||||
|                 let breaker = hm |                 let breaker = hm | ||||||
|                     .entry(domain.to_owned()) |                     .entry(domain.to_owned()) | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		
		Reference in a new issue