123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360 |
- use anyhow::Context as _;
- use chrono::{DateTime, FixedOffset};
- use tokio_postgres::Client as DbClient;
- use tracing as log;
- pub struct Notification {
- pub user_id: u64,
- pub origin_url: String,
- pub origin_html: String,
- pub short_description: Option<String>,
- pub time: DateTime<FixedOffset>,
- /// If this is Some, then the notification originated in a team-wide ping
- /// (e.g., @rust-lang/libs). The String is the team name (e.g., libs).
- pub team_name: Option<String>,
- }
- /// Add a new user (if not existing)
- pub async fn record_username(db: &DbClient, user_id: u64, username: &str) -> anyhow::Result<()> {
- db.execute(
- "INSERT INTO users (user_id, username) VALUES ($1, $2) ON CONFLICT DO NOTHING",
- &[&(user_id as i64), &username],
- )
- .await
- .context("inserting user id / username")?;
- Ok(())
- }
- pub async fn record_ping(db: &DbClient, notification: &Notification) -> anyhow::Result<()> {
- db.execute("INSERT INTO notifications (user_id, origin_url, origin_html, time, short_description, team_name, idx)
- VALUES (
- $1, $2, $3, $4, $5, $6,
- (SELECT max(notifications.idx) + 1 from notifications where notifications.user_id = $1)
- )",
- &[&(notification.user_id as i64), ¬ification.origin_url, ¬ification.origin_html, ¬ification.time, ¬ification.short_description, ¬ification.team_name],
- ).await.context("inserting notification")?;
- Ok(())
- }
- #[derive(Copy, Clone)]
- pub enum Identifier<'a> {
- Url(&'a str),
- Index(std::num::NonZeroU32),
- /// Glob identifier (`all` or `*`).
- All,
- }
- pub async fn delete_ping(
- db: &mut DbClient,
- user_id: u64,
- identifier: Identifier<'_>,
- ) -> anyhow::Result<Vec<NotificationData>> {
- match identifier {
- Identifier::Url(origin_url) => {
- let rows = db
- .query(
- "DELETE FROM notifications WHERE user_id = $1 and origin_url = $2
- RETURNING origin_html, time, short_description, metadata",
- &[&(user_id as i64), &origin_url],
- )
- .await
- .context("delete notification query")?;
- Ok(rows
- .into_iter()
- .map(|row| {
- let origin_text: String = row.get(0);
- let time: DateTime<FixedOffset> = row.get(1);
- let short_description: Option<String> = row.get(2);
- let metadata: Option<String> = row.get(3);
- NotificationData {
- origin_url: origin_url.to_owned(),
- origin_text,
- time,
- short_description,
- metadata,
- }
- })
- .collect())
- }
- Identifier::Index(idx) => loop {
- let t = db
- .build_transaction()
- .isolation_level(tokio_postgres::IsolationLevel::Serializable)
- .start()
- .await
- .context("begin transaction")?;
- let notifications = t
- .query(
- "select notification_id, idx, user_id
- from notifications
- where user_id = $1
- order by idx asc nulls last;",
- &[&(user_id as i64)],
- )
- .await
- .context("failed to get ordering")?;
- let notification_id: i64 = notifications
- .get((idx.get() - 1) as usize)
- .ok_or_else(|| anyhow::anyhow!("No such notification with index {}", idx.get()))?
- .get(0);
- let row = t
- .query_one(
- "DELETE FROM notifications WHERE notification_id = $1
- RETURNING origin_url, origin_html, time, short_description, metadata",
- &[¬ification_id],
- )
- .await
- .context(format!(
- "Failed to delete notification with id {}",
- notification_id
- ))?;
- let origin_url: String = row.get(0);
- let origin_text: String = row.get(1);
- let time: DateTime<FixedOffset> = row.get(2);
- let short_description: Option<String> = row.get(3);
- let metadata: Option<String> = row.get(4);
- let deleted_notification = NotificationData {
- origin_url,
- origin_text,
- time,
- short_description,
- metadata,
- };
- if let Err(e) = t.commit().await {
- if e.code().map_or(false, |c| {
- *c == tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE
- }) {
- log::trace!("serialization failure, restarting deletion");
- continue;
- } else {
- return Err(e).context("transaction commit failure");
- }
- } else {
- return Ok(vec![deleted_notification]);
- }
- },
- Identifier::All => {
- let rows = db
- .query(
- "DELETE FROM notifications WHERE user_id = $1
- RETURNING origin_url, origin_html, time, short_description, metadata",
- &[&(user_id as i64)],
- )
- .await
- .context("delete all notifications query")?;
- Ok(rows
- .into_iter()
- .map(|row| {
- let origin_url: String = row.get(0);
- let origin_text: String = row.get(1);
- let time: DateTime<FixedOffset> = row.get(2);
- let short_description: Option<String> = row.get(3);
- let metadata: Option<String> = row.get(4);
- NotificationData {
- origin_url,
- origin_text,
- time,
- short_description,
- metadata,
- }
- })
- .collect())
- }
- }
- }
- #[derive(Debug)]
- pub struct NotificationData {
- pub origin_url: String,
- pub origin_text: String,
- pub short_description: Option<String>,
- pub time: DateTime<FixedOffset>,
- pub metadata: Option<String>,
- }
- pub async fn move_indices(
- db: &mut DbClient,
- user_id: u64,
- from: u32,
- to: u32,
- ) -> anyhow::Result<()> {
- let from = usize::try_from(from)?;
- let to = usize::try_from(to)?;
- loop {
- let t = db
- .build_transaction()
- .isolation_level(tokio_postgres::IsolationLevel::Serializable)
- .start()
- .await
- .context("begin transaction")?;
- let notifications = t
- .query(
- "select notification_id, idx, user_id
- from notifications
- where user_id = $1
- order by idx asc nulls last;",
- &[&(user_id as i64)],
- )
- .await
- .context("failed to get initial ordering")?;
- let mut notifications = notifications
- .into_iter()
- .map(|n| n.get(0))
- .collect::<Vec<i64>>();
- if notifications.get(from).is_none() {
- anyhow::bail!(
- "`from` index not present, must be less than {}",
- notifications.len()
- );
- }
- if notifications.get(to).is_none() {
- anyhow::bail!(
- "`to` index not present, must be less than {}",
- notifications.len()
- );
- }
- if from < to {
- notifications[from..=to].rotate_left(1);
- } else if to < from {
- notifications[to..=from].rotate_right(1);
- }
- for (idx, id) in notifications.into_iter().enumerate() {
- t.execute(
- "update notifications SET idx = $2
- where notification_id = $1",
- &[&id, &(idx as i32)],
- )
- .await
- .context("update notification id")?;
- }
- if let Err(e) = t.commit().await {
- if e.code().map_or(false, |c| {
- *c == tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE
- }) {
- log::trace!("serialization failure, restarting index movement");
- continue;
- } else {
- return Err(e).context("transaction commit failure");
- }
- } else {
- break;
- }
- }
- Ok(())
- }
- pub async fn add_metadata(
- db: &mut DbClient,
- user_id: u64,
- idx: u32,
- metadata: Option<&str>,
- ) -> anyhow::Result<()> {
- let idx = usize::try_from(idx)?;
- loop {
- let t = db
- .build_transaction()
- .isolation_level(tokio_postgres::IsolationLevel::Serializable)
- .start()
- .await
- .context("begin transaction")?;
- let notifications = t
- .query(
- "select notification_id, idx, user_id
- from notifications
- where user_id = $1
- order by idx asc nulls last;",
- &[&(user_id as i64)],
- )
- .await
- .context("failed to get initial ordering")?;
- let notifications = notifications
- .into_iter()
- .map(|n| n.get(0))
- .collect::<Vec<i64>>();
- match notifications.get(idx) {
- None => anyhow::bail!(
- "index not present, must be less than {}",
- notifications.len()
- ),
- Some(id) => {
- t.execute(
- "update notifications SET metadata = $2
- where notification_id = $1",
- &[&id, &metadata],
- )
- .await
- .context("update notification id")?;
- }
- }
- if let Err(e) = t.commit().await {
- if e.code().map_or(false, |c| {
- *c == tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE
- }) {
- log::trace!("serialization failure, restarting index movement");
- continue;
- } else {
- return Err(e).context("transaction commit failure");
- }
- } else {
- break;
- }
- }
- Ok(())
- }
- pub async fn get_notifications(
- db: &DbClient,
- username: &str,
- ) -> anyhow::Result<Vec<NotificationData>> {
- let notifications = db
- .query(
- "
- select username, origin_url, origin_html, time, short_description, idx, metadata
- from notifications
- join users on notifications.user_id = users.user_id
- where username = $1
- order by notifications.idx asc nulls last;",
- &[&username],
- )
- .await
- .context("Getting notification data")?;
- let mut data = Vec::new();
- for notification in notifications {
- let origin_url: String = notification.get(1);
- let origin_text: String = notification.get(2);
- let time: DateTime<FixedOffset> = notification.get(3);
- let short_description: Option<String> = notification.get(4);
- let metadata: Option<String> = notification.get(6);
- data.push(NotificationData {
- origin_url,
- origin_text,
- short_description,
- time,
- metadata,
- });
- }
- Ok(data)
- }
|