tokio/time/interval.rs
1use crate::time::{sleep_until, Duration, Instant, Sleep};
2use crate::util::trace;
3
4use std::future::{poll_fn, Future};
5use std::panic::Location;
6use std::pin::Pin;
7use std::task::{ready, Context, Poll};
8
9/// Creates new [`Interval`] that yields with interval of `period`. The first
10/// tick completes immediately. The default [`MissedTickBehavior`] is
11/// [`Burst`](MissedTickBehavior::Burst), but this can be configured
12/// by calling [`set_missed_tick_behavior`](Interval::set_missed_tick_behavior).
13///
14/// An interval will tick indefinitely. At any time, the [`Interval`] value can
15/// be dropped. This cancels the interval.
16///
17/// This function is equivalent to
18/// [`interval_at(Instant::now(), period)`](interval_at).
19///
20/// # Panics
21///
22/// This function panics if `period` is zero.
23///
24/// # Examples
25///
26/// ```
27/// use tokio::time::{self, Duration};
28///
29/// # #[tokio::main(flavor = "current_thread")]
30/// # async fn main() {
31/// let mut interval = time::interval(Duration::from_millis(10));
32///
33/// interval.tick().await; // ticks immediately
34/// interval.tick().await; // ticks after 10ms
35/// interval.tick().await; // ticks after 10ms
36///
37/// // approximately 20ms have elapsed.
38/// # }
39/// ```
40///
41/// A simple example using `interval` to execute a task every two seconds.
42///
43/// The difference between `interval` and [`sleep`] is that an [`Interval`]
44/// measures the time since the last tick, which means that [`.tick().await`]
45/// may wait for a shorter time than the duration specified for the interval
46/// if some time has passed between calls to [`.tick().await`].
47///
48/// If the tick in the example below was replaced with [`sleep`], the task
49/// would only be executed once every three seconds, and not every two
50/// seconds.
51///
52/// ```
53/// use tokio::time;
54///
55/// async fn task_that_takes_a_second() {
56/// println!("hello");
57/// time::sleep(time::Duration::from_secs(1)).await
58/// }
59///
60/// # #[tokio::main(flavor = "current_thread")]
61/// # async fn main() {
62/// let mut interval = time::interval(time::Duration::from_secs(2));
63/// for _i in 0..5 {
64/// interval.tick().await;
65/// task_that_takes_a_second().await;
66/// }
67/// # }
68/// ```
69///
70/// [`sleep`]: crate::time::sleep()
71/// [`.tick().await`]: Interval::tick
72#[track_caller]
73pub fn interval(period: Duration) -> Interval {
74 assert!(period > Duration::new(0, 0), "`period` must be non-zero.");
75 internal_interval_at(Instant::now(), period, trace::caller_location())
76}
77
78/// Creates new [`Interval`] that yields with interval of `period` with the
79/// first tick completing at `start`. The default [`MissedTickBehavior`] is
80/// [`Burst`](MissedTickBehavior::Burst), but this can be configured
81/// by calling [`set_missed_tick_behavior`](Interval::set_missed_tick_behavior).
82///
83/// An interval will tick indefinitely. At any time, the [`Interval`] value can
84/// be dropped. This cancels the interval.
85///
86/// # Panics
87///
88/// This function panics if `period` is zero.
89///
90/// # Examples
91///
92/// ```
93/// use tokio::time::{interval_at, Duration, Instant};
94///
95/// # #[tokio::main(flavor = "current_thread")]
96/// # async fn main() {
97/// let start = Instant::now() + Duration::from_millis(50);
98/// let mut interval = interval_at(start, Duration::from_millis(10));
99///
100/// interval.tick().await; // ticks after 50ms
101/// interval.tick().await; // ticks after 10ms
102/// interval.tick().await; // ticks after 10ms
103///
104/// // approximately 70ms have elapsed.
105/// # }
106/// ```
107#[track_caller]
108pub fn interval_at(start: Instant, period: Duration) -> Interval {
109 assert!(period > Duration::new(0, 0), "`period` must be non-zero.");
110 internal_interval_at(start, period, trace::caller_location())
111}
112
113#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
114fn internal_interval_at(
115 start: Instant,
116 period: Duration,
117 location: Option<&'static Location<'static>>,
118) -> Interval {
119 #[cfg(all(tokio_unstable, feature = "tracing"))]
120 let resource_span = {
121 let location = location.expect("should have location if tracing");
122
123 tracing::trace_span!(
124 parent: None,
125 "runtime.resource",
126 concrete_type = "Interval",
127 kind = "timer",
128 loc.file = location.file(),
129 loc.line = location.line(),
130 loc.col = location.column(),
131 )
132 };
133
134 #[cfg(all(tokio_unstable, feature = "tracing"))]
135 let delay = resource_span.in_scope(|| Box::pin(sleep_until(start)));
136
137 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
138 let delay = Box::pin(sleep_until(start));
139
140 Interval {
141 delay,
142 period,
143 missed_tick_behavior: MissedTickBehavior::default(),
144 #[cfg(all(tokio_unstable, feature = "tracing"))]
145 resource_span,
146 }
147}
148
149/// Defines the behavior of an [`Interval`] when it misses a tick.
150///
151/// Sometimes, an [`Interval`]'s tick is missed. For example, consider the
152/// following:
153///
154/// ```
155/// use tokio::time::{self, Duration};
156/// # async fn task_that_takes_one_to_three_millis() {}
157///
158/// # #[tokio::main(flavor = "current_thread")]
159/// # async fn main() {
160/// // ticks every 2 milliseconds
161/// let mut interval = time::interval(Duration::from_millis(2));
162/// for _ in 0..5 {
163/// interval.tick().await;
164/// // if this takes more than 2 milliseconds, a tick will be delayed
165/// task_that_takes_one_to_three_millis().await;
166/// }
167/// # }
168/// ```
169///
170/// Generally, a tick is missed if too much time is spent without calling
171/// [`Interval::tick()`].
172///
173/// By default, when a tick is missed, [`Interval`] fires ticks as quickly as it
174/// can until it is "caught up" in time to where it should be.
175/// `MissedTickBehavior` can be used to specify a different behavior for
176/// [`Interval`] to exhibit. Each variant represents a different strategy.
177///
178/// With all tick behaviors, the next tick after a missed tick will always
179/// complete immediately.
180///
181/// Note that because the executor cannot guarantee exact precision with timers,
182/// these strategies will only apply when the delay is greater than 5
183/// milliseconds.
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub enum MissedTickBehavior {
186 /// Ticks as fast as possible until caught up.
187 ///
188 /// When this strategy is used, [`Interval`] schedules ticks "normally" (the
189 /// same as it would have if the ticks hadn't been delayed), which results
190 /// in it firing ticks as fast as possible until it is caught up in time to
191 /// where it should be. Unlike [`Delay`] and [`Skip`], the ticks yielded
192 /// when `Burst` is used (the [`Instant`]s that [`tick`](Interval::tick)
193 /// yields) aren't different than they would have been if a tick had not
194 /// been missed. Like [`Skip`], and unlike [`Delay`], the ticks may be
195 /// shortened.
196 ///
197 /// This looks something like this:
198 /// ```text
199 /// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
200 /// Actual ticks: | work -----| delay | work | work | work -| work -----|
201 /// ```
202 ///
203 /// In code:
204 ///
205 /// ```
206 /// use tokio::time::{interval, Duration};
207 /// # async fn task_that_takes_200_millis() {}
208 ///
209 /// # #[tokio::main(flavor = "current_thread")]
210 /// # async fn main() {
211 /// let mut interval = interval(Duration::from_millis(50));
212 ///
213 /// // First tick resolves immediately after creation
214 /// interval.tick().await;
215 ///
216 /// task_that_takes_200_millis().await;
217 /// // The `Interval` has missed a tick
218 ///
219 /// // Since we have exceeded our timeout, this will resolve immediately
220 /// interval.tick().await;
221 ///
222 /// // Since we are more than 100ms after the start of `interval`, this will
223 /// // also resolve immediately.
224 /// interval.tick().await;
225 ///
226 /// // Also resolves immediately, because it was supposed to resolve at
227 /// // 150ms after the start of `interval`
228 /// interval.tick().await;
229 ///
230 /// // Resolves immediately
231 /// interval.tick().await;
232 ///
233 /// // Since we have gotten to 200ms after the start of `interval`, this
234 /// // will resolve after 50ms
235 /// interval.tick().await;
236 /// # }
237 /// ```
238 ///
239 /// This is the default behavior when [`Interval`] is created with
240 /// [`interval`] and [`interval_at`].
241 ///
242 /// [`Delay`]: MissedTickBehavior::Delay
243 /// [`Skip`]: MissedTickBehavior::Skip
244 Burst,
245
246 /// Tick at multiples of `period` from when [`tick`] was called, rather than
247 /// from `start`.
248 ///
249 /// When this strategy is used and [`Interval`] has missed a tick, instead
250 /// of scheduling ticks to fire at multiples of `period` from `start` (the
251 /// time when the first tick was fired), it schedules all future ticks to
252 /// happen at a regular `period` from the point when [`tick`] was called.
253 /// Unlike [`Burst`] and [`Skip`], ticks are not shortened, and they aren't
254 /// guaranteed to happen at a multiple of `period` from `start` any longer.
255 ///
256 /// This looks something like this:
257 /// ```text
258 /// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
259 /// Actual ticks: | work -----| delay | work -----| work -----| work -----|
260 /// ```
261 ///
262 /// In code:
263 ///
264 /// ```
265 /// use tokio::time::{interval, Duration, MissedTickBehavior};
266 /// # async fn task_that_takes_more_than_50_millis() {}
267 ///
268 /// # #[tokio::main(flavor = "current_thread")]
269 /// # async fn main() {
270 /// let mut interval = interval(Duration::from_millis(50));
271 /// interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
272 ///
273 /// task_that_takes_more_than_50_millis().await;
274 /// // The `Interval` has missed a tick
275 ///
276 /// // Since we have exceeded our timeout, this will resolve immediately
277 /// interval.tick().await;
278 ///
279 /// // But this one, rather than also resolving immediately, as might happen
280 /// // with the `Burst` or `Skip` behaviors, will not resolve until
281 /// // 50ms after the call to `tick` up above. That is, in `tick`, when we
282 /// // recognize that we missed a tick, we schedule the next tick to happen
283 /// // 50ms (or whatever the `period` is) from right then, not from when
284 /// // were *supposed* to tick
285 /// interval.tick().await;
286 /// # }
287 /// ```
288 ///
289 /// [`Burst`]: MissedTickBehavior::Burst
290 /// [`Skip`]: MissedTickBehavior::Skip
291 /// [`tick`]: Interval::tick
292 Delay,
293
294 /// Skips missed ticks and tick on the next multiple of `period` from
295 /// `start`.
296 ///
297 /// When this strategy is used, [`Interval`] schedules the next tick to fire
298 /// at the next-closest tick that is a multiple of `period` away from
299 /// `start` (the point where [`Interval`] first ticked). Like [`Burst`], all
300 /// ticks remain multiples of `period` away from `start`. Like [`Delay`],
301 /// the number of fired ticks are no longer the same as they would have been
302 /// if ticks had not been missed, but unlike [`Delay`], and like [`Burst`],
303 /// the ticks may be shortened to be less than one `period` away from each
304 /// other.
305 ///
306 /// This looks something like this:
307 /// ```text
308 /// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
309 /// Actual ticks: | work -----| delay | work ---| work -----| work -----|
310 /// ```
311 ///
312 /// In code:
313 ///
314 /// ```
315 /// use tokio::time::{interval, Duration, MissedTickBehavior};
316 /// # async fn task_that_takes_75_millis() {}
317 ///
318 /// # #[tokio::main(flavor = "current_thread")]
319 /// # async fn main() {
320 /// let mut interval = interval(Duration::from_millis(50));
321 /// interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
322 ///
323 /// task_that_takes_75_millis().await;
324 /// // The `Interval` has missed a tick
325 ///
326 /// // Since we have exceeded our timeout, this will resolve immediately
327 /// interval.tick().await;
328 ///
329 /// // This one will resolve after 25ms, 100ms after the start of
330 /// // `interval`, which is the closest multiple of `period` from the start
331 /// // of `interval` after the call to `tick` up above.
332 /// interval.tick().await;
333 /// # }
334 /// ```
335 ///
336 /// [`Burst`]: MissedTickBehavior::Burst
337 /// [`Delay`]: MissedTickBehavior::Delay
338 Skip,
339}
340
341impl MissedTickBehavior {
342 /// If a tick is missed, this method is called to determine when the next tick should happen.
343 fn next_timeout(&self, timeout: Instant, now: Instant, period: Duration) -> Instant {
344 match self {
345 Self::Burst => timeout + period,
346 Self::Delay => now + period,
347 Self::Skip => {
348 now + period
349 - Duration::from_nanos(
350 ((now - timeout).as_nanos() % period.as_nanos())
351 .try_into()
352 // This operation is practically guaranteed not to
353 // fail, as in order for it to fail, `period` would
354 // have to be longer than `now - timeout`, and both
355 // would have to be longer than 584 years.
356 //
357 // If it did fail, there's not a good way to pass
358 // the error along to the user, so we just panic.
359 .expect(
360 "too much time has elapsed since the interval was supposed to tick",
361 ),
362 )
363 }
364 }
365 }
366}
367
368impl Default for MissedTickBehavior {
369 /// Returns [`MissedTickBehavior::Burst`].
370 ///
371 /// For most usecases, the [`Burst`] strategy is what is desired.
372 /// Additionally, to preserve backwards compatibility, the [`Burst`]
373 /// strategy must be the default. For these reasons,
374 /// [`MissedTickBehavior::Burst`] is the default for [`MissedTickBehavior`].
375 /// See [`Burst`] for more details.
376 ///
377 /// [`Burst`]: MissedTickBehavior::Burst
378 fn default() -> Self {
379 Self::Burst
380 }
381}
382
383/// Interval returned by [`interval`] and [`interval_at`].
384///
385/// This type allows you to wait on a sequence of instants with a certain
386/// duration between each instant. Unlike calling [`sleep`] in a loop, this lets
387/// you count the time spent between the calls to [`sleep`] as well.
388///
389/// An `Interval` can be turned into a `Stream` with [`IntervalStream`].
390///
391/// [`IntervalStream`]: https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/struct.IntervalStream.html
392/// [`sleep`]: crate::time::sleep()
393#[derive(Debug)]
394pub struct Interval {
395 /// Future that completes the next time the `Interval` yields a value.
396 delay: Pin<Box<Sleep>>,
397
398 /// The duration between values yielded by `Interval`.
399 period: Duration,
400
401 /// The strategy `Interval` should use when a tick is missed.
402 missed_tick_behavior: MissedTickBehavior,
403
404 #[cfg(all(tokio_unstable, feature = "tracing"))]
405 resource_span: tracing::Span,
406}
407
408impl Interval {
409 /// Completes when the next instant in the interval has been reached.
410 ///
411 /// # Cancel safety
412 ///
413 /// This method is cancellation safe. If `tick` is used as the branch in a `tokio::select!` and
414 /// another branch completes first, then no tick has been consumed.
415 ///
416 /// # Examples
417 ///
418 /// ```
419 /// use tokio::time;
420 ///
421 /// use std::time::Duration;
422 ///
423 /// # #[tokio::main(flavor = "current_thread")]
424 /// # async fn main() {
425 /// let mut interval = time::interval(Duration::from_millis(10));
426 ///
427 /// interval.tick().await;
428 /// // approximately 0ms have elapsed. The first tick completes immediately.
429 /// interval.tick().await;
430 /// interval.tick().await;
431 ///
432 /// // approximately 20ms have elapsed.
433 /// # }
434 /// ```
435 pub async fn tick(&mut self) -> Instant {
436 #[cfg(all(tokio_unstable, feature = "tracing"))]
437 let resource_span = self.resource_span.clone();
438 #[cfg(all(tokio_unstable, feature = "tracing"))]
439 let instant = trace::async_op(
440 || poll_fn(|cx| self.poll_tick(cx)),
441 resource_span,
442 "Interval::tick",
443 "poll_tick",
444 false,
445 );
446 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
447 let instant = poll_fn(|cx| self.poll_tick(cx));
448
449 instant.await
450 }
451
452 /// Polls for the next instant in the interval to be reached.
453 ///
454 /// This method can return the following values:
455 ///
456 /// * `Poll::Pending` if the next instant has not yet been reached.
457 /// * `Poll::Ready(instant)` if the next instant has been reached.
458 ///
459 /// When this method returns `Poll::Pending`, the current task is scheduled
460 /// to receive a wakeup when the instant has elapsed. Note that on multiple
461 /// calls to `poll_tick`, only the [`Waker`](std::task::Waker) from the
462 /// [`Context`] passed to the most recent call is scheduled to receive a
463 /// wakeup.
464 pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
465 // Wait for the delay to be done
466 ready!(Pin::new(&mut self.delay).poll(cx));
467
468 // Get the time when we were scheduled to tick
469 let timeout = self.delay.deadline();
470
471 let now = Instant::now();
472
473 // If a tick was not missed, and thus we are being called before the
474 // next tick is due, just schedule the next tick normally, one `period`
475 // after `timeout`
476 //
477 // However, if a tick took excessively long and we are now behind,
478 // schedule the next tick according to how the user specified with
479 // `MissedTickBehavior`
480 let next = if now > timeout + Duration::from_millis(5) {
481 self.missed_tick_behavior
482 .next_timeout(timeout, now, self.period)
483 } else {
484 timeout
485 .checked_add(self.period)
486 .unwrap_or_else(Instant::far_future)
487 };
488
489 // When we arrive here, the internal delay returned `Poll::Ready`.
490 // Reset the delay but do not register it. It should be registered with
491 // the next call to [`poll_tick`].
492 self.delay.as_mut().reset_without_reregister(next);
493
494 // Return the time when we were scheduled to tick
495 Poll::Ready(timeout)
496 }
497
498 /// Resets the interval to complete one period after the current time.
499 ///
500 /// This method ignores [`MissedTickBehavior`] strategy.
501 ///
502 /// This is equivalent to calling `reset_at(Instant::now() + period)`.
503 ///
504 /// # Examples
505 ///
506 /// ```
507 /// use tokio::time;
508 ///
509 /// use std::time::Duration;
510 ///
511 /// # #[tokio::main(flavor = "current_thread")]
512 /// # async fn main() {
513 /// let mut interval = time::interval(Duration::from_millis(100));
514 ///
515 /// interval.tick().await;
516 ///
517 /// time::sleep(Duration::from_millis(50)).await;
518 /// interval.reset();
519 ///
520 /// interval.tick().await;
521 /// interval.tick().await;
522 ///
523 /// // approximately 250ms have elapsed.
524 /// # }
525 /// ```
526 pub fn reset(&mut self) {
527 self.delay.as_mut().reset(Instant::now() + self.period);
528 }
529
530 /// Resets the interval immediately.
531 ///
532 /// This method ignores [`MissedTickBehavior`] strategy.
533 ///
534 /// This is equivalent to calling `reset_at(Instant::now())`.
535 ///
536 /// # Examples
537 ///
538 /// ```
539 /// use tokio::time;
540 ///
541 /// use std::time::Duration;
542 ///
543 /// # #[tokio::main(flavor = "current_thread")]
544 /// # async fn main() {
545 /// let mut interval = time::interval(Duration::from_millis(100));
546 ///
547 /// interval.tick().await;
548 ///
549 /// time::sleep(Duration::from_millis(50)).await;
550 /// interval.reset_immediately();
551 ///
552 /// interval.tick().await;
553 /// interval.tick().await;
554 ///
555 /// // approximately 150ms have elapsed.
556 /// # }
557 /// ```
558 pub fn reset_immediately(&mut self) {
559 self.delay.as_mut().reset(Instant::now());
560 }
561
562 /// Resets the interval after the specified [`std::time::Duration`].
563 ///
564 /// This method ignores [`MissedTickBehavior`] strategy.
565 ///
566 /// This is equivalent to calling `reset_at(Instant::now() + after)`.
567 ///
568 /// # Examples
569 ///
570 /// ```
571 /// use tokio::time;
572 ///
573 /// use std::time::Duration;
574 ///
575 /// # #[tokio::main(flavor = "current_thread")]
576 /// # async fn main() {
577 /// let mut interval = time::interval(Duration::from_millis(100));
578 /// interval.tick().await;
579 ///
580 /// time::sleep(Duration::from_millis(50)).await;
581 ///
582 /// let after = Duration::from_millis(20);
583 /// interval.reset_after(after);
584 ///
585 /// interval.tick().await;
586 /// interval.tick().await;
587 ///
588 /// // approximately 170ms have elapsed.
589 /// # }
590 /// ```
591 pub fn reset_after(&mut self, after: Duration) {
592 self.delay.as_mut().reset(Instant::now() + after);
593 }
594
595 /// Resets the interval to a [`crate::time::Instant`] deadline.
596 ///
597 /// Sets the next tick to expire at the given instant. If the instant is in
598 /// the past, then the [`MissedTickBehavior`] strategy will be used to
599 /// catch up. If the instant is in the future, then the next tick will
600 /// complete at the given instant, even if that means that it will sleep for
601 /// longer than the duration of this [`Interval`]. If the [`Interval`] had
602 /// any missed ticks before calling this method, then those are discarded.
603 ///
604 /// # Examples
605 ///
606 /// ```
607 /// use tokio::time::{self, Instant};
608 ///
609 /// use std::time::Duration;
610 ///
611 /// # #[tokio::main(flavor = "current_thread")]
612 /// # async fn main() {
613 /// let mut interval = time::interval(Duration::from_millis(100));
614 /// interval.tick().await;
615 ///
616 /// time::sleep(Duration::from_millis(50)).await;
617 ///
618 /// let deadline = Instant::now() + Duration::from_millis(30);
619 /// interval.reset_at(deadline);
620 ///
621 /// interval.tick().await;
622 /// interval.tick().await;
623 ///
624 /// // approximately 180ms have elapsed.
625 /// # }
626 /// ```
627 pub fn reset_at(&mut self, deadline: Instant) {
628 self.delay.as_mut().reset(deadline);
629 }
630
631 /// Returns the [`MissedTickBehavior`] strategy currently being used.
632 pub fn missed_tick_behavior(&self) -> MissedTickBehavior {
633 self.missed_tick_behavior
634 }
635
636 /// Sets the [`MissedTickBehavior`] strategy that should be used.
637 pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) {
638 self.missed_tick_behavior = behavior;
639 }
640
641 /// Returns the period of the interval.
642 pub fn period(&self) -> Duration {
643 self.period
644 }
645}