Update deps, use tokio_postgres' Json type
This commit is contained in:
		
							parent
							
								
									69ec2baf79
								
							
						
					
					
						commit
						7c8fbdd965
					
				
					 2 changed files with 52 additions and 46 deletions
				
			
		
							
								
								
									
										42
									
								
								Cargo.lock
									
										
									
										generated
									
									
									
								
							
							
						
						
									
										42
									
								
								Cargo.lock
									
										
									
										generated
									
									
									
								
							|  | @ -2,9 +2,9 @@ | ||||||
| # It is not intended for manual editing. | # It is not intended for manual editing. | ||||||
| [[package]] | [[package]] | ||||||
| name = "activitystreams" | name = "activitystreams" | ||||||
| version = "0.5.0-alpha.11" | version = "0.5.0-alpha.16" | ||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "f0fb876395ae7a1dd1c7d7de38f2cdb583918db16b46ee5b75d2e9bf7af1ef9f" | checksum = "e7173513c9d586a1157f375835777e3b50498b6b7aab4411a7098b455ba995f0" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "activitystreams-derive", |  "activitystreams-derive", | ||||||
|  "chrono", |  "chrono", | ||||||
|  | @ -17,9 +17,9 @@ dependencies = [ | ||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "activitystreams-derive" | name = "activitystreams-derive" | ||||||
| version = "0.5.0-alpha.4" | version = "0.5.0-alpha.8" | ||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "c2bc640808dceb2efac81e6bcb77a7f4e2e76af7fb60e88f966b48123b625d2f" | checksum = "c7ff4a2be3b67d763e78794f622ef2d53da077521229774837f61963c4067b36" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "proc-macro2", |  "proc-macro2", | ||||||
|  "quote", |  "quote", | ||||||
|  | @ -390,9 +390,9 @@ dependencies = [ | ||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "anyhow" | name = "anyhow" | ||||||
| version = "1.0.27" | version = "1.0.28" | ||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "013a6e0a2cbe3d20f9c60b65458f7a7f7a5e636c5d0f45a5a6aee5d4b1f01785" | checksum = "d9a60d744a80c30fcb657dfe2c1b22bcb3e814c1a1e3674f32bf5820b570fbff" | ||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "arc-swap" | name = "arc-swap" | ||||||
|  | @ -411,9 +411,9 @@ dependencies = [ | ||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "async-trait" | name = "async-trait" | ||||||
| version = "0.1.27" | version = "0.1.29" | ||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "991d0a1a3e790c835fd54ab41742a59251338d8c7577fe7d7f0170c7072be708" | checksum = "bab5c215748dc1ad11a145359b1067107ae0f8ca5e99844fa64067ed5bf198e3" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "proc-macro2", |  "proc-macro2", | ||||||
|  "quote", |  "quote", | ||||||
|  | @ -1079,9 +1079,9 @@ dependencies = [ | ||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "h2" | name = "h2" | ||||||
| version = "0.2.3" | version = "0.2.4" | ||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "7938e6aa2a31df4e21f224dc84704bd31c089a6d1355c535b03667371cccc843" | checksum = "377038bf3c89d18d6ca1431e7a5027194fbd724ca10592b9487ede5e8e144f42" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "bytes", |  "bytes", | ||||||
|  "fnv", |  "fnv", | ||||||
|  | @ -1093,7 +1093,7 @@ dependencies = [ | ||||||
|  "log", |  "log", | ||||||
|  "slab", |  "slab", | ||||||
|  "tokio", |  "tokio", | ||||||
|  "tokio-util 0.2.0", |  "tokio-util 0.3.1", | ||||||
| ] | ] | ||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
|  | @ -1830,9 +1830,9 @@ dependencies = [ | ||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "proc-macro-hack" | name = "proc-macro-hack" | ||||||
| version = "0.5.14" | version = "0.5.15" | ||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "fcfdefadc3d57ca21cf17990a28ef4c0f7c61383a28cb7604cf4a18e6ede1420" | checksum = "0d659fe7c6d27f25e9d80a1a094c223f5246f6a6596453e09d7229bf42750b63" | ||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "proc-macro-nested" | name = "proc-macro-nested" | ||||||
|  | @ -1842,9 +1842,9 @@ checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" | ||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "proc-macro2" | name = "proc-macro2" | ||||||
| version = "1.0.9" | version = "1.0.10" | ||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "6c09721c6781493a2a492a96b5a5bf19b65917fe6728884e7c44dd0c60ca3435" | checksum = "df246d292ff63439fea9bc8c0a270bed0e390d5ebd4db4ba15aba81111b5abe3" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "unicode-xid", |  "unicode-xid", | ||||||
| ] | ] | ||||||
|  | @ -2293,9 +2293,9 @@ checksum = "5c2fb2ec9bcd216a5b0d0ccf31ab17b5ed1d627960edff65bbe95d3ce221cefc" | ||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "socket2" | name = "socket2" | ||||||
| version = "0.3.11" | version = "0.3.12" | ||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "e8b74de517221a2cb01a53349cf54182acdc31a074727d3079068448c0676d85" | checksum = "03088793f677dce356f3ccc2edb1b314ad191ab702a5de3faf49304f7e104918" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "cfg-if", |  "cfg-if", | ||||||
|  "libc", |  "libc", | ||||||
|  | @ -2598,9 +2598,9 @@ dependencies = [ | ||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "tokio" | name = "tokio" | ||||||
| version = "0.2.13" | version = "0.2.15" | ||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "0fa5e81d6bc4e67fe889d5783bd2a128ab2e0cfa487e0be16b6a8d177b101616" | checksum = "619cdb2245c40c42d563089b72e80c5df659513d667a017598439ef7a7b1ffe1" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "bytes", |  "bytes", | ||||||
|  "fnv", |  "fnv", | ||||||
|  | @ -2949,9 +2949,9 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" | ||||||
| 
 | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "winapi-util" | name = "winapi-util" | ||||||
| version = "0.1.3" | version = "0.1.4" | ||||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
| checksum = "4ccfbf554c6ad11084fb7517daca16cfdcaccbdadba4fc336f032a8b12c2ad80" | checksum = "fa515c5163a99cc82bab70fd3bfdd36d827be85de63737b40fcef2ce084a436e" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "winapi 0.3.8", |  "winapi 0.3.8", | ||||||
| ] | ] | ||||||
|  |  | ||||||
|  | @ -1,5 +1,6 @@ | ||||||
| use crate::{db::Db, error::MyError}; | use crate::{db::Db, error::MyError}; | ||||||
| use background_jobs_core::{JobInfo, Stats}; | use background_jobs_core::{JobInfo, Stats}; | ||||||
|  | use bb8_postgres::tokio_postgres::types::Json; | ||||||
| use log::debug; | use log::debug; | ||||||
| use uuid::Uuid; | use uuid::Uuid; | ||||||
| 
 | 
 | ||||||
|  | @ -24,17 +25,13 @@ impl background_jobs_core::Storage for Storage { | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     async fn save_job(&self, job: JobInfo) -> Result<(), MyError> { |     async fn save_job(&self, job: JobInfo) -> Result<(), MyError> { | ||||||
|         let id = job.id(); |         debug!( | ||||||
|         let queue = job.queue().to_owned(); |             "Inserting job {} status {} for queue {}", | ||||||
|         let timeout = job.timeout(); |             job.id(), | ||||||
|         let updated = job.updated_at().naive_utc(); |             job.status(), | ||||||
|         let status = job.status().to_string(); |             job.queue() | ||||||
|         let next_queue = job.next_queue().map(|q| q.naive_utc()); |         ); | ||||||
|         let value = serde_json::to_value(job)?; |         self.db.pool().get().await?.execute( | ||||||
| 
 |  | ||||||
|         let conn = self.db.pool().get().await?; |  | ||||||
|         debug!("Inserting job {} status {} for queue {}", id, status, queue); |  | ||||||
|         conn.execute( |  | ||||||
|             "INSERT INTO jobs
 |             "INSERT INTO jobs
 | ||||||
|                 (job_id, job_queue, job_timeout, job_updated, job_status, job_next_run, job_value, created_at) |                 (job_id, job_queue, job_timeout, job_updated, job_status, job_next_run, job_value, created_at) | ||||||
|              VALUES |              VALUES | ||||||
|  | @ -45,7 +42,7 @@ impl background_jobs_core::Storage for Storage { | ||||||
|                 job_status = $5::TEXT, |                 job_status = $5::TEXT, | ||||||
|                 job_next_run = $6::TIMESTAMP, |                 job_next_run = $6::TIMESTAMP, | ||||||
|                 job_value = $7::JSONB;",
 |                 job_value = $7::JSONB;",
 | ||||||
|             &[&id, &queue, &timeout, &updated, &status, &next_queue, &value], |             &[&job.id(), &job.queue(), &job.timeout(), &job.updated_at().naive_utc(), &job.status().to_string(), &job.next_queue().map(|q| q.naive_utc()), &Json(&job)], | ||||||
|         ) |         ) | ||||||
|         .await?; |         .await?; | ||||||
| 
 | 
 | ||||||
|  | @ -53,13 +50,16 @@ impl background_jobs_core::Storage for Storage { | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     async fn fetch_job(&self, id: Uuid) -> Result<Option<JobInfo>, MyError> { |     async fn fetch_job(&self, id: Uuid) -> Result<Option<JobInfo>, MyError> { | ||||||
|         let conn = self.db.pool().get().await?; |  | ||||||
|         debug!( |         debug!( | ||||||
|             "SELECT job_value FROM jobs WHERE job_id = $1::UUID LIMIT 1; [{}]", |             "SELECT job_value FROM jobs WHERE job_id = $1::UUID LIMIT 1; [{}]", | ||||||
|             id |             id | ||||||
|         ); |         ); | ||||||
|         let rows = conn |         let row_opt = self | ||||||
|             .query( |             .db | ||||||
|  |             .pool() | ||||||
|  |             .get() | ||||||
|  |             .await? | ||||||
|  |             .query_opt( | ||||||
|                 "SELECT job_value
 |                 "SELECT job_value
 | ||||||
|                  FROM jobs |                  FROM jobs | ||||||
|                  WHERE job_id = $1::UUID |                  WHERE job_id = $1::UUID | ||||||
|  | @ -68,20 +68,23 @@ impl background_jobs_core::Storage for Storage { | ||||||
|             ) |             ) | ||||||
|             .await?; |             .await?; | ||||||
| 
 | 
 | ||||||
|         let row = if let Some(row) = rows.into_iter().next() { |         let row = if let Some(row) = row_opt { | ||||||
|             row |             row | ||||||
|         } else { |         } else { | ||||||
|             return Ok(None); |             return Ok(None); | ||||||
|         }; |         }; | ||||||
| 
 | 
 | ||||||
|         let value = row.try_get(0)?; |         let value: Json<JobInfo> = row.try_get(0)?; | ||||||
| 
 | 
 | ||||||
|         Ok(Some(serde_json::from_value(value)?)) |         Ok(Some(value.0)) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     async fn fetch_job_from_queue(&self, queue: &str) -> Result<Option<JobInfo>, MyError> { |     async fn fetch_job_from_queue(&self, queue: &str) -> Result<Option<JobInfo>, MyError> { | ||||||
|         let conn = self.db.pool().get().await?; |         let row_opt = self | ||||||
|         let row = conn |             .db | ||||||
|  |             .pool() | ||||||
|  |             .get() | ||||||
|  |             .await? | ||||||
|             .query_opt( |             .query_opt( | ||||||
|                 "UPDATE jobs
 |                 "UPDATE jobs
 | ||||||
|                  SET |                  SET | ||||||
|  | @ -117,15 +120,15 @@ impl background_jobs_core::Storage for Storage { | ||||||
|             ) |             ) | ||||||
|             .await?; |             .await?; | ||||||
| 
 | 
 | ||||||
|         let row = if let Some(row) = row { |         let row = if let Some(row) = row_opt { | ||||||
|             row |             row | ||||||
|         } else { |         } else { | ||||||
|             return Ok(None); |             return Ok(None); | ||||||
|         }; |         }; | ||||||
| 
 | 
 | ||||||
|         let value = row.try_get(0)?; |         let value: Json<JobInfo> = row.try_get(0)?; | ||||||
|  |         let job = value.0; | ||||||
| 
 | 
 | ||||||
|         let job: JobInfo = serde_json::from_value(value)?; |  | ||||||
|         debug!("Found job {} in queue {}", job.id(), queue); |         debug!("Found job {} in queue {}", job.id(), queue); | ||||||
| 
 | 
 | ||||||
|         Ok(Some(job)) |         Ok(Some(job)) | ||||||
|  | @ -142,9 +145,12 @@ impl background_jobs_core::Storage for Storage { | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     async fn delete_job(&self, id: Uuid) -> Result<(), MyError> { |     async fn delete_job(&self, id: Uuid) -> Result<(), MyError> { | ||||||
|         let conn = self.db.pool().get().await?; |  | ||||||
|         debug!("Deleting job {}", id); |         debug!("Deleting job {}", id); | ||||||
|         conn.execute("DELETE FROM jobs WHERE job_id = $1::UUID;", &[&id]) |         self.db | ||||||
|  |             .pool() | ||||||
|  |             .get() | ||||||
|  |             .await? | ||||||
|  |             .execute("DELETE FROM jobs WHERE job_id = $1::UUID;", &[&id]) | ||||||
|             .await?; |             .await?; | ||||||
| 
 | 
 | ||||||
|         Ok(()) |         Ok(()) | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		
		Reference in a new issue