浏览代码

Create Job trait and centralize scheduled jobs

Jack Huey 1 年之前
父节点
当前提交
b3af9c113b
共有 6 个文件被更改,包括 150 次插入73 次删除
  1. 23 4
      src/db.rs
  2. 0 1
      src/handlers.rs
  3. 35 32
      src/handlers/docs_update.rs
  4. 13 9
      src/handlers/rustc_commits.rs
  5. 75 25
      src/jobs.rs
  6. 4 2
      src/main.rs

+ 23 - 4
src/db.rs

@@ -1,5 +1,4 @@
-use crate::handlers::jobs::handle_job;
-use crate::{db::jobs::*, handlers::Context};
+use crate::{db::jobs::*, handlers::Context, jobs::jobs};
 use anyhow::Context as _;
 use anyhow::Context as _;
 use chrono::Utc;
 use chrono::Utc;
 use native_tls::{Certificate, TlsConnector};
 use native_tls::{Certificate, TlsConnector};
@@ -188,9 +187,9 @@ pub async fn schedule_jobs(db: &DbClient, jobs: Vec<JobSchedule>) -> anyhow::Res
         let mut upcoming = job.schedule.upcoming(Utc).take(1);
         let mut upcoming = job.schedule.upcoming(Utc).take(1);
 
 
         if let Some(scheduled_at) = upcoming.next() {
         if let Some(scheduled_at) = upcoming.next() {
-            if let Err(_) = get_job_by_name_and_scheduled_at(&db, &job.name, &scheduled_at).await {
+            if let Err(_) = get_job_by_name_and_scheduled_at(&db, job.name, &scheduled_at).await {
                 // mean there's no job already in the db with that name and scheduled_at
                 // mean there's no job already in the db with that name and scheduled_at
-                insert_job(&db, &job.name, &scheduled_at, &job.metadata).await?;
+                insert_job(&db, job.name, &scheduled_at, &job.metadata).await?;
             }
             }
         }
         }
     }
     }
@@ -220,6 +219,26 @@ pub async fn run_scheduled_jobs(ctx: &Context, db: &DbClient) -> anyhow::Result<
     Ok(())
     Ok(())
 }
 }
 
 
+// Try to handle a specific job
+async fn handle_job(
+    ctx: &Context,
+    name: &String,
+    metadata: &serde_json::Value,
+) -> anyhow::Result<()> {
+    for job in jobs() {
+        if &job.name() == &name {
+            return job.run(ctx, metadata).await;
+        }
+    }
+    tracing::trace!(
+        "handle_job fell into default case: (name={:?}, metadata={:?})",
+        name,
+        metadata
+    );
+
+    Ok(())
+}
+
 static MIGRATIONS: &[&str] = &[
 static MIGRATIONS: &[&str] = &[
     "
     "
 CREATE TABLE notifications (
 CREATE TABLE notifications (

+ 0 - 1
src/handlers.rs

@@ -29,7 +29,6 @@ mod close;
 pub mod docs_update;
 pub mod docs_update;
 mod github_releases;
 mod github_releases;
 mod glacier;
 mod glacier;
-pub mod jobs;
 mod major_change;
 mod major_change;
 mod mentions;
 mod mentions;
 mod milestone_prs;
 mod milestone_prs;

+ 35 - 32
src/handlers/docs_update.rs

@@ -1,13 +1,12 @@
 //! A scheduled job to post a PR to update the documentation on rust-lang/rust.
 //! A scheduled job to post a PR to update the documentation on rust-lang/rust.
 
 
-use crate::db::jobs::JobSchedule;
 use crate::github::{self, GitTreeEntry, GithubClient, Issue, Repository};
 use crate::github::{self, GitTreeEntry, GithubClient, Issue, Repository};
+use crate::jobs::Job;
 use anyhow::Context;
 use anyhow::Context;
 use anyhow::Result;
 use anyhow::Result;
-use cron::Schedule;
+use async_trait::async_trait;
 use reqwest::Client;
 use reqwest::Client;
 use std::fmt::Write;
 use std::fmt::Write;
-use std::str::FromStr;
 
 
 /// This is the repository where the commits will be created.
 /// This is the repository where the commits will be created.
 const WORK_REPO: &str = "rustbot/rust";
 const WORK_REPO: &str = "rustbot/rust";
@@ -28,38 +27,42 @@ const SUBMODULES: &[&str] = &[
 
 
 const TITLE: &str = "Update books";
 const TITLE: &str = "Update books";
 
 
-pub fn job() -> JobSchedule {
-    JobSchedule {
-        name: "docs_update",
-        // Around 9am Pacific time on every Monday.
-        schedule: Schedule::from_str("0 00 17 * * Mon *").unwrap(),
-        metadata: serde_json::Value::Null,
-    }
-}
+pub struct DocsUpdateJob;
 
 
-pub async fn handle_job() -> Result<()> {
-    // Only run every other week. Doing it every week can be a bit noisy, and
-    // (rarely) a PR can take longer than a week to merge (like if there are
-    // CI issues). `Schedule` does not allow expressing this, so check it
-    // manually.
-    //
-    // This is set to run the first week after a release, and the week just
-    // before a release. That allows getting the latest changes in the next
-    // release, accounting for possibly taking a few days for the PR to land.
-    let today = chrono::Utc::today().naive_utc();
-    let base = chrono::naive::NaiveDate::from_ymd(2015, 12, 10);
-    let duration = today.signed_duration_since(base);
-    let weeks = duration.num_weeks();
-    if weeks % 2 != 0 {
-        tracing::trace!("skipping job, this is an odd week");
-        return Ok(());
+#[async_trait]
+impl Job for DocsUpdateJob {
+    fn name(&self) -> &'static str {
+        "docs_update"
     }
     }
 
 
-    tracing::trace!("starting docs-update");
-    docs_update()
-        .await
-        .context("failed to process docs update")?;
-    Ok(())
+    async fn run(
+        &self,
+        _ctx: &super::Context,
+        _metadata: &serde_json::Value,
+    ) -> anyhow::Result<()> {
+        // Only run every other week. Doing it every week can be a bit noisy, and
+        // (rarely) a PR can take longer than a week to merge (like if there are
+        // CI issues). `Schedule` does not allow expressing this, so check it
+        // manually.
+        //
+        // This is set to run the first week after a release, and the week just
+        // before a release. That allows getting the latest changes in the next
+        // release, accounting for possibly taking a few days for the PR to land.
+        let today = chrono::Utc::today().naive_utc();
+        let base = chrono::naive::NaiveDate::from_ymd(2015, 12, 10);
+        let duration = today.signed_duration_since(base);
+        let weeks = duration.num_weeks();
+        if weeks % 2 != 0 {
+            tracing::trace!("skipping job, this is an odd week");
+            return Ok(());
+        }
+
+        tracing::trace!("starting docs-update");
+        docs_update()
+            .await
+            .context("failed to process docs update")?;
+        Ok(())
+    }
 }
 }
 
 
 pub async fn docs_update() -> Result<Option<Issue>> {
 pub async fn docs_update() -> Result<Option<Issue>> {

+ 13 - 9
src/handlers/rustc_commits.rs

@@ -1,14 +1,13 @@
-use crate::db::jobs::JobSchedule;
 use crate::db::rustc_commits;
 use crate::db::rustc_commits;
 use crate::db::rustc_commits::get_missing_commits;
 use crate::db::rustc_commits::get_missing_commits;
+use crate::jobs::Job;
 use crate::{
 use crate::{
     github::{self, Event},
     github::{self, Event},
     handlers::Context,
     handlers::Context,
 };
 };
-use cron::Schedule;
+use async_trait::async_trait;
 use std::collections::VecDeque;
 use std::collections::VecDeque;
 use std::convert::TryInto;
 use std::convert::TryInto;
-use std::str::FromStr;
 use tracing as log;
 use tracing as log;
 
 
 const BORS_GH_ID: i64 = 3372342;
 const BORS_GH_ID: i64 = 3372342;
@@ -153,12 +152,17 @@ pub async fn synchronize_commits_inner(ctx: &Context, starter: Option<(String, u
     }
     }
 }
 }
 
 
-pub fn job() -> JobSchedule {
-    JobSchedule {
-        name: "rustc_commits",
-        // Every 30 minutes...
-        schedule: Schedule::from_str("* 0,30 * * * * *").unwrap(),
-        metadata: serde_json::Value::Null,
+pub struct RustcCommitsJob;
+
+#[async_trait]
+impl Job for RustcCommitsJob {
+    fn name(&self) -> &'static str {
+        "rustc_commits"
+    }
+
+    async fn run(&self, ctx: &super::Context, _metadata: &serde_json::Value) -> anyhow::Result<()> {
+        synchronize_commits_inner(ctx, None).await;
+        Ok(())
     }
     }
 }
 }
 
 

+ 75 - 25
src/jobs.rs

@@ -1,4 +1,11 @@
-//! SCHEDULED JOBS
+//! # Scheduled Jobs
+//!
+//! Scheduled jobs essentially come in two flavors: automatically repeating
+//! (cron) jobs and one-off jobs.
+//!
+//! The core trait here is the `Job` trait, which *must* define the name of the
+//! job (to be used as an identifier in the database) and the function to run
+//! when the job runs.
 //!
 //!
 //! The metadata is a serde_json::Value
 //! The metadata is a serde_json::Value
 //! Please refer to https://docs.rs/serde_json/latest/serde_json/value/fn.from_value.html
 //! Please refer to https://docs.rs/serde_json/latest/serde_json/value/fn.from_value.html
@@ -7,33 +14,42 @@
 //! The schedule is a cron::Schedule
 //! The schedule is a cron::Schedule
 //! Please refer to https://docs.rs/cron/latest/cron/struct.Schedule.html for further info
 //! Please refer to https://docs.rs/cron/latest/cron/struct.Schedule.html for further info
 //!
 //!
-//! For example, if we want to sends a Zulip message every Friday at 11:30am ET into #t-release
-//! with a @T-release meeting! content, we should create some JobSchedule like:
+//! ## Example, sending a zulip message once a week
 //!
 //!
+//! To give an example, let's imagine we want to sends a Zulip message every
+//! Friday at 11:30am ET into #t-release with a "@T-release meeting!"" content.
+//!
+//! To begin, let's create a generic zulip message Job:
 //!    #[derive(Serialize, Deserialize)]
 //!    #[derive(Serialize, Deserialize)]
 //!    struct ZulipMetadata {
 //!    struct ZulipMetadata {
 //!      pub message: String
 //!      pub message: String
+//!      pub channel: String,
 //!    }
 //!    }
+//!    struct ZulipMessageJob;
+//!    impl Job for ZulipMessageJob { ... }
 //!
 //!
-//!    let metadata = serde_json::value::to_value(ZulipMetadata {
-//!      message: "@T-release meeting!".to_string()
-//!    }).unwrap();
-//!
-//!    let schedule = Schedule::from_str("0 30 11 * * FRI *").unwrap();
-//!    
-//!    let new_job = JobSchedule {
-//!      name: "send_zulip_message".to_owned(),
-//!      schedule: schedule,
-//!      metadata: metadata
-//!    }
-//!
-//! and include it in the below vector in jobs():
+//! (Imagine that this job requires a channel and a message in the metadata.)
 //!
 //!
-//!   jobs.push(new_job);
-//!
-//! ... fianlly, add the corresponding "send_zulip_message" handler in src/handlers/jobs.rs
+//! If we wanted to have a default scheduled message, we could add the following to
+//! `default_jobs`:
+//!     JobSchedule {
+//!         name: ZulipMessageJob.name(),
+//!         schedule: Schedule::from_str("0 30 11 * * FRI *").unwrap(),
+//!         metadata: serde_json::value::to_value(ZulipMetadata {
+//!             message: "@T-release meeting!".to_string()
+//!             channel: "T-release".to_string(),
+//!         }).unwrap(),
+//!     }
+
+use std::str::FromStr;
 
 
-use crate::db::jobs::JobSchedule;
+use async_trait::async_trait;
+use cron::Schedule;
+
+use crate::{
+    db::jobs::JobSchedule,
+    handlers::{docs_update::DocsUpdateJob, rustc_commits::RustcCommitsJob, Context},
+};
 
 
 // How often new cron-based jobs will be placed in the queue.
 // How often new cron-based jobs will be placed in the queue.
 // This is the minimum period *between* a single cron task's executions.
 // This is the minimum period *between* a single cron task's executions.
@@ -43,16 +59,50 @@ pub const JOB_SCHEDULING_CADENCE_IN_SECS: u64 = 1800;
 // This is the granularity at which events will occur.
 // This is the granularity at which events will occur.
 pub const JOB_PROCESSING_CADENCE_IN_SECS: u64 = 60;
 pub const JOB_PROCESSING_CADENCE_IN_SECS: u64 = 60;
 
 
-pub fn jobs() -> Vec<JobSchedule> {
-    // Add to this vector any new cron task you want (as explained above)
+// The default jobs to schedule, repeatedly.
+pub fn jobs() -> Vec<Box<dyn Job + Send + Sync>> {
+    vec![Box::new(DocsUpdateJob), Box::new(RustcCommitsJob)]
+}
+
+pub fn default_jobs() -> Vec<JobSchedule> {
     vec![
     vec![
-        crate::handlers::docs_update::job(),
-        crate::handlers::rustc_commits::job(),
+        JobSchedule {
+            name: DocsUpdateJob.name(),
+            // Around 9am Pacific time on every Monday.
+            schedule: Schedule::from_str("0 00 17 * * Mon *").unwrap(),
+            metadata: serde_json::Value::Null,
+        },
+        JobSchedule {
+            name: RustcCommitsJob.name(),
+            // Every 30 minutes...
+            schedule: Schedule::from_str("* 0,30 * * * * *").unwrap(),
+            metadata: serde_json::Value::Null,
+        },
     ]
     ]
 }
 }
 
 
+#[async_trait]
+pub trait Job {
+    fn name(&self) -> &str;
+
+    async fn run(&self, ctx: &Context, metadata: &serde_json::Value) -> anyhow::Result<()>;
+}
+
 #[test]
 #[test]
 fn jobs_defined() {
 fn jobs_defined() {
+    // This checks that we don't panic (during schedule parsing) and that all names are unique
     // Checks we don't panic here, mostly for the schedule parsing.
     // Checks we don't panic here, mostly for the schedule parsing.
-    drop(jobs());
+    let all_jobs = jobs();
+    let mut all_job_names: Vec<_> = all_jobs.into_iter().map(|j| j.name().to_string()).collect();
+    all_job_names.sort();
+    let mut unique_all_job_names = all_job_names.clone();
+    unique_all_job_names.sort();
+    unique_all_job_names.dedup();
+    assert_eq!(all_job_names, unique_all_job_names);
+
+    // Also ensure that our defalt jobs are release jobs
+    let default_jobs = default_jobs();
+    default_jobs
+        .iter()
+        .for_each(|j| assert!(all_job_names.contains(&j.name.to_string())));
 }
 }

+ 4 - 2
src/main.rs

@@ -11,7 +11,9 @@ use tokio::{task, time};
 use tower::{Service, ServiceExt};
 use tower::{Service, ServiceExt};
 use tracing as log;
 use tracing as log;
 use tracing::Instrument;
 use tracing::Instrument;
-use triagebot::jobs::{jobs, JOB_PROCESSING_CADENCE_IN_SECS, JOB_SCHEDULING_CADENCE_IN_SECS};
+use triagebot::jobs::{
+    default_jobs, JOB_PROCESSING_CADENCE_IN_SECS, JOB_SCHEDULING_CADENCE_IN_SECS,
+};
 use triagebot::{db, github, handlers::Context, notification_listing, payload, EventName};
 use triagebot::{db, github, handlers::Context, notification_listing, payload, EventName};
 
 
 async fn handle_agenda_request(req: String) -> anyhow::Result<String> {
 async fn handle_agenda_request(req: String) -> anyhow::Result<String> {
@@ -320,7 +322,7 @@ fn spawn_job_scheduler() {
 
 
                 loop {
                 loop {
                     interval.tick().await;
                     interval.tick().await;
-                    db::schedule_jobs(&*pool.get().await, jobs())
+                    db::schedule_jobs(&*pool.get().await, default_jobs())
                         .await
                         .await
                         .context("database schedule jobs")
                         .context("database schedule jobs")
                         .unwrap();
                         .unwrap();