|
@@ -30,14 +30,71 @@ pub async fn record_ping(db: &DbClient, notification: &Notification) -> anyhow::
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
-pub async fn delete_ping(db: &DbClient, user_id: i64, origin_url: &str) -> anyhow::Result<()> {
|
|
|
- db.execute(
|
|
|
- "DELETE FROM notifications WHERE user_id = $1 and origin_url = $2",
|
|
|
- &[&user_id, &origin_url],
|
|
|
- )
|
|
|
- .await
|
|
|
- .context("delete notification query")?;
|
|
|
+#[derive(Copy, Clone)]
|
|
|
+pub enum Identifier<'a> {
|
|
|
+ Url(&'a str),
|
|
|
+ Index(std::num::NonZeroUsize),
|
|
|
+}
|
|
|
|
|
|
+pub async fn delete_ping(
|
|
|
+ db: &mut DbClient,
|
|
|
+ user_id: i64,
|
|
|
+ identifier: Identifier<'_>,
|
|
|
+) -> anyhow::Result<()> {
|
|
|
+ match identifier {
|
|
|
+ Identifier::Url(origin_url) => {
|
|
|
+ db.execute(
|
|
|
+ "DELETE FROM notifications WHERE user_id = $1 and origin_url = $2",
|
|
|
+ &[&user_id, &origin_url],
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ .context("delete notification query")?;
|
|
|
+ }
|
|
|
+ Identifier::Index(idx) => loop {
|
|
|
+ let t = db
|
|
|
+ .build_transaction()
|
|
|
+ .isolation_level(tokio_postgres::IsolationLevel::Serializable)
|
|
|
+ .start()
|
|
|
+ .await
|
|
|
+ .context("begin transaction")?;
|
|
|
+
|
|
|
+ let notifications = t
|
|
|
+ .query(
|
|
|
+ "select notification_id, idx, user_id
|
|
|
+ from notifications
|
|
|
+ where user_id = $1
|
|
|
+ order by idx desc nulls last, time desc;",
|
|
|
+ &[&user_id],
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ .context("failed to get ordering")?;
|
|
|
+
|
|
|
+ let notification_id: i64 = notifications[idx.get() - 1].get(0);
|
|
|
+
|
|
|
+ t.execute(
|
|
|
+ "DELETE FROM notifications WHERE notification_id = $1",
|
|
|
+ &[¬ification_id],
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ .context(format!(
|
|
|
+ "Failed to delete notification with id {}",
|
|
|
+ notification_id
|
|
|
+ ))?;
|
|
|
+
|
|
|
+ if let Err(e) = t.commit().await {
|
|
|
+ if e.code().map_or(false, |c| {
|
|
|
+ *c == tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE
|
|
|
+ }) {
|
|
|
+ log::trace!("serialization failure, restarting deletion");
|
|
|
+ continue;
|
|
|
+ } else {
|
|
|
+ return Err(e).context("transaction commit failure");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ },
|
|
|
+ }
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
@@ -68,7 +125,7 @@ pub async fn move_indices(
|
|
|
"select notification_id, idx, user_id
|
|
|
from notifications
|
|
|
where user_id = $1
|
|
|
- order by idx desc, time desc;",
|
|
|
+ order by idx desc nulls last, time desc;",
|
|
|
&[&user_id],
|
|
|
)
|
|
|
.await
|