浏览代码

Rename to jobs and some refactors

Mauricio Cassola 2 年之前
父节点
当前提交
0593e070a4
共有 7 个文件被更改,包括 170 次插入156 次删除
  1. 29 23
      src/db.rs
  2. 0 103
      src/db/events.rs
  3. 109 0
      src/db/jobs.rs
  4. 1 1
      src/handlers.rs
  5. 0 21
      src/handlers/events.rs
  6. 20 0
      src/handlers/jobs.rs
  7. 11 8
      src/main.rs

+ 29 - 23
src/db.rs

@@ -4,10 +4,10 @@ use postgres_native_tls::MakeTlsConnector;
 use std::sync::{Arc, Mutex};
 use tokio::sync::{OwnedSemaphorePermit, Semaphore};
 use tokio_postgres::Client as DbClient;
-use crate::db::events::*;
-use crate::handlers::events::handle_event;
+use crate::db::jobs::*;
+use crate::handlers::jobs::handle_job;
 
-pub mod events;
+pub mod jobs;
 pub mod issue_data;
 pub mod notifications;
 pub mod rustc_commits;
@@ -182,26 +182,26 @@ pub async fn run_migrations(client: &DbClient) -> anyhow::Result<()> {
     Ok(())
 }
 
-pub async fn run_scheduled_events(db: &DbClient) -> anyhow::Result<()> {
-    let events = get_events_to_execute(&db).await.unwrap();
-    println!("events to execute: {:#?}", events);
-    tracing::trace!("events to execute: {:#?}", events);
+pub async fn run_scheduled_jobs(db: &DbClient) -> anyhow::Result<()> {
+    let jobs = get_jobs_to_execute(&db).await.unwrap();
+    println!("jobs to execute: {:#?}", jobs);
+    tracing::trace!("jobs to execute: {:#?}", jobs);
 
-    for event in events.iter() {
-        update_event_executed_at(&db, &event.event_id).await?;
+    for job in jobs.iter() {
+        update_job_executed_at(&db, &job.id).await?;
 
-        match handle_event(&event.event_name, &event.event_metadata).await {
+        match handle_job(&job.name, &job.metadata).await {
             Ok(_) => {
-                println!("event succesfully executed (id={})", event.event_id);
-                tracing::trace!("event succesfully executed (id={})", event.event_id);
+                println!("job succesfully executed (id={})", job.id);
+                tracing::trace!("job succesfully executed (id={})", job.id);
 
-                delete_event(&db, &event.event_id).await?;
+                delete_job(&db, &job.id).await?;
             },
             Err(e) => {
-                println!("event failed on execution (id={:?}, error={:?})", event.event_id, e);
-                tracing::trace!("event failed on execution (id={:?}, error={:?})", event.event_id, e);
+                println!("job failed on execution (id={:?}, error={:?})", job.id, e);
+                tracing::trace!("job failed on execution (id={:?}, error={:?})", job.id, e);
 
-                update_event_failed_message(&db, &event.event_id, &e.to_string()).await?;
+                update_job_error_message(&db, &job.id, &e.to_string()).await?;
             },
         }
     }
@@ -247,13 +247,19 @@ CREATE TABLE issue_data (
 );
 ",
     "
-CREATE TABLE events (
-    event_id UUID PRIMARY KEY,
-    event_name TEXT NOT NULL,
-    expected_event_time TIMESTAMP WITH TIME ZONE NOT NULL,
-    event_metadata JSONB,
-    executed_at TIMESTAMP WITH TIME ZONE NOT NULL,
-    failed TEXT
+CREATE TABLE jobs (
+    id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
+    name TEXT NOT NULL,
+    expected_time TIMESTAMP WITH TIME ZONE NOT NULL,
+    metadata JSONB,
+    executed_at TIMESTAMP WITH TIME ZONE,
+    error_message TEXT
 );
 ",
+    "
+CREATE UNIQUE INDEX jobs_name_expected_time_unique_index 
+    ON jobs (
+        name, expected_time
+    );
+"
 ];

+ 0 - 103
src/db/events.rs

@@ -1,103 +0,0 @@
-//! The `events` table provides a way to have scheduled events
-use anyhow::{Result, Context as _};
-use chrono::{DateTime, FixedOffset};
-use tokio_postgres::{Client as DbClient};
-use uuid::Uuid;
-use serde::{Deserialize, Serialize};
-
-#[derive(Serialize, Deserialize, Debug)]
-pub struct Event {
-    pub event_id: Uuid,
-    pub event_name: String,
-    pub expected_event_time: DateTime<FixedOffset>,
-    pub event_metadata: serde_json::Value,
-    pub executed_at: DateTime<FixedOffset>,
-    pub failed: Option<String>,
-}
-
-pub async fn insert_event(db: &DbClient, event: &Event) -> Result<()> {
-    tracing::trace!("insert_event(id={})", event.event_id);
-    
-    db.execute(
-        "INSERT INTO events (event_id, event_name, expected_event_time, event_metadata, executed_at, failed) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING",
-        &[&event.event_id, &event.event_name, &event.expected_event_time, &"", &event.executed_at, &event.failed],
-    )
-    .await
-    .context("inserting event")?;
-
-    Ok(())
-}
-
-pub async fn delete_event(db: &DbClient, event_id: &Uuid) -> Result<()> {
-    tracing::trace!("delete_event(id={})", event_id);
-    
-    db.execute(
-        "DELETE FROM events WHERE event_id = $1",
-        &[&event_id],
-    )
-    .await
-    .context("deleting event")?;
-
-    Ok(())
-}
-
-pub async fn update_event_failed_message(db: &DbClient, event_id: &Uuid, message: &String) -> Result<()> {
-    tracing::trace!("update_event_failed_message(id={})", event_id);
-    
-    db.execute(
-        "UPDATE events SET failed = $2 WHERE event_id = $1",
-        &[&event_id, &message],
-    )
-    .await
-    .context("updating event failed message")?;
-
-    Ok(())
-}
-
-pub async fn update_event_executed_at(db: &DbClient, event_id: &Uuid) -> Result<()> {
-    tracing::trace!("update_event_executed_at(id={})", event_id);
-    
-    db.execute(
-        "UPDATE events SET executed_at = now() WHERE event_id = $1",
-        &[&event_id],
-    )
-    .await
-    .context("updating event executed at")?;
-
-    Ok(())
-}
-
-// Selects all events with:
-//  - event_time's in the past 
-//  - failed is null or executed_at is at least 60 minutes ago (intended to make repeat executions rare enough)
-pub async fn get_events_to_execute(db: &DbClient) -> Result<Vec<Event>>  {
-    let events = db
-        .query(
-            "
-        SELECT * FROM events WHERE expected_event_time <= now() AND (failed IS NULL OR executed_at <= now() - INTERVAL '60 minutes')",
-            &[],
-        )
-        .await
-        .context("Getting events data")?;
-
-    let mut data = Vec::with_capacity(events.len());
-    for event in events {
-        let event_id: Uuid = event.get(0);
-        let event_name: String = event.get(1);
-        let expected_event_time: DateTime<FixedOffset> = event.get(2);
-        let event_metadata: serde_json::Value = event.get(3);
-        let executed_at: DateTime<FixedOffset> = event.get(4);
-        let failed: Option<String> = event.get(5);
-
-        data.push(Event {
-            event_id,
-            event_name,
-            expected_event_time,
-            event_metadata,
-            executed_at,
-            failed
-        });
-    }
-
-    Ok(data)
-}

+ 109 - 0
src/db/jobs.rs

@@ -0,0 +1,109 @@
+//! The `jobs` table provides a way to have scheduled jobs
+use anyhow::{Result, Context as _};
+use chrono::{DateTime, FixedOffset};
+use tokio_postgres::{Client as DbClient};
+use uuid::Uuid;
+use serde::{Deserialize, Serialize};
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct Job {
+    pub id: Uuid,
+    pub name: String,
+    pub expected_time: DateTime<FixedOffset>,
+    pub metadata: serde_json::Value,
+    pub executed_at: Option<DateTime<FixedOffset>>,
+    pub error_message: Option<String>,
+}
+
+pub async fn insert_job(
+    db: &DbClient, 
+    name: &String,
+    expected_time: &DateTime<FixedOffset>,
+    metadata: &serde_json::Value
+) -> Result<()> {
+    tracing::trace!("insert_job(name={})", name);
+    
+    db.execute(
+        "INSERT INTO jobs (name, expected_time, metadata) VALUES ($1, $2, $3) 
+            ON CONFLICT (name, expected_time) DO UPDATE SET metadata = EXCLUDED.metadata",
+        &[&name, &expected_time, &metadata],
+    )
+    .await
+    .context("Inserting job")?;
+
+    Ok(())
+}
+
+pub async fn delete_job(db: &DbClient, id: &Uuid) -> Result<()> {
+    tracing::trace!("delete_job(id={})", id);
+    
+    db.execute(
+        "DELETE FROM jobs WHERE id = $1",
+        &[&id],
+    )
+    .await
+    .context("Deleting job")?;
+
+    Ok(())
+}
+
+pub async fn update_job_error_message(db: &DbClient, id: &Uuid, message: &String) -> Result<()> {
+    tracing::trace!("update_job_error_message(id={})", id);
+    
+    db.execute(
+        "UPDATE jobs SET error_message = $2 WHERE id = $1",
+        &[&id, &message],
+    )
+    .await
+    .context("Updating job error message")?;
+
+    Ok(())
+}
+
+pub async fn update_job_executed_at(db: &DbClient, id: &Uuid) -> Result<()> {
+    tracing::trace!("update_job_executed_at(id={})", id);
+    
+    db.execute(
+        "UPDATE jobs SET executed_at = now() WHERE id = $1",
+        &[&id],
+    )
+    .await
+    .context("Updating job executed at")?;
+
+    Ok(())
+}
+
+// Selects all jobs with:
+//  - expected_time in the past 
+//  - error_message is null or executed_at is at least 60 minutes ago (intended to make repeat executions rare enough)
+pub async fn get_jobs_to_execute(db: &DbClient) -> Result<Vec<Job>>  {
+    let jobs = db
+        .query(
+            "
+        SELECT * FROM jobs WHERE expected_time <= now() AND (error_message IS NULL OR executed_at <= now() - INTERVAL '60 minutes')",
+            &[],
+        )
+        .await
+        .context("Getting jobs data")?;
+
+    let mut data = Vec::with_capacity(jobs.len());
+    for job in jobs {
+        let id: Uuid = job.get(0);
+        let name: String = job.get(1);
+        let expected_time: DateTime<FixedOffset> = job.get(2);
+        let metadata: serde_json::Value = job.get(3);
+        let executed_at: Option<DateTime<FixedOffset>> = job.get(4);
+        let error_message: Option<String> = job.get(5);
+
+        data.push(Job {
+            id,
+            name,
+            expected_time,
+            metadata: metadata,
+            executed_at: executed_at,
+            error_message
+        });
+    }
+
+    Ok(data)
+}

+ 1 - 1
src/handlers.rs

@@ -43,7 +43,7 @@ mod review_submitted;
 mod rfc_helper;
 mod rustc_commits;
 mod shortcut;
-pub mod events;
+pub mod jobs;
 
 pub async fn handle(ctx: &Context, event: &Event) -> Vec<HandlerError> {
     let config = config::get(&ctx.github, event.repo()).await;

+ 0 - 21
src/handlers/events.rs

@@ -1,21 +0,0 @@
-// Function to match the scheduled event function with its corresponding handler.
-// In case you want to add a new one, just add a new clause to the match with 
-// the event name and the corresponding function.
-
-// The metadata is a serde_json::Value, please visit: https://docs.rs/serde_json/latest/serde_json/value/enum.Value.html
-// to refer on how to get values from there.
-// Example of accessing an integer id in the metadata:
-//    event_metadata["id"].as_i64().unwrap();
-
-pub async fn handle_event(event_name: &String, event_metadata: &serde_json::Value) -> anyhow::Result<()> {
-    match event_name {
-      _ => default(&event_name, &event_metadata)
-    }
-}
-
-fn default(event_name: &String, event_metadata: &serde_json::Value) -> anyhow::Result<()> {
-  println!("handle_event fall in default cause: (name={:?}, metadata={:?})", event_name, event_metadata);
-  tracing::trace!("handle_event fall in default cause: (name={:?}, metadata={:?})", event_name, event_metadata);
-
-  Ok(())
-}

+ 20 - 0
src/handlers/jobs.rs

@@ -0,0 +1,20 @@
+// Function to match the scheduled job function with its corresponding handler.
+// In case you want to add a new one, just add a new clause to the match with 
+// the job name and the corresponding function.
+
+// The metadata is a serde_json::Value
+// Please refer to https://docs.rs/serde_json/latest/serde_json/value/fn.from_value.html
+// on how to interpret it as an instance of type T, implementing Deserialize.
+
+pub async fn handle_job(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> {
+    match name {
+      _ => default(&name, &metadata)
+    }
+}
+
+fn default(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> {
+  println!("handle_job fell into default case: (name={:?}, metadata={:?})", name, metadata);
+  tracing::trace!("handle_job fell into default case: (name={:?}, metadata={:?})", name, metadata);
+
+  Ok(())
+}

+ 11 - 8
src/main.rs

@@ -6,13 +6,14 @@ use futures::StreamExt;
 use hyper::{header, Body, Request, Response, Server, StatusCode};
 use reqwest::Client;
 use route_recognizer::Router;
-use std::{env, net::SocketAddr, sync::Arc};
+use std::{env, net::SocketAddr, sync::Arc, time::Duration};
 use tower::{Service, ServiceExt};
 use tracing as log;
 use tracing::Instrument;
 use triagebot::{db, github, handlers::Context, notification_listing, payload, EventName};
-use std::{time::Duration, thread};
-use tokio::task;
+use tokio::{task, time::sleep};
+
+const JOB_PROCESSING_CADENCE_IN_SECS: u64 = 60;
 
 async fn handle_agenda_request(req: String) -> anyhow::Result<String> {
     if req == "/agenda/lang/triage" {
@@ -239,15 +240,17 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
         .await
         .context("database migrations")?;  
 
+    // spawning a background task that will run the scheduled jobs
+    // every JOB_PROCESSING_CADENCE_IN_SECS
     task::spawn(async move {
         let pool = db::ClientPool::new();
 
-        loop {
-            thread::sleep(Duration::from_secs(60)); // every one minute
-
-            db::run_scheduled_events(&*pool.get().await)
+        loop { 
+            db::run_scheduled_jobs(&*pool.get().await)
                 .await
-                .context("database scheduled_events").unwrap(); 
+                .context("run database scheduled jobs").unwrap(); 
+
+            sleep(Duration::from_secs(JOB_PROCESSING_CADENCE_IN_SECS)).await;
         }
     });