Skip to content

Commit

Permalink
Use row level locking for payouts (#926)
Browse files Browse the repository at this point in the history
  • Loading branch information
Geometrically authored Jun 12, 2024
1 parent 6bbd8c9 commit beaaed6
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 8 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions src/queue/payouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ use serde_json::Value;
use sqlx::postgres::PgQueryResult;
use sqlx::PgPool;
use std::collections::HashMap;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::RwLock;

pub struct PayoutsQueue {
credential: RwLock<Option<PayPalCredentials>>,
payout_options: RwLock<Option<PayoutMethods>>,
pub payouts_locks: Mutex<()>,
}

#[derive(Clone)]
Expand All @@ -48,7 +47,6 @@ impl PayoutsQueue {
PayoutsQueue {
credential: RwLock::new(None),
payout_options: RwLock::new(None),
payouts_locks: Mutex::new(()),
}
}

Expand Down
31 changes: 26 additions & 5 deletions src/routes/v3/payouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,14 @@ pub async fn paypal_webhook(
.await?;

if let Some(result) = result {
let _guard = payouts.payouts_locks.lock().await;
sqlx::query!(
"
SELECT balance FROM users WHERE id = $1 FOR UPDATE
",
result.user_id
)
.fetch_optional(&mut *transaction)
.await?;

sqlx::query!(
"
Expand Down Expand Up @@ -194,7 +201,6 @@ pub async fn tremendous_webhook(
req: HttpRequest,
pool: web::Data<PgPool>,
redis: web::Data<RedisPool>,
payouts: web::Data<PayoutsQueue>,
body: String,
) -> Result<HttpResponse, ApiError> {
let signature = req
Expand Down Expand Up @@ -247,7 +253,14 @@ pub async fn tremendous_webhook(
.await?;

if let Some(result) = result {
let _guard = payouts.payouts_locks.lock().await;
sqlx::query!(
"
SELECT balance FROM users WHERE id = $1 FOR UPDATE
",
result.user_id
)
.fetch_optional(&mut *transaction)
.await?;

sqlx::query!(
"
Expand Down Expand Up @@ -367,8 +380,6 @@ pub async fn create_payout(
));
}

let _guard = payouts_queue.payouts_locks.lock().await;

if user.balance < body.amount || body.amount < Decimal::ZERO {
return Err(ApiError::InvalidInput(
"You do not have enough funds to make this payout!".to_string(),
Expand Down Expand Up @@ -398,6 +409,16 @@ pub async fn create_payout(
}

let mut transaction = pool.begin().await?;

sqlx::query!(
"
SELECT balance FROM users WHERE id = $1 FOR UPDATE
",
user.id.0
)
.fetch_optional(&mut *transaction)
.await?;

let payout_id = generate_payout_id(&mut transaction).await?;

let payout_item = match body.method {
Expand Down

0 comments on commit beaaed6

Please sign in to comment.