Don't use actor for notify, fix migration dockerfile
This commit is contained in:
		
							parent
							
								
									5e09fadc3b
								
							
						
					
					
						commit
						7510ab5b94
					
				
					 4 changed files with 72 additions and 140 deletions
				
			
		|  | @ -1,5 +1,11 @@ | |||
| FROM asonix/diesel-cli:v1.4.0-r0-arm64v8 | ||||
| FROM asonix/diesel-cli:v1.4.0-r1-arm64v8 | ||||
| 
 | ||||
| COPY migrations /migrations | ||||
| USER root | ||||
| RUN \ | ||||
|  apt-get install -y tini && \ | ||||
|  chown -R diesel:diesel /migrations | ||||
| 
 | ||||
| USER diesel | ||||
| ENTRYPOINT ["/usr/bin/tini"] | ||||
| CMD ["diesel", "migration", "run", "--migration-dir", "/migrations"] | ||||
|  |  | |||
							
								
								
									
										7
									
								
								build.sh
									
										
									
									
									
								
							
							
						
						
									
										7
									
								
								build.sh
									
										
									
									
									
								
							|  | @ -33,8 +33,8 @@ cross build \ | |||
|     --release | ||||
| 
 | ||||
| mkdir -p artifacts | ||||
| rm -rf artifacts/relay | ||||
| cp ./target/aarch64-unknown-linux-musl/release/relay artifacts/relay | ||||
| cp -r ./migrations artifacts/migrations | ||||
| 
 | ||||
| # from `sudo docker run --rm --privileged multiarch/qemu-user-static --reset -p yes` | ||||
| docker build \ | ||||
|  | @ -52,7 +52,10 @@ docker push "asonix/relay:${VERSION}-arm64v8" | |||
| docker push "asonix/relay:latest-arm64v8" | ||||
| docker push "asonix/relay:latest" | ||||
| 
 | ||||
| if [ "${MIGRATIONS}" = "" ]; then | ||||
| if [ "${MIGRATIONS}" = "migrations" ]; then | ||||
|     rm -rf artifacts/migrations | ||||
|     cp -r ./migrations artifacts/migrations | ||||
| 
 | ||||
|     docker build \ | ||||
|         --pull \ | ||||
|         --no-cache \ | ||||
|  |  | |||
|  | @ -33,6 +33,7 @@ use self::{ | |||
|     db::Db, | ||||
|     error::MyError, | ||||
|     jobs::{create_server, create_workers}, | ||||
|     notify::notify_loop, | ||||
|     state::State, | ||||
|     templates::statics::StaticFile, | ||||
|     webfinger::RelayResolver, | ||||
|  | @ -111,7 +112,7 @@ async fn main() -> Result<(), anyhow::Error> { | |||
| 
 | ||||
|     let job_server = create_server(db.clone()); | ||||
| 
 | ||||
|     let _ = notify::NotifyHandler::start_handler(state.clone(), pg_config.clone()); | ||||
|     let _ = notify_loop(state.clone(), pg_config.clone()); | ||||
| 
 | ||||
|     let bind_address = config.bind_address(); | ||||
|     HttpServer::new(move || { | ||||
|  |  | |||
							
								
								
									
										194
									
								
								src/notify.rs
									
										
									
									
									
								
							
							
						
						
									
										194
									
								
								src/notify.rs
									
										
									
									
									
								
							|  | @ -1,61 +1,75 @@ | |||
| use crate::state::State; | ||||
| use crate::{db::listen, state::State}; | ||||
| use activitystreams::primitives::XsdAnyUri; | ||||
| use actix::prelude::*; | ||||
| use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Client, Config, Notification}; | ||||
| use actix::clock::{delay_for, Duration}; | ||||
| use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Config, Notification}; | ||||
| use futures::{ | ||||
|     future::ready, | ||||
|     stream::{poll_fn, StreamExt}, | ||||
| }; | ||||
| use log::{debug, error, info, warn}; | ||||
| use tokio::sync::mpsc; | ||||
| use std::sync::Arc; | ||||
| 
 | ||||
| #[derive(Message)] | ||||
| #[rtype(result = "()")] | ||||
| pub enum Notify { | ||||
|     Msg(Notification), | ||||
|     Done, | ||||
| } | ||||
| 
 | ||||
| pub struct NotifyHandler { | ||||
|     client: Option<Client>, | ||||
|     state: State, | ||||
|     config: Config, | ||||
| } | ||||
| 
 | ||||
| impl NotifyHandler { | ||||
|     fn new(state: State, config: Config) -> Self { | ||||
|         NotifyHandler { | ||||
|             state, | ||||
|             config, | ||||
|             client: None, | ||||
| async fn handle_notification(state: &State, notif: Notification) { | ||||
|     match notif.channel() { | ||||
|         "new_blocks" => { | ||||
|             info!("Caching block of {}", notif.payload()); | ||||
|             state.cache_block(notif.payload().to_owned()).await; | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     pub fn start_handler(state: State, config: Config) -> Addr<Self> { | ||||
|         Supervisor::start(|_| Self::new(state, config)) | ||||
|     } | ||||
|         "new_whitelists" => { | ||||
|             info!("Caching whitelist of {}", notif.payload()); | ||||
|             state.cache_whitelist(notif.payload().to_owned()).await; | ||||
|         } | ||||
|         "new_listeners" => { | ||||
|             if let Ok(uri) = notif.payload().parse::<XsdAnyUri>() { | ||||
|                 info!("Caching listener {}", uri); | ||||
|                 state.cache_listener(uri).await; | ||||
|             } | ||||
|         } | ||||
|         "rm_blocks" => { | ||||
|             info!("Busting block cache for {}", notif.payload()); | ||||
|             state.bust_block(notif.payload()).await; | ||||
|         } | ||||
|         "rm_whitelists" => { | ||||
|             info!("Busting whitelist cache for {}", notif.payload()); | ||||
|             state.bust_whitelist(notif.payload()).await; | ||||
|         } | ||||
|         "rm_listeners" => { | ||||
|             if let Ok(uri) = notif.payload().parse::<XsdAnyUri>() { | ||||
|                 info!("Busting listener cache for {}", uri); | ||||
|                 state.bust_listener(&uri).await; | ||||
|             } | ||||
|         } | ||||
|         _ => (), | ||||
|     }; | ||||
| } | ||||
| 
 | ||||
| impl Actor for NotifyHandler { | ||||
|     type Context = Context<Self>; | ||||
| pub fn notify_loop(state: State, config: Config) { | ||||
|     actix::spawn(async move { | ||||
|         let mut client; | ||||
| 
 | ||||
|     fn started(&mut self, ctx: &mut Self::Context) { | ||||
|         info!("Starting notify handler"); | ||||
|         let config = self.config.clone(); | ||||
| 
 | ||||
|         let fut = async move { | ||||
|             let (client, mut conn) = match config.connect(NoTls).await { | ||||
|         loop { | ||||
|             let (new_client, mut conn) = match config.connect(NoTls).await { | ||||
|                 Ok((client, conn)) => (client, conn), | ||||
|                 Err(e) => { | ||||
|                     error!("Error establishing DB Connection, {}", e); | ||||
|                     return Err(()); | ||||
|                     delay_for(Duration::new(5, 0)).await; | ||||
|                     continue; | ||||
|                 } | ||||
|             }; | ||||
| 
 | ||||
|             client = Arc::new(new_client); | ||||
|             let new_client = client.clone(); | ||||
| 
 | ||||
|             actix::spawn(async move { | ||||
|                 if let Err(e) = listen(&new_client).await { | ||||
|                     error!("Error listening for updates, {}", e); | ||||
|                 } | ||||
|             }); | ||||
| 
 | ||||
|             let mut stream = poll_fn(move |cx| conn.poll_message(cx)).filter_map(|m| match m { | ||||
|                 Ok(AsyncMessage::Notification(n)) => { | ||||
|                     debug!("Handling Notification, {:?}", n); | ||||
|                     ready(Some(Notify::Msg(n))) | ||||
|                     ready(Some(n)) | ||||
|                 } | ||||
|                 Ok(AsyncMessage::Notice(e)) => { | ||||
|                     debug!("Handling Notice, {:?}", e); | ||||
|  | @ -71,104 +85,12 @@ impl Actor for NotifyHandler { | |||
|                 } | ||||
|             }); | ||||
| 
 | ||||
|             let (mut tx, rx) = mpsc::channel(256); | ||||
| 
 | ||||
|             Arbiter::spawn(async move { | ||||
|                 debug!("Spawned stream handler"); | ||||
|                 while let Some(n) = stream.next().await { | ||||
|                     match tx.send(n).await { | ||||
|                         Err(e) => error!("Error forwarding notification, {}", e), | ||||
|                         _ => (), | ||||
|                     }; | ||||
|                 } | ||||
|                 warn!("Stream handler ended"); | ||||
|                 let _ = tx.send(Notify::Done).await; | ||||
|             }); | ||||
| 
 | ||||
|             Ok((client, rx)) | ||||
|         }; | ||||
| 
 | ||||
|         let fut = fut.into_actor(self).map(|res, actor, ctx| match res { | ||||
|             Ok((client, stream)) => { | ||||
|                 Self::add_stream(stream, ctx); | ||||
|                 let f = async move { | ||||
|                     match crate::db::listen(&client).await { | ||||
|                         Err(e) => { | ||||
|                             error!("Error listening, {}", e); | ||||
|                             Err(()) | ||||
|                         } | ||||
|                         Ok(_) => Ok(client), | ||||
|                     } | ||||
|                 }; | ||||
| 
 | ||||
|                 ctx.wait(f.into_actor(actor).map(|res, actor, ctx| match res { | ||||
|                     Ok(client) => { | ||||
|                         actor.client = Some(client); | ||||
|                     } | ||||
|                     Err(_) => { | ||||
|                         ctx.stop(); | ||||
|                     } | ||||
|                 })); | ||||
|             while let Some(n) = stream.next().await { | ||||
|                 handle_notification(&state, n).await; | ||||
|             } | ||||
|             Err(_) => { | ||||
|                 ctx.stop(); | ||||
|             } | ||||
|         }); | ||||
| 
 | ||||
|         ctx.wait(fut); | ||||
|         info!("Listener starting"); | ||||
|     } | ||||
|             drop(client); | ||||
|             warn!("Restarting listener task"); | ||||
|         } | ||||
|     }); | ||||
| } | ||||
| 
 | ||||
| impl StreamHandler<Notify> for NotifyHandler { | ||||
|     fn handle(&mut self, notify: Notify, ctx: &mut Self::Context) { | ||||
|         let notif = match notify { | ||||
|             Notify::Msg(notif) => notif, | ||||
|             Notify::Done => { | ||||
|                 warn!("Stopping notify handler"); | ||||
|                 ctx.stop(); | ||||
|                 return; | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         let state = self.state.clone(); | ||||
| 
 | ||||
|         let fut = async move { | ||||
|             match notif.channel() { | ||||
|                 "new_blocks" => { | ||||
|                     info!("Caching block of {}", notif.payload()); | ||||
|                     state.cache_block(notif.payload().to_owned()).await; | ||||
|                 } | ||||
|                 "new_whitelists" => { | ||||
|                     info!("Caching whitelist of {}", notif.payload()); | ||||
|                     state.cache_whitelist(notif.payload().to_owned()).await; | ||||
|                 } | ||||
|                 "new_listeners" => { | ||||
|                     if let Ok(uri) = notif.payload().parse::<XsdAnyUri>() { | ||||
|                         info!("Caching listener {}", uri); | ||||
|                         state.cache_listener(uri).await; | ||||
|                     } | ||||
|                 } | ||||
|                 "rm_blocks" => { | ||||
|                     info!("Busting block cache for {}", notif.payload()); | ||||
|                     state.bust_block(notif.payload()).await; | ||||
|                 } | ||||
|                 "rm_whitelists" => { | ||||
|                     info!("Busting whitelist cache for {}", notif.payload()); | ||||
|                     state.bust_whitelist(notif.payload()).await; | ||||
|                 } | ||||
|                 "rm_listeners" => { | ||||
|                     if let Ok(uri) = notif.payload().parse::<XsdAnyUri>() { | ||||
|                         info!("Busting listener cache for {}", uri); | ||||
|                         state.bust_listener(&uri).await; | ||||
|                     } | ||||
|                 } | ||||
|                 _ => (), | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         ctx.spawn(fut.into_actor(self)); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl Supervised for NotifyHandler {} | ||||
|  |  | |||
		Loading…
	
	Add table
		
		Reference in a new issue