notifications.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. use anyhow::Context as _;
  2. use chrono::{DateTime, FixedOffset};
  3. use tokio_postgres::Client as DbClient;
  4. use tracing as log;
  5. pub struct Notification {
  6. pub user_id: u64,
  7. pub origin_url: String,
  8. pub origin_html: String,
  9. pub short_description: Option<String>,
  10. pub time: DateTime<FixedOffset>,
  11. /// If this is Some, then the notification originated in a team-wide ping
  12. /// (e.g., @rust-lang/libs). The String is the team name (e.g., libs).
  13. pub team_name: Option<String>,
  14. }
  15. /// Add a new user (if not existing)
  16. pub async fn record_username(db: &DbClient, user_id: u64, username: &str) -> anyhow::Result<()> {
  17. db.execute(
  18. "INSERT INTO users (user_id, username) VALUES ($1, $2) ON CONFLICT DO NOTHING",
  19. &[&(user_id as i64), &username],
  20. )
  21. .await
  22. .context("inserting user id / username")?;
  23. Ok(())
  24. }
  25. pub async fn record_ping(db: &DbClient, notification: &Notification) -> anyhow::Result<()> {
  26. db.execute("INSERT INTO notifications (user_id, origin_url, origin_html, time, short_description, team_name, idx)
  27. VALUES (
  28. $1, $2, $3, $4, $5, $6,
  29. (SELECT max(notifications.idx) + 1 from notifications where notifications.user_id = $1)
  30. )",
  31. &[&(notification.user_id as i64), &notification.origin_url, &notification.origin_html, &notification.time, &notification.short_description, &notification.team_name],
  32. ).await.context("inserting notification")?;
  33. Ok(())
  34. }
  35. #[derive(Copy, Clone)]
  36. pub enum Identifier<'a> {
  37. Url(&'a str),
  38. Index(std::num::NonZeroU32),
  39. /// Glob identifier (`all` or `*`).
  40. All,
  41. }
  42. pub async fn delete_ping(
  43. db: &mut DbClient,
  44. user_id: u64,
  45. identifier: Identifier<'_>,
  46. ) -> anyhow::Result<Vec<NotificationData>> {
  47. match identifier {
  48. Identifier::Url(origin_url) => {
  49. let rows = db
  50. .query(
  51. "DELETE FROM notifications WHERE user_id = $1 and origin_url = $2
  52. RETURNING origin_html, time, short_description, metadata",
  53. &[&(user_id as i64), &origin_url],
  54. )
  55. .await
  56. .context("delete notification query")?;
  57. Ok(rows
  58. .into_iter()
  59. .map(|row| {
  60. let origin_text: String = row.get(0);
  61. let time: DateTime<FixedOffset> = row.get(1);
  62. let short_description: Option<String> = row.get(2);
  63. let metadata: Option<String> = row.get(3);
  64. NotificationData {
  65. origin_url: origin_url.to_owned(),
  66. origin_text,
  67. time,
  68. short_description,
  69. metadata,
  70. }
  71. })
  72. .collect())
  73. }
  74. Identifier::Index(idx) => loop {
  75. let t = db
  76. .build_transaction()
  77. .isolation_level(tokio_postgres::IsolationLevel::Serializable)
  78. .start()
  79. .await
  80. .context("begin transaction")?;
  81. let notifications = t
  82. .query(
  83. "select notification_id, idx, user_id
  84. from notifications
  85. where user_id = $1
  86. order by idx asc nulls last;",
  87. &[&(user_id as i64)],
  88. )
  89. .await
  90. .context("failed to get ordering")?;
  91. let notification_id: i64 = notifications
  92. .get((idx.get() - 1) as usize)
  93. .ok_or_else(|| anyhow::anyhow!("No such notification with index {}", idx.get()))?
  94. .get(0);
  95. let row = t
  96. .query_one(
  97. "DELETE FROM notifications WHERE notification_id = $1
  98. RETURNING origin_url, origin_html, time, short_description, metadata",
  99. &[&notification_id],
  100. )
  101. .await
  102. .context(format!(
  103. "Failed to delete notification with id {}",
  104. notification_id
  105. ))?;
  106. let origin_url: String = row.get(0);
  107. let origin_text: String = row.get(1);
  108. let time: DateTime<FixedOffset> = row.get(2);
  109. let short_description: Option<String> = row.get(3);
  110. let metadata: Option<String> = row.get(4);
  111. let deleted_notification = NotificationData {
  112. origin_url,
  113. origin_text,
  114. time,
  115. short_description,
  116. metadata,
  117. };
  118. if let Err(e) = t.commit().await {
  119. if e.code().map_or(false, |c| {
  120. *c == tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE
  121. }) {
  122. log::trace!("serialization failure, restarting deletion");
  123. continue;
  124. } else {
  125. return Err(e).context("transaction commit failure");
  126. }
  127. } else {
  128. return Ok(vec![deleted_notification]);
  129. }
  130. },
  131. Identifier::All => {
  132. let rows = db
  133. .query(
  134. "DELETE FROM notifications WHERE user_id = $1
  135. RETURNING origin_url, origin_html, time, short_description, metadata",
  136. &[&(user_id as i64)],
  137. )
  138. .await
  139. .context("delete all notifications query")?;
  140. Ok(rows
  141. .into_iter()
  142. .map(|row| {
  143. let origin_url: String = row.get(0);
  144. let origin_text: String = row.get(1);
  145. let time: DateTime<FixedOffset> = row.get(2);
  146. let short_description: Option<String> = row.get(3);
  147. let metadata: Option<String> = row.get(4);
  148. NotificationData {
  149. origin_url,
  150. origin_text,
  151. time,
  152. short_description,
  153. metadata,
  154. }
  155. })
  156. .collect())
  157. }
  158. }
  159. }
  160. #[derive(Debug)]
  161. pub struct NotificationData {
  162. pub origin_url: String,
  163. pub origin_text: String,
  164. pub short_description: Option<String>,
  165. pub time: DateTime<FixedOffset>,
  166. pub metadata: Option<String>,
  167. }
  168. pub async fn move_indices(
  169. db: &mut DbClient,
  170. user_id: u64,
  171. from: u32,
  172. to: u32,
  173. ) -> anyhow::Result<()> {
  174. let from = usize::try_from(from)?;
  175. let to = usize::try_from(to)?;
  176. loop {
  177. let t = db
  178. .build_transaction()
  179. .isolation_level(tokio_postgres::IsolationLevel::Serializable)
  180. .start()
  181. .await
  182. .context("begin transaction")?;
  183. let notifications = t
  184. .query(
  185. "select notification_id, idx, user_id
  186. from notifications
  187. where user_id = $1
  188. order by idx asc nulls last;",
  189. &[&(user_id as i64)],
  190. )
  191. .await
  192. .context("failed to get initial ordering")?;
  193. let mut notifications = notifications
  194. .into_iter()
  195. .map(|n| n.get(0))
  196. .collect::<Vec<i64>>();
  197. if notifications.get(from).is_none() {
  198. anyhow::bail!(
  199. "`from` index not present, must be less than {}",
  200. notifications.len()
  201. );
  202. }
  203. if notifications.get(to).is_none() {
  204. anyhow::bail!(
  205. "`to` index not present, must be less than {}",
  206. notifications.len()
  207. );
  208. }
  209. if from < to {
  210. notifications[from..=to].rotate_left(1);
  211. } else if to < from {
  212. notifications[to..=from].rotate_right(1);
  213. }
  214. for (idx, id) in notifications.into_iter().enumerate() {
  215. t.execute(
  216. "update notifications SET idx = $2
  217. where notification_id = $1",
  218. &[&id, &(idx as i32)],
  219. )
  220. .await
  221. .context("update notification id")?;
  222. }
  223. if let Err(e) = t.commit().await {
  224. if e.code().map_or(false, |c| {
  225. *c == tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE
  226. }) {
  227. log::trace!("serialization failure, restarting index movement");
  228. continue;
  229. } else {
  230. return Err(e).context("transaction commit failure");
  231. }
  232. } else {
  233. break;
  234. }
  235. }
  236. Ok(())
  237. }
  238. pub async fn add_metadata(
  239. db: &mut DbClient,
  240. user_id: u64,
  241. idx: u32,
  242. metadata: Option<&str>,
  243. ) -> anyhow::Result<()> {
  244. let idx = usize::try_from(idx)?;
  245. loop {
  246. let t = db
  247. .build_transaction()
  248. .isolation_level(tokio_postgres::IsolationLevel::Serializable)
  249. .start()
  250. .await
  251. .context("begin transaction")?;
  252. let notifications = t
  253. .query(
  254. "select notification_id, idx, user_id
  255. from notifications
  256. where user_id = $1
  257. order by idx asc nulls last;",
  258. &[&(user_id as i64)],
  259. )
  260. .await
  261. .context("failed to get initial ordering")?;
  262. let notifications = notifications
  263. .into_iter()
  264. .map(|n| n.get(0))
  265. .collect::<Vec<i64>>();
  266. match notifications.get(idx) {
  267. None => anyhow::bail!(
  268. "index not present, must be less than {}",
  269. notifications.len()
  270. ),
  271. Some(id) => {
  272. t.execute(
  273. "update notifications SET metadata = $2
  274. where notification_id = $1",
  275. &[&id, &metadata],
  276. )
  277. .await
  278. .context("update notification id")?;
  279. }
  280. }
  281. if let Err(e) = t.commit().await {
  282. if e.code().map_or(false, |c| {
  283. *c == tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE
  284. }) {
  285. log::trace!("serialization failure, restarting index movement");
  286. continue;
  287. } else {
  288. return Err(e).context("transaction commit failure");
  289. }
  290. } else {
  291. break;
  292. }
  293. }
  294. Ok(())
  295. }
  296. pub async fn get_notifications(
  297. db: &DbClient,
  298. username: &str,
  299. ) -> anyhow::Result<Vec<NotificationData>> {
  300. let notifications = db
  301. .query(
  302. "
  303. select username, origin_url, origin_html, time, short_description, idx, metadata
  304. from notifications
  305. join users on notifications.user_id = users.user_id
  306. where username = $1
  307. order by notifications.idx asc nulls last;",
  308. &[&username],
  309. )
  310. .await
  311. .context("Getting notification data")?;
  312. let mut data = Vec::new();
  313. for notification in notifications {
  314. let origin_url: String = notification.get(1);
  315. let origin_text: String = notification.get(2);
  316. let time: DateTime<FixedOffset> = notification.get(3);
  317. let short_description: Option<String> = notification.get(4);
  318. let metadata: Option<String> = notification.get(6);
  319. data.push(NotificationData {
  320. origin_url,
  321. origin_text,
  322. short_description,
  323. time,
  324. metadata,
  325. });
  326. }
  327. Ok(data)
  328. }