pull_requests_assignment_update.rs 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. use std::collections::HashMap;
  2. use crate::db::notifications::record_username;
  3. use crate::github::retrieve_pull_requests;
  4. use crate::jobs::Job;
  5. use crate::ReviewPrefs;
  6. use anyhow::Context as _;
  7. use async_trait::async_trait;
  8. use tokio_postgres::Client as DbClient;
  9. pub struct PullRequestAssignmentUpdate;
  10. #[async_trait]
  11. impl Job for PullRequestAssignmentUpdate {
  12. fn name(&self) -> &'static str {
  13. "pull_request_assignment_update"
  14. }
  15. async fn run(&self, ctx: &super::Context, _metadata: &serde_json::Value) -> anyhow::Result<()> {
  16. let db = ctx.db.get().await;
  17. let gh = &ctx.github;
  18. tracing::trace!("starting pull_request_assignment_update");
  19. let dragonos_repo = gh.repository("DragonOS-Community/DragonOS").await?;
  20. let prs = retrieve_pull_requests(&dragonos_repo, &gh).await?;
  21. // delete all PR assignments before populating
  22. init_table(&db).await?;
  23. // aggregate by user first
  24. let aggregated = prs.into_iter().fold(HashMap::new(), |mut acc, (user, pr)| {
  25. let (_, prs) = acc
  26. .entry(user.id.unwrap())
  27. .or_insert_with(|| (user, Vec::new()));
  28. prs.push(pr);
  29. acc
  30. });
  31. // populate the table
  32. for (_user_id, (assignee, prs)) in &aggregated {
  33. let assignee_id = assignee.id.expect("checked");
  34. let _ = record_username(&db, assignee_id, &assignee.login).await;
  35. create_team_member_workqueue(&db, assignee_id, &prs).await?;
  36. }
  37. Ok(())
  38. }
  39. }
  40. /// Truncate the review prefs table
  41. async fn init_table(db: &DbClient) -> anyhow::Result<u64> {
  42. let res = db
  43. .execute("UPDATE review_prefs SET assigned_prs='{}';", &[])
  44. .await?;
  45. Ok(res)
  46. }
  47. /// Create a team member work queue
  48. async fn create_team_member_workqueue(
  49. db: &DbClient,
  50. user_id: u64,
  51. prs: &Vec<i32>,
  52. ) -> anyhow::Result<u64, anyhow::Error> {
  53. let q = "
  54. INSERT INTO review_prefs (user_id, assigned_prs) VALUES ($1, $2)
  55. ON CONFLICT (user_id)
  56. DO UPDATE SET assigned_prs = $2
  57. WHERE review_prefs.user_id=$1";
  58. db.execute(q, &[&(user_id as i64), prs])
  59. .await
  60. .context("Insert DB error")
  61. }
  62. /// Get pull request assignments for a team member
  63. pub async fn get_review_prefs(db: &DbClient, user_id: u64) -> anyhow::Result<ReviewPrefs> {
  64. let q = "
  65. SELECT username,r.*
  66. FROM review_prefs r
  67. JOIN users on r.user_id=users.user_id
  68. WHERE r.user_id = $1;";
  69. let row = db
  70. .query_one(q, &[&(user_id as i64)])
  71. .await
  72. .context("Error retrieving review preferences")
  73. .unwrap();
  74. Ok(row.into())
  75. }