Use memory job store because why not
This commit is contained in:
parent
51ef5c2796
commit
db5284e162
3 changed files with 5 additions and 177 deletions
|
@ -5,7 +5,6 @@ mod deliver_many;
|
||||||
mod instance;
|
mod instance;
|
||||||
mod nodeinfo;
|
mod nodeinfo;
|
||||||
mod process_listeners;
|
mod process_listeners;
|
||||||
mod storage;
|
|
||||||
|
|
||||||
pub use self::{
|
pub use self::{
|
||||||
cache_media::CacheMedia, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance,
|
cache_media::CacheMedia, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance,
|
||||||
|
@ -17,14 +16,14 @@ use crate::{
|
||||||
data::{ActorCache, Media, NodeCache, State},
|
data::{ActorCache, Media, NodeCache, State},
|
||||||
db::Db,
|
db::Db,
|
||||||
error::MyError,
|
error::MyError,
|
||||||
jobs::{process_listeners::Listeners, storage::Storage},
|
jobs::process_listeners::Listeners,
|
||||||
requests::Requests,
|
requests::Requests,
|
||||||
};
|
};
|
||||||
use background_jobs::{Job, QueueHandle, WorkerConfig};
|
use background_jobs::{memory_storage::Storage, Job, QueueHandle, WorkerConfig};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
pub fn create_server(db: Db) -> JobServer {
|
pub fn create_server() -> JobServer {
|
||||||
let shared = background_jobs::create_server(Storage::new(db));
|
let shared = background_jobs::create_server(Storage::new());
|
||||||
|
|
||||||
shared.every(Duration::from_secs(60 * 5), Listeners);
|
shared.every(Duration::from_secs(60 * 5), Listeners);
|
||||||
|
|
||||||
|
|
|
@ -1,171 +0,0 @@
|
||||||
use crate::{db::Db, error::MyError};
|
|
||||||
use background_jobs::{dev::JobInfo, Stats};
|
|
||||||
use log::debug;
|
|
||||||
use tokio_postgres::types::Json;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Storage {
|
|
||||||
db: Db,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Storage {
|
|
||||||
pub fn new(db: Db) -> Self {
|
|
||||||
Storage { db }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl background_jobs::dev::Storage for Storage {
|
|
||||||
type Error = MyError;
|
|
||||||
|
|
||||||
async fn generate_id(&self) -> Result<Uuid, MyError> {
|
|
||||||
// TODO: Ensure unique job id
|
|
||||||
Ok(Uuid::new_v4())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn save_job(&self, job: JobInfo) -> Result<(), MyError> {
|
|
||||||
debug!(
|
|
||||||
"Inserting job {} status {} for queue {}",
|
|
||||||
job.id(),
|
|
||||||
job.status(),
|
|
||||||
job.queue()
|
|
||||||
);
|
|
||||||
self.db.pool().get().await?.execute(
|
|
||||||
"INSERT INTO jobs
|
|
||||||
(job_id, job_queue, job_timeout, job_updated, job_status, job_next_run, job_value, created_at)
|
|
||||||
VALUES
|
|
||||||
($1::UUID, $2::TEXT, $3::BIGINT, $4::TIMESTAMP, $5::TEXT, $6::TIMESTAMP, $7::JSONB, 'now')
|
|
||||||
ON CONFLICT (job_id)
|
|
||||||
DO UPDATE SET
|
|
||||||
job_updated = $4::TIMESTAMP,
|
|
||||||
job_status = $5::TEXT,
|
|
||||||
job_next_run = $6::TIMESTAMP,
|
|
||||||
job_value = $7::JSONB;",
|
|
||||||
&[&job.id(), &job.queue(), &job.timeout(), &job.updated_at().naive_utc(), &job.status().to_string(), &job.next_queue().map(|q| q.naive_utc()), &Json(&job)],
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn fetch_job(&self, id: Uuid) -> Result<Option<JobInfo>, MyError> {
|
|
||||||
debug!(
|
|
||||||
"SELECT job_value FROM jobs WHERE job_id = $1::UUID LIMIT 1; [{}]",
|
|
||||||
id
|
|
||||||
);
|
|
||||||
let row_opt = self
|
|
||||||
.db
|
|
||||||
.pool()
|
|
||||||
.get()
|
|
||||||
.await?
|
|
||||||
.query_opt(
|
|
||||||
"SELECT job_value
|
|
||||||
FROM jobs
|
|
||||||
WHERE job_id = $1::UUID
|
|
||||||
LIMIT 1;",
|
|
||||||
&[&id],
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let row = if let Some(row) = row_opt {
|
|
||||||
row
|
|
||||||
} else {
|
|
||||||
return Ok(None);
|
|
||||||
};
|
|
||||||
|
|
||||||
let value: Json<JobInfo> = row.try_get(0)?;
|
|
||||||
|
|
||||||
Ok(Some(value.0))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn fetch_job_from_queue(&self, queue: &str) -> Result<Option<JobInfo>, MyError> {
|
|
||||||
let row_opt = self
|
|
||||||
.db
|
|
||||||
.pool()
|
|
||||||
.get()
|
|
||||||
.await?
|
|
||||||
.query_opt(
|
|
||||||
"UPDATE jobs
|
|
||||||
SET
|
|
||||||
job_status = 'Running',
|
|
||||||
job_updated = 'now'
|
|
||||||
WHERE
|
|
||||||
job_id = (
|
|
||||||
SELECT job_id
|
|
||||||
FROM jobs
|
|
||||||
WHERE
|
|
||||||
job_queue = $1::TEXT
|
|
||||||
AND
|
|
||||||
(
|
|
||||||
job_next_run IS NULL
|
|
||||||
OR
|
|
||||||
job_next_run < now()
|
|
||||||
)
|
|
||||||
AND
|
|
||||||
(
|
|
||||||
job_status = 'Pending'
|
|
||||||
OR
|
|
||||||
(
|
|
||||||
job_status = 'Running'
|
|
||||||
AND
|
|
||||||
NOW() > (INTERVAL '1 millisecond' * job_timeout + job_updated)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
LIMIT 1
|
|
||||||
FOR UPDATE SKIP LOCKED
|
|
||||||
)
|
|
||||||
RETURNING job_value;",
|
|
||||||
&[&queue],
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let row = if let Some(row) = row_opt {
|
|
||||||
row
|
|
||||||
} else {
|
|
||||||
return Ok(None);
|
|
||||||
};
|
|
||||||
|
|
||||||
let value: Json<JobInfo> = row.try_get(0)?;
|
|
||||||
let job = value.0;
|
|
||||||
|
|
||||||
debug!("Found job {} in queue {}", job.id(), queue);
|
|
||||||
|
|
||||||
Ok(Some(job))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn queue_job(&self, _queue: &str, _id: Uuid) -> Result<(), MyError> {
|
|
||||||
// Queue Job is a no-op, since jobs are always in their queue
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_job(&self, _id: Uuid, _runner_id: Uuid) -> Result<(), MyError> {
|
|
||||||
// Run Job is a no-op, since jobs are marked running at fetch
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn delete_job(&self, id: Uuid) -> Result<(), MyError> {
|
|
||||||
debug!("Deleting job {}", id);
|
|
||||||
self.db
|
|
||||||
.pool()
|
|
||||||
.get()
|
|
||||||
.await?
|
|
||||||
.execute("DELETE FROM jobs WHERE job_id = $1::UUID;", &[&id])
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_stats(&self) -> Result<Stats, MyError> {
|
|
||||||
// TODO: Stats are unimplemented
|
|
||||||
Ok(Stats::default())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_stats<F>(&self, _f: F) -> Result<(), MyError>
|
|
||||||
where
|
|
||||||
F: Fn(Stats) -> Stats + Send,
|
|
||||||
{
|
|
||||||
// TODO: Stats are unimplemented
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -71,7 +71,7 @@ async fn main() -> Result<(), anyhow::Error> {
|
||||||
let media = Media::new(db.clone());
|
let media = Media::new(db.clone());
|
||||||
let state = State::hydrate(config.clone(), &db).await?;
|
let state = State::hydrate(config.clone(), &db).await?;
|
||||||
let actors = ActorCache::new(db.clone());
|
let actors = ActorCache::new(db.clone());
|
||||||
let job_server = create_server(db.clone());
|
let job_server = create_server();
|
||||||
|
|
||||||
notify::Notifier::new(config.database_url().parse()?)
|
notify::Notifier::new(config.database_url().parse()?)
|
||||||
.register(notify::NewBlocks(state.clone()))
|
.register(notify::NewBlocks(state.clone()))
|
||||||
|
|
Loading…
Add table
Reference in a new issue