From 3bfa2c0e45f95767c7ba625aff661c4db7229aa8 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 22 Mar 2020 22:52:42 -0500 Subject: [PATCH] Add local jobs, query connected servers for info --- src/jobs/instance.rs | 110 +++++++++++++++++++ src/jobs/mod.rs | 73 ++++++++++--- src/jobs/nodeinfo.rs | 170 +++++++++++++++++++++++++++++ src/jobs/process_listeners.rs | 47 ++++++++ src/main.rs | 10 +- src/node.rs | 194 ++++++++++++++++++++++++++++++++++ src/notify.rs | 25 +++-- src/state.rs | 10 +- templates/index.rs.html | 23 ++-- 9 files changed, 624 insertions(+), 38 deletions(-) create mode 100644 src/jobs/instance.rs create mode 100644 src/jobs/nodeinfo.rs create mode 100644 src/jobs/process_listeners.rs create mode 100644 src/node.rs diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs new file mode 100644 index 0000000..965fa0d --- /dev/null +++ b/src/jobs/instance.rs @@ -0,0 +1,110 @@ +use crate::jobs::JobState; +use activitystreams::primitives::XsdAnyUri; +use anyhow::Error; +use background_jobs::{Job, Processor}; +use std::{future::Future, pin::Pin}; +use tokio::sync::oneshot; + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct QueryInstance { + listener: XsdAnyUri, +} + +impl QueryInstance { + pub fn new(listener: XsdAnyUri) -> Self { + QueryInstance { listener } + } + + async fn perform(mut self, state: JobState) -> Result<(), Error> { + let listener = self.listener.clone(); + + let url = self.listener.as_url_mut(); + url.set_fragment(None); + url.set_query(None); + url.set_path("api/v1/instance"); + + let instance = state + .requests + .fetch::(self.listener.as_str()) + .await?; + + let description = if instance.description.is_empty() { + instance.short_description + } else { + instance.description + }; + + if let Some(contact) = instance.contact { + state + .node_cache + .set_contact( + listener.clone(), + contact.username, + contact.display_name, + contact.url, + contact.avatar, + ) + .await; + } + + state + .node_cache + .set_instance( + listener, + instance.title, + description, + instance.version, + instance.registrations, + instance.approval_required, + ) + .await; + + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub struct InstanceProcessor; + +impl Job for QueryInstance { + type State = JobState; + type Processor = InstanceProcessor; + type Future = Pin> + Send>>; + + fn run(self, state: Self::State) -> Self::Future { + let (tx, rx) = oneshot::channel(); + + actix::spawn(async move { + let _ = tx.send(self.perform(state).await); + }); + + Box::pin(async move { rx.await? }) + } +} + +impl Processor for InstanceProcessor { + type Job = QueryInstance; + + const NAME: &'static str = "InstanceProcessor"; + const QUEUE: &'static str = "default"; +} + +#[derive(serde::Deserialize)] +struct Instance { + title: String, + short_description: String, + description: String, + version: String, + registrations: bool, + approval_required: bool, + + contact: Option, +} + +#[derive(serde::Deserialize)] +struct Contact { + username: String, + display_name: String, + url: XsdAnyUri, + avatar: XsdAnyUri, +} diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index 12a9d33..ea91551 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -1,66 +1,105 @@ mod deliver; mod deliver_many; +mod instance; +mod nodeinfo; +mod process_listeners; mod storage; -pub use self::{deliver::Deliver, deliver_many::DeliverMany}; +pub use self::{ + deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, nodeinfo::QueryNodeinfo, +}; use crate::{ db::Db, error::MyError, - jobs::{deliver::DeliverProcessor, deliver_many::DeliverManyProcessor, storage::Storage}, + jobs::{ + deliver::DeliverProcessor, + deliver_many::DeliverManyProcessor, + instance::InstanceProcessor, + nodeinfo::NodeinfoProcessor, + process_listeners::{Listeners, ListenersProcessor}, + storage::Storage, + }, + node::NodeCache, requests::Requests, state::State, }; -use background_jobs::{Job, QueueHandle, WorkerConfig}; +use background_jobs::{memory_storage::Storage as MemoryStorage, Job, QueueHandle, WorkerConfig}; +use std::time::Duration; pub fn create_server(db: Db) -> JobServer { - JobServer::new(background_jobs::create_server(Storage::new(db))) + let local = background_jobs::create_server(MemoryStorage::new()); + let shared = background_jobs::create_server(Storage::new(db)); + + local.every(Duration::from_secs(60 * 5), Listeners); + + JobServer::new(shared, local) } pub fn create_workers(state: State, job_server: JobServer) { - let queue_handle = job_server.queue_handle(); + let state2 = state.clone(); + let job_server2 = job_server.clone(); - WorkerConfig::new(move || JobState::new(state.requests(), job_server.clone())) + let remote_handle = job_server.remote.clone(); + let local_handle = job_server.local.clone(); + + WorkerConfig::new(move || JobState::new(state.clone(), job_server.clone())) .register(DeliverProcessor) .register(DeliverManyProcessor) .set_processor_count("default", 4) - .start(queue_handle); + .start(remote_handle); + + WorkerConfig::new(move || JobState::new(state2.clone(), job_server2.clone())) + .register(NodeinfoProcessor) + .register(InstanceProcessor) + .register(ListenersProcessor) + .set_processor_count("default", 4) + .start(local_handle); } #[derive(Clone)] pub struct JobState { requests: Requests, + state: State, + node_cache: NodeCache, job_server: JobServer, } #[derive(Clone)] pub struct JobServer { - inner: QueueHandle, + remote: QueueHandle, + local: QueueHandle, } impl JobState { - fn new(requests: Requests, job_server: JobServer) -> Self { + fn new(state: State, job_server: JobServer) -> Self { JobState { - requests, + requests: state.requests(), + node_cache: state.node_cache(), + state, job_server, } } } impl JobServer { - fn new(queue_handle: QueueHandle) -> Self { + fn new(remote_handle: QueueHandle, local_handle: QueueHandle) -> Self { JobServer { - inner: queue_handle, + remote: remote_handle, + local: local_handle, } } - pub fn queue_handle(&self) -> QueueHandle { - self.inner.clone() - } - pub fn queue(&self, job: J) -> Result<(), MyError> where J: Job, { - self.inner.queue(job).map_err(MyError::Queue) + self.remote.queue(job).map_err(MyError::Queue) + } + + pub fn queue_local(&self, job: J) -> Result<(), MyError> + where + J: Job, + { + self.local.queue(job).map_err(MyError::Queue) } } diff --git a/src/jobs/nodeinfo.rs b/src/jobs/nodeinfo.rs new file mode 100644 index 0000000..d60c144 --- /dev/null +++ b/src/jobs/nodeinfo.rs @@ -0,0 +1,170 @@ +use crate::jobs::JobState; +use activitystreams::primitives::XsdAnyUri; +use anyhow::Error; +use background_jobs::{Job, Processor}; +use std::{future::Future, pin::Pin}; +use tokio::sync::oneshot; + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct QueryNodeinfo { + listener: XsdAnyUri, +} + +impl QueryNodeinfo { + pub fn new(listener: XsdAnyUri) -> Self { + QueryNodeinfo { listener } + } + + async fn perform(mut self, state: JobState) -> Result<(), Error> { + let listener = self.listener.clone(); + + let url = self.listener.as_url_mut(); + url.set_fragment(None); + url.set_query(None); + url.set_path(".well-known/nodeinfo"); + + let well_known = state + .requests + .fetch::(self.listener.as_str()) + .await?; + + let href = if let Some(link) = well_known.links.into_iter().next() { + link.href + } else { + return Ok(()); + }; + + let nodeinfo = state.requests.fetch::(&href).await?; + + state + .node_cache + .set_info( + listener, + nodeinfo.software.name, + nodeinfo.software.version, + nodeinfo.open_registrations, + ) + .await; + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub struct NodeinfoProcessor; + +impl Job for QueryNodeinfo { + type State = JobState; + type Processor = NodeinfoProcessor; + type Future = Pin> + Send>>; + + fn run(self, state: Self::State) -> Self::Future { + let (tx, rx) = oneshot::channel(); + + actix::spawn(async move { + let _ = tx.send(self.perform(state).await); + }); + + Box::pin(async move { rx.await? }) + } +} + +impl Processor for NodeinfoProcessor { + type Job = QueryNodeinfo; + + const NAME: &'static str = "NodeinfoProcessor"; + const QUEUE: &'static str = "default"; +} + +#[derive(serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct Nodeinfo { + #[allow(dead_code)] + version: SupportedVersion, + + software: Software, + open_registrations: bool, +} + +#[derive(serde::Deserialize)] +struct Software { + name: String, + version: String, +} + +#[derive(serde::Deserialize)] +struct WellKnown { + links: Vec, +} + +#[derive(serde::Deserialize)] +struct Link { + #[allow(dead_code)] + rel: SupportedNodeinfo, + + href: String, +} + +struct SupportedVersion; +struct SupportedNodeinfo; + +static SUPPORTED_VERSION: &'static str = "2.0"; +static SUPPORTED_NODEINFO: &'static str = "http://nodeinfo.diaspora.software/ns/schema/2.0"; + +struct SupportedVersionVisitor; +struct SupportedNodeinfoVisitor; + +impl<'de> serde::de::Visitor<'de> for SupportedVersionVisitor { + type Value = SupportedVersion; + + fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "the string '{}'", SUPPORTED_VERSION) + } + + fn visit_str(self, s: &str) -> Result + where + E: serde::de::Error, + { + if s == SUPPORTED_VERSION { + Ok(SupportedVersion) + } else { + Err(serde::de::Error::custom("Invalid nodeinfo version")) + } + } +} + +impl<'de> serde::de::Visitor<'de> for SupportedNodeinfoVisitor { + type Value = SupportedNodeinfo; + + fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "the string '{}'", SUPPORTED_NODEINFO) + } + + fn visit_str(self, s: &str) -> Result + where + E: serde::de::Error, + { + if s == SUPPORTED_NODEINFO { + Ok(SupportedNodeinfo) + } else { + Err(serde::de::Error::custom("Invalid nodeinfo version")) + } + } +} + +impl<'de> serde::de::Deserialize<'de> for SupportedVersion { + fn deserialize(deserializer: D) -> Result + where + D: serde::de::Deserializer<'de>, + { + deserializer.deserialize_str(SupportedVersionVisitor) + } +} + +impl<'de> serde::de::Deserialize<'de> for SupportedNodeinfo { + fn deserialize(deserializer: D) -> Result + where + D: serde::de::Deserializer<'de>, + { + deserializer.deserialize_str(SupportedNodeinfoVisitor) + } +} diff --git a/src/jobs/process_listeners.rs b/src/jobs/process_listeners.rs new file mode 100644 index 0000000..2969457 --- /dev/null +++ b/src/jobs/process_listeners.rs @@ -0,0 +1,47 @@ +use crate::jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState}; +use anyhow::Error; +use background_jobs::{Job, Processor}; +use std::{future::Future, pin::Pin}; +use tokio::sync::oneshot; + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct Listeners; + +#[derive(Clone, Debug)] +pub struct ListenersProcessor; + +impl Listeners { + async fn perform(self, state: JobState) -> Result<(), Error> { + for listener in state.state.listeners().await { + state + .job_server + .queue_local(QueryInstance::new(listener.clone()))?; + state.job_server.queue_local(QueryNodeinfo::new(listener))?; + } + + Ok(()) + } +} + +impl Job for Listeners { + type State = JobState; + type Processor = ListenersProcessor; + type Future = Pin> + Send>>; + + fn run(self, state: Self::State) -> Self::Future { + let (tx, rx) = oneshot::channel(); + + actix::spawn(async move { + let _ = tx.send(self.perform(state).await); + }); + + Box::pin(async move { rx.await? }) + } +} + +impl Processor for ListenersProcessor { + type Job = Listeners; + + const NAME: &'static str = "ProcessListenersProcessor"; + const QUEUE: &'static str = "default"; +} diff --git a/src/main.rs b/src/main.rs index 169d515..521e5eb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,7 @@ mod db; mod error; mod inbox; mod jobs; +mod node; mod nodeinfo; mod notify; mod rehydrate; @@ -42,11 +43,11 @@ async fn index( state: web::Data, config: web::Data, ) -> Result { - let listeners = state.listeners().await; + let nodes = state.node_cache().nodes().await; let mut buf = BufWriter::new(Vec::new()); - templates::index(&mut buf, &listeners, &config)?; + templates::index(&mut buf, &nodes, &config)?; let buf = buf.into_inner().map_err(|e| { error!("Error rendering template, {}", e.error()); MyError::FlushBuffer @@ -111,11 +112,10 @@ async fn main() -> Result<(), anyhow::Error> { } let state = State::hydrate(config.clone(), &db).await?; + let job_server = create_server(db.clone()); rehydrate::spawn(db.clone(), state.clone()); - notify::spawn(state.clone(), &config)?; - - let job_server = create_server(db.clone()); + notify::spawn(state.clone(), job_server.clone(), &config)?; if args.jobs_only() { for _ in 0..num_cpus::get() { diff --git a/src/node.rs b/src/node.rs new file mode 100644 index 0000000..b5f5ff7 --- /dev/null +++ b/src/node.rs @@ -0,0 +1,194 @@ +use activitystreams::primitives::XsdAnyUri; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; +use tokio::sync::RwLock; + +pub type ListenersCache = Arc>>; + +#[derive(Clone)] +pub struct NodeCache { + listeners: ListenersCache, + nodes: Arc>>, +} + +impl NodeCache { + pub fn new(listeners: ListenersCache) -> Self { + NodeCache { + listeners, + nodes: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn nodes(&self) -> Vec { + let listeners: HashSet<_> = self.listeners.read().await.clone(); + + self.nodes + .read() + .await + .iter() + .filter_map(|(k, v)| { + if listeners.contains(k) { + Some(v.clone()) + } else { + None + } + }) + .collect() + } + + pub async fn set_info( + &self, + listener: XsdAnyUri, + software: String, + version: String, + reg: bool, + ) { + if !self.listeners.read().await.contains(&listener) { + let mut nodes = self.nodes.write().await; + nodes.remove(&listener); + return; + } + + let mut write_guard = self.nodes.write().await; + let node = write_guard + .entry(listener.clone()) + .or_insert(Node::new(listener)); + node.set_info(software, version, reg); + } + + pub async fn set_instance( + &self, + listener: XsdAnyUri, + title: String, + description: String, + version: String, + reg: bool, + requires_approval: bool, + ) { + if !self.listeners.read().await.contains(&listener) { + let mut nodes = self.nodes.write().await; + nodes.remove(&listener); + return; + } + + let mut write_guard = self.nodes.write().await; + let node = write_guard + .entry(listener.clone()) + .or_insert(Node::new(listener)); + node.set_instance(title, description, version, reg, requires_approval); + } + + pub async fn set_contact( + &self, + listener: XsdAnyUri, + username: String, + display_name: String, + url: XsdAnyUri, + avatar: XsdAnyUri, + ) { + if !self.listeners.read().await.contains(&listener) { + let mut nodes = self.nodes.write().await; + nodes.remove(&listener); + return; + } + + let mut write_guard = self.nodes.write().await; + let node = write_guard + .entry(listener.clone()) + .or_insert(Node::new(listener)); + node.set_contact(username, display_name, url, avatar); + } +} + +#[derive(Clone, Debug)] +pub struct Node { + pub base: XsdAnyUri, + pub info: Option, + pub instance: Option, + pub contact: Option, +} + +impl Node { + pub fn new(mut uri: XsdAnyUri) -> Self { + let url = uri.as_mut(); + url.set_fragment(None); + url.set_query(None); + url.set_path(""); + + Node { + base: uri, + info: None, + instance: None, + contact: None, + } + } + + fn set_info(&mut self, software: String, version: String, reg: bool) -> &mut Self { + self.info = Some(Info { + software, + version, + reg, + }); + self + } + + fn set_instance( + &mut self, + title: String, + description: String, + version: String, + reg: bool, + requires_approval: bool, + ) -> &mut Self { + self.instance = Some(Instance { + title, + description, + version, + reg, + requires_approval, + }); + self + } + + fn set_contact( + &mut self, + username: String, + display_name: String, + url: XsdAnyUri, + avatar: XsdAnyUri, + ) -> &mut Self { + self.contact = Some(Contact { + username, + display_name, + url, + avatar, + }); + self + } +} + +#[derive(Clone, Debug)] +pub struct Info { + pub software: String, + pub version: String, + pub reg: bool, +} + +#[derive(Clone, Debug)] +pub struct Instance { + pub title: String, + pub description: String, + pub version: String, + pub reg: bool, + pub requires_approval: bool, +} + +#[derive(Clone, Debug)] +pub struct Contact { + pub username: String, + pub display_name: String, + pub url: XsdAnyUri, + pub avatar: XsdAnyUri, +} diff --git a/src/notify.rs b/src/notify.rs index e2f5da1..2e65fc1 100644 --- a/src/notify.rs +++ b/src/notify.rs @@ -1,4 +1,9 @@ -use crate::{db::listen, error::MyError, state::State}; +use crate::{ + db::listen, + error::MyError, + jobs::{JobServer, QueryInstance, QueryNodeinfo}, + state::State, +}; use activitystreams::primitives::XsdAnyUri; use actix::clock::{delay_for, Duration}; use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Config, Notification}; @@ -9,7 +14,7 @@ use futures::{ use log::{debug, error, info, warn}; use std::sync::Arc; -async fn handle_notification(state: State, notif: Notification) { +async fn handle_notification(state: State, job_server: JobServer, notif: Notification) { match notif.channel() { "new_blocks" => { info!("Caching block of {}", notif.payload()); @@ -22,7 +27,9 @@ async fn handle_notification(state: State, notif: Notification) { "new_listeners" => { if let Ok(uri) = notif.payload().parse::() { info!("Caching listener {}", uri); - state.cache_listener(uri).await; + state.cache_listener(uri.clone()).await; + let _ = job_server.queue_local(QueryInstance::new(uri.clone())); + let _ = job_server.queue_local(QueryNodeinfo::new(uri)); } } "rm_blocks" => { @@ -43,12 +50,14 @@ async fn handle_notification(state: State, notif: Notification) { }; } -pub fn spawn(state: State, config: &crate::config::Config) -> Result<(), MyError> { +pub fn spawn( + state: State, + job_server: JobServer, + config: &crate::config::Config, +) -> Result<(), MyError> { let config: Config = config.database_url().parse()?; actix::spawn(async move { - let mut client; - loop { let (new_client, mut conn) = match config.connect(NoTls).await { Ok((client, conn)) => (client, conn), @@ -59,7 +68,7 @@ pub fn spawn(state: State, config: &crate::config::Config) -> Result<(), MyError } }; - client = Arc::new(new_client); + let client = Arc::new(new_client); let new_client = client.clone(); actix::spawn(async move { @@ -88,7 +97,7 @@ pub fn spawn(state: State, config: &crate::config::Config) -> Result<(), MyError }); while let Some(n) = stream.next().await { - actix::spawn(handle_notification(state.clone(), n)); + actix::spawn(handle_notification(state.clone(), job_server.clone(), n)); } drop(client); diff --git a/src/state.rs b/src/state.rs index 40e2b0f..3e2e7ae 100644 --- a/src/state.rs +++ b/src/state.rs @@ -3,6 +3,7 @@ use crate::{ config::{Config, UrlKind}, db::Db, error::MyError, + node::NodeCache, requests::Requests, }; use activitystreams::primitives::XsdAnyUri; @@ -28,9 +29,14 @@ pub struct State { blocks: Arc>>, whitelists: Arc>>, listeners: Arc>>, + node_cache: NodeCache, } impl State { + pub fn node_cache(&self) -> NodeCache { + self.node_cache.clone() + } + pub fn requests(&self) -> Requests { Requests::new( self.config.generate_url(UrlKind::MainKey), @@ -191,6 +197,7 @@ impl State { let (blocks, whitelists, listeners, private_key) = try_join!(f1, f2, f3, f4)?; let public_key = private_key.to_public_key(); + let listeners = Arc::new(RwLock::new(listeners)); Ok(State { public_key, @@ -200,7 +207,8 @@ impl State { actor_id_cache: Arc::new(RwLock::new(LruCache::new(1024 * 8))), blocks: Arc::new(RwLock::new(blocks)), whitelists: Arc::new(RwLock::new(whitelists)), - listeners: Arc::new(RwLock::new(listeners)), + listeners: listeners.clone(), + node_cache: NodeCache::new(listeners), }) } } diff --git a/templates/index.rs.html b/templates/index.rs.html index e97af0b..fa67175 100644 --- a/templates/index.rs.html +++ b/templates/index.rs.html @@ -1,7 +1,6 @@ -@use crate::{config::{Config, UrlKind}, templates::statics::index_css}; -@use activitystreams::primitives::XsdAnyUri; +@use crate::{config::{Config, UrlKind}, templates::statics::index_css, node::Node}; -@(listeners: &[XsdAnyUri], config: &Config) +@(nodes: &[Node], config: &Config) @@ -17,13 +16,23 @@

Connected Servers:

- @if listeners.is_empty() { + @if nodes.is_empty() {

There are no connected servers at this time.

} else {
    - @for listener in listeners { - @if let Some(domain) = listener.as_url().domain() { -
  • @domain
  • + @for node in nodes { + @if let Some(domain) = node.base.as_url().domain() { +
  • +

    @domain

    + @if let Some(info) = node.info.as_ref() { +

    Running @info.software version @info.version

    + @if info.reg { +

    Registration is open

    + } else { +

    Registration is closed

    + } + } +
  • } }