notifications.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. use anyhow::Context as _;
  2. use chrono::{DateTime, FixedOffset};
  3. use tokio_postgres::Client as DbClient;
  4. pub struct Notification {
  5. pub user_id: i64,
  6. pub origin_url: String,
  7. pub origin_html: String,
  8. pub short_description: Option<String>,
  9. pub time: DateTime<FixedOffset>,
  10. /// If this is Some, then the notification originated in a team-wide ping
  11. /// (e.g., @rust-lang/libs). The String is the team name (e.g., libs).
  12. pub team_name: Option<String>,
  13. }
  14. pub async fn record_username(db: &DbClient, user_id: i64, username: String) -> anyhow::Result<()> {
  15. db.execute(
  16. "INSERT INTO users (user_id, username) VALUES ($1, $2) ON CONFLICT DO NOTHING",
  17. &[&user_id, &username],
  18. )
  19. .await
  20. .context("inserting user id / username")?;
  21. Ok(())
  22. }
  23. pub async fn record_ping(db: &DbClient, notification: &Notification) -> anyhow::Result<()> {
  24. db.execute("INSERT INTO notifications (user_id, origin_url, origin_html, time, short_description, team_name, idx)
  25. VALUES (
  26. $1, $2, $3, $4, $5, $6,
  27. (SELECT max(notifications.idx) + 1 from notifications where notifications.user_id = $1)
  28. )",
  29. &[&notification.user_id, &notification.origin_url, &notification.origin_html, &notification.time, &notification.short_description, &notification.team_name],
  30. ).await.context("inserting notification")?;
  31. Ok(())
  32. }
  33. #[derive(Copy, Clone)]
  34. pub enum Identifier<'a> {
  35. Url(&'a str),
  36. Index(std::num::NonZeroUsize),
  37. /// Glob identifier (`all` or `*`).
  38. All,
  39. }
  40. pub async fn delete_ping(
  41. db: &mut DbClient,
  42. user_id: i64,
  43. identifier: Identifier<'_>,
  44. ) -> anyhow::Result<Vec<NotificationData>> {
  45. match identifier {
  46. Identifier::Url(origin_url) => {
  47. let rows = db
  48. .query(
  49. "DELETE FROM notifications WHERE user_id = $1 and origin_url = $2
  50. RETURNING origin_html, time, short_description, metadata",
  51. &[&user_id, &origin_url],
  52. )
  53. .await
  54. .context("delete notification query")?;
  55. Ok(rows
  56. .into_iter()
  57. .map(|row| {
  58. let origin_text: String = row.get(0);
  59. let time: DateTime<FixedOffset> = row.get(1);
  60. let short_description: Option<String> = row.get(2);
  61. let metadata: Option<String> = row.get(3);
  62. NotificationData {
  63. origin_url: origin_url.to_owned(),
  64. origin_text,
  65. time,
  66. short_description,
  67. metadata,
  68. }
  69. })
  70. .collect())
  71. }
  72. Identifier::Index(idx) => loop {
  73. let t = db
  74. .build_transaction()
  75. .isolation_level(tokio_postgres::IsolationLevel::Serializable)
  76. .start()
  77. .await
  78. .context("begin transaction")?;
  79. let notifications = t
  80. .query(
  81. "select notification_id, idx, user_id
  82. from notifications
  83. where user_id = $1
  84. order by idx asc nulls last;",
  85. &[&user_id],
  86. )
  87. .await
  88. .context("failed to get ordering")?;
  89. let notification_id: i64 = notifications
  90. .get(idx.get() - 1)
  91. .ok_or_else(|| anyhow::anyhow!("No such notification with index {}", idx.get()))?
  92. .get(0);
  93. let row = t
  94. .query_one(
  95. "DELETE FROM notifications WHERE notification_id = $1
  96. RETURNING origin_url, origin_html, time, short_description, metadata",
  97. &[&notification_id],
  98. )
  99. .await
  100. .context(format!(
  101. "Failed to delete notification with id {}",
  102. notification_id
  103. ))?;
  104. let origin_url: String = row.get(0);
  105. let origin_text: String = row.get(1);
  106. let time: DateTime<FixedOffset> = row.get(2);
  107. let short_description: Option<String> = row.get(3);
  108. let metadata: Option<String> = row.get(4);
  109. let deleted_notification = NotificationData {
  110. origin_url,
  111. origin_text,
  112. time,
  113. short_description,
  114. metadata,
  115. };
  116. if let Err(e) = t.commit().await {
  117. if e.code().map_or(false, |c| {
  118. *c == tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE
  119. }) {
  120. log::trace!("serialization failure, restarting deletion");
  121. continue;
  122. } else {
  123. return Err(e).context("transaction commit failure");
  124. }
  125. } else {
  126. return Ok(vec![deleted_notification]);
  127. }
  128. },
  129. Identifier::All => {
  130. let rows = db
  131. .query(
  132. "DELETE FROM notifications WHERE user_id = $1
  133. RETURNING origin_url, origin_html, time, short_description, metadata",
  134. &[&user_id],
  135. )
  136. .await
  137. .context("delete all notifications query")?;
  138. Ok(rows
  139. .into_iter()
  140. .map(|row| {
  141. let origin_url: String = row.get(0);
  142. let origin_text: String = row.get(1);
  143. let time: DateTime<FixedOffset> = row.get(2);
  144. let short_description: Option<String> = row.get(3);
  145. let metadata: Option<String> = row.get(4);
  146. NotificationData {
  147. origin_url,
  148. origin_text,
  149. time,
  150. short_description,
  151. metadata,
  152. }
  153. })
  154. .collect())
  155. }
  156. }
  157. }
  158. #[derive(Debug)]
  159. pub struct NotificationData {
  160. pub origin_url: String,
  161. pub origin_text: String,
  162. pub short_description: Option<String>,
  163. pub time: DateTime<FixedOffset>,
  164. pub metadata: Option<String>,
  165. }
  166. pub async fn move_indices(
  167. db: &mut DbClient,
  168. user_id: i64,
  169. from: usize,
  170. to: usize,
  171. ) -> anyhow::Result<()> {
  172. loop {
  173. let t = db
  174. .build_transaction()
  175. .isolation_level(tokio_postgres::IsolationLevel::Serializable)
  176. .start()
  177. .await
  178. .context("begin transaction")?;
  179. let notifications = t
  180. .query(
  181. "select notification_id, idx, user_id
  182. from notifications
  183. where user_id = $1
  184. order by idx asc nulls last;",
  185. &[&user_id],
  186. )
  187. .await
  188. .context("failed to get initial ordering")?;
  189. let mut notifications = notifications
  190. .into_iter()
  191. .map(|n| n.get(0))
  192. .collect::<Vec<i64>>();
  193. if notifications.get(from).is_none() {
  194. anyhow::bail!(
  195. "`from` index not present, must be less than {}",
  196. notifications.len()
  197. );
  198. }
  199. if notifications.get(to).is_none() {
  200. anyhow::bail!(
  201. "`to` index not present, must be less than {}",
  202. notifications.len()
  203. );
  204. }
  205. if from < to {
  206. notifications[from..=to].rotate_left(1);
  207. } else if to < from {
  208. notifications[to..=from].rotate_right(1);
  209. }
  210. for (idx, id) in notifications.into_iter().enumerate() {
  211. t.execute(
  212. "update notifications SET idx = $2
  213. where notification_id = $1",
  214. &[&id, &(idx as i32)],
  215. )
  216. .await
  217. .context("update notification id")?;
  218. }
  219. if let Err(e) = t.commit().await {
  220. if e.code().map_or(false, |c| {
  221. *c == tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE
  222. }) {
  223. log::trace!("serialization failure, restarting index movement");
  224. continue;
  225. } else {
  226. return Err(e).context("transaction commit failure");
  227. }
  228. } else {
  229. break;
  230. }
  231. }
  232. Ok(())
  233. }
  234. pub async fn add_metadata(
  235. db: &mut DbClient,
  236. user_id: i64,
  237. idx: usize,
  238. metadata: Option<&str>,
  239. ) -> anyhow::Result<()> {
  240. loop {
  241. let t = db
  242. .build_transaction()
  243. .isolation_level(tokio_postgres::IsolationLevel::Serializable)
  244. .start()
  245. .await
  246. .context("begin transaction")?;
  247. let notifications = t
  248. .query(
  249. "select notification_id, idx, user_id
  250. from notifications
  251. where user_id = $1
  252. order by idx asc nulls last;",
  253. &[&user_id],
  254. )
  255. .await
  256. .context("failed to get initial ordering")?;
  257. let notifications = notifications
  258. .into_iter()
  259. .map(|n| n.get(0))
  260. .collect::<Vec<i64>>();
  261. match notifications.get(idx) {
  262. None => anyhow::bail!(
  263. "index not present, must be less than {}",
  264. notifications.len()
  265. ),
  266. Some(id) => {
  267. t.execute(
  268. "update notifications SET metadata = $2
  269. where notification_id = $1",
  270. &[&id, &metadata],
  271. )
  272. .await
  273. .context("update notification id")?;
  274. }
  275. }
  276. if let Err(e) = t.commit().await {
  277. if e.code().map_or(false, |c| {
  278. *c == tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE
  279. }) {
  280. log::trace!("serialization failure, restarting index movement");
  281. continue;
  282. } else {
  283. return Err(e).context("transaction commit failure");
  284. }
  285. } else {
  286. break;
  287. }
  288. }
  289. Ok(())
  290. }
  291. pub async fn get_notifications(
  292. db: &DbClient,
  293. username: &str,
  294. ) -> anyhow::Result<Vec<NotificationData>> {
  295. let notifications = db
  296. .query(
  297. "
  298. select username, origin_url, origin_html, time, short_description, idx, metadata
  299. from notifications
  300. join users on notifications.user_id = users.user_id
  301. where username = $1
  302. order by notifications.idx asc nulls last;",
  303. &[&username],
  304. )
  305. .await
  306. .context("Getting notification data")?;
  307. let mut data = Vec::new();
  308. for notification in notifications {
  309. let origin_url: String = notification.get(1);
  310. let origin_text: String = notification.get(2);
  311. let time: DateTime<FixedOffset> = notification.get(3);
  312. let short_description: Option<String> = notification.get(4);
  313. let metadata: Option<String> = notification.get(6);
  314. data.push(NotificationData {
  315. origin_url,
  316. origin_text,
  317. short_description,
  318. time,
  319. metadata,
  320. });
  321. }
  322. Ok(data)
  323. }