Listen isn't always reliable, rehydrate every 10 minutes
This commit is contained in:
		
							parent
							
								
									65ce77898a
								
							
						
					
					
						commit
						ef13e93140
					
				
					 3 changed files with 53 additions and 1 deletions
				
			
		|  | @ -10,6 +10,7 @@ mod error; | ||||||
| mod inbox; | mod inbox; | ||||||
| mod nodeinfo; | mod nodeinfo; | ||||||
| mod notify; | mod notify; | ||||||
|  | mod rehydrate; | ||||||
| mod requests; | mod requests; | ||||||
| mod responses; | mod responses; | ||||||
| mod state; | mod state; | ||||||
|  | @ -80,6 +81,8 @@ async fn main() -> Result<(), anyhow::Error> { | ||||||
| 
 | 
 | ||||||
|     let state = State::hydrate(config.clone(), &db).await?; |     let state = State::hydrate(config.clone(), &db).await?; | ||||||
| 
 | 
 | ||||||
|  |     rehydrate::spawn(db.clone(), state.clone()); | ||||||
|  | 
 | ||||||
|     let _ = notify::NotifyHandler::start_handler(state.clone(), pg_config.clone()); |     let _ = notify::NotifyHandler::start_handler(state.clone(), pg_config.clone()); | ||||||
| 
 | 
 | ||||||
|     let bind_address = config.bind_address(); |     let bind_address = config.bind_address(); | ||||||
|  |  | ||||||
							
								
								
									
										24
									
								
								src/rehydrate.rs
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								src/rehydrate.rs
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,24 @@ | ||||||
|  | use crate::{db::Db, state::State}; | ||||||
|  | use actix::{ | ||||||
|  |     clock::{interval_at, Duration, Instant}, | ||||||
|  |     Arbiter, | ||||||
|  | }; | ||||||
|  | use log::error; | ||||||
|  | 
 | ||||||
|  | pub fn spawn(db: Db, state: State) { | ||||||
|  |     Arbiter::spawn(async move { | ||||||
|  |         let start = Instant::now(); | ||||||
|  |         let duration = Duration::from_secs(60 * 10); | ||||||
|  | 
 | ||||||
|  |         let mut interval = interval_at(start, duration); | ||||||
|  | 
 | ||||||
|  |         loop { | ||||||
|  |             interval.tick().await; | ||||||
|  | 
 | ||||||
|  |             match state.rehydrate(&db).await { | ||||||
|  |                 Err(e) => error!("Error rehydrating, {}", e), | ||||||
|  |                 _ => (), | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     }); | ||||||
|  | } | ||||||
							
								
								
									
										27
									
								
								src/state.rs
									
										
									
									
									
								
							
							
						
						
									
										27
									
								
								src/state.rs
									
										
									
									
									
								
							|  | @ -7,7 +7,7 @@ use crate::{ | ||||||
| }; | }; | ||||||
| use activitystreams::primitives::XsdAnyUri; | use activitystreams::primitives::XsdAnyUri; | ||||||
| use actix_web::web; | use actix_web::web; | ||||||
| use futures::try_join; | use futures::{join, try_join}; | ||||||
| use log::info; | use log::info; | ||||||
| use lru::LruCache; | use lru::LruCache; | ||||||
| use rand::thread_rng; | use rand::thread_rng; | ||||||
|  | @ -135,6 +135,31 @@ impl State { | ||||||
|         write_guard.insert(listener); |         write_guard.insert(listener); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     pub async fn rehydrate(&self, db: &Db) -> Result<(), MyError> { | ||||||
|  |         let f1 = db.hydrate_blocks(); | ||||||
|  |         let f2 = db.hydrate_whitelists(); | ||||||
|  |         let f3 = db.hydrate_listeners(); | ||||||
|  | 
 | ||||||
|  |         let (blocks, whitelists, listeners) = try_join!(f1, f2, f3)?; | ||||||
|  | 
 | ||||||
|  |         join!( | ||||||
|  |             async move { | ||||||
|  |                 let mut write_guard = self.listeners.write().await; | ||||||
|  |                 *write_guard = listeners; | ||||||
|  |             }, | ||||||
|  |             async move { | ||||||
|  |                 let mut write_guard = self.whitelists.write().await; | ||||||
|  |                 *write_guard = whitelists; | ||||||
|  |             }, | ||||||
|  |             async move { | ||||||
|  |                 let mut write_guard = self.blocks.write().await; | ||||||
|  |                 *write_guard = blocks; | ||||||
|  |             } | ||||||
|  |         ); | ||||||
|  | 
 | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     pub async fn hydrate(config: Config, db: &Db) -> Result<Self, MyError> { |     pub async fn hydrate(config: Config, db: &Db) -> Result<Self, MyError> { | ||||||
|         let f1 = db.hydrate_blocks(); |         let f1 = db.hydrate_blocks(); | ||||||
|         let f2 = db.hydrate_whitelists(); |         let f2 = db.hydrate_whitelists(); | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		
		Reference in a new issue