events.rs 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. //! The `events` table provides a way to have scheduled events
  2. use anyhow::{Result, Context as _};
  3. use chrono::{DateTime, FixedOffset};
  4. use tokio_postgres::{Client as DbClient};
  5. use uuid::Uuid;
  6. use serde::{Deserialize, Serialize};
  7. #[derive(Serialize, Deserialize, Debug)]
  8. pub struct Event {
  9. pub event_id: Uuid,
  10. pub event_name: String,
  11. pub expected_event_time: DateTime<FixedOffset>,
  12. // pub event_metadata: String,
  13. pub executed_at: DateTime<FixedOffset>,
  14. pub failed: Option<String>,
  15. }
  16. pub async fn insert_event(db: &DbClient, event: &Event) -> Result<()> {
  17. tracing::trace!("insert_event(id={})", event.event_id);
  18. db.execute(
  19. "INSERT INTO events (event_id, event_name, expected_event_time, event_metadata, executed_at, failed) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING",
  20. &[&event.event_id, &event.event_name, &event.expected_event_time, &"", &event.executed_at, &event.failed],
  21. )
  22. .await
  23. .context("inserting event")?;
  24. Ok(())
  25. }
  26. pub async fn delete_event(db: &DbClient, event_id: &Uuid) -> Result<()> {
  27. tracing::trace!("delete_event(id={})", event_id);
  28. db.execute(
  29. "DELETE FROM events WHERE event_id = $1",
  30. &[&event_id],
  31. )
  32. .await
  33. .context("deleting event")?;
  34. Ok(())
  35. }
  36. pub async fn update_event_failed_message(db: &DbClient, event_id: &Uuid, message: &String) -> Result<()> {
  37. tracing::trace!("update_event_failed_message(id={})", event_id);
  38. db.execute(
  39. "UPDATE events SET failed = $2 WHERE event_id = $1",
  40. &[&event_id, &message],
  41. )
  42. .await
  43. .context("updating event failed message")?;
  44. Ok(())
  45. }
  46. pub async fn update_event_executed_at(db: &DbClient, event_id: &Uuid) -> Result<()> {
  47. tracing::trace!("update_event_executed_at(id={})", event_id);
  48. db.execute(
  49. "UPDATE events SET executed_at = now() WHERE event_id = $1",
  50. &[&event_id],
  51. )
  52. .await
  53. .context("updating event executed at")?;
  54. Ok(())
  55. }
  56. // Selects all events with:
  57. // - event_time's in the past
  58. // - failed is null or executed_at is at least 60 minutes ago (intended to make repeat executions rare enough)
  59. pub async fn get_events_to_execute(db: &DbClient) -> Result<Vec<Event>> {
  60. let events = db
  61. .query(
  62. "
  63. SELECT * FROM events WHERE expected_event_time <= now() AND (failed IS NULL OR executed_at <= now() - INTERVAL '60 minutes')",
  64. &[],
  65. )
  66. .await
  67. .context("Getting events data")?;
  68. let mut data = Vec::with_capacity(events.len());
  69. for event in events {
  70. let event_id: Uuid = event.get(0);
  71. let event_name: String = event.get(1);
  72. let expected_event_time: DateTime<FixedOffset> = event.get(2);
  73. // let event_metadata: String = event.get(3);
  74. let executed_at: DateTime<FixedOffset> = event.get(4);
  75. let failed: Option<String> = event.get(5);
  76. data.push(Event {
  77. event_id,
  78. event_name,
  79. expected_event_time,
  80. // event_metadata,
  81. executed_at,
  82. failed
  83. });
  84. }
  85. Ok(data)
  86. }