Browse Source

Add retry logic when hitting rate-limits

Paul Daniel Faria 4 years ago
parent
commit
48419b9fd5
2 changed files with 125 additions and 38 deletions
  1. 115 35
      src/github.rs
  2. 10 3
      src/prioritization/mod.rs

+ 115 - 35
src/github.rs

@@ -2,9 +2,10 @@ use anyhow::Context;
 
 
 use chrono::{DateTime, FixedOffset, Utc};
 use chrono::{DateTime, FixedOffset, Utc};
 use futures::stream::{FuturesUnordered, StreamExt};
 use futures::stream::{FuturesUnordered, StreamExt};
+use futures::{future::BoxFuture, FutureExt};
 use once_cell::sync::OnceCell;
 use once_cell::sync::OnceCell;
 use reqwest::header::{AUTHORIZATION, USER_AGENT};
 use reqwest::header::{AUTHORIZATION, USER_AGENT};
-use reqwest::{Client, RequestBuilder, Response, StatusCode};
+use reqwest::{Client, Request, RequestBuilder, Response, StatusCode};
 use std::{
 use std::{
     fmt,
     fmt,
     time::{Duration, SystemTime},
     time::{Duration, SystemTime},
@@ -18,51 +19,130 @@ pub struct User {
 
 
 impl GithubClient {
 impl GithubClient {
     async fn _send_req(&self, req: RequestBuilder) -> Result<(Response, String), reqwest::Error> {
     async fn _send_req(&self, req: RequestBuilder) -> Result<(Response, String), reqwest::Error> {
+        const MAX_ATTEMPTS: usize = 2;
         log::debug!("_send_req with {:?}", req);
         log::debug!("_send_req with {:?}", req);
         let req = req.build()?;
         let req = req.build()?;
-
         let req_dbg = format!("{:?}", req);
         let req_dbg = format!("{:?}", req);
 
 
         let mut resp = self.client.execute(req.try_clone().unwrap()).await?;
         let mut resp = self.client.execute(req.try_clone().unwrap()).await?;
+        if let Some(sleep) = Self::needs_retry(&resp).await {
+            eprintln!(
+                "Need to retry request {}. Sleeping for {}",
+                req_dbg,
+                sleep.as_secs()
+            );
+            resp = self.retry(req, sleep, MAX_ATTEMPTS).await?;
+        }
+
+        if let Err(e) = resp.error_for_status_ref() {
+            return Err(e);
+        }
+
+        Ok((resp, req_dbg))
+    }
+
+    async fn needs_retry(resp: &Response) -> Option<Duration> {
+        const REMAINING: &str = "X-RateLimit-Remaining";
+        const RESET: &str = "X-RateLimit-Reset";
+
+        if resp.status().is_success() {
+            return None;
+        }
+
+        let headers = resp.headers();
+        if !(headers.contains_key(REMAINING) && headers.contains_key(RESET)) {
+            return None;
+        }
+
+        // Weird github api behavior. It asks us to retry but also has a remaining count above 1
+        // Try again immediately and hope for the best...
+        if headers[REMAINING] != "0" {
+            return Some(Duration::from_secs(0));
+        }
+
+        let reset_time = headers[RESET].to_str().unwrap().parse::<u64>().unwrap();
+        Some(Duration::from_secs(Self::calc_sleep(reset_time) + 10))
+    }
 
 
-        if resp.status().as_u16() == 403 {
-            let headers = dbg!(resp.headers());
-            if headers.contains_key("X-RateLimit-Remaining")
-                && headers.contains_key("X-RateLimit-Reset")
-            {
-                if headers["X-RateLimit-Remaining"] == "0" {
-                    let epoch_time = dbg!(SystemTime::now()
-                        .duration_since(SystemTime::UNIX_EPOCH)
-                        .unwrap()
-                        .as_secs());
-                    let reset_time = dbg!(headers["X-RateLimit-Reset"]
-                        .to_str()
-                        .unwrap()
-                        .parse::<u64>()
-                        .unwrap());
-                    tokio::time::delay_for(Duration::from_secs(dbg!(
-                        reset_time.saturating_sub(epoch_time) + 5
-                    )))
-                    .await;
-                    dbg!(SystemTime::now()
-                        .duration_since(SystemTime::UNIX_EPOCH)
-                        .unwrap()
-                        .as_secs());
-                    resp = self.client.execute(req.try_clone().unwrap()).await?;
-                    if resp.status().as_u16() == 403 {
-                        dbg!(resp.headers());
-                        tokio::time::delay_for(Duration::from_secs(60)).await;
-                        resp = self.client.execute(req).await?;
-                        dbg!(resp.headers());
-                    }
+    fn calc_sleep(reset_time: u64) -> u64 {
+        let epoch_time = SystemTime::now()
+            .duration_since(SystemTime::UNIX_EPOCH)
+            .unwrap()
+            .as_secs();
+        reset_time.saturating_sub(epoch_time)
+    }
+
+    fn retry(
+        &self,
+        req: Request,
+        sleep: Duration,
+        remaining_attempts: usize,
+    ) -> BoxFuture<Result<Response, reqwest::Error>> {
+        #[derive(Debug, serde::Deserialize)]
+        struct RateLimit {
+            pub limit: u64,
+            pub remaining: u64,
+            pub reset: u64,
+        }
+
+        #[derive(Debug, serde::Deserialize)]
+        struct RateLimitResponse {
+            pub resources: Resources,
+        }
+
+        #[derive(Debug, serde::Deserialize)]
+        struct Resources {
+            pub core: RateLimit,
+            pub search: RateLimit,
+            pub graphql: RateLimit,
+            pub source_import: RateLimit,
+        }
+
+        async move {
+            eprintln!("Sleeping for {}", sleep.as_secs());
+            tokio::time::delay_for(sleep).await;
+            eprintln!("Done sleeping");
+
+            // check rate limit
+            let rate_resp = self
+                .client
+                .execute(
+                    self.client
+                        .get("https://api.github.com/rate_limit")
+                        .configure(self)
+                        .build()
+                        .unwrap(),
+                )
+                .await?;
+            let search_rate_limit = rate_resp
+                .json::<RateLimitResponse>()
+                .await?
+                .resources
+                .search;
+            eprintln!("search rate limit info: {:?}", search_rate_limit);
+
+            // If we still don't have any more remaining attempts, try sleeping for the remaining
+            // period of time
+            if search_rate_limit.remaining == 0 {
+                let sleep = Self::calc_sleep(search_rate_limit.reset);
+                if sleep > 0 {
+                    tokio::time::delay_for(Duration::from_secs(sleep)).await;
                 }
                 }
             }
             }
-        }
 
 
-        resp.error_for_status_ref()?;
+            let resp = self.client.execute(req.try_clone().unwrap()).await?;
+            if let Some(sleep) = Self::needs_retry(&resp).await {
+                if remaining_attempts > 0 {
+                    drop(resp);
+                    return self.retry(req, sleep, remaining_attempts - 1).await;
+                }
+            }
 
 
-        Ok((resp, req_dbg))
+            Ok(resp)
+        }
+        .boxed()
     }
     }
+
     async fn send_req(&self, req: RequestBuilder) -> anyhow::Result<Vec<u8>> {
     async fn send_req(&self, req: RequestBuilder) -> anyhow::Result<Vec<u8>> {
         let (mut resp, req_dbg) = self._send_req(req).await?;
         let (mut resp, req_dbg) = self._send_req(req).await?;
 
 

+ 10 - 3
src/prioritization/mod.rs

@@ -85,10 +85,17 @@ impl<'a> Action for Step<'a> {
                         let issues_search_result = block_on(
                         let issues_search_result = block_on(
                             repository
                             repository
                                 .get_issues(&gh, &filters.iter().map(|s| s.as_ref()).collect()),
                                 .get_issues(&gh, &filters.iter().map(|s| s.as_ref()).collect()),
-                        )
-                        .unwrap();
+                        );
+
+                        if let Err(err) = issues_search_result {
+                            eprintln!("ERROR: {}", err);
+                            err.chain()
+                                .skip(1)
+                                .for_each(|cause| eprintln!("because: {}", cause));
+                            std::process::exit(1);
+                        }
 
 
-                        (*name, issues_search_result.items)
+                        (*name, issues_search_result.unwrap().items)
                     })
                     })
                     .collect::<Vec<_>>()
                     .collect::<Vec<_>>()
             })
             })