Browse Source

Automatically re-scan GitHub commits every 30 minutes

This lets us recover from faults in the GitHub API more quickly.
Mark Rousskov 2 years ago
parent
commit
8f2edd2c5b
6 changed files with 67 additions and 24 deletions
  1. 3 3
      src/db.rs
  2. 1 1
      src/handlers.rs
  3. 11 1
      src/handlers/jobs.rs
  4. 29 5
      src/handlers/rustc_commits.rs
  5. 7 0
      src/jobs.rs
  6. 16 14
      src/main.rs

+ 3 - 3
src/db.rs

@@ -1,5 +1,5 @@
-use crate::db::jobs::*;
 use crate::handlers::jobs::handle_job;
+use crate::{db::jobs::*, handlers::Context};
 use anyhow::Context as _;
 use chrono::Utc;
 use native_tls::{Certificate, TlsConnector};
@@ -198,14 +198,14 @@ pub async fn schedule_jobs(db: &DbClient, jobs: Vec<JobSchedule>) -> anyhow::Res
     Ok(())
 }
 
-pub async fn run_scheduled_jobs(db: &DbClient) -> anyhow::Result<()> {
+pub async fn run_scheduled_jobs(ctx: &Context, db: &DbClient) -> anyhow::Result<()> {
     let jobs = get_jobs_to_execute(&db).await.unwrap();
     tracing::trace!("jobs to execute: {:#?}", jobs);
 
     for job in jobs.iter() {
         update_job_executed_at(&db, &job.id).await?;
 
-        match handle_job(&job.name, &job.metadata).await {
+        match handle_job(&ctx, &job.name, &job.metadata).await {
             Ok(_) => {
                 tracing::trace!("job successfully executed (id={})", job.id);
                 delete_job(&db, &job.id).await?;

+ 1 - 1
src/handlers.rs

@@ -43,7 +43,7 @@ mod prioritize;
 mod relabel;
 mod review_submitted;
 mod rfc_helper;
-mod rustc_commits;
+pub mod rustc_commits;
 mod shortcut;
 
 pub async fn handle(ctx: &Context, event: &Event) -> Vec<HandlerError> {

+ 11 - 1
src/handlers/jobs.rs

@@ -4,9 +4,19 @@
 
 // Further info could be find in src/jobs.rs
 
-pub async fn handle_job(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> {
+use super::Context;
+
+pub async fn handle_job(
+    ctx: &Context,
+    name: &String,
+    metadata: &serde_json::Value,
+) -> anyhow::Result<()> {
     match name.as_str() {
         "docs_update" => super::docs_update::handle_job().await,
+        "rustc_commits" => {
+            super::rustc_commits::synchronize_commits_inner(ctx, None).await;
+            Ok(())
+        }
         _ => default(&name, &metadata),
     }
 }

+ 29 - 5
src/handlers/rustc_commits.rs

@@ -1,11 +1,14 @@
+use crate::db::jobs::JobSchedule;
 use crate::db::rustc_commits;
 use crate::db::rustc_commits::get_missing_commits;
 use crate::{
     github::{self, Event},
     handlers::Context,
 };
+use cron::Schedule;
 use std::collections::VecDeque;
 use std::convert::TryInto;
+use std::str::FromStr;
 use tracing as log;
 
 const BORS_GH_ID: i64 = 3372342;
@@ -80,16 +83,28 @@ pub async fn handle(ctx: &Context, event: &Event) -> anyhow::Result<()> {
 /// Fetch commits that are not present in the database.
 async fn synchronize_commits(ctx: &Context, sha: &str, pr: u32) {
     log::trace!("synchronize_commits for sha={:?}, pr={}", sha, pr);
+    synchronize_commits_inner(ctx, Some((sha.to_owned(), pr))).await;
+}
+
+pub async fn synchronize_commits_inner(ctx: &Context, starter: Option<(String, u32)>) {
     let db = ctx.db.get().await;
-    let mut pr = Some(pr);
 
     // List of roots to be resolved. Each root and its parents will be recursively resolved
     // until an existing commit is found.
     let mut to_be_resolved = VecDeque::new();
-    to_be_resolved.push_back(sha.to_string());
-    to_be_resolved.extend(get_missing_commits(&db).await);
+    if let Some((sha, pr)) = starter {
+        to_be_resolved.push_back((sha.to_string(), Some(pr)));
+    }
+    to_be_resolved.extend(
+        get_missing_commits(&db)
+            .await
+            .into_iter()
+            .map(|c| (c, None::<u32>)),
+    );
+    log::info!("synchronize_commits for {:?}", to_be_resolved);
 
-    while let Some(sha) = to_be_resolved.pop_front() {
+    let db = ctx.db.get().await;
+    while let Some((sha, mut pr)) = to_be_resolved.pop_front() {
         let mut gc = match ctx.github.rust_commit(&sha).await {
             Some(c) => c,
             None => {
@@ -130,7 +145,7 @@ async fn synchronize_commits(ctx: &Context, sha: &str, pr: u32) {
         match res {
             Ok(()) => {
                 if !rustc_commits::has_commit(&db, &parent_sha).await {
-                    to_be_resolved.push_back(parent_sha)
+                    to_be_resolved.push_back((parent_sha, None))
                 }
             }
             Err(e) => log::error!("Failed to record commit {:?}", e),
@@ -138,6 +153,15 @@ async fn synchronize_commits(ctx: &Context, sha: &str, pr: u32) {
     }
 }
 
+pub fn job() -> JobSchedule {
+    JobSchedule {
+        name: "rustc_commits".to_string(),
+        // Every 30 minutes...
+        schedule: Schedule::from_str("* 0,30 * * * * *").unwrap(),
+        metadata: serde_json::Value::Null,
+    }
+}
+
 #[derive(Debug, serde::Deserialize)]
 struct BorsMessage {
     #[serde(rename = "type")]

+ 7 - 0
src/jobs.rs

@@ -47,6 +47,13 @@ pub fn jobs() -> Vec<JobSchedule> {
     // Add to this vector any new cron task you want (as explained above)
     let mut jobs: Vec<JobSchedule> = Vec::new();
     jobs.push(crate::handlers::docs_update::job());
+    jobs.push(crate::handlers::rustc_commits::job());
 
     jobs
 }
+
+#[test]
+fn jobs_defined() {
+    // Checks we don't panic here, mostly for the schedule parsing.
+    drop(jobs());
+}

+ 16 - 14
src/main.rs

@@ -267,10 +267,25 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
         }
     });
 
+    let client = Client::new();
+    let gh = github::GithubClient::new_with_default_token(client.clone());
+    let oc = octocrab::OctocrabBuilder::new()
+        .personal_token(github::default_token_from_env())
+        .build()
+        .expect("Failed to build octograb.");
+    let ctx = Arc::new(Context {
+        username: String::from("rustbot"),
+        db: pool,
+        github: gh,
+        octocrab: oc,
+    });
+
     // spawning a background task that will run the scheduled jobs
     // every JOB_PROCESSING_CADENCE_IN_SECS
+    let ctx2 = ctx.clone();
     task::spawn(async move {
         loop {
+            let ctx = ctx2.clone();
             let res = task::spawn(async move {
                 let pool = db::ClientPool::new();
                 let mut interval =
@@ -278,7 +293,7 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
 
                 loop {
                     interval.tick().await;
-                    db::run_scheduled_jobs(&*pool.get().await)
+                    db::run_scheduled_jobs(&ctx, &*pool.get().await)
                         .await
                         .context("run database scheduled jobs")
                         .unwrap();
@@ -295,19 +310,6 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
         }
     });
 
-    let client = Client::new();
-    let gh = github::GithubClient::new_with_default_token(client.clone());
-    let oc = octocrab::OctocrabBuilder::new()
-        .personal_token(github::default_token_from_env())
-        .build()
-        .expect("Failed to build octograb.");
-    let ctx = Arc::new(Context {
-        username: String::from("rustbot"),
-        db: pool,
-        github: gh,
-        octocrab: oc,
-    });
-
     let agenda = tower::ServiceBuilder::new()
         .buffer(10)
         .layer_fn(|input| {