Use async-cpupool
This commit is contained in:
		
							parent
							
								
									708e7da301
								
							
						
					
					
						commit
						8540e93469
					
				
					 4 changed files with 33 additions and 127 deletions
				
			
		
							
								
								
									
										4
									
								
								Cargo.lock
									
										
									
										generated
									
									
									
								
							
							
						
						
									
										4
									
								
								Cargo.lock
									
										
									
										generated
									
									
									
								
							| 
						 | 
				
			
			@ -1746,9 +1746,9 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
 | 
			
		|||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "lru"
 | 
			
		||||
version = "0.11.1"
 | 
			
		||||
version = "0.12.1"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21"
 | 
			
		||||
checksum = "2994eeba8ed550fd9b47a0b38f0242bc3344e496483c6180b69139cc2fa5d1d7"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "hashbrown 0.14.2",
 | 
			
		||||
]
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -37,7 +37,7 @@ console-subscriber = { version = "0.2", optional = true }
 | 
			
		|||
dashmap = "5.1.0"
 | 
			
		||||
dotenv = "0.15.0"
 | 
			
		||||
flume = "0.11.0"
 | 
			
		||||
lru = "0.11.0"
 | 
			
		||||
lru = "0.12.0"
 | 
			
		||||
metrics = "0.21.0"
 | 
			
		||||
metrics-exporter-prometheus = { version = "0.12.0", default-features = false, features = [
 | 
			
		||||
  "http-listener",
 | 
			
		||||
| 
						 | 
				
			
			@ -49,6 +49,7 @@ opentelemetry = "0.21"
 | 
			
		|||
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] }
 | 
			
		||||
opentelemetry-otlp = "0.14"
 | 
			
		||||
pin-project-lite = "0.2.9"
 | 
			
		||||
# pinned to metrics-util
 | 
			
		||||
quanta = "0.11.0"
 | 
			
		||||
rand = "0.8"
 | 
			
		||||
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "stream"]}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										11
									
								
								src/main.rs
									
										
									
									
									
								
							
							
						
						
									
										11
									
								
								src/main.rs
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -311,11 +311,11 @@ async fn do_server_main(
 | 
			
		|||
        }
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    let verify_spawner = Spawner::build("verify-cpu", verify_threads)?;
 | 
			
		||||
    let sign_spawner = Spawner::build("sign-cpu", signature_threads)?;
 | 
			
		||||
    let verify_spawner = Spawner::build("verify-cpu", verify_threads.try_into()?);
 | 
			
		||||
    let sign_spawner = Spawner::build("sign-cpu", signature_threads.try_into()?);
 | 
			
		||||
 | 
			
		||||
    let key_id = config.generate_url(UrlKind::MainKey).to_string();
 | 
			
		||||
    let state = State::build(db.clone(), key_id, sign_spawner, client).await?;
 | 
			
		||||
    let state = State::build(db.clone(), key_id, sign_spawner.clone(), client).await?;
 | 
			
		||||
 | 
			
		||||
    if let Some((token, admin_handle)) = config.telegram_info() {
 | 
			
		||||
        tracing::warn!("Creating telegram handler");
 | 
			
		||||
| 
						 | 
				
			
			@ -325,6 +325,8 @@ async fn do_server_main(
 | 
			
		|||
    let keys = config.open_keys()?;
 | 
			
		||||
 | 
			
		||||
    let bind_address = config.bind_address();
 | 
			
		||||
    let sign_spawner2 = sign_spawner.clone();
 | 
			
		||||
    let verify_spawner2 = verify_spawner.clone();
 | 
			
		||||
    let server = HttpServer::new(move || {
 | 
			
		||||
        let job_server =
 | 
			
		||||
            create_workers(state.clone(), actors.clone(), media.clone(), config.clone());
 | 
			
		||||
| 
						 | 
				
			
			@ -410,6 +412,9 @@ async fn do_server_main(
 | 
			
		|||
        server.bind(bind_address)?.run().await?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    sign_spawner2.close().await;
 | 
			
		||||
    verify_spawner2.close().await;
 | 
			
		||||
 | 
			
		||||
    tracing::warn!("Server closed");
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										142
									
								
								src/spawner.rs
									
										
									
									
									
								
							
							
						
						
									
										142
									
								
								src/spawner.rs
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -1,107 +1,31 @@
 | 
			
		|||
use async_cpupool::CpuPool;
 | 
			
		||||
use http_signature_normalization_actix::{Canceled, Spawn};
 | 
			
		||||
use std::{
 | 
			
		||||
    panic::AssertUnwindSafe,
 | 
			
		||||
    sync::Arc,
 | 
			
		||||
    thread::JoinHandle,
 | 
			
		||||
    time::{Duration, Instant},
 | 
			
		||||
};
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
 | 
			
		||||
fn spawner_thread(
 | 
			
		||||
    receiver: flume::Receiver<Box<dyn FnOnce() + Send>>,
 | 
			
		||||
    name: &'static str,
 | 
			
		||||
    id: usize,
 | 
			
		||||
) {
 | 
			
		||||
    let guard = MetricsGuard::guard(name, id);
 | 
			
		||||
 | 
			
		||||
    while let Ok(f) = receiver.recv() {
 | 
			
		||||
        let start = Instant::now();
 | 
			
		||||
        metrics::increment_counter!(format!("relay.{name}.operation.start"), "id" => id.to_string());
 | 
			
		||||
        let res = std::panic::catch_unwind(AssertUnwindSafe(f));
 | 
			
		||||
        metrics::increment_counter!(format!("relay.{name}.operation.end"), "complete" => res.is_ok().to_string(), "id" => id.to_string());
 | 
			
		||||
        metrics::histogram!(format!("relay.{name}.operation.duration"), start.elapsed().as_secs_f64(), "complete" => res.is_ok().to_string(), "id" => id.to_string());
 | 
			
		||||
 | 
			
		||||
        if let Err(e) = res {
 | 
			
		||||
            tracing::warn!("{name} fn panicked: {e:?}");
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    guard.disarm();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Debug)]
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub(crate) struct Spawner {
 | 
			
		||||
    name: &'static str,
 | 
			
		||||
    sender: Option<flume::Sender<Box<dyn FnOnce() + Send>>>,
 | 
			
		||||
    threads: Option<Arc<Vec<JoinHandle<()>>>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct MetricsGuard {
 | 
			
		||||
    name: &'static str,
 | 
			
		||||
    id: usize,
 | 
			
		||||
    start: Instant,
 | 
			
		||||
    armed: bool,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl MetricsGuard {
 | 
			
		||||
    fn guard(name: &'static str, id: usize) -> Self {
 | 
			
		||||
        metrics::increment_counter!(format!("relay.{name}.launched"), "id" => id.to_string());
 | 
			
		||||
 | 
			
		||||
        Self {
 | 
			
		||||
            name,
 | 
			
		||||
            id,
 | 
			
		||||
            start: Instant::now(),
 | 
			
		||||
            armed: true,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn disarm(mut self) {
 | 
			
		||||
        self.armed = false;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Drop for MetricsGuard {
 | 
			
		||||
    fn drop(&mut self) {
 | 
			
		||||
        metrics::increment_counter!(format!("relay.{}.closed", self.name), "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
 | 
			
		||||
        metrics::histogram!(format!("relay.{}.duration", self.name), self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
 | 
			
		||||
        tracing::warn!("Stopping {} - {}", self.name, self.id);
 | 
			
		||||
    }
 | 
			
		||||
    pool: CpuPool,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Spawner {
 | 
			
		||||
    pub(crate) fn build(name: &'static str, threads: usize) -> std::io::Result<Self> {
 | 
			
		||||
        let (sender, receiver) = flume::bounded(8);
 | 
			
		||||
    pub(crate) fn build(name: &'static str, threads: u16) -> Self {
 | 
			
		||||
        let pool = CpuPool::configure()
 | 
			
		||||
            .name(name)
 | 
			
		||||
            .max_threads(threads)
 | 
			
		||||
            .build()
 | 
			
		||||
            .expect("valid configuration");
 | 
			
		||||
 | 
			
		||||
        tracing::warn!("Launching {threads} {name}s");
 | 
			
		||||
        Spawner { pool }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
        let threads = (0..threads)
 | 
			
		||||
            .map(|i| {
 | 
			
		||||
                let receiver = receiver.clone();
 | 
			
		||||
                std::thread::Builder::new()
 | 
			
		||||
                    .name(format!("{name}-{i}"))
 | 
			
		||||
                    .spawn(move || {
 | 
			
		||||
                        spawner_thread(receiver, name, i);
 | 
			
		||||
                    })
 | 
			
		||||
            })
 | 
			
		||||
            .collect::<Result<Vec<_>, _>>()?;
 | 
			
		||||
 | 
			
		||||
        Ok(Spawner {
 | 
			
		||||
            name,
 | 
			
		||||
            sender: Some(sender),
 | 
			
		||||
            threads: Some(Arc::new(threads)),
 | 
			
		||||
        })
 | 
			
		||||
    pub(crate) async fn close(self) {
 | 
			
		||||
        self.pool.close().await;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Drop for Spawner {
 | 
			
		||||
    fn drop(&mut self) {
 | 
			
		||||
        self.sender.take();
 | 
			
		||||
 | 
			
		||||
        if let Some(threads) = self.threads.take().and_then(Arc::into_inner) {
 | 
			
		||||
            tracing::warn!("Joining {}s", self.name);
 | 
			
		||||
            for thread in threads {
 | 
			
		||||
                let _ = thread.join();
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
impl std::fmt::Debug for Spawner {
 | 
			
		||||
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 | 
			
		||||
        f.debug_struct("Spawner").finish()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -144,22 +68,9 @@ impl Spawn for Spawner {
 | 
			
		|||
        Func: FnOnce() -> Out + Send + 'static,
 | 
			
		||||
        Out: Send + 'static,
 | 
			
		||||
    {
 | 
			
		||||
        let sender = self.sender.as_ref().expect("Sender exists").clone();
 | 
			
		||||
        let pool = self.pool.clone();
 | 
			
		||||
 | 
			
		||||
        Box::pin(async move {
 | 
			
		||||
            let (tx, rx) = flume::bounded(1);
 | 
			
		||||
 | 
			
		||||
            let _ = sender
 | 
			
		||||
                .send_async(Box::new(move || {
 | 
			
		||||
                    if tx.try_send((func)()).is_err() {
 | 
			
		||||
                        tracing::warn!("Requestor hung up");
 | 
			
		||||
                        metrics::increment_counter!("relay.spawner.disconnected");
 | 
			
		||||
                    }
 | 
			
		||||
                }))
 | 
			
		||||
                .await;
 | 
			
		||||
 | 
			
		||||
            timer(rx.recv_async()).await.map_err(|_| Canceled)
 | 
			
		||||
        })
 | 
			
		||||
        Box::pin(async move { timer(pool.spawn(func)).await.map_err(|_| Canceled) })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -171,21 +82,10 @@ impl http_signature_normalization_reqwest::Spawn for Spawner {
 | 
			
		|||
        Func: FnOnce() -> Out + Send + 'static,
 | 
			
		||||
        Out: Send + 'static,
 | 
			
		||||
    {
 | 
			
		||||
        let sender = self.sender.as_ref().expect("Sender exists").clone();
 | 
			
		||||
        let pool = self.pool.clone();
 | 
			
		||||
 | 
			
		||||
        Box::pin(async move {
 | 
			
		||||
            let (tx, rx) = flume::bounded(1);
 | 
			
		||||
 | 
			
		||||
            let _ = sender
 | 
			
		||||
                .send_async(Box::new(move || {
 | 
			
		||||
                    if tx.try_send((func)()).is_err() {
 | 
			
		||||
                        tracing::warn!("Requestor hung up");
 | 
			
		||||
                        metrics::increment_counter!("relay.spawner.disconnected");
 | 
			
		||||
                    }
 | 
			
		||||
                }))
 | 
			
		||||
                .await;
 | 
			
		||||
 | 
			
		||||
            timer(rx.recv_async())
 | 
			
		||||
            timer(pool.spawn(func))
 | 
			
		||||
                .await
 | 
			
		||||
                .map_err(|_| http_signature_normalization_reqwest::Canceled)
 | 
			
		||||
        })
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		
		Reference in a new issue