♻️ refactor(rule_processor): enhance Gmail message handling with chunk processing

- introduce `process_in_chunks` for batch operations to respect API limits
- refactor `batch_trash` to use `process_in_chunks` for efficient trashing
- add documentation for `process_in_chunks` explaining chunking and API limits
This commit is contained in:
Jeremiah Russell
2025-10-30 08:02:18 +00:00
committed by Jeremiah Russell
parent 01c337d94c
commit fc85f46e80

View File

@@ -307,27 +307,6 @@ pub trait RuleProcessor {
/// Requires the `https://mail.google.com/` scope or broader. /// Requires the `https://mail.google.com/` scope or broader.
fn batch_delete(&mut self) -> impl std::future::Future<Output = Result<()>> + Send; fn batch_delete(&mut self) -> impl std::future::Future<Output = Result<()>> + Send;
/// Calls the Gmail API to permanently deletes a slice from the list of messages.
///
/// # Returns
///
/// * `Ok(())` - All messages successfully deleted
/// * `Err(_)` - Gmail API error, network failure, or insufficient permissions
///
/// # Safety
///
/// ⚠️ **DESTRUCTIVE OPERATION** - This permanently removes messages from Gmail.
/// Deleted messages cannot be recovered. Use [`batch_trash`](Self::batch_trash)
/// for recoverable deletion.
///
/// # Gmail API Requirements
///
/// Requires the `https://mail.google.com/` scope or broader.
fn call_batch_delete(
&self,
ids: &[String],
) -> impl std::future::Future<Output = Result<()>> + Send;
/// Calls the Gmail API to move a slice of the prepared messages to the Gmail /// Calls the Gmail API to move a slice of the prepared messages to the Gmail
/// trash folder. /// trash folder.
/// ///
@@ -349,6 +328,40 @@ pub trait RuleProcessor {
/// Requires the `https://www.googleapis.com/auth/gmail.modify` scope. /// Requires the `https://www.googleapis.com/auth/gmail.modify` scope.
fn batch_trash(&mut self) -> impl std::future::Future<Output = Result<()>> + Send; fn batch_trash(&mut self) -> impl std::future::Future<Output = Result<()>> + Send;
/// Chunk the message lists to respect API limits and call required action.
///
/// # Returns
///
/// * `Ok(())` - All messages successfully deleted
/// * `Err(_)` - Gmail API error, network failure, or insufficient permissions
///
fn process_in_chunks(
&self,
message_ids: Vec<String>,
action: EolAction,
) -> impl std::future::Future<Output = Result<()>> + Send;
/// Calls the Gmail API to permanently deletes a slice from the list of messages.
///
/// # Returns
///
/// * `Ok(())` - All messages successfully deleted
/// * `Err(_)` - Gmail API error, network failure, or insufficient permissions
///
/// # Safety
///
/// ⚠️ **DESTRUCTIVE OPERATION** - This permanently removes messages from Gmail.
/// Deleted messages cannot be recovered. Use [`batch_trash`](Self::batch_trash)
/// for recoverable deletion.
///
/// # Gmail API Requirements
///
/// Requires the `https://mail.google.com/` scope or broader.
fn call_batch_delete(
&self,
ids: &[String],
) -> impl std::future::Future<Output = Result<()>> + Send;
/// Moves all prepared messages to the Gmail trash folder. /// Moves all prepared messages to the Gmail trash folder.
/// ///
/// Messages moved to trash can be recovered within 30 days through the Gmail /// Messages moved to trash can be recovered within 30 days through the Gmail
@@ -540,6 +553,13 @@ impl RuleProcessor for GmailClient {
self.log_messages("Message with subject `", "` moved to trash") self.log_messages("Message with subject `", "` moved to trash")
.await?; .await?;
self.process_in_chunks(message_ids, EolAction::Trash)
.await?;
Ok(())
}
async fn process_in_chunks(&self, message_ids: Vec<String>, action: EolAction) -> Result<()> {
let (chunks, remainder) = message_ids.as_chunks::<1000>(); let (chunks, remainder) = message_ids.as_chunks::<1000>();
log::trace!( log::trace!(
"Message list chopped into {} chunks with {} ids in the remainder", "Message list chopped into {} chunks with {} ids in the remainder",
@@ -547,16 +567,21 @@ impl RuleProcessor for GmailClient {
remainder.len() remainder.len()
); );
let act = async |action, list| match action {
EolAction::Trash => self.call_batch_trash(list).await,
EolAction::Delete => self.call_batch_delete(list).await,
};
if !chunks.is_empty() { if !chunks.is_empty() {
for (i, chunk) in chunks.iter().enumerate() { for (i, chunk) in chunks.iter().enumerate() {
log::trace!("Processing chunk {i}"); log::trace!("Processing chunk {i}");
self.call_batch_delete(chunk).await?; act(action, chunk).await?;
} }
} }
if !remainder.is_empty() { if !remainder.is_empty() {
log::trace!("Processing remainder."); log::trace!("Processing remainder.");
self.call_batch_delete(remainder).await?; act(action, remainder).await?;
} }
Ok(()) Ok(())
@@ -886,6 +911,14 @@ mod tests {
async fn call_batch_trash(&self, _ids: &[String]) -> Result<()> { async fn call_batch_trash(&self, _ids: &[String]) -> Result<()> {
Ok(()) Ok(())
} }
async fn process_in_chunks(
&self,
_message_ids: Vec<String>,
_action: EolAction,
) -> Result<()> {
Ok(())
}
} }
let mut processor = MockProcessor { let mut processor = MockProcessor {