sqlx_core/
sync.rs

1// For types with identical signatures that don't require runtime support,
2// we can just arbitrarily pick one to use based on what's enabled.
3//
4// We'll generally lean towards Tokio's types as those are more featureful
5// (including `tokio-console` support) and more widely deployed.
6
7pub struct AsyncSemaphore {
8    // We use the semaphore from futures-intrusive as the one from async-std
9    // is missing the ability to add arbitrary permits, and is not guaranteed to be fair:
10    // * https://github.com/smol-rs/async-lock/issues/22
11    // * https://github.com/smol-rs/async-lock/issues/23
12    //
13    // We're on the look-out for a replacement, however, as futures-intrusive is not maintained
14    // and there are some soundness concerns (although it turns out any intrusive future is unsound
15    // in MIRI due to the necessitated mutable aliasing):
16    // https://github.com/launchbadge/sqlx/issues/1668
17    #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
18    inner: futures_intrusive::sync::Semaphore,
19
20    #[cfg(feature = "_rt-tokio")]
21    inner: tokio::sync::Semaphore,
22}
23
24impl AsyncSemaphore {
25    #[track_caller]
26    pub fn new(fair: bool, permits: usize) -> Self {
27        if cfg!(not(any(feature = "_rt-async-std", feature = "_rt-tokio"))) {
28            crate::rt::missing_rt((fair, permits));
29        }
30
31        AsyncSemaphore {
32            #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
33            inner: futures_intrusive::sync::Semaphore::new(fair, permits),
34            #[cfg(feature = "_rt-tokio")]
35            inner: {
36                debug_assert!(fair, "Tokio only has fair permits");
37                tokio::sync::Semaphore::new(permits)
38            },
39        }
40    }
41
42    pub fn permits(&self) -> usize {
43        #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
44        return self.inner.permits();
45
46        #[cfg(feature = "_rt-tokio")]
47        return self.inner.available_permits();
48
49        #[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
50        crate::rt::missing_rt(())
51    }
52
53    pub async fn acquire(&self, permits: u32) -> AsyncSemaphoreReleaser<'_> {
54        #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
55        return AsyncSemaphoreReleaser {
56            inner: self.inner.acquire(permits as usize).await,
57        };
58
59        #[cfg(feature = "_rt-tokio")]
60        return AsyncSemaphoreReleaser {
61            inner: self
62                .inner
63                // Weird quirk: `tokio::sync::Semaphore` mostly uses `usize` for permit counts,
64                // but `u32` for this and `try_acquire_many()`.
65                .acquire_many(permits)
66                .await
67                .expect("BUG: we do not expose the `.close()` method"),
68        };
69
70        #[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
71        crate::rt::missing_rt(permits)
72    }
73
74    pub fn try_acquire(&self, permits: u32) -> Option<AsyncSemaphoreReleaser<'_>> {
75        #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
76        return Some(AsyncSemaphoreReleaser {
77            inner: self.inner.try_acquire(permits as usize)?,
78        });
79
80        #[cfg(feature = "_rt-tokio")]
81        return Some(AsyncSemaphoreReleaser {
82            inner: self.inner.try_acquire_many(permits).ok()?,
83        });
84
85        #[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
86        crate::rt::missing_rt(permits)
87    }
88
89    pub fn release(&self, permits: usize) {
90        #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
91        return self.inner.release(permits);
92
93        #[cfg(feature = "_rt-tokio")]
94        return self.inner.add_permits(permits);
95
96        #[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
97        crate::rt::missing_rt(permits)
98    }
99}
100
101pub struct AsyncSemaphoreReleaser<'a> {
102    // We use the semaphore from futures-intrusive as the one from async-std
103    // is missing the ability to add arbitrary permits, and is not guaranteed to be fair:
104    // * https://github.com/smol-rs/async-lock/issues/22
105    // * https://github.com/smol-rs/async-lock/issues/23
106    //
107    // We're on the look-out for a replacement, however, as futures-intrusive is not maintained
108    // and there are some soundness concerns (although it turns out any intrusive future is unsound
109    // in MIRI due to the necessitated mutable aliasing):
110    // https://github.com/launchbadge/sqlx/issues/1668
111    #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
112    inner: futures_intrusive::sync::SemaphoreReleaser<'a>,
113
114    #[cfg(feature = "_rt-tokio")]
115    inner: tokio::sync::SemaphorePermit<'a>,
116
117    #[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
118    _phantom: std::marker::PhantomData<&'a ()>,
119}
120
121impl AsyncSemaphoreReleaser<'_> {
122    pub fn disarm(self) {
123        #[cfg(feature = "_rt-tokio")]
124        {
125            self.inner.forget();
126        }
127
128        #[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
129        {
130            let mut this = self;
131            this.inner.disarm();
132        }
133
134        #[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
135        crate::rt::missing_rt(())
136    }
137}