Procházet zdrojové kódy

Run scheduled events once at the beginning

Mauricio Cassola před 2 roky
rodič
revize
e716b515d5
5 změnil soubory, kde provedl 122 přidání a 14 odebrání
  1. 2 0
      Cargo.lock
  2. 2 2
      Cargo.toml
  3. 24 0
      src/db.rs
  4. 78 12
      src/db/events.rs
  5. 16 0
      src/main.rs

+ 2 - 0
Cargo.lock

@@ -1337,6 +1337,7 @@ dependencies = [
  "postgres-protocol",
  "serde",
  "serde_json",
+ "uuid",
 ]
 
 [[package]]
@@ -2238,6 +2239,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
 dependencies = [
  "getrandom",
+ "serde",
 ]
 
 [[package]]

+ 2 - 2
Cargo.toml

@@ -23,13 +23,13 @@ hyper = { version = "0.14.4", features = ["server", "stream"]}
 tokio = { version = "1.7.1", features = ["macros", "time", "rt"] }
 futures = { version = "0.3", default-features = false, features = ["std"] }
 async-trait = "0.1.31"
-uuid = { version = "0.8", features = ["v4"] }
+uuid = { version = "0.8", features = ["v4", "serde"] }
 tracing = "0.1"
 tracing-subscriber = { version = "0.3", features = ["env-filter"] }
 url = "2.1.0"
 once_cell = "1"
 chrono = { version = "0.4", features = ["serde"] }
-tokio-postgres = { version = "0.7.2", features = ["with-chrono-0_4", "with-serde_json-1"] }
+tokio-postgres = { version = "0.7.2", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-0_8"] }
 postgres-native-tls = "0.5.0"
 native-tls = "0.2"
 serde_path_to_error = "0.1.2"

+ 24 - 0
src/db.rs

@@ -4,6 +4,7 @@ use postgres_native_tls::MakeTlsConnector;
 use std::sync::{Arc, Mutex};
 use tokio::sync::{OwnedSemaphorePermit, Semaphore};
 use tokio_postgres::Client as DbClient;
+use crate::db::events::*;
 
 pub mod events;
 pub mod issue_data;
@@ -180,6 +181,29 @@ pub async fn run_migrations(client: &DbClient) -> anyhow::Result<()> {
     Ok(())
 }
 
+pub async fn run_scheduled_events(db: &DbClient) -> anyhow::Result<()> {
+    // table lock ????
+    
+    let events = get_events_to_execute(&db).await;
+    println!("events to execute: {:#?}", events);
+
+    for event in events.unwrap().iter() {
+        update_event_executed_at(&db, &event.event_id).await;
+        match call_event_handler_based_on_event_name {
+            Ok(r) => {
+                tracing::trace!("event succesfully executed (id={})", event.event_id);
+                delete_event(&db, &event.event_id).await;
+            },
+            Err(e) => {
+                tracing::trace!("event failed on execution (id={:?}, error={:?})", event.event_id, e);
+                update_event_failed_message(&db, &event.event_id, &e).await;
+            },
+        }
+    }
+
+    Ok(())
+}
+
 static MIGRATIONS: &[&str] = &[
     "
 CREATE TABLE notifications (

+ 78 - 12
src/db/events.rs

@@ -1,37 +1,103 @@
 //! The `events` table provides a way to have scheduled events
-
-use anyhow::{Result};
+use anyhow::{Result, Context as _};
 use chrono::{DateTime, FixedOffset};
 use tokio_postgres::{Client as DbClient};
 use uuid::Uuid;
+use serde::{Deserialize, Serialize};
 
-#[derive(Debug)]
+#[derive(Serialize, Deserialize, Debug)]
 pub struct Event {
     pub event_id: Uuid,
     pub event_name: String,
     pub expected_event_time: DateTime<FixedOffset>,
-    pub event_metadata: String,
+    // pub event_metadata: String,
     pub executed_at: DateTime<FixedOffset>,
     pub failed: Option<String>,
 }
 
-pub async fn insert_failed(db: &DbClient) -> Result<()> {
-    unimplemented!();
+pub async fn insert_event(db: &DbClient, event: &Event) -> Result<()> {
+    tracing::trace!("insert_event(id={})", event.event_id);
+    
+    db.execute(
+        "INSERT INTO events (event_id, event_name, expected_event_time, event_metadata, executed_at, failed) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING",
+        &[&event.event_id, &event.event_name, &event.expected_event_time, &"", &event.executed_at, &event.failed],
+    )
+    .await
+    .context("inserting event")?;
+
+    Ok(())
+}
+
+pub async fn delete_event(db: &DbClient, event_id: &Uuid) -> Result<()> {
+    tracing::trace!("delete_event(id={})", event_id);
+    
+    db.execute(
+        "DELETE FROM events WHERE event_id = $1",
+        &[&event_id],
+    )
+    .await
+    .context("deleting event")?;
+
+    Ok(())
 }
 
-pub async fn delete_event(db: &DbClient) -> Result<()> {
-    unimplemented!();
+pub async fn update_event_failed_message(db: &DbClient, event_id: &Uuid, message: &String) -> Result<()> {
+    tracing::trace!("update_event_failed_message(id={})", event_id);
+    
+    db.execute(
+        "UPDATE events SET failed = $2 WHERE event_id = $1",
+        &[&event_id, &message],
+    )
+    .await
+    .context("updating event failed message")?;
+
+    Ok(())
 }
 
-pub async fn get_events_to_execute(db: &DbClient) -> Result<Vec<Event>> {
+pub async fn update_event_executed_at(db: &DbClient, event_id: &Uuid) -> Result<()> {
+    tracing::trace!("update_event_executed_at(id={})", event_id);
+    
+    db.execute(
+        "UPDATE events SET executed_at = now() WHERE event_id = $1",
+        &[&event_id],
+    )
+    .await
+    .context("updating event executed at")?;
+
+    Ok(())
+}
+
+// Selects all events with:
+//  - event_time's in the past 
+//  - failed is null or executed_at is at least 60 minutes ago (intended to make repeat executions rare enough)
+pub async fn get_events_to_execute(db: &DbClient) -> Result<Vec<Event>>  {
     let events = db
         .query(
             "
-        SELECT * FROM events",
+        SELECT * FROM events WHERE expected_event_time <= now() AND (failed IS NULL OR executed_at <= now() - INTERVAL '60 minutes')",
             &[],
         )
         .await
-        .unwrap();
+        .context("Getting events data")?;
+
+    let mut data = Vec::with_capacity(events.len());
+    for event in events {
+        let event_id: Uuid = event.get(0);
+        let event_name: String = event.get(1);
+        let expected_event_time: DateTime<FixedOffset> = event.get(2);
+        // let event_metadata: String = event.get(3);
+        let executed_at: DateTime<FixedOffset> = event.get(4);
+        let failed: Option<String> = event.get(5);
+
+        data.push(Event {
+            event_id,
+            event_name,
+            expected_event_time,
+            // event_metadata,
+            executed_at,
+            failed
+        });
+    }
 
-    Ok(vec![])
+    Ok(data)
 }

+ 16 - 0
src/main.rs

@@ -11,6 +11,8 @@ use tower::{Service, ServiceExt};
 use tracing as log;
 use tracing::Instrument;
 use triagebot::{db, github, handlers::Context, notification_listing, payload, EventName};
+// use std::{time::Duration, thread};
+// use tokio::task;
 
 async fn handle_agenda_request(req: String) -> anyhow::Result<String> {
     if req == "/agenda/lang/triage" {
@@ -237,6 +239,20 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
         .await
         .context("database migrations")?;
 
+    db::run_scheduled_events(&*pool.get().await)
+        .await
+        .context("database scheduled_events")?;
+
+    // task::spawn(async move {
+    //     loop {
+    //         thread::sleep(Duration::from_secs(60)); // every one minute
+
+    //         db::run_scheduled_events(&*pool.get().await)
+    //             .await
+    //             .context("database scheduled_events")?; 
+    //     }
+    // });
+
     let client = Client::new();
     let gh = github::GithubClient::new_with_default_token(client.clone());
     let oc = octocrab::OctocrabBuilder::new()