Rework misskey fetch to reuse deliver plumbing
Only count server errors towards failed breakers
This commit is contained in:
		
							parent
							
								
									667d586160
								
							
						
					
					
						commit
						a1ea5d676c
					
				
					 8 changed files with 67 additions and 78 deletions
				
			
		| 
						 | 
					@ -71,7 +71,7 @@ impl ActorCache {
 | 
				
			||||||
        id: &IriString,
 | 
					        id: &IriString,
 | 
				
			||||||
        requests: &Requests,
 | 
					        requests: &Requests,
 | 
				
			||||||
    ) -> Result<Actor, Error> {
 | 
					    ) -> Result<Actor, Error> {
 | 
				
			||||||
        let accepted_actor = requests.fetch::<AcceptedActors>(id.as_str()).await?;
 | 
					        let accepted_actor = requests.fetch::<AcceptedActors>(id).await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let input_authority = id.authority_components().ok_or(ErrorKind::MissingDomain)?;
 | 
					        let input_authority = id.authority_components().ok_or(ErrorKind::MissingDomain)?;
 | 
				
			||||||
        let accepted_actor_id = accepted_actor
 | 
					        let accepted_actor_id = accepted_actor
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -42,7 +42,7 @@ impl QueryContact {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let contact = match state
 | 
					        let contact = match state
 | 
				
			||||||
            .requests
 | 
					            .requests
 | 
				
			||||||
            .fetch::<AcceptedActors>(self.contact_id.as_str())
 | 
					            .fetch::<AcceptedActors>(&self.contact_id)
 | 
				
			||||||
            .await
 | 
					            .await
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            Ok(contact) => contact,
 | 
					            Ok(contact) => contact,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -35,7 +35,7 @@ impl Deliver {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    #[tracing::instrument(name = "Deliver", skip(state))]
 | 
					    #[tracing::instrument(name = "Deliver", skip(state))]
 | 
				
			||||||
    async fn permform(self, state: JobState) -> Result<(), Error> {
 | 
					    async fn permform(self, state: JobState) -> Result<(), Error> {
 | 
				
			||||||
        if let Err(e) = state.requests.deliver(self.to, &self.data).await {
 | 
					        if let Err(e) = state.requests.deliver(&self.to, &self.data).await {
 | 
				
			||||||
            if e.is_breaker() {
 | 
					            if e.is_breaker() {
 | 
				
			||||||
                tracing::debug!("Not trying due to failed breaker");
 | 
					                tracing::debug!("Not trying due to failed breaker");
 | 
				
			||||||
                return Ok(());
 | 
					                return Ok(());
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -41,14 +41,14 @@ impl QueryInstance {
 | 
				
			||||||
                let mastodon_instance_uri = iri!(format!("{scheme}://{authority}/api/v1/instance"));
 | 
					                let mastodon_instance_uri = iri!(format!("{scheme}://{authority}/api/v1/instance"));
 | 
				
			||||||
                state
 | 
					                state
 | 
				
			||||||
                    .requests
 | 
					                    .requests
 | 
				
			||||||
                    .fetch_json::<Instance>(mastodon_instance_uri.as_str())
 | 
					                    .fetch_json::<Instance>(&mastodon_instance_uri)
 | 
				
			||||||
                    .await
 | 
					                    .await
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            InstanceApiType::Misskey => {
 | 
					            InstanceApiType::Misskey => {
 | 
				
			||||||
                let msky_meta_uri = iri!(format!("{scheme}://{authority}/api/meta"));
 | 
					                let msky_meta_uri = iri!(format!("{scheme}://{authority}/api/meta"));
 | 
				
			||||||
                state
 | 
					                state
 | 
				
			||||||
                    .requests
 | 
					                    .requests
 | 
				
			||||||
                    .fetch_json_msky::<MisskeyMeta>(msky_meta_uri.as_str())
 | 
					                    .fetch_json_msky::<MisskeyMeta>(&msky_meta_uri)
 | 
				
			||||||
                    .await
 | 
					                    .await
 | 
				
			||||||
                    .map(|res| res.into())
 | 
					                    .map(|res| res.into())
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -43,7 +43,7 @@ impl QueryNodeinfo {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let well_known = match state
 | 
					        let well_known = match state
 | 
				
			||||||
            .requests
 | 
					            .requests
 | 
				
			||||||
            .fetch_json::<WellKnown>(well_known_uri.as_str())
 | 
					            .fetch_json::<WellKnown>(&well_known_uri)
 | 
				
			||||||
            .await
 | 
					            .await
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            Ok(well_known) => well_known,
 | 
					            Ok(well_known) => well_known,
 | 
				
			||||||
| 
						 | 
					@ -55,7 +55,7 @@ impl QueryNodeinfo {
 | 
				
			||||||
        };
 | 
					        };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let href = if let Some(link) = well_known.links.into_iter().find(|l| l.rel.is_supported()) {
 | 
					        let href = if let Some(link) = well_known.links.into_iter().find(|l| l.rel.is_supported()) {
 | 
				
			||||||
            link.href
 | 
					            iri!(&link.href)
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            return Ok(());
 | 
					            return Ok(());
 | 
				
			||||||
        };
 | 
					        };
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -67,11 +67,7 @@ impl MyVerify {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            actor_id
 | 
					            actor_id
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            match self
 | 
					            match self.0.fetch::<PublicKeyResponse>(&public_key_id).await {
 | 
				
			||||||
                .0
 | 
					 | 
				
			||||||
                .fetch::<PublicKeyResponse>(public_key_id.as_str())
 | 
					 | 
				
			||||||
                .await
 | 
					 | 
				
			||||||
            {
 | 
					 | 
				
			||||||
                Ok(res) => res.actor_id().ok_or(ErrorKind::MissingId),
 | 
					                Ok(res) => res.actor_id().ok_or(ErrorKind::MissingId),
 | 
				
			||||||
                Err(e) => {
 | 
					                Err(e) => {
 | 
				
			||||||
                    if e.is_gone() {
 | 
					                    if e.is_gone() {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										123
									
								
								src/requests.rs
									
										
									
									
									
								
							
							
						
						
									
										123
									
								
								src/requests.rs
									
										
									
									
									
								
							| 
						 | 
					@ -229,7 +229,7 @@ impl Requests {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.reset_err();
 | 
					        self.reset_err();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if !res.status().is_success() {
 | 
					        if res.status().is_server_error() {
 | 
				
			||||||
            self.breakers.fail(&parsed_url);
 | 
					            self.breakers.fail(&parsed_url);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if let Ok(bytes) = res.body().await {
 | 
					            if let Ok(bytes) = res.body().await {
 | 
				
			||||||
| 
						 | 
					@ -250,7 +250,7 @@ impl Requests {
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    #[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
 | 
					    #[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
 | 
				
			||||||
    pub(crate) async fn fetch_json<T>(&self, url: &str) -> Result<T, Error>
 | 
					    pub(crate) async fn fetch_json<T>(&self, url: &IriString) -> Result<T, Error>
 | 
				
			||||||
    where
 | 
					    where
 | 
				
			||||||
        T: serde::de::DeserializeOwned,
 | 
					        T: serde::de::DeserializeOwned,
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
| 
						 | 
					@ -258,75 +258,40 @@ impl Requests {
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    #[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
 | 
					    #[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
 | 
				
			||||||
    pub(crate) async fn fetch_json_msky<T>(&self, url: &str) -> Result<T, Error>
 | 
					    pub(crate) async fn fetch_json_msky<T>(&self, url: &IriString) -> Result<T, Error>
 | 
				
			||||||
    where
 | 
					    where
 | 
				
			||||||
        T: serde::de::DeserializeOwned,
 | 
					        T: serde::de::DeserializeOwned,
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        self.do_fetch_msky(url, "application/json").await
 | 
					        let mut res = self
 | 
				
			||||||
 | 
					            .do_deliver(
 | 
				
			||||||
 | 
					                url,
 | 
				
			||||||
 | 
					                &serde_json::json!({}),
 | 
				
			||||||
 | 
					                "application/json",
 | 
				
			||||||
 | 
					                "application/json",
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            .await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let body = res
 | 
				
			||||||
 | 
					            .body()
 | 
				
			||||||
 | 
					            .await
 | 
				
			||||||
 | 
					            .map_err(|e| ErrorKind::ReceiveResponse(url.to_string(), e.to_string()))?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Ok(serde_json::from_slice(body.as_ref())?)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    #[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))]
 | 
					    #[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))]
 | 
				
			||||||
    pub(crate) async fn fetch<T>(&self, url: &str) -> Result<T, Error>
 | 
					    pub(crate) async fn fetch<T>(&self, url: &IriString) -> Result<T, Error>
 | 
				
			||||||
    where
 | 
					    where
 | 
				
			||||||
        T: serde::de::DeserializeOwned,
 | 
					        T: serde::de::DeserializeOwned,
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        self.do_fetch(url, "application/activity+json").await
 | 
					        self.do_fetch(url, "application/activity+json").await
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async fn do_fetch<T>(&self, url: &str, accept: &str) -> Result<T, Error>
 | 
					    async fn do_fetch<T>(&self, url: &IriString, accept: &str) -> Result<T, Error>
 | 
				
			||||||
    where
 | 
					    where
 | 
				
			||||||
        T: serde::de::DeserializeOwned,
 | 
					        T: serde::de::DeserializeOwned,
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        self.do_fetch_inner(url, accept, false).await
 | 
					        let mut res = self.do_fetch_response(url, accept).await?;
 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    async fn do_fetch_msky<T>(&self, url: &str, accept: &str) -> Result<T, Error>
 | 
					 | 
				
			||||||
    where
 | 
					 | 
				
			||||||
        T: serde::de::DeserializeOwned,
 | 
					 | 
				
			||||||
    {
 | 
					 | 
				
			||||||
        self.do_fetch_inner(url, accept, true).await
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    async fn do_fetch_inner<T>(&self, url: &str, accept: &str, use_post: bool) -> Result<T, Error>
 | 
					 | 
				
			||||||
    where
 | 
					 | 
				
			||||||
        T: serde::de::DeserializeOwned,
 | 
					 | 
				
			||||||
    {
 | 
					 | 
				
			||||||
        let parsed_url = url.parse::<IriString>()?;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if !self.breakers.should_try(&parsed_url) {
 | 
					 | 
				
			||||||
            return Err(ErrorKind::Breaker.into());
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        let signer = self.signer();
 | 
					 | 
				
			||||||
        let span = tracing::Span::current();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        let client: Client = self.client.borrow().clone();
 | 
					 | 
				
			||||||
        let client_req = match use_post {
 | 
					 | 
				
			||||||
            true => client.post(url),
 | 
					 | 
				
			||||||
            false => client.get(url),
 | 
					 | 
				
			||||||
        };
 | 
					 | 
				
			||||||
        let client_signed = client_req
 | 
					 | 
				
			||||||
            .insert_header(("Accept", accept))
 | 
					 | 
				
			||||||
            .insert_header(Date(SystemTime::now().into()))
 | 
					 | 
				
			||||||
            .signature(
 | 
					 | 
				
			||||||
                self.config.clone(),
 | 
					 | 
				
			||||||
                self.key_id.clone(),
 | 
					 | 
				
			||||||
                move |signing_string| {
 | 
					 | 
				
			||||||
                    span.record("signing_string", signing_string);
 | 
					 | 
				
			||||||
                    span.in_scope(|| signer.sign(signing_string))
 | 
					 | 
				
			||||||
                },
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            .await?;
 | 
					 | 
				
			||||||
        let res = match use_post {
 | 
					 | 
				
			||||||
            true => {
 | 
					 | 
				
			||||||
                let dummy = serde_json::json!({});
 | 
					 | 
				
			||||||
                client_signed.send_json(&dummy)
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            false => client_signed.send(),
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        .await;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        let mut res = self.check_response(&parsed_url, res).await?;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let body = res
 | 
					        let body = res
 | 
				
			||||||
            .body()
 | 
					            .body()
 | 
				
			||||||
| 
						 | 
					@ -337,8 +302,16 @@ impl Requests {
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    #[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))]
 | 
					    #[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))]
 | 
				
			||||||
    pub(crate) async fn fetch_response(&self, url: IriString) -> Result<ClientResponse, Error> {
 | 
					    pub(crate) async fn fetch_response(&self, url: &IriString) -> Result<ClientResponse, Error> {
 | 
				
			||||||
        if !self.breakers.should_try(&url) {
 | 
					        self.do_fetch_response(url, "*/*").await
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pub(crate) async fn do_fetch_response(
 | 
				
			||||||
 | 
					        &self,
 | 
				
			||||||
 | 
					        url: &IriString,
 | 
				
			||||||
 | 
					        accept: &str,
 | 
				
			||||||
 | 
					    ) -> Result<ClientResponse, Error> {
 | 
				
			||||||
 | 
					        if !self.breakers.should_try(url) {
 | 
				
			||||||
            return Err(ErrorKind::Breaker.into());
 | 
					            return Err(ErrorKind::Breaker.into());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -348,7 +321,7 @@ impl Requests {
 | 
				
			||||||
        let client: Client = self.client.borrow().clone();
 | 
					        let client: Client = self.client.borrow().clone();
 | 
				
			||||||
        let res = client
 | 
					        let res = client
 | 
				
			||||||
            .get(url.as_str())
 | 
					            .get(url.as_str())
 | 
				
			||||||
            .insert_header(("Accept", "*/*"))
 | 
					            .insert_header(("Accept", accept))
 | 
				
			||||||
            .insert_header(Date(SystemTime::now().into()))
 | 
					            .insert_header(Date(SystemTime::now().into()))
 | 
				
			||||||
            .no_decompress()
 | 
					            .no_decompress()
 | 
				
			||||||
            .signature(
 | 
					            .signature(
 | 
				
			||||||
| 
						 | 
					@ -363,7 +336,7 @@ impl Requests {
 | 
				
			||||||
            .send()
 | 
					            .send()
 | 
				
			||||||
            .await;
 | 
					            .await;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let res = self.check_response(&url, res).await?;
 | 
					        let res = self.check_response(url, res).await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        Ok(res)
 | 
					        Ok(res)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
| 
						 | 
					@ -373,7 +346,27 @@ impl Requests {
 | 
				
			||||||
        skip_all,
 | 
					        skip_all,
 | 
				
			||||||
        fields(inbox = inbox.to_string().as_str(), signing_string)
 | 
					        fields(inbox = inbox.to_string().as_str(), signing_string)
 | 
				
			||||||
    )]
 | 
					    )]
 | 
				
			||||||
    pub(crate) async fn deliver<T>(&self, inbox: IriString, item: &T) -> Result<(), Error>
 | 
					    pub(crate) async fn deliver<T>(&self, inbox: &IriString, item: &T) -> Result<(), Error>
 | 
				
			||||||
 | 
					    where
 | 
				
			||||||
 | 
					        T: serde::ser::Serialize + std::fmt::Debug,
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        self.do_deliver(
 | 
				
			||||||
 | 
					            inbox,
 | 
				
			||||||
 | 
					            item,
 | 
				
			||||||
 | 
					            "application/activity+json",
 | 
				
			||||||
 | 
					            "application/activity+json",
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        .await?;
 | 
				
			||||||
 | 
					        Ok(())
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async fn do_deliver<T>(
 | 
				
			||||||
 | 
					        &self,
 | 
				
			||||||
 | 
					        inbox: &IriString,
 | 
				
			||||||
 | 
					        item: &T,
 | 
				
			||||||
 | 
					        content_type: &str,
 | 
				
			||||||
 | 
					        accept: &str,
 | 
				
			||||||
 | 
					    ) -> Result<ClientResponse, Error>
 | 
				
			||||||
    where
 | 
					    where
 | 
				
			||||||
        T: serde::ser::Serialize + std::fmt::Debug,
 | 
					        T: serde::ser::Serialize + std::fmt::Debug,
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
| 
						 | 
					@ -388,8 +381,8 @@ impl Requests {
 | 
				
			||||||
        let client: Client = self.client.borrow().clone();
 | 
					        let client: Client = self.client.borrow().clone();
 | 
				
			||||||
        let (req, body) = client
 | 
					        let (req, body) = client
 | 
				
			||||||
            .post(inbox.as_str())
 | 
					            .post(inbox.as_str())
 | 
				
			||||||
            .insert_header(("Accept", "application/activity+json"))
 | 
					            .insert_header(("Accept", accept))
 | 
				
			||||||
            .insert_header(("Content-Type", "application/activity+json"))
 | 
					            .insert_header(("Content-Type", content_type))
 | 
				
			||||||
            .insert_header(Date(SystemTime::now().into()))
 | 
					            .insert_header(Date(SystemTime::now().into()))
 | 
				
			||||||
            .signature_with_digest(
 | 
					            .signature_with_digest(
 | 
				
			||||||
                self.config.clone(),
 | 
					                self.config.clone(),
 | 
				
			||||||
| 
						 | 
					@ -406,9 +399,9 @@ impl Requests {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let res = req.send_body(body).await;
 | 
					        let res = req.send_body(body).await;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.check_response(&inbox, res).await?;
 | 
					        let res = self.check_response(inbox, res).await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        Ok(())
 | 
					        Ok(res)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn signer(&self) -> Signer {
 | 
					    fn signer(&self) -> Signer {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -11,7 +11,7 @@ pub(crate) async fn route(
 | 
				
			||||||
    let uuid = uuid.into_inner();
 | 
					    let uuid = uuid.into_inner();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if let Some(url) = media.get_url(uuid).await? {
 | 
					    if let Some(url) = media.get_url(uuid).await? {
 | 
				
			||||||
        let res = requests.fetch_response(url).await?;
 | 
					        let res = requests.fetch_response(&url).await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let mut response = HttpResponse::build(res.status());
 | 
					        let mut response = HttpResponse::build(res.status());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		
		Reference in a new issue