Skip to main content

tokio_stream/wrappers/
interval.rs

1use crate::Stream;
2use futures_core::stream::FusedStream;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::time::{Instant, Interval};
6
7/// A wrapper around [`Interval`] that implements [`Stream`].
8///
9/// # Example
10///
11/// ```
12/// use tokio::time::{Duration, Instant, interval};
13/// use tokio_stream::wrappers::IntervalStream;
14/// use tokio_stream::StreamExt;
15///
16/// # #[tokio::main(flavor = "current_thread")]
17/// # async fn main() {
18/// let start = Instant::now();
19/// let interval = interval(Duration::from_millis(10));
20/// let mut stream = IntervalStream::new(interval);
21/// for _ in 0..3 {
22///     if let Some(instant) = stream.next().await {
23///         println!("elapsed: {:.1?}", instant.duration_since(start));
24///     }
25/// }
26/// # }
27/// ```
28///
29/// [`Interval`]: struct@tokio::time::Interval
30/// [`Stream`]: trait@crate::Stream
31#[derive(Debug)]
32#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
33pub struct IntervalStream {
34    inner: Interval,
35}
36
37impl IntervalStream {
38    /// Create a new `IntervalStream`.
39    pub fn new(interval: Interval) -> Self {
40        Self { inner: interval }
41    }
42
43    /// Get back the inner `Interval`.
44    pub fn into_inner(self) -> Interval {
45        self.inner
46    }
47}
48
49impl Stream for IntervalStream {
50    type Item = Instant;
51
52    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> {
53        self.inner.poll_tick(cx).map(Some)
54    }
55
56    fn size_hint(&self) -> (usize, Option<usize>) {
57        (usize::MAX, None)
58    }
59}
60
61impl FusedStream for IntervalStream {
62    fn is_terminated(&self) -> bool {
63        false
64    }
65}
66
67impl AsRef<Interval> for IntervalStream {
68    fn as_ref(&self) -> &Interval {
69        &self.inner
70    }
71}
72
73impl AsMut<Interval> for IntervalStream {
74    fn as_mut(&mut self) -> &mut Interval {
75        &mut self.inner
76    }
77}