|
@@ -10,8 +10,9 @@ use tokio::{task, time};
|
|
use tower::{Service, ServiceExt};
|
|
use tower::{Service, ServiceExt};
|
|
use tracing as log;
|
|
use tracing as log;
|
|
use tracing::Instrument;
|
|
use tracing::Instrument;
|
|
|
|
+use triagebot::handlers::pull_requests_assignment_update::PullRequestAssignmentUpdate;
|
|
use triagebot::jobs::{
|
|
use triagebot::jobs::{
|
|
- default_jobs, JOB_PROCESSING_CADENCE_IN_SECS, JOB_SCHEDULING_CADENCE_IN_SECS,
|
|
|
|
|
|
+ default_jobs, Job, JOB_PROCESSING_CADENCE_IN_SECS, JOB_SCHEDULING_CADENCE_IN_SECS,
|
|
};
|
|
};
|
|
use triagebot::{db, github, handlers::Context, notification_listing, payload, EventName};
|
|
use triagebot::{db, github, handlers::Context, notification_listing, payload, EventName};
|
|
|
|
|
|
@@ -261,6 +262,14 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
|
|
octocrab: oc,
|
|
octocrab: oc,
|
|
});
|
|
});
|
|
|
|
|
|
|
|
+ // Run all jobs that don't have a schedule (one-off jobs)
|
|
|
|
+ // TODO: Ideally JobSchedule.schedule should become an `Option<Schedule>`
|
|
|
|
+ // and here we run all those with schedule=None
|
|
|
|
+ if !is_scheduled_jobs_disabled() {
|
|
|
|
+ spawn_job_oneoffs(ctx.clone()).await;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Run all jobs that have a schedule (recurring jobs)
|
|
if !is_scheduled_jobs_disabled() {
|
|
if !is_scheduled_jobs_disabled() {
|
|
spawn_job_scheduler();
|
|
spawn_job_scheduler();
|
|
spawn_job_runner(ctx.clone());
|
|
spawn_job_runner(ctx.clone());
|
|
@@ -310,6 +319,35 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
|
|
Ok(())
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+/// Spawns a background tokio task which runs all jobs having no schedule
|
|
|
|
+/// i.e. manually executed at the end of the triagebot startup
|
|
|
|
+// - jobs are not guaranteed to start in sequence (care is to be taken to ensure thet are completely independent one from the other)
|
|
|
|
+// - the delay between jobs start is not guaranteed to be precise
|
|
|
|
+async fn spawn_job_oneoffs(ctx: Arc<Context>) {
|
|
|
|
+ let jobs: Vec<Box<dyn Job + Send + Sync>> = vec![Box::new(PullRequestAssignmentUpdate)];
|
|
|
|
+
|
|
|
|
+ for (idx, job) in jobs.into_iter().enumerate() {
|
|
|
|
+ let ctx = ctx.clone();
|
|
|
|
+ task::spawn(async move {
|
|
|
|
+ // Allow some spacing between starting jobs
|
|
|
|
+ let delay = idx as u64 * 2;
|
|
|
|
+ time::sleep(time::Duration::from_secs(delay)).await;
|
|
|
|
+ match job.run(&ctx, &serde_json::Value::Null).await {
|
|
|
|
+ Ok(_) => {
|
|
|
|
+ log::trace!("job successfully executed (name={})", &job.name());
|
|
|
|
+ }
|
|
|
|
+ Err(e) => {
|
|
|
|
+ log::error!(
|
|
|
|
+ "job failed on execution (name={:?}, error={:?})",
|
|
|
|
+ job.name(),
|
|
|
|
+ e
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
/// Spawns a background tokio task which runs continuously to queue up jobs
|
|
/// Spawns a background tokio task which runs continuously to queue up jobs
|
|
/// to be run by the job runner.
|
|
/// to be run by the job runner.
|
|
///
|
|
///
|