diff --git a/.env b/.env new file mode 100644 index 0000000..49cf5c0 --- /dev/null +++ b/.env @@ -0,0 +1,2 @@ +HOSTNAME=localhost:8079 +PORT=8079 diff --git a/Cargo.lock b/Cargo.lock index b7302cb..cedd4e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1933,7 +1933,7 @@ checksum = "b5eb417147ba9860a96cfe72a0b93bf88fee1744b5636ec99ab20c1aa9376581" [[package]] name = "relay" -version = "0.2.0" +version = "0.2.1" dependencies = [ "activitystreams", "activitystreams-ext", diff --git a/Cargo.toml b/Cargo.toml index 68f3214..ff905ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "relay" description = "A simple activitypub relay" -version = "0.2.0" +version = "0.2.1" authors = ["asonix "] license-file = "LICENSE" readme = "README.md" diff --git a/src/middleware/payload.rs b/src/middleware/payload.rs index 700d409..2a0d210 100644 --- a/src/middleware/payload.rs +++ b/src/middleware/payload.rs @@ -1,14 +1,12 @@ use actix_web::{ dev::{Payload, Service, ServiceRequest, Transform}, - http::StatusCode, + http::{Method, StatusCode}, web::BytesMut, HttpMessage, HttpResponse, ResponseError, }; use futures::{ - channel::mpsc::channel, - future::{ok, try_join, LocalBoxFuture, Ready}, - sink::SinkExt, - stream::StreamExt, + future::{ok, LocalBoxFuture, Ready, TryFutureExt}, + stream::{once, TryStreamExt}, }; use log::{error, info}; use std::task::{Context, Poll}; @@ -68,40 +66,23 @@ where } fn call(&mut self, mut req: S::Request) -> Self::Future { - if self.0 { - let (mut tx, rx) = channel(0); - - let mut pl = req.take_payload(); - req.set_payload(Payload::Stream(Box::pin(rx))); + if self.0 && req.method() == Method::POST { + let pl = req.take_payload(); + req.set_payload(Payload::Stream(Box::pin(once( + pl.try_fold(BytesMut::new(), |mut acc, bytes| async { + acc.extend(bytes); + Ok(acc) + }) + .map_ok(|bytes| { + let bytes = bytes.freeze(); + info!("{}", String::from_utf8_lossy(&bytes)); + bytes + }), + )))); let fut = self.1.call(req); - let payload_fut = async move { - let mut bytes = BytesMut::new(); - - while let Some(res) = pl.next().await { - let b = res.map_err(|e| { - error!("Payload error, {}", e); - DebugError - })?; - bytes.extend(b); - } - - info!("{}", String::from_utf8_lossy(bytes.as_ref())); - - tx.send(Ok(bytes.freeze())).await.map_err(|e| { - error!("Error sending bytes, {}", e); - DebugError - })?; - - Ok(()) as Result<(), actix_web::Error> - }; - - Box::pin(async move { - let (res, _) = try_join(fut, payload_fut).await?; - - Ok(res) - }) + Box::pin(async move { fut.await }) } else { let fut = self.1.call(req); diff --git a/src/middleware/verifier.rs b/src/middleware/verifier.rs index 8f838d7..67261a0 100644 --- a/src/middleware/verifier.rs +++ b/src/middleware/verifier.rs @@ -63,7 +63,7 @@ impl MyVerify { actor_id } else { self.0 - .fetch_json::(public_key_id.as_str()) + .fetch::(public_key_id.as_str()) .await? .actor_id() .ok_or_else(|| MyError::MissingId)? diff --git a/src/requests.rs b/src/requests.rs index c138c5c..8ab206c 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -39,11 +39,20 @@ impl Breakers { async fn fail(&self, url: &Url) { if let Some(domain) = url.domain() { - if let Some(breaker) = self.inner.read().await.get(domain) { - let owned_breaker = Arc::clone(&breaker); - drop(breaker); - owned_breaker.lock().await.fail(); - } else { + let should_write = { + let read = self.inner.read().await; + + if let Some(breaker) = read.get(domain) { + let owned_breaker = Arc::clone(&breaker); + drop(breaker); + owned_breaker.lock().await.fail(); + false + } else { + true + } + }; + + if should_write { let mut hm = self.inner.write().await; let breaker = hm .entry(domain.to_owned()) @@ -55,11 +64,20 @@ impl Breakers { async fn succeed(&self, url: &Url) { if let Some(domain) = url.domain() { - if let Some(breaker) = self.inner.read().await.get(domain) { - let owned_breaker = Arc::clone(&breaker); - drop(breaker); - owned_breaker.lock().await.succeed(); - } else { + let should_write = { + let read = self.inner.read().await; + + if let Some(breaker) = read.get(domain) { + let owned_breaker = Arc::clone(&breaker); + drop(breaker); + owned_breaker.lock().await.succeed(); + false + } else { + true + } + }; + + if should_write { let mut hm = self.inner.write().await; let breaker = hm .entry(domain.to_owned())