Просмотр исходного кода

Cron jobs support in main runner

Mauricio Cassola 2 лет назад
Родитель
Сommit
ac1d005ae3
4 измененных файлов с 71 добавлено и 12 удалено
  1. 17 4
      Cargo.lock
  2. 1 0
      Cargo.toml
  3. 14 0
      src/db.rs
  4. 39 8
      src/db/jobs.rs

+ 17 - 4
Cargo.lock

@@ -1294,6 +1294,17 @@ version = "0.3.25"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae"
 
+[[package]]
+name = "postgres-derive"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d0c2c18e40b92144b05e6f3ae9d1ee931f0d1afa9410ac8b97486c6eaaf91201"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "postgres-native-tls"
 version = "0.5.0"
@@ -1309,9 +1320,9 @@ dependencies = [
 
 [[package]]
 name = "postgres-protocol"
-version = "0.6.3"
+version = "0.6.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "79ec03bce71f18b4a27c4c64c6ba2ddf74686d69b91d8714fb32ead3adaed713"
+checksum = "878c6cbf956e03af9aa8204b407b9cbf47c072164800aa918c516cd4b056c50c"
 dependencies = [
  "base64",
  "byteorder",
@@ -1327,13 +1338,14 @@ dependencies = [
 
 [[package]]
 name = "postgres-types"
-version = "0.2.2"
+version = "0.2.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "04619f94ba0cc80999f4fc7073607cb825bc739a883cb6d20900fc5e009d6b0d"
+checksum = "73d946ec7d256b04dfadc4e6a3292324e6f417124750fc5c0950f981b703a0f1"
 dependencies = [
  "bytes",
  "chrono",
  "fallible-iterator",
+ "postgres-derive",
  "postgres-protocol",
  "serde",
  "serde_json",
@@ -2059,6 +2071,7 @@ dependencies = [
  "openssl",
  "parser",
  "postgres-native-tls",
+ "postgres-types",
  "rand",
  "regex",
  "reqwest",

+ 1 - 0
Cargo.toml

@@ -42,6 +42,7 @@ tower = { version = "0.4.13", features = ["util", "limit", "buffer", "load-shed"
 github-graphql = { path = "github-graphql" }
 rand = "0.8.5"
 ignore = "0.4.18"
+postgres-types = { version = "0.2.4", features = ["derive"] }
 
 [dependencies.serde]
 version = "1"

+ 14 - 0
src/db.rs

@@ -195,6 +195,15 @@ pub async fn run_scheduled_jobs(db: &DbClient) -> anyhow::Result<()> {
                 println!("job succesfully executed (id={})", job.id);
                 tracing::trace!("job succesfully executed (id={})", job.id);
 
+                if let Some(frequency) = job.frequency {
+                    let duration = get_duration_from_cron(frequency, job.frequency_unit.as_ref().unwrap());
+                    let new_expected_time = job.expected_time.checked_add_signed(duration).unwrap();
+
+                    insert_job(&db, &job.name, &new_expected_time, &Some(frequency), &job.frequency_unit, &job.metadata).await?;
+                    println!("job succesfully reinserted (name={})", job.name);
+                    tracing::trace!("job succesfully reinserted (name={})", job.name);
+                }
+
                 delete_job(&db, &job.id).await?;
             },
             Err(e) => {
@@ -245,12 +254,17 @@ CREATE TABLE issue_data (
     data JSONB,
     PRIMARY KEY (repo, issue_number, key)
 );
+",
+"
+CREATE TYPE frequency_unit AS ENUM ('days', 'hours', 'minutes', 'seconds');
 ",
     "
 CREATE TABLE jobs (
     id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
     name TEXT NOT NULL,
     expected_time TIMESTAMP WITH TIME ZONE NOT NULL,
+    frequency INTEGER,
+    frequency_unit frequency_unit,
     metadata JSONB,
     executed_at TIMESTAMP WITH TIME ZONE,
     error_message TEXT

+ 39 - 8
src/db/jobs.rs

@@ -1,32 +1,50 @@
 //! The `jobs` table provides a way to have scheduled jobs
 use anyhow::{Result, Context as _};
-use chrono::{DateTime, FixedOffset};
+use chrono::{DateTime, FixedOffset, Duration};
 use tokio_postgres::{Client as DbClient};
 use uuid::Uuid;
 use serde::{Deserialize, Serialize};
+use postgres_types::{ToSql, FromSql};
 
 #[derive(Serialize, Deserialize, Debug)]
 pub struct Job {
     pub id: Uuid,
     pub name: String,
     pub expected_time: DateTime<FixedOffset>,
+    pub frequency: Option<i32>,
+    pub frequency_unit: Option<FrequencyUnit>,
     pub metadata: serde_json::Value,
     pub executed_at: Option<DateTime<FixedOffset>>,
     pub error_message: Option<String>,
 }
 
+#[derive(Serialize, Deserialize, Debug, ToSql, FromSql)]
+#[postgres(name = "frequency_unit")]
+pub enum FrequencyUnit {
+    #[postgres(name = "days")]
+    Days,
+    #[postgres(name = "hours")]
+    Hours,
+    #[postgres(name = "minutes")]
+    Minutes,
+    #[postgres(name = "seconds")]
+    Seconds,
+}
+
 pub async fn insert_job(
     db: &DbClient, 
     name: &String,
     expected_time: &DateTime<FixedOffset>,
+    frequency: &Option<i32>,
+    frequency_unit: &Option<FrequencyUnit>,
     metadata: &serde_json::Value
 ) -> Result<()> {
     tracing::trace!("insert_job(name={})", name);
     
     db.execute(
-        "INSERT INTO jobs (name, expected_time, metadata) VALUES ($1, $2, $3) 
+        "INSERT INTO jobs (name, expected_time, frequency, frequency_unit, metadata) VALUES ($1, $2, $3, $4, $5) 
             ON CONFLICT (name, expected_time) DO UPDATE SET metadata = EXCLUDED.metadata",
-        &[&name, &expected_time, &metadata],
+        &[&name, &expected_time, &frequency, &frequency_unit, &metadata],
     )
     .await
     .context("Inserting job")?;
@@ -91,19 +109,32 @@ pub async fn get_jobs_to_execute(db: &DbClient) -> Result<Vec<Job>>  {
         let id: Uuid = job.get(0);
         let name: String = job.get(1);
         let expected_time: DateTime<FixedOffset> = job.get(2);
-        let metadata: serde_json::Value = job.get(3);
-        let executed_at: Option<DateTime<FixedOffset>> = job.get(4);
-        let error_message: Option<String> = job.get(5);
+        let frequency: Option<i32> = job.get(3);
+        let frequency_unit: Option<FrequencyUnit> = job.get(4);
+        let metadata: serde_json::Value = job.get(5);
+        let executed_at: Option<DateTime<FixedOffset>> = job.get(6);
+        let error_message: Option<String> = job.get(7);
 
         data.push(Job {
             id,
             name,
             expected_time,
-            metadata: metadata,
-            executed_at: executed_at,
+            frequency,
+            frequency_unit,
+            metadata,
+            executed_at,
             error_message
         });
     }
 
     Ok(data)
 }
+
+pub fn get_duration_from_cron(cron_period: i32, cron_unit: &FrequencyUnit) -> Duration {
+    match cron_unit {
+        FrequencyUnit::Days => Duration::days(cron_period as i64),
+        FrequencyUnit::Hours => Duration::hours(cron_period as i64),
+        FrequencyUnit::Minutes => Duration::minutes(cron_period as i64),
+        FrequencyUnit::Seconds => Duration::seconds(cron_period as i64),
+    }
+}