use anyhow::Context; use async_trait::async_trait; use chrono::{DateTime, FixedOffset, Utc}; use futures::stream::{FuturesUnordered, StreamExt}; use futures::{future::BoxFuture, FutureExt}; use hyper::header::HeaderValue; use once_cell::sync::OnceCell; use reqwest::header::{AUTHORIZATION, USER_AGENT}; use reqwest::{Client, Request, RequestBuilder, Response, StatusCode}; use std::{ fmt, time::{Duration, SystemTime}, }; pub mod graphql; #[derive(Debug, PartialEq, Eq, serde::Deserialize)] pub struct User { pub login: String, pub id: Option, } impl GithubClient { async fn _send_req(&self, req: RequestBuilder) -> anyhow::Result<(Response, String)> { const MAX_ATTEMPTS: usize = 2; log::debug!("_send_req with {:?}", req); let req_dbg = format!("{:?}", req); let req = req .build() .with_context(|| format!("building reqwest {}", req_dbg))?; let mut resp = self.client.execute(req.try_clone().unwrap()).await?; if let Some(sleep) = Self::needs_retry(&resp).await { resp = self.retry(req, sleep, MAX_ATTEMPTS).await?; } resp.error_for_status_ref()?; Ok((resp, req_dbg)) } async fn needs_retry(resp: &Response) -> Option { const REMAINING: &str = "X-RateLimit-Remaining"; const RESET: &str = "X-RateLimit-Reset"; if resp.status().is_success() { return None; } let headers = resp.headers(); if !(headers.contains_key(REMAINING) && headers.contains_key(RESET)) { return None; } // Weird github api behavior. It asks us to retry but also has a remaining count above 1 // Try again immediately and hope for the best... if headers[REMAINING] != "0" { return Some(Duration::from_secs(0)); } let reset_time = headers[RESET].to_str().unwrap().parse::().unwrap(); Some(Duration::from_secs(Self::calc_sleep(reset_time) + 10)) } fn calc_sleep(reset_time: u64) -> u64 { let epoch_time = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_secs(); reset_time.saturating_sub(epoch_time) } fn retry( &self, req: Request, sleep: Duration, remaining_attempts: usize, ) -> BoxFuture> { #[derive(Debug, serde::Deserialize)] struct RateLimit { pub limit: u64, pub remaining: u64, pub reset: u64, } #[derive(Debug, serde::Deserialize)] struct RateLimitResponse { pub resources: Resources, } #[derive(Debug, serde::Deserialize)] struct Resources { pub core: RateLimit, pub search: RateLimit, pub graphql: RateLimit, pub source_import: RateLimit, } log::warn!( "Retrying after {} seconds, remaining attepts {}", sleep.as_secs(), remaining_attempts, ); async move { tokio::time::sleep(sleep).await; // check rate limit let rate_resp = self .client .execute( self.client .get("https://api.github.com/rate_limit") .configure(self) .build() .unwrap(), ) .await?; let rate_limit_response = rate_resp.json::().await?; // Check url for search path because github has different rate limits for the search api let rate_limit = if req .url() .path_segments() .map(|mut segments| matches!(segments.next(), Some("search"))) .unwrap_or(false) { rate_limit_response.resources.search } else { rate_limit_response.resources.core }; // If we still don't have any more remaining attempts, try sleeping for the remaining // period of time if rate_limit.remaining == 0 { let sleep = Self::calc_sleep(rate_limit.reset); if sleep > 0 { tokio::time::sleep(Duration::from_secs(sleep)).await; } } let resp = self.client.execute(req.try_clone().unwrap()).await?; if let Some(sleep) = Self::needs_retry(&resp).await { if remaining_attempts > 0 { return self.retry(req, sleep, remaining_attempts - 1).await; } } Ok(resp) } .boxed() } async fn send_req(&self, req: RequestBuilder) -> anyhow::Result> { let (mut resp, req_dbg) = self._send_req(req).await?; let mut body = Vec::new(); while let Some(chunk) = resp.chunk().await.transpose() { let chunk = chunk .context("reading stream failed") .map_err(anyhow::Error::from) .context(req_dbg.clone())?; body.extend_from_slice(&chunk); } Ok(body) } pub async fn json(&self, req: RequestBuilder) -> anyhow::Result where T: serde::de::DeserializeOwned, { let (resp, req_dbg) = self._send_req(req).await?; Ok(resp.json().await.context(req_dbg)?) } } impl User { pub async fn current(client: &GithubClient) -> anyhow::Result { client.json(client.get("https://api.github.com/user")).await } pub async fn is_team_member<'a>(&'a self, client: &'a GithubClient) -> anyhow::Result { log::trace!("Getting team membership for {:?}", self.login); let permission = crate::team_data::teams(client).await?; let map = permission.teams; let is_triager = map .get("wg-triage") .map_or(false, |w| w.members.iter().any(|g| g.github == self.login)); let is_pri_member = map .get("wg-prioritization") .map_or(false, |w| w.members.iter().any(|g| g.github == self.login)); let is_async_member = map .get("wg-async-foundations") .map_or(false, |w| w.members.iter().any(|g| g.github == self.login)); let in_all = map["all"].members.iter().any(|g| g.github == self.login); log::trace!( "{:?} is all?={:?}, triager?={:?}, prioritizer?={:?}, async?={:?}", self.login, in_all, is_triager, is_pri_member, is_async_member, ); Ok(in_all || is_triager || is_pri_member || is_async_member) } // Returns the ID of the given user, if the user is in the `all` team. pub async fn get_id<'a>(&'a self, client: &'a GithubClient) -> anyhow::Result> { let permission = crate::team_data::teams(client).await?; let map = permission.teams; Ok(map["all"] .members .iter() .find(|g| g.github == self.login) .map(|u| u.github_id)) } } pub async fn get_team( client: &GithubClient, team: &str, ) -> anyhow::Result> { let permission = crate::team_data::teams(client).await?; let mut map = permission.teams; Ok(map.swap_remove(team)) } #[derive(PartialEq, Eq, Debug, Clone, serde::Deserialize)] pub struct Label { pub name: String, } impl Label { async fn exists<'a>(&'a self, repo_api_prefix: &'a str, client: &'a GithubClient) -> bool { #[allow(clippy::redundant_pattern_matching)] let url = format!("{}/labels/{}", repo_api_prefix, self.name); match client.send_req(client.get(&url)).await { Ok(_) => true, // XXX: Error handling if the request failed for reasons beyond 'label didn't exist' Err(_) => false, } } } #[derive(Debug, serde::Deserialize)] pub struct PullRequestDetails { // none for now } #[derive(Debug, serde::Deserialize)] pub struct Issue { pub number: u64, #[serde(deserialize_with = "opt_string")] pub body: String, created_at: chrono::DateTime, pub updated_at: chrono::DateTime, #[serde(default)] pub merge_commit_sha: Option, pub title: String, pub html_url: String, pub user: User, pub labels: Vec