Bläddra i källkod

Switch to use locking for issue data.

Eric Huss 2 år sedan
förälder
incheckning
5919d5de47
2 ändrade filer med 71 tillägg och 40 borttagningar
  1. 64 32
      src/db/issue_data.rs
  2. 7 8
      src/handlers/mentions.rs

+ 64 - 32
src/db/issue_data.rs

@@ -4,44 +4,76 @@
 //! Each issue has a unique "key" where you can store data under. Typically
 //! that key should be the name of the handler. The data can be anything that
 //! can be serialized to JSON.
+//!
+//! Note that this uses crude locking, so try to keep the duration between
+//! loading and saving to a minimum.
 
 use crate::github::Issue;
 use anyhow::{Context, Result};
 use serde::{Deserialize, Serialize};
 use tokio_postgres::types::Json;
-use tokio_postgres::Client as DbClient;
+use tokio_postgres::{Client as DbClient, Transaction};
 
-pub async fn load<T: for<'a> Deserialize<'a>>(
-    db: &DbClient,
-    issue: &Issue,
-    key: &str,
-) -> Result<Option<T>> {
-    let repo = issue.repository().to_string();
-    let data = db
-        .query_opt(
-            "SELECT data FROM issue_data WHERE \
-            repo = $1 AND issue_number = $2 AND key = $3",
-            &[&repo, &(issue.number as i32), &key],
-        )
-        .await
-        .context("selecting issue data")?
-        .map(|row| row.get::<usize, Json<T>>(0).0);
-    Ok(data)
+pub struct IssueData<'db, T>
+where
+    T: for<'a> Deserialize<'a> + Serialize + Default + std::fmt::Debug + Sync,
+{
+    transaction: Transaction<'db>,
+    repo: String,
+    issue_number: i32,
+    key: String,
+    pub data: T,
 }
 
-pub async fn save<T: Serialize + std::fmt::Debug + Sync>(
-    db: &DbClient,
-    issue: &Issue,
-    key: &str,
-    data: &T,
-) -> Result<()> {
-    let repo = issue.repository().to_string();
-    db.execute(
-        "INSERT INTO issue_data (repo, issue_number, key, data) VALUES ($1, $2, $3, $4) \
-         ON CONFLICT (repo, issue_number, key) DO UPDATE SET data=EXCLUDED.data",
-        &[&repo, &(issue.number as i32), &key, &Json(data)],
-    )
-    .await
-    .context("inserting issue data")?;
-    Ok(())
+impl<'db, T> IssueData<'db, T>
+where
+    T: for<'a> Deserialize<'a> + Serialize + Default + std::fmt::Debug + Sync,
+{
+    pub async fn load(
+        db: &'db mut DbClient,
+        issue: &Issue,
+        key: &str,
+    ) -> Result<IssueData<'db, T>> {
+        let repo = issue.repository().to_string();
+        let issue_number = issue.number as i32;
+        let transaction = db.transaction().await?;
+        transaction
+            .execute("LOCK TABLE issue_data", &[])
+            .await
+            .context("locking issue data")?;
+        let data = transaction
+            .query_opt(
+                "SELECT data FROM issue_data WHERE \
+                 repo = $1 AND issue_number = $2 AND key = $3",
+                &[&repo, &issue_number, &key],
+            )
+            .await
+            .context("selecting issue data")?
+            .map(|row| row.get::<usize, Json<T>>(0).0)
+            .unwrap_or_default();
+        Ok(IssueData {
+            transaction,
+            repo,
+            issue_number,
+            key: key.to_string(),
+            data,
+        })
+    }
+
+    pub async fn save(self) -> Result<()> {
+        self.transaction
+            .execute(
+                "INSERT INTO issue_data (repo, issue_number, key, data) \
+                 VALUES ($1, $2, $3, $4) \
+                 ON CONFLICT (repo, issue_number, key) DO UPDATE SET data=EXCLUDED.data",
+                &[&self.repo, &self.issue_number, &self.key, &Json(&self.data)],
+            )
+            .await
+            .context("inserting issue data")?;
+        self.transaction
+            .commit()
+            .await
+            .context("committing issue data")?;
+        Ok(())
+    }
 }

+ 7 - 8
src/handlers/mentions.rs

@@ -4,7 +4,7 @@
 
 use crate::{
     config::{MentionsConfig, MentionsPathConfig},
-    db::issue_data,
+    db::issue_data::IssueData,
     github::{files_changed, IssuesAction, IssuesEvent},
     handlers::Context,
 };
@@ -78,14 +78,13 @@ pub(super) async fn handle_input(
     event: &IssuesEvent,
     input: MentionsInput,
 ) -> anyhow::Result<()> {
-    let client = ctx.db.get().await;
-    let mut state: MentionState = issue_data::load(&client, &event.issue, MENTIONS_KEY)
-        .await?
-        .unwrap_or_default();
+    let mut client = ctx.db.get().await;
+    let mut state: IssueData<'_, MentionState> =
+        IssueData::load(&mut client, &event.issue, MENTIONS_KEY).await?;
     // Build the message to post to the issue.
     let mut result = String::new();
     for to_mention in &input.paths {
-        if state.paths.iter().any(|p| p == to_mention) {
+        if state.data.paths.iter().any(|p| p == to_mention) {
             // Avoid duplicate mentions.
             continue;
         }
@@ -100,7 +99,7 @@ pub(super) async fn handle_input(
         if !reviewers.is_empty() {
             write!(result, "\n\ncc {}", reviewers.join(", ")).unwrap();
         }
-        state.paths.push(to_mention.to_string());
+        state.data.paths.push(to_mention.to_string());
     }
     if !result.is_empty() {
         event
@@ -108,7 +107,7 @@ pub(super) async fn handle_input(
             .post_comment(&ctx.github, &result)
             .await
             .context("failed to post mentions comment")?;
-        issue_data::save(&client, &event.issue, MENTIONS_KEY, &state).await?;
+        state.save().await?;
     }
     Ok(())
 }