notifications.rs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. use anyhow::Context as _;
  2. use chrono::{DateTime, FixedOffset};
  3. use tokio_postgres::Client as DbClient;
  4. pub struct Notification {
  5. pub user_id: i64,
  6. pub origin_url: String,
  7. pub origin_html: String,
  8. pub short_description: Option<String>,
  9. pub time: DateTime<FixedOffset>,
  10. /// If this is Some, then the notification originated in a team-wide ping
  11. /// (e.g., @rust-lang/libs). The String is the team name (e.g., libs).
  12. pub team_name: Option<String>,
  13. }
  14. pub async fn record_username(db: &DbClient, user_id: i64, username: String) -> anyhow::Result<()> {
  15. db.execute(
  16. "INSERT INTO users (user_id, username) VALUES ($1, $2) ON CONFLICT DO NOTHING",
  17. &[&user_id, &username],
  18. )
  19. .await
  20. .context("inserting user id / username")?;
  21. Ok(())
  22. }
  23. pub async fn record_ping(db: &DbClient, notification: &Notification) -> anyhow::Result<()> {
  24. db.execute("INSERT INTO notifications (user_id, origin_url, origin_html, time, short_description, team_name, idx)
  25. VALUES (
  26. $1, $2, $3, $4, $5, $6,
  27. (SELECT max(notifications.idx) + 1 from notifications where notifications.user_id = $1)
  28. )",
  29. &[&notification.user_id, &notification.origin_url, &notification.origin_html, &notification.time, &notification.short_description, &notification.team_name],
  30. ).await.context("inserting notification")?;
  31. Ok(())
  32. }
  33. #[derive(Copy, Clone)]
  34. pub enum Identifier<'a> {
  35. Url(&'a str),
  36. Index(std::num::NonZeroUsize),
  37. }
  38. pub async fn delete_ping(
  39. db: &mut DbClient,
  40. user_id: i64,
  41. identifier: Identifier<'_>,
  42. ) -> anyhow::Result<Vec<NotificationData>> {
  43. match identifier {
  44. Identifier::Url(origin_url) => {
  45. let rows = db
  46. .query(
  47. "DELETE FROM notifications WHERE user_id = $1 and origin_url = $2
  48. RETURNING origin_html, time, short_description, metadata",
  49. &[&user_id, &origin_url],
  50. )
  51. .await
  52. .context("delete notification query")?;
  53. if rows.is_empty() {
  54. anyhow::bail!("Did not delete any notifications");
  55. }
  56. Ok(rows
  57. .into_iter()
  58. .map(|row| {
  59. let origin_text: String = row.get(0);
  60. let time: DateTime<FixedOffset> = row.get(1);
  61. let short_description: Option<String> = row.get(2);
  62. let metadata: Option<String> = row.get(3);
  63. NotificationData {
  64. origin_url: origin_url.to_owned(),
  65. origin_text,
  66. time,
  67. short_description,
  68. metadata,
  69. }
  70. })
  71. .collect())
  72. }
  73. Identifier::Index(idx) => loop {
  74. let t = db
  75. .build_transaction()
  76. .isolation_level(tokio_postgres::IsolationLevel::Serializable)
  77. .start()
  78. .await
  79. .context("begin transaction")?;
  80. let notifications = t
  81. .query(
  82. "select notification_id, idx, user_id
  83. from notifications
  84. where user_id = $1
  85. order by idx asc nulls last;",
  86. &[&user_id],
  87. )
  88. .await
  89. .context("failed to get ordering")?;
  90. let notification_id: i64 = notifications
  91. .get(idx.get() - 1)
  92. .ok_or_else(|| anyhow::anyhow!("No such notification with index {}", idx.get()))?
  93. .get(0);
  94. let row = t
  95. .query_one(
  96. "DELETE FROM notifications WHERE notification_id = $1
  97. RETURNING origin_url, origin_html, time, short_description, metadata",
  98. &[&notification_id],
  99. )
  100. .await
  101. .context(format!(
  102. "Failed to delete notification with id {}",
  103. notification_id
  104. ))?;
  105. let origin_url: String = row.get(0);
  106. let origin_text: String = row.get(1);
  107. let time: DateTime<FixedOffset> = row.get(2);
  108. let short_description: Option<String> = row.get(3);
  109. let metadata: Option<String> = row.get(4);
  110. let deleted_notification = NotificationData {
  111. origin_url,
  112. origin_text,
  113. time,
  114. short_description,
  115. metadata,
  116. };
  117. if let Err(e) = t.commit().await {
  118. if e.code().map_or(false, |c| {
  119. *c == tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE
  120. }) {
  121. log::trace!("serialization failure, restarting deletion");
  122. continue;
  123. } else {
  124. return Err(e).context("transaction commit failure");
  125. }
  126. } else {
  127. return Ok(vec![deleted_notification]);
  128. }
  129. },
  130. }
  131. }
  132. #[derive(Debug)]
  133. pub struct NotificationData {
  134. pub origin_url: String,
  135. pub origin_text: String,
  136. pub short_description: Option<String>,
  137. pub time: DateTime<FixedOffset>,
  138. pub metadata: Option<String>,
  139. }
  140. pub async fn move_indices(
  141. db: &mut DbClient,
  142. user_id: i64,
  143. from: usize,
  144. to: usize,
  145. ) -> anyhow::Result<()> {
  146. loop {
  147. let t = db
  148. .build_transaction()
  149. .isolation_level(tokio_postgres::IsolationLevel::Serializable)
  150. .start()
  151. .await
  152. .context("begin transaction")?;
  153. let notifications = t
  154. .query(
  155. "select notification_id, idx, user_id
  156. from notifications
  157. where user_id = $1
  158. order by idx asc nulls last;",
  159. &[&user_id],
  160. )
  161. .await
  162. .context("failed to get initial ordering")?;
  163. let mut notifications = notifications
  164. .into_iter()
  165. .map(|n| n.get(0))
  166. .collect::<Vec<i64>>();
  167. if notifications.get(from).is_none() {
  168. anyhow::bail!(
  169. "`from` index not present, must be less than {}",
  170. notifications.len()
  171. );
  172. }
  173. if notifications.get(to).is_none() {
  174. anyhow::bail!(
  175. "`to` index not present, must be less than {}",
  176. notifications.len()
  177. );
  178. }
  179. if from < to {
  180. notifications[from..=to].rotate_left(1);
  181. } else if to < from {
  182. notifications[to..=from].rotate_right(1);
  183. }
  184. for (idx, id) in notifications.into_iter().enumerate() {
  185. t.execute(
  186. "update notifications SET idx = $2
  187. where notification_id = $1",
  188. &[&id, &(idx as i32)],
  189. )
  190. .await
  191. .context("update notification id")?;
  192. }
  193. if let Err(e) = t.commit().await {
  194. if e.code().map_or(false, |c| {
  195. *c == tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE
  196. }) {
  197. log::trace!("serialization failure, restarting index movement");
  198. continue;
  199. } else {
  200. return Err(e).context("transaction commit failure");
  201. }
  202. } else {
  203. break;
  204. }
  205. }
  206. Ok(())
  207. }
  208. pub async fn add_metadata(
  209. db: &mut DbClient,
  210. user_id: i64,
  211. idx: usize,
  212. metadata: Option<&str>,
  213. ) -> anyhow::Result<()> {
  214. loop {
  215. let t = db
  216. .build_transaction()
  217. .isolation_level(tokio_postgres::IsolationLevel::Serializable)
  218. .start()
  219. .await
  220. .context("begin transaction")?;
  221. let notifications = t
  222. .query(
  223. "select notification_id, idx, user_id
  224. from notifications
  225. where user_id = $1
  226. order by idx asc nulls last;",
  227. &[&user_id],
  228. )
  229. .await
  230. .context("failed to get initial ordering")?;
  231. let notifications = notifications
  232. .into_iter()
  233. .map(|n| n.get(0))
  234. .collect::<Vec<i64>>();
  235. match notifications.get(idx) {
  236. None => anyhow::bail!(
  237. "index not present, must be less than {}",
  238. notifications.len()
  239. ),
  240. Some(id) => {
  241. t.execute(
  242. "update notifications SET metadata = $2
  243. where notification_id = $1",
  244. &[&id, &metadata],
  245. )
  246. .await
  247. .context("update notification id")?;
  248. }
  249. }
  250. if let Err(e) = t.commit().await {
  251. if e.code().map_or(false, |c| {
  252. *c == tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE
  253. }) {
  254. log::trace!("serialization failure, restarting index movement");
  255. continue;
  256. } else {
  257. return Err(e).context("transaction commit failure");
  258. }
  259. } else {
  260. break;
  261. }
  262. }
  263. Ok(())
  264. }
  265. pub async fn get_notifications(
  266. db: &DbClient,
  267. username: &str,
  268. ) -> anyhow::Result<Vec<NotificationData>> {
  269. let notifications = db
  270. .query(
  271. "
  272. select username, origin_url, origin_html, time, short_description, idx, metadata
  273. from notifications
  274. join users on notifications.user_id = users.user_id
  275. where username = $1
  276. order by notifications.idx asc nulls last;",
  277. &[&username],
  278. )
  279. .await
  280. .context("Getting notification data")?;
  281. let mut data = Vec::new();
  282. for notification in notifications {
  283. let origin_url: String = notification.get(1);
  284. let origin_text: String = notification.get(2);
  285. let time: DateTime<FixedOffset> = notification.get(3);
  286. let short_description: Option<String> = notification.get(4);
  287. let metadata: Option<String> = notification.get(6);
  288. data.push(NotificationData {
  289. origin_url,
  290. origin_text,
  291. short_description,
  292. time,
  293. metadata,
  294. });
  295. }
  296. Ok(data)
  297. }