Browse Source

Import pull request assignments into triagebot

General overview at: rust-lang#1753

- Added a new DB table with the fields to track how many PRs are
  assigned to a contributor
- Initial DB table population with a one-off job, manually run.
apiraino 1 year ago
parent
commit
a75633d258

+ 26 - 0
github-graphql/PullRequestsOpen.gql

@@ -0,0 +1,26 @@
+query PullRequestsOpen ($repo_owner: String!, $repo_name: String!, $after: String) {
+  repository(owner: $repo_owner, name: $repo_name) {
+    pullRequests(first: 100, after: $after, states:OPEN) {
+      pageInfo {
+        hasNextPage
+        endCursor
+      }
+      nodes {
+        number
+        updatedAt
+        createdAt
+        assignees(first: 10) {
+          nodes {
+            login
+            databaseId
+          }
+        }
+        labels(first:5, orderBy:{field:NAME, direction:DESC}) {
+          nodes {
+            name
+          }
+        }
+      }
+    }
+  }
+}

+ 31 - 0
github-graphql/README.md

@@ -0,0 +1,31 @@
+# How to use GraphQL with Rust
+
+# GUI Clients (Electron apps)
+
+Use a client to experiment and build your GraphQL query/mutation.
+
+https://insomnia.rest/download
+
+https://docs.usebruno.com
+
+Once you're happy with the result, save your query in a `<query>.gql` file in this directory. It will serve as
+documentation on how to reproduce the Rust boilerplate.
+
+# Cynic CLI
+
+Introspect a schema and save it locally:
+
+```sh
+cynic introspect \
+    -H "User-Agent: cynic/3.4.3" \
+    -H "Authorization: Bearer [GITHUB_TOKEN]" \
+    "https://api.github.com/graphql" \
+    -o schemas/github.graphql
+```
+
+Execute a GraphQL query/mutation and save locally the Rust boilerplate:
+
+``` sh
+cynic querygen --schema schemas/github.graphql --query query.gql
+```
+

+ 42 - 0
github-graphql/src/lib.rs

@@ -89,6 +89,7 @@ pub mod queries {
     #[derive(cynic::QueryFragment, Debug)]
     pub struct User {
         pub login: String,
+        pub database_id: Option<i32>,
     }
 
     #[derive(cynic::QueryFragment, Debug)]
@@ -385,3 +386,44 @@ pub mod project_items {
         pub date: Option<Date>,
     }
 }
+
+/// Retrieve all pull requests waiting on review from T-compiler
+/// GraphQL query: see file github-graphql/PullRequestsOpen.gql
+pub mod pull_requests_open {
+    use crate::queries::{LabelConnection, PullRequestConnection, UserConnection};
+
+    use super::queries::DateTime;
+    use super::schema;
+
+    #[derive(cynic::QueryVariables, Clone, Debug)]
+    pub struct PullRequestsOpenVariables<'a> {
+        pub repo_owner: &'a str,
+        pub repo_name: &'a str,
+        pub after: Option<String>,
+    }
+
+    #[derive(cynic::QueryFragment, Debug)]
+    #[cynic(graphql_type = "Query", variables = "PullRequestsOpenVariables")]
+    pub struct PullRequestsOpen {
+        #[arguments(owner: $repo_owner, name: $repo_name)]
+        pub repository: Option<Repository>,
+    }
+
+    #[derive(cynic::QueryFragment, Debug)]
+    #[cynic(variables = "PullRequestsOpenVariables")]
+    pub struct Repository {
+        #[arguments(first: 100, after: $after, states: "OPEN")]
+        pub pull_requests: PullRequestConnection,
+    }
+
+    #[derive(cynic::QueryFragment, Debug)]
+    pub struct PullRequest {
+        pub number: i32,
+        pub updated_at: DateTime,
+        pub created_at: DateTime,
+        #[arguments(first: 10)]
+        pub assignees: UserConnection,
+        #[arguments(first: 5, orderBy: { direction: "DESC", field: "NAME" })]
+        pub labels: Option<LabelConnection>,
+    }
+}

+ 10 - 1
src/db.rs

@@ -320,9 +320,18 @@ CREATE TABLE jobs (
 );
 ",
     "
-CREATE UNIQUE INDEX jobs_name_scheduled_at_unique_index 
+CREATE UNIQUE INDEX jobs_name_scheduled_at_unique_index
     ON jobs (
         name, scheduled_at
     );
 ",
+    "
+CREATE table review_prefs (
+    id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
+    user_id BIGINT REFERENCES users(user_id),
+    assigned_prs INT[] NOT NULL DEFAULT array[]::INT[]
+);",
+    "
+CREATE UNIQUE INDEX review_prefs_user_id ON review_prefs(user_id);
+ ",
 ];

+ 2 - 1
src/db/notifications.rs

@@ -15,7 +15,8 @@ pub struct Notification {
     pub team_name: Option<String>,
 }
 
-pub async fn record_username(db: &DbClient, user_id: u64, username: String) -> anyhow::Result<()> {
+/// Add a new user (if not existing)
+pub async fn record_username(db: &DbClient, user_id: u64, username: &str) -> anyhow::Result<()> {
     db.execute(
         "INSERT INTO users (user_id, username) VALUES ($1, $2) ON CONFLICT DO NOTHING",
         &[&(user_id as i64), &username],

+ 84 - 0
src/github.rs

@@ -2592,6 +2592,90 @@ async fn project_items_by_status(
     Ok(all_items)
 }
 
+/// Retrieve all pull requests in status OPEN that are not drafts
+pub async fn retrieve_pull_requests(
+    repo: &Repository,
+    client: &GithubClient,
+) -> anyhow::Result<Vec<(User, i32)>> {
+    use cynic::QueryBuilder;
+    use github_graphql::pull_requests_open::{PullRequestsOpen, PullRequestsOpenVariables};
+
+    let repo_owner = repo.owner();
+    let repo_name = repo.name();
+
+    let mut prs = vec![];
+
+    let mut vars = PullRequestsOpenVariables {
+        repo_owner,
+        repo_name,
+        after: None,
+    };
+    loop {
+        let query = PullRequestsOpen::build(vars.clone());
+        let req = client.post(&client.graphql_url);
+        let req = req.json(&query);
+
+        let data: cynic::GraphQlResponse<PullRequestsOpen> = client.json(req).await?;
+        if let Some(errors) = data.errors {
+            anyhow::bail!("There were graphql errors. {:?}", errors);
+        }
+        let repository = data
+            .data
+            .ok_or_else(|| anyhow::anyhow!("No data returned."))?
+            .repository
+            .ok_or_else(|| anyhow::anyhow!("No repository."))?;
+        prs.extend(repository.pull_requests.nodes);
+
+        let page_info = repository.pull_requests.page_info;
+        if !page_info.has_next_page || page_info.end_cursor.is_none() {
+            break;
+        }
+        vars.after = page_info.end_cursor;
+    }
+
+    let mut prs_processed: Vec<_> = vec![];
+    let _: Vec<_> = prs
+        .into_iter()
+        .filter_map(|pr| {
+            if pr.is_draft {
+                return None;
+            }
+
+            // exclude rollup PRs
+            let labels = pr
+                .labels
+                .map(|l| l.nodes)
+                .unwrap_or_default()
+                .into_iter()
+                .map(|node| node.name)
+                .collect::<Vec<_>>();
+            if labels.iter().any(|label| label == "rollup") {
+                return None;
+            }
+
+            let _: Vec<_> = pr
+                .assignees
+                .nodes
+                .iter()
+                .map(|user| {
+                    let user_id = user.database_id.expect("checked") as u64;
+                    prs_processed.push((
+                        User {
+                            login: user.login.clone(),
+                            id: Some(user_id),
+                        },
+                        pr.number,
+                    ));
+                })
+                .collect();
+            Some(true)
+        })
+        .collect();
+    prs_processed.sort_by(|a, b| a.0.id.cmp(&b.0.id));
+
+    Ok(prs_processed)
+}
+
 pub enum DesignMeetingStatus {
     Proposed,
     Scheduled,

+ 1 - 0
src/handlers.rs

@@ -39,6 +39,7 @@ mod notification;
 mod notify_zulip;
 mod ping;
 mod prioritize;
+pub mod pull_requests_assignment_update;
 mod relabel;
 mod review_requested;
 mod review_submitted;

+ 1 - 1
src/handlers/notification.rs

@@ -92,7 +92,7 @@ pub async fn handle(ctx: &Context, event: &Event) -> anyhow::Result<()> {
                 continue;
             }
 
-            if let Err(err) = notifications::record_username(&client, user.id.unwrap(), user.login)
+            if let Err(err) = notifications::record_username(&client, user.id.unwrap(), &user.login)
                 .await
                 .context("failed to record username")
             {

+ 72 - 0
src/handlers/pull_requests_assignment_update.rs

@@ -0,0 +1,72 @@
+use std::collections::HashMap;
+
+use crate::db::notifications::record_username;
+use crate::github::retrieve_pull_requests;
+use crate::jobs::Job;
+use anyhow::Context as _;
+use async_trait::async_trait;
+use tokio_postgres::Client as DbClient;
+
+pub struct PullRequestAssignmentUpdate;
+
+#[async_trait]
+impl Job for PullRequestAssignmentUpdate {
+    fn name(&self) -> &'static str {
+        "pull_request_assignment_update"
+    }
+
+    async fn run(&self, ctx: &super::Context, _metadata: &serde_json::Value) -> anyhow::Result<()> {
+        let db = ctx.db.get().await;
+        let gh = &ctx.github;
+
+        tracing::trace!("starting pull_request_assignment_update");
+
+        let rust_repo = gh.repository("rust-lang/rust").await?;
+        let prs = retrieve_pull_requests(&rust_repo, &gh).await?;
+
+        // delete all PR assignments before populating
+        init_table(&db).await?;
+
+        // aggregate by user first
+        let aggregated = prs.into_iter().fold(HashMap::new(), |mut acc, (user, pr)| {
+            let (_, prs) = acc
+                .entry(user.id.unwrap())
+                .or_insert_with(|| (user, Vec::new()));
+            prs.push(pr);
+            acc
+        });
+
+        // populate the table
+        for (_user_id, (assignee, prs)) in &aggregated {
+            let assignee_id = assignee.id.expect("checked");
+            let _ = record_username(&db, assignee_id, &assignee.login).await;
+            create_team_member_workqueue(&db, assignee_id, &prs).await?;
+        }
+
+        Ok(())
+    }
+}
+
+/// Truncate the review prefs table
+async fn init_table(db: &DbClient) -> anyhow::Result<u64> {
+    let res = db
+        .execute("UPDATE review_prefs SET assigned_prs='{}';", &[])
+        .await?;
+    Ok(res)
+}
+
+/// Create a team member work queue
+async fn create_team_member_workqueue(
+    db: &DbClient,
+    user_id: u64,
+    prs: &Vec<i32>,
+) -> anyhow::Result<u64, anyhow::Error> {
+    let q = "
+INSERT INTO review_prefs (user_id, assigned_prs) VALUES ($1, $2)
+ON CONFLICT (user_id)
+DO UPDATE SET assigned_prs = $2
+WHERE review_prefs.user_id=$1";
+    db.execute(q, &[&(user_id as i64), prs])
+        .await
+        .context("Insert DB error")
+}

+ 5 - 5
src/jobs.rs

@@ -58,12 +58,12 @@ use crate::{
     },
 };
 
-// How often new cron-based jobs will be placed in the queue.
-// This is the minimum period *between* a single cron task's executions.
+/// How often new cron-based jobs will be placed in the queue.
+/// This is the minimum period *between* a single cron task's executions.
 pub const JOB_SCHEDULING_CADENCE_IN_SECS: u64 = 1800;
 
-// How often the database is inspected for jobs which need to execute.
-// This is the granularity at which events will occur.
+/// How often the database is inspected for jobs which need to execute.
+/// This is the granularity at which events will occur.
 pub const JOB_PROCESSING_CADENCE_IN_SECS: u64 = 60;
 
 // The default jobs to schedule, repeatedly.
@@ -119,7 +119,7 @@ fn jobs_defined() {
     unique_all_job_names.dedup();
     assert_eq!(all_job_names, unique_all_job_names);
 
-    // Also ensure that our defalt jobs are release jobs
+    // Also ensure that our default jobs are release jobs
     let default_jobs = default_jobs();
     default_jobs
         .iter()

+ 39 - 1
src/main.rs

@@ -10,8 +10,9 @@ use tokio::{task, time};
 use tower::{Service, ServiceExt};
 use tracing as log;
 use tracing::Instrument;
+use triagebot::handlers::pull_requests_assignment_update::PullRequestAssignmentUpdate;
 use triagebot::jobs::{
-    default_jobs, JOB_PROCESSING_CADENCE_IN_SECS, JOB_SCHEDULING_CADENCE_IN_SECS,
+    default_jobs, Job, JOB_PROCESSING_CADENCE_IN_SECS, JOB_SCHEDULING_CADENCE_IN_SECS,
 };
 use triagebot::{db, github, handlers::Context, notification_listing, payload, EventName};
 
@@ -261,6 +262,14 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
         octocrab: oc,
     });
 
+    // Run all jobs that don't have a schedule (one-off jobs)
+    // TODO: Ideally JobSchedule.schedule should become an `Option<Schedule>`
+    // and here we run all those with schedule=None
+    if !is_scheduled_jobs_disabled() {
+        spawn_job_oneoffs(ctx.clone()).await;
+    }
+
+    // Run all jobs that have a schedule (recurring jobs)
     if !is_scheduled_jobs_disabled() {
         spawn_job_scheduler();
         spawn_job_runner(ctx.clone());
@@ -310,6 +319,35 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
     Ok(())
 }
 
+/// Spawns a background tokio task which runs all jobs having no schedule
+/// i.e. manually executed at the end of the triagebot startup
+// - jobs are not guaranteed to start in sequence (care is to be taken to ensure thet are completely independent one from the other)
+// - the delay between jobs start is not guaranteed to be precise
+async fn spawn_job_oneoffs(ctx: Arc<Context>) {
+    let jobs: Vec<Box<dyn Job + Send + Sync>> = vec![Box::new(PullRequestAssignmentUpdate)];
+
+    for (idx, job) in jobs.into_iter().enumerate() {
+        let ctx = ctx.clone();
+        task::spawn(async move {
+            // Allow some spacing between starting jobs
+            let delay = idx as u64 * 2;
+            time::sleep(time::Duration::from_secs(delay)).await;
+            match job.run(&ctx, &serde_json::Value::Null).await {
+                Ok(_) => {
+                    log::trace!("job successfully executed (name={})", &job.name());
+                }
+                Err(e) => {
+                    log::error!(
+                        "job failed on execution (name={:?}, error={:?})",
+                        job.name(),
+                        e
+                    );
+                }
+            }
+        });
+    }
+}
+
 /// Spawns a background tokio task which runs continuously to queue up jobs
 /// to be run by the job runner.
 ///