Clean db code, switch to deadpool
This commit is contained in:
		
							parent
							
								
									4d07476fe7
								
							
						
					
					
						commit
						82e8042c66
					
				
					 9 changed files with 149 additions and 163 deletions
				
			
		
							
								
								
									
										82
									
								
								Cargo.lock
									
										
									
										generated
									
									
									
								
							
							
						
						
									
										82
									
								
								Cargo.lock
									
										
									
										generated
									
									
									
								
							| 
						 | 
				
			
			@ -470,8 +470,9 @@ dependencies = [
 | 
			
		|||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "background-jobs"
 | 
			
		||||
version = "0.8.0-alpha.1"
 | 
			
		||||
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#f8fa1bb5ef71724053c8fe68b75d900a51cb4f45"
 | 
			
		||||
version = "0.8.0-alpha.2"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "fb38c4a5de33324650e9023829b0f4129eb5418b29f5dfe69a52100ff5bc50d7"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "background-jobs-actix",
 | 
			
		||||
 "background-jobs-core",
 | 
			
		||||
| 
						 | 
				
			
			@ -479,8 +480,9 @@ dependencies = [
 | 
			
		|||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "background-jobs-actix"
 | 
			
		||||
version = "0.8.0-alpha.0"
 | 
			
		||||
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#f8fa1bb5ef71724053c8fe68b75d900a51cb4f45"
 | 
			
		||||
version = "0.8.0-alpha.1"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "2bb7f892dcd3ee34aab169d60587232d47aa054e4401c3067a64a6871eda806a"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "actix",
 | 
			
		||||
 "actix-rt",
 | 
			
		||||
| 
						 | 
				
			
			@ -501,7 +503,8 @@ dependencies = [
 | 
			
		|||
[[package]]
 | 
			
		||||
name = "background-jobs-core"
 | 
			
		||||
version = "0.8.0-alpha.0"
 | 
			
		||||
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#f8fa1bb5ef71724053c8fe68b75d900a51cb4f45"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "1c3447d183c7f1c6e2f9c564860712fb5b11ffa9be12caa28791674b865c85fb"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "actix",
 | 
			
		||||
 "anyhow",
 | 
			
		||||
| 
						 | 
				
			
			@ -556,30 +559,6 @@ version = "0.12.0"
 | 
			
		|||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "7d5ca2cd0adc3f48f9e9ea5a6bbdf9ccc0bfade884847e484d452414c7ccffb3"
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "bb8"
 | 
			
		||||
version = "0.4.1"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "7744a35a99f0ae3a6f5681f5af800e9074c658b1d0d314e9f0c3166455a1c3f6"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "async-trait",
 | 
			
		||||
 "futures",
 | 
			
		||||
 "tokio",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "bb8-postgres"
 | 
			
		||||
version = "0.4.0"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "39a233af6ea3952e20d01863c87b4f6689b2f806249688b0908b5f02d4fa41ac"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "async-trait",
 | 
			
		||||
 "bb8",
 | 
			
		||||
 "futures",
 | 
			
		||||
 "tokio",
 | 
			
		||||
 "tokio-postgres",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "bit-vec"
 | 
			
		||||
version = "0.6.1"
 | 
			
		||||
| 
						 | 
				
			
			@ -781,6 +760,16 @@ dependencies = [
 | 
			
		|||
 "maybe-uninit",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "crossbeam-queue"
 | 
			
		||||
version = "0.2.1"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "cfg-if",
 | 
			
		||||
 "crossbeam-utils",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "crossbeam-utils"
 | 
			
		||||
version = "0.7.2"
 | 
			
		||||
| 
						 | 
				
			
			@ -802,6 +791,36 @@ dependencies = [
 | 
			
		|||
 "subtle 1.0.0",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "deadpool"
 | 
			
		||||
version = "0.5.1"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "38ce52b0b1ad88ed0b2be2bc3c65ad39dd1a5d9633b1a8a314fc017fbe0027d2"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "async-trait",
 | 
			
		||||
 "config",
 | 
			
		||||
 "crossbeam-queue",
 | 
			
		||||
 "num_cpus",
 | 
			
		||||
 "serde 1.0.106",
 | 
			
		||||
 "tokio",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "deadpool-postgres"
 | 
			
		||||
version = "0.5.5"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
checksum = "45575d9acf1535dddcfc5841fd8f1776287bdc328f8d9e76531f4dfd2eb9788f"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "async-trait",
 | 
			
		||||
 "config",
 | 
			
		||||
 "deadpool",
 | 
			
		||||
 "futures",
 | 
			
		||||
 "log",
 | 
			
		||||
 "serde 1.0.106",
 | 
			
		||||
 "tokio",
 | 
			
		||||
 "tokio-postgres",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "derive_more"
 | 
			
		||||
version = "0.99.5"
 | 
			
		||||
| 
						 | 
				
			
			@ -1953,11 +1972,11 @@ dependencies = [
 | 
			
		|||
 "anyhow",
 | 
			
		||||
 "async-trait",
 | 
			
		||||
 "background-jobs",
 | 
			
		||||
 "background-jobs-core",
 | 
			
		||||
 "base64 0.12.0",
 | 
			
		||||
 "bb8-postgres",
 | 
			
		||||
 "bytes",
 | 
			
		||||
 "config",
 | 
			
		||||
 "deadpool",
 | 
			
		||||
 "deadpool-postgres",
 | 
			
		||||
 "dotenv",
 | 
			
		||||
 "env_logger",
 | 
			
		||||
 "futures",
 | 
			
		||||
| 
						 | 
				
			
			@ -1978,6 +1997,7 @@ dependencies = [
 | 
			
		|||
 "structopt",
 | 
			
		||||
 "thiserror",
 | 
			
		||||
 "tokio",
 | 
			
		||||
 "tokio-postgres",
 | 
			
		||||
 "ttl_cache",
 | 
			
		||||
 "uuid",
 | 
			
		||||
]
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -21,12 +21,12 @@ actix-webfinger = "0.3.0-alpha.3"
 | 
			
		|||
activitystreams = "0.5.0"
 | 
			
		||||
ammonia = "3.1.0"
 | 
			
		||||
async-trait = "0.1.24"
 | 
			
		||||
background-jobs = { version = "0.8.0-alpha.1", git = "https://git.asonix.dog/Aardwolf/background-jobs", default-features = false, features = ["background-jobs-actix"] }
 | 
			
		||||
background-jobs-core = { version = "0.8.0-alpha.0", git = "https://git.asonix.dog/Aardwolf/background-jobs" }
 | 
			
		||||
background-jobs = "0.8.0-alpha.2"
 | 
			
		||||
bytes = "0.5.4"
 | 
			
		||||
base64 = "0.12"
 | 
			
		||||
bb8-postgres = { version = "0.4.0", features = ["with-serde_json-1", "with-uuid-0_8", "with-chrono-0_4"] }
 | 
			
		||||
config = "0.10.1"
 | 
			
		||||
deadpool = "0.5.1"
 | 
			
		||||
deadpool-postgres = "0.5.5"
 | 
			
		||||
dotenv = "0.15.0"
 | 
			
		||||
env_logger = "0.7.1"
 | 
			
		||||
futures = "0.3.4"
 | 
			
		||||
| 
						 | 
				
			
			@ -46,6 +46,7 @@ sha2 = "0.8"
 | 
			
		|||
structopt = "0.3.12"
 | 
			
		||||
thiserror = "1.0"
 | 
			
		||||
tokio = { version = "0.2.13", features = ["sync"] }
 | 
			
		||||
tokio-postgres = { version = "0.5.1", features = ["with-serde_json-1", "with-uuid-0_8", "with-chrono-0_4"] }
 | 
			
		||||
ttl_cache = "0.5.1"
 | 
			
		||||
uuid = { version = "0.8", features = ["v4", "serde"] }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -17,7 +17,7 @@ pub struct Config {
 | 
			
		|||
    database_url: String,
 | 
			
		||||
    pretty_log: bool,
 | 
			
		||||
    publish_blocks: bool,
 | 
			
		||||
    connections_per_core: u32,
 | 
			
		||||
    max_connections: usize,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub enum UrlKind {
 | 
			
		||||
| 
						 | 
				
			
			@ -46,7 +46,7 @@ impl Config {
 | 
			
		|||
            .set_default("https", false)?
 | 
			
		||||
            .set_default("pretty_log", true)?
 | 
			
		||||
            .set_default("publish_blocks", false)?
 | 
			
		||||
            .set_default("connections_per_core", 2)?
 | 
			
		||||
            .set_default("max_connections", 2)?
 | 
			
		||||
            .merge(Environment::new())?;
 | 
			
		||||
 | 
			
		||||
        Ok(config.try_into()?)
 | 
			
		||||
| 
						 | 
				
			
			@ -56,8 +56,8 @@ impl Config {
 | 
			
		|||
        self.pretty_log
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn connections_per_core(&self) -> u32 {
 | 
			
		||||
        self.connections_per_core
 | 
			
		||||
    pub fn max_connections(&self) -> usize {
 | 
			
		||||
        self.max_connections
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn validate_signatures(&self) -> bool {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,6 +1,5 @@
 | 
			
		|||
use crate::{db::Db, error::MyError};
 | 
			
		||||
use activitystreams::primitives::XsdAnyUri;
 | 
			
		||||
use bb8_postgres::tokio_postgres::types::Json;
 | 
			
		||||
use log::{debug, error};
 | 
			
		||||
use std::{
 | 
			
		||||
    collections::{HashMap, HashSet},
 | 
			
		||||
| 
						 | 
				
			
			@ -8,6 +7,7 @@ use std::{
 | 
			
		|||
    time::{Duration, SystemTime},
 | 
			
		||||
};
 | 
			
		||||
use tokio::sync::RwLock;
 | 
			
		||||
use tokio_postgres::types::Json;
 | 
			
		||||
use uuid::Uuid;
 | 
			
		||||
 | 
			
		||||
pub type ListenersCache = Arc<RwLock<HashSet<XsdAnyUri>>>;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										188
									
								
								src/db.rs
									
										
									
									
									
								
							
							
						
						
									
										188
									
								
								src/db.rs
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -1,20 +1,15 @@
 | 
			
		|||
use crate::error::MyError;
 | 
			
		||||
use activitystreams::primitives::XsdAnyUri;
 | 
			
		||||
use bb8_postgres::{
 | 
			
		||||
    bb8,
 | 
			
		||||
    tokio_postgres::{
 | 
			
		||||
        error::{Error, SqlState},
 | 
			
		||||
        row::Row,
 | 
			
		||||
        Client, Config, NoTls,
 | 
			
		||||
    },
 | 
			
		||||
    PostgresConnectionManager,
 | 
			
		||||
};
 | 
			
		||||
use deadpool_postgres::{Manager, Pool};
 | 
			
		||||
use log::{info, warn};
 | 
			
		||||
use rsa::RSAPrivateKey;
 | 
			
		||||
use rsa_pem::KeyExt;
 | 
			
		||||
use std::{collections::HashSet, convert::TryInto};
 | 
			
		||||
 | 
			
		||||
pub type Pool = bb8::Pool<PostgresConnectionManager<NoTls>>;
 | 
			
		||||
use std::collections::HashSet;
 | 
			
		||||
use tokio_postgres::{
 | 
			
		||||
    error::{Error, SqlState},
 | 
			
		||||
    row::Row,
 | 
			
		||||
    Client, Config, NoTls,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct Db {
 | 
			
		||||
| 
						 | 
				
			
			@ -22,19 +17,15 @@ pub struct Db {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
impl Db {
 | 
			
		||||
    pub async fn build(config: &crate::config::Config) -> Result<Self, MyError> {
 | 
			
		||||
        let cpus: u32 = num_cpus::get().try_into()?;
 | 
			
		||||
        let max_conns = cpus * config.connections_per_core();
 | 
			
		||||
 | 
			
		||||
    pub fn build(config: &crate::config::Config) -> Result<Self, MyError> {
 | 
			
		||||
        let max_conns = config.max_connections();
 | 
			
		||||
        let config: Config = config.database_url().parse()?;
 | 
			
		||||
        let manager = PostgresConnectionManager::new(config, NoTls);
 | 
			
		||||
 | 
			
		||||
        let pool = bb8::Pool::builder()
 | 
			
		||||
            .max_size(max_conns)
 | 
			
		||||
            .build(manager)
 | 
			
		||||
            .await?;
 | 
			
		||||
        let manager = Manager::new(config, NoTls);
 | 
			
		||||
 | 
			
		||||
        Ok(Db { pool })
 | 
			
		||||
        Ok(Db {
 | 
			
		||||
            pool: Pool::new(manager, max_conns),
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn pool(&self) -> &Pool {
 | 
			
		||||
| 
						 | 
				
			
			@ -42,16 +33,33 @@ impl Db {
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn remove_listener(&self, inbox: XsdAnyUri) -> Result<(), MyError> {
 | 
			
		||||
        let conn = self.pool.get().await?;
 | 
			
		||||
        info!("DELETE FROM listeners WHERE actor_id = {};", inbox.as_str());
 | 
			
		||||
        self.pool
 | 
			
		||||
            .get()
 | 
			
		||||
            .await?
 | 
			
		||||
            .execute(
 | 
			
		||||
                "DELETE FROM listeners WHERE actor_id = $1::TEXT;",
 | 
			
		||||
                &[&inbox.as_str()],
 | 
			
		||||
            )
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        remove_listener(&conn, &inbox).await?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn add_listener(&self, inbox: XsdAnyUri) -> Result<(), MyError> {
 | 
			
		||||
        let conn = self.pool.get().await?;
 | 
			
		||||
        info!(
 | 
			
		||||
            "INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, 'now'); [{}]",
 | 
			
		||||
            inbox.as_str(),
 | 
			
		||||
        );
 | 
			
		||||
        self.pool
 | 
			
		||||
            .get()
 | 
			
		||||
            .await?
 | 
			
		||||
            .execute(
 | 
			
		||||
                "INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, 'now');",
 | 
			
		||||
                &[&inbox.as_str()],
 | 
			
		||||
            )
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        add_listener(&conn, &inbox).await?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -98,33 +106,64 @@ impl Db {
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn hydrate_blocks(&self) -> Result<HashSet<String>, MyError> {
 | 
			
		||||
        let conn = self.pool.get().await?;
 | 
			
		||||
        info!("SELECT domain_name FROM blocks");
 | 
			
		||||
        let rows = self
 | 
			
		||||
            .pool
 | 
			
		||||
            .get()
 | 
			
		||||
            .await?
 | 
			
		||||
            .query("SELECT domain_name FROM blocks", &[])
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        Ok(hydrate_blocks(&conn).await?)
 | 
			
		||||
        parse_rows(rows)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn hydrate_whitelists(&self) -> Result<HashSet<String>, MyError> {
 | 
			
		||||
        let conn = self.pool.get().await?;
 | 
			
		||||
        info!("SELECT domain_name FROM whitelists");
 | 
			
		||||
        let rows = self
 | 
			
		||||
            .pool
 | 
			
		||||
            .get()
 | 
			
		||||
            .await?
 | 
			
		||||
            .query("SELECT domain_name FROM whitelists", &[])
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        Ok(hydrate_whitelists(&conn).await?)
 | 
			
		||||
        parse_rows(rows)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn hydrate_listeners(&self) -> Result<HashSet<XsdAnyUri>, MyError> {
 | 
			
		||||
        let conn = self.pool.get().await?;
 | 
			
		||||
        info!("SELECT actor_id FROM listeners");
 | 
			
		||||
        let rows = self
 | 
			
		||||
            .pool
 | 
			
		||||
            .get()
 | 
			
		||||
            .await?
 | 
			
		||||
            .query("SELECT actor_id FROM listeners", &[])
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        Ok(hydrate_listeners(&conn).await?)
 | 
			
		||||
        parse_rows(rows)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn hydrate_private_key(&self) -> Result<Option<RSAPrivateKey>, MyError> {
 | 
			
		||||
        let conn = self.pool.get().await?;
 | 
			
		||||
        info!("SELECT value FROM settings WHERE key = 'private_key'");
 | 
			
		||||
        let rows = self
 | 
			
		||||
            .pool
 | 
			
		||||
            .get()
 | 
			
		||||
            .await?
 | 
			
		||||
            .query("SELECT value FROM settings WHERE key = 'private_key'", &[])
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        Ok(hydrate_private_key(&conn).await?)
 | 
			
		||||
        if let Some(row) = rows.into_iter().next() {
 | 
			
		||||
            let key_str: String = row.get(0);
 | 
			
		||||
            return Ok(Some(KeyExt::from_pem_pkcs8(&key_str)?));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(None)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn update_private_key(&self, private_key: &RSAPrivateKey) -> Result<(), MyError> {
 | 
			
		||||
        let conn = self.pool.get().await?;
 | 
			
		||||
        let pem_pkcs8 = private_key.to_pem_pkcs8()?;
 | 
			
		||||
 | 
			
		||||
        Ok(update_private_key(&conn, private_key).await?)
 | 
			
		||||
        info!("INSERT INTO settings (key, value, created_at) VALUES ('private_key', $1::TEXT, 'now');");
 | 
			
		||||
        self.pool.get().await?.execute("INSERT INTO settings (key, value, created_at) VALUES ('private_key', $1::TEXT, 'now');", &[&pem_pkcs8]).await?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -148,28 +187,6 @@ pub async fn listen(client: &Client) -> Result<(), Error> {
 | 
			
		|||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn hydrate_private_key(client: &Client) -> Result<Option<RSAPrivateKey>, MyError> {
 | 
			
		||||
    info!("SELECT value FROM settings WHERE key = 'private_key'");
 | 
			
		||||
    let rows = client
 | 
			
		||||
        .query("SELECT value FROM settings WHERE key = 'private_key'", &[])
 | 
			
		||||
        .await?;
 | 
			
		||||
 | 
			
		||||
    if let Some(row) = rows.into_iter().next() {
 | 
			
		||||
        let key_str: String = row.get(0);
 | 
			
		||||
        return Ok(Some(KeyExt::from_pem_pkcs8(&key_str)?));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Ok(None)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn update_private_key(client: &Client, key: &RSAPrivateKey) -> Result<(), MyError> {
 | 
			
		||||
    let pem_pkcs8 = key.to_pem_pkcs8()?;
 | 
			
		||||
 | 
			
		||||
    info!("INSERT INTO settings (key, value, created_at) VALUES ('private_key', $1::TEXT, 'now');");
 | 
			
		||||
    client.execute("INSERT INTO settings (key, value, created_at) VALUES ('private_key', $1::TEXT, 'now');", &[&pem_pkcs8]).await?;
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn add_block(client: &Client, domain: &str) -> Result<(), Error> {
 | 
			
		||||
    info!(
 | 
			
		||||
        "INSERT INTO blocks (domain_name, created_at) VALUES ($1::TEXT, 'now'); [{}]",
 | 
			
		||||
| 
						 | 
				
			
			@ -230,60 +247,7 @@ async fn remove_whitelist(client: &Client, domain: &str) -> Result<(), Error> {
 | 
			
		|||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn remove_listener(client: &Client, listener: &XsdAnyUri) -> Result<(), Error> {
 | 
			
		||||
    info!(
 | 
			
		||||
        "DELETE FROM listeners WHERE actor_id = {};",
 | 
			
		||||
        listener.as_str()
 | 
			
		||||
    );
 | 
			
		||||
    client
 | 
			
		||||
        .execute(
 | 
			
		||||
            "DELETE FROM listeners WHERE actor_id = $1::TEXT;",
 | 
			
		||||
            &[&listener.as_str()],
 | 
			
		||||
        )
 | 
			
		||||
        .await?;
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn add_listener(client: &Client, listener: &XsdAnyUri) -> Result<(), Error> {
 | 
			
		||||
    info!(
 | 
			
		||||
        "INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, 'now'); [{}]",
 | 
			
		||||
        listener.as_str(),
 | 
			
		||||
    );
 | 
			
		||||
    client
 | 
			
		||||
        .execute(
 | 
			
		||||
            "INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, 'now');",
 | 
			
		||||
            &[&listener.as_str()],
 | 
			
		||||
        )
 | 
			
		||||
        .await?;
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn hydrate_blocks(client: &Client) -> Result<HashSet<String>, Error> {
 | 
			
		||||
    info!("SELECT domain_name FROM blocks");
 | 
			
		||||
    let rows = client.query("SELECT domain_name FROM blocks", &[]).await?;
 | 
			
		||||
 | 
			
		||||
    parse_rows(rows)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn hydrate_whitelists(client: &Client) -> Result<HashSet<String>, Error> {
 | 
			
		||||
    info!("SELECT domain_name FROM whitelists");
 | 
			
		||||
    let rows = client
 | 
			
		||||
        .query("SELECT domain_name FROM whitelists", &[])
 | 
			
		||||
        .await?;
 | 
			
		||||
 | 
			
		||||
    parse_rows(rows)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn hydrate_listeners(client: &Client) -> Result<HashSet<XsdAnyUri>, Error> {
 | 
			
		||||
    info!("SELECT actor_id FROM listeners");
 | 
			
		||||
    let rows = client.query("SELECT actor_id FROM listeners", &[]).await?;
 | 
			
		||||
 | 
			
		||||
    parse_rows(rows)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn parse_rows<T, E>(rows: Vec<Row>) -> Result<HashSet<T>, Error>
 | 
			
		||||
fn parse_rows<T, E>(rows: Vec<Row>) -> Result<HashSet<T>, MyError>
 | 
			
		||||
where
 | 
			
		||||
    T: std::str::FromStr<Err = E> + Eq + std::hash::Hash,
 | 
			
		||||
    E: std::fmt::Display,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										15
									
								
								src/error.rs
									
										
									
									
									
								
							
							
						
						
									
										15
									
								
								src/error.rs
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -4,6 +4,7 @@ use actix_web::{
 | 
			
		|||
    http::StatusCode,
 | 
			
		||||
    HttpResponse,
 | 
			
		||||
};
 | 
			
		||||
use deadpool::managed::{PoolError, TimeoutType};
 | 
			
		||||
use log::error;
 | 
			
		||||
use rsa_pem::KeyError;
 | 
			
		||||
use std::{convert::Infallible, fmt::Debug, io::Error};
 | 
			
		||||
| 
						 | 
				
			
			@ -17,7 +18,7 @@ pub enum MyError {
 | 
			
		|||
    Config(#[from] config::ConfigError),
 | 
			
		||||
 | 
			
		||||
    #[error("Error in db, {0}")]
 | 
			
		||||
    DbError(#[from] bb8_postgres::tokio_postgres::error::Error),
 | 
			
		||||
    DbError(#[from] tokio_postgres::error::Error),
 | 
			
		||||
 | 
			
		||||
    #[error("Couldn't parse key, {0}")]
 | 
			
		||||
    Key(#[from] KeyError),
 | 
			
		||||
| 
						 | 
				
			
			@ -76,8 +77,8 @@ pub enum MyError {
 | 
			
		|||
    #[error("Couldn't flush buffer")]
 | 
			
		||||
    FlushBuffer,
 | 
			
		||||
 | 
			
		||||
    #[error("Timed out while waiting on db pool")]
 | 
			
		||||
    DbTimeout,
 | 
			
		||||
    #[error("Timed out while waiting on db pool, {0:?}")]
 | 
			
		||||
    DbTimeout(TimeoutType),
 | 
			
		||||
 | 
			
		||||
    #[error("Invalid algorithm provided to verifier")]
 | 
			
		||||
    Algorithm,
 | 
			
		||||
| 
						 | 
				
			
			@ -136,14 +137,14 @@ where
 | 
			
		|||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T> From<bb8_postgres::bb8::RunError<T>> for MyError
 | 
			
		||||
impl<T> From<PoolError<T>> for MyError
 | 
			
		||||
where
 | 
			
		||||
    T: Into<MyError>,
 | 
			
		||||
{
 | 
			
		||||
    fn from(e: bb8_postgres::bb8::RunError<T>) -> Self {
 | 
			
		||||
    fn from(e: PoolError<T>) -> Self {
 | 
			
		||||
        match e {
 | 
			
		||||
            bb8_postgres::bb8::RunError::User(e) => e.into(),
 | 
			
		||||
            bb8_postgres::bb8::RunError::TimedOut => MyError::DbTimeout,
 | 
			
		||||
            PoolError::Backend(e) => e.into(),
 | 
			
		||||
            PoolError::Timeout(t) => MyError::DbTimeout(t),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,7 +1,7 @@
 | 
			
		|||
use crate::{db::Db, error::MyError};
 | 
			
		||||
use background_jobs_core::{JobInfo, Stats};
 | 
			
		||||
use bb8_postgres::tokio_postgres::types::Json;
 | 
			
		||||
use background_jobs::{dev::JobInfo, Stats};
 | 
			
		||||
use log::debug;
 | 
			
		||||
use tokio_postgres::types::Json;
 | 
			
		||||
use uuid::Uuid;
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
| 
						 | 
				
			
			@ -16,7 +16,7 @@ impl Storage {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
#[async_trait::async_trait]
 | 
			
		||||
impl background_jobs_core::Storage for Storage {
 | 
			
		||||
impl background_jobs::dev::Storage for Storage {
 | 
			
		||||
    type Error = MyError;
 | 
			
		||||
 | 
			
		||||
    async fn generate_id(&self) -> Result<Uuid, MyError> {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -47,7 +47,7 @@ async fn main() -> Result<(), anyhow::Error> {
 | 
			
		|||
        env_logger::init();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let db = Db::build(&config).await?;
 | 
			
		||||
    let db = Db::build(&config)?;
 | 
			
		||||
 | 
			
		||||
    let args = Args::new();
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -5,10 +5,10 @@ use crate::{
 | 
			
		|||
};
 | 
			
		||||
use activitystreams::primitives::XsdAnyUri;
 | 
			
		||||
use actix::clock::{delay_for, Duration};
 | 
			
		||||
use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Config};
 | 
			
		||||
use futures::stream::{poll_fn, StreamExt};
 | 
			
		||||
use log::{debug, error, warn};
 | 
			
		||||
use std::{collections::HashMap, sync::Arc};
 | 
			
		||||
use tokio_postgres::{tls::NoTls, AsyncMessage, Config};
 | 
			
		||||
use uuid::Uuid;
 | 
			
		||||
 | 
			
		||||
pub trait Listener {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		
		Reference in a new issue