forked from tokio-rs/tokio-metrics
/
task.rs
2452 lines (2371 loc) · 104 KB
/
task.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
use futures_util::task::{ArcWake, AtomicWaker};
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio_stream::Stream;
#[cfg(any(feature = "rt"))]
use tokio::time::{Duration, Instant};
#[cfg(not(any(feature = "rt")))]
use std::time::{Duration, Instant};
/// Monitors key metrics of instrumented tasks.
///
/// ### Basic Usage
/// A [`TaskMonitor`] tracks key [metrics][TaskMetrics] of async tasks that have been
/// [instrumented][`TaskMonitor::instrument`] with the monitor.
///
/// In the below example, a [`TaskMonitor`] is [constructed][TaskMonitor::new] and used to
/// [instrument][TaskMonitor::instrument] three worker tasks; meanwhile, a fourth task
/// prints [metrics][TaskMetrics] in 500ms [intervals][TaskMonitor::intervals].
/// ```
/// use std::time::Duration;
///
/// #[tokio::main]
/// async fn main() {
/// // construct a metrics monitor
/// let metrics_monitor = tokio_metrics::TaskMonitor::new();
///
/// // print task metrics every 500ms
/// {
/// let metrics_monitor = metrics_monitor.clone();
/// tokio::spawn(async move {
/// for interval in metrics_monitor.intervals() {
/// // pretty-print the metric interval
/// println!("{:?}", interval);
/// // wait 500ms
/// tokio::time::sleep(Duration::from_millis(500)).await;
/// }
/// });
/// }
///
/// // instrument some tasks and await them
/// // note that the same TaskMonitor can be used for multiple tasks
/// tokio::join![
/// metrics_monitor.instrument(do_work()),
/// metrics_monitor.instrument(do_work()),
/// metrics_monitor.instrument(do_work())
/// ];
/// }
///
/// async fn do_work() {
/// for _ in 0..25 {
/// tokio::task::yield_now().await;
/// tokio::time::sleep(Duration::from_millis(100)).await;
/// }
/// }
/// ```
///
/// ### What should I instrument?
/// In most cases, you should construct a *distinct* [`TaskMonitor`] for each kind of key task.
///
/// #### Instrumenting a web application
/// For instance, a web service should have a distinct [`TaskMonitor`] for each endpoint. Within
/// each endpoint, it's prudent to additionally instrument major sub-tasks, each with their own
/// distinct [`TaskMonitor`]s. [*Why are my tasks slow?*](#why-are-my-tasks-slow) explores a
/// debugging scenario for a web service that takes this approach to instrumentation. This
/// approach is exemplified in the below example:
/// ```no_run
/// // The unabridged version of this snippet is in the examples directory of this crate.
///
/// #[tokio::main]
/// async fn main() {
/// // construct a TaskMonitor for root endpoint
/// let monitor_root = tokio_metrics::TaskMonitor::new();
///
/// // construct TaskMonitors for create_users endpoint
/// let monitor_create_user = CreateUserMonitors {
/// // monitor for the entire endpoint
/// route: tokio_metrics::TaskMonitor::new(),
/// // monitor for database insertion subtask
/// insert: tokio_metrics::TaskMonitor::new(),
/// };
///
/// // build our application with two instrumented endpoints
/// let app = axum::Router::new()
/// // `GET /` goes to `root`
/// .route("/", axum::routing::get({
/// let monitor = monitor_root.clone();
/// move || monitor.instrument(async { "Hello, World!" })
/// }))
/// // `POST /users` goes to `create_user`
/// .route("/users", axum::routing::post({
/// let monitors = monitor_create_user.clone();
/// let route = monitors.route.clone();
/// move |payload| {
/// route.instrument(create_user(payload, monitors))
/// }
/// }));
///
/// // print task metrics for each endpoint every 1s
/// let metrics_frequency = std::time::Duration::from_secs(1);
/// tokio::spawn(async move {
/// let root_intervals = monitor_root.intervals();
/// let create_user_route_intervals =
/// monitor_create_user.route.intervals();
/// let create_user_insert_intervals =
/// monitor_create_user.insert.intervals();
/// let create_user_intervals =
/// create_user_route_intervals.zip(create_user_insert_intervals);
///
/// let intervals = root_intervals.zip(create_user_intervals);
/// for (root_route, (create_user_route, create_user_insert)) in intervals {
/// println!("root_route = {:#?}", root_route);
/// println!("create_user_route = {:#?}", create_user_route);
/// println!("create_user_insert = {:#?}", create_user_insert);
/// tokio::time::sleep(metrics_frequency).await;
/// }
/// });
///
/// // run the server
/// let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 3000));
/// axum::Server::bind(&addr)
/// .serve(app.into_make_service())
/// .await
/// .unwrap();
/// }
///
/// async fn create_user(
/// axum::Json(payload): axum::Json<CreateUser>,
/// monitors: CreateUserMonitors,
/// ) -> impl axum::response::IntoResponse {
/// let user = User { id: 1337, username: payload.username, };
/// // instrument inserting the user into the db:
/// let _ = monitors.insert.instrument(insert_user(user.clone())).await;
/// (axum::http::StatusCode::CREATED, axum::Json(user))
/// }
///
/// /* definitions of CreateUserMonitors, CreateUser and User omitted for brevity */
///
/// #
/// # #[derive(Clone)]
/// # struct CreateUserMonitors {
/// # // monitor for the entire endpoint
/// # route: tokio_metrics::TaskMonitor,
/// # // monitor for database insertion subtask
/// # insert: tokio_metrics::TaskMonitor,
/// # }
/// #
/// # #[derive(serde::Deserialize)] struct CreateUser { username: String, }
/// # #[derive(Clone, serde::Serialize)] struct User { id: u64, username: String, }
/// #
/// // insert the user into the database
/// async fn insert_user(_: User) {
/// /* implementation details elided */
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
/// }
/// ```
///
/// ### Why are my tasks slow?
/// **Scenario:** You track key, high-level metrics about the customer response time. An alarm warns
/// you that P90 latency for an endpoint exceeds your targets. What is causing the increase?
///
/// #### Identifying the high-level culprits
/// A set of tasks will appear to execute more slowly if:
/// - they are taking longer to poll (i.e., they consume too much CPU time)
/// - they are waiting longer to be polled (e.g., they're waiting longer in tokio's scheduling
/// queues)
/// - they are waiting longer on external events to complete (e.g., asynchronous network requests)
///
/// The culprits, at a high level, may be some combination of these sources of latency. Fortunately,
/// you have instrumented the key tasks of each of your endpoints with distinct [`TaskMonitor`]s.
/// Using the monitors on the endpoint experiencing elevated latency, you begin by answering:
/// - [*Are my tasks taking longer to poll?*](#are-my-tasks-taking-longer-to-poll)
/// - [*Are my tasks spending more time waiting to be polled?*](#are-my-tasks-spending-more-time-waiting-to-be-polled)
/// - [*Are my tasks spending more time waiting on external events to complete?*](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
///
/// ##### Are my tasks taking longer to poll?
/// - **Did [`mean_poll_duration`][TaskMetrics::mean_poll_duration] increase?**
/// This metric reflects the mean poll duration. If it increased, it means that, on average,
/// individual polls tended to take longer. However, this does not necessarily imply increased
/// task latency: An increase in poll durations could be offset by fewer polls.
/// - **Did [`slow_poll_ratio`][TaskMetrics::slow_poll_ratio] increase?**
/// This metric reflects the proportion of polls that were 'slow'. If it increased, it means that
/// a greater proportion of polls performed excessive computation before yielding. This does not
/// necessarily imply increased task latency: An increase in the proportion of slow polls could be
/// offset by fewer or faster polls.
/// - **Did [`mean_slow_poll_duration`][TaskMetrics::mean_slow_poll_duration] increase?**
/// This metric reflects the mean duration of slow polls. If it increased, it means that, on
/// average, slow polls got slower. This does not necessarily imply increased task latency: An
/// increase in average slow poll duration could be offset by fewer or faster polls.
///
/// If so, [*why are my tasks taking longer to poll?*](#why-are-my-tasks-taking-longer-to-poll)
///
/// ##### Are my tasks spending more time waiting to be polled?
/// - **Did [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increase?**
/// This metric reflects the mean delay between the instant a task is first instrumented and the
/// instant it is first polled. If it increases, it means that, on average, tasks spent longer
/// waiting to be initially run.
/// - **Did [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration] increase?**
/// This metric reflects the mean duration that tasks spent in the scheduled state. The
/// 'scheduled' state of a task is the duration between the instant a task is awoken and the
/// instant it is subsequently polled. If this metric increases, it means that, on average, tasks
/// spent longer in tokio's queues before being polled.
///
/// If so, [*why are my tasks spending more time waiting to be polled?*](#why-are-my-tasks-spending-more-time-waiting-to-be-polled)
///
/// ##### Are my tasks spending more time waiting on external events to complete?
/// - **Did [`mean_idle_duration`][TaskMetrics::mean_idle_duration] increase?**
/// This metric reflects the mean duration that tasks spent in the idle state. The idle state is
/// the duration spanning the instant a task completes a poll, and the instant that it is next
/// awoken. Tasks inhabit this state when they are waiting for task-external events to complete
/// (e.g., an asynchronous sleep, a network request, file I/O, etc.). If this metric increases,
/// tasks, in aggregate, spent more time waiting for task-external events to complete.
///
/// If so, [*why are my tasks spending more time waiting on external events to complete?*](#why-are-my-tasks-spending-more-time-waiting-on-external-events-to-complete)
///
/// #### Digging deeper
/// Having [established the high-level culprits](#identifying-the-high-level-culprits), you now
/// search for further explanation...
///
/// ##### Why are my tasks taking longer to poll?
/// You observed that [your tasks are taking longer to poll](#are-my-tasks-taking-longer-to-poll).
/// The culprit is likely some combination of:
/// - **Your tasks are accidentally blocking.** Common culprits include:
/// 1. Using the Rust standard library's [filesystem](https://doc.rust-lang.org/std/fs/) or
/// [networking](https://doc.rust-lang.org/std/net/) APIs.
/// These APIs are synchronous; use tokio's [filesystem](https://docs.rs/tokio/latest/tokio/fs/)
/// and [networking](https://docs.rs/tokio/latest/tokio/net/) APIs, instead.
/// 3. Calling [`block_on`](https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.block_on).
/// 4. Invoking `println!` or other synchronous logging routines.
/// Invocations of `println!` involve acquiring an exclusive lock on stdout, followed by a
/// synchronous write to stdout.
/// 2. **Your tasks are computationally expensive.** Common culprits include:
/// 1. TLS/cryptographic routines
/// 2. doing a lot of processing on bytes
/// 3. calling non-Tokio resources
///
/// ##### Why are my tasks spending more time waiting to be polled?
/// You observed that [your tasks are spending more time waiting to be polled](#are-my-tasks-spending-more-time-waiting-to-be-polled)
/// suggesting some combination of:
/// - Your application is inflating the time elapsed between instrumentation and first poll.
/// - Your tasks are being scheduled into tokio's injection queue.
/// - Other tasks are spending too long without yielding, thus backing up tokio's queues.
///
/// Start by asking: [*Is time-to-first-poll unusually high?*](#is-time-to-first-poll-unusually-high)
///
/// ##### Why are my tasks spending more time waiting on external events to complete?
/// You observed that [your tasks are spending more time waiting waiting on external events to
/// complete](#are-my-tasks-spending-more-time-waiting-on-external-events-to-complete). But what
/// event? Fortunately, within the task experiencing increased idle times, you monitored several
/// sub-tasks with distinct [`TaskMonitor`]s. For each of these sub-tasks, you [*you try to identify
/// the performance culprits...*](#identifying-the-high-level-culprits)
///
/// #### Digging even deeper
///
/// ##### Is time-to-first-poll unusually high?
/// Contrast these two metrics:
/// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
/// This metric reflects the mean delay between the instant a task is first instrumented and the
/// instant it is *first* polled.
/// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
/// This metric reflects the mean delay between the instant when tasks were awoken and the
/// instant they were subsequently polled.
///
/// If the former metric exceeds the latter (or increased unexpectedly more than the latter), then
/// start by investigating [*if your application is artificially delaying the time-to-first-poll*](#is-my-application-delaying-the-time-to-first-poll).
///
/// Otherwise, investigate [*if other tasks are polling too long without yielding*](#are-other-tasks-polling-too-long-without-yielding).
///
/// ##### Is my application delaying the time-to-first-poll?
/// You observed that [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] increased, more
/// than [`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]. Your application may be
/// needlessly inflating the time elapsed between instrumentation and first poll. Are you
/// constructing (and instrumenting) tasks separately from awaiting or spawning them?
///
/// For instance, in the below example, the application induces 1 second delay between when `task`
/// is instrumented and when it is awaited:
/// ```rust
/// #[tokio::main]
/// async fn main() {
/// use tokio::time::Duration;
/// let monitor = tokio_metrics::TaskMonitor::new();
///
/// let task = monitor.instrument(async move {});
///
/// let one_sec = Duration::from_secs(1);
/// tokio::time::sleep(one_sec).await;
///
/// let _ = tokio::spawn(task).await;
///
/// assert!(monitor.cumulative().total_first_poll_delay >= one_sec);
/// }
/// ```
///
/// Otherwise, [`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay] might be unusually high
/// because [*your application is spawning key tasks into tokio's injection queue...*](#is-my-application-spawning-more-tasks-into-tokio’s-injection-queue)
///
/// ##### Is my application spawning more tasks into tokio's injection queue?
/// Tasks awoken from threads *not* managed by the tokio runtime are scheduled with a slower,
/// global "injection" queue.
///
/// You may be notifying runtime tasks from off-runtime. For instance, Given the following:
/// ```ignore
/// #[tokio::main]
/// async fn main() {
/// for _ in 0..100 {
/// let (tx, rx) = oneshot::channel();
/// tokio::spawn(async move {
/// tx.send(());
/// })
///
/// rx.await;
/// }
/// }
/// ```
/// One would expect this to run efficiently, however, the main task is run *off* the main runtime
/// and the spawned tasks are *on* runtime, which means the snippet will run much slower than:
/// ```ignore
/// #[tokio::main]
/// async fn main() {
/// tokio::spawn(async {
/// for _ in 0..100 {
/// let (tx, rx) = oneshot::channel();
/// tokio::spawn(async move {
/// tx.send(());
/// })
///
/// rx.await;
/// }
/// }).await;
/// }
/// ```
/// The slowdown is caused by a higher time between the `rx` task being notified (in `tx.send()`)
/// and the task being polled.
///
/// ##### Are other tasks polling too long without yielding?
/// You suspect that your tasks are slow because they're backed up in tokio's scheduling queues. For
/// *each* of your application's [`TaskMonitor`]s you check to see [*if their associated tasks are
/// taking longer to poll...*](#are-my-tasks-taking-longer-to-poll)
///
/// ### Limitations
/// The [`TaskMetrics`] type uses [`u64`] to represent both event counters and durations (measured
/// in nanoseconds). Consequently, event counters are accurate for ≤ [`u64::MAX`] events, and
/// durations are accurate for ≤ [`u64::MAX`] nanoseconds.
///
/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::cumulative`] increase
/// monotonically with each successive invocation of [`TaskMonitor::cumulative`]. Upon overflow,
/// counters and durations wrap.
///
/// The counters and durations of [`TaskMetrics`] produced by [`TaskMonitor::intervals`] are
/// calculated by computing the difference of metrics in successive invocations of
/// [`TaskMonitor::cumulative`]. If, within a monitoring interval, an event occurs more than
/// [`u64::MAX`] times, or a monitored duration exceeds [`u64::MAX`] nanoseconds, the metrics for
/// that interval will overflow and not be accurate.
///
/// ##### Examples at the limits
/// Consider the [`TaskMetrics::total_first_poll_delay`] metric. This metric accurately reflects
/// delays between instrumentation and first-poll ≤ [`u64::MAX`] nanoseconds:
/// ```
/// use tokio::time::Duration;
///
/// #[tokio::main(flavor = "current_thread", start_paused = true)]
/// async fn main() {
/// let monitor = tokio_metrics::TaskMonitor::new();
/// let mut interval = monitor.intervals();
/// let mut next_interval = || interval.next().unwrap();
///
/// // construct and instrument a task, but do not `await` it
/// let task = monitor.instrument(async {});
///
/// // this is the maximum duration representable by tokio_metrics
/// let max_duration = Duration::from_nanos(u64::MAX);
///
/// // let's advance the clock by this amount and poll `task`
/// let _ = tokio::time::advance(max_duration).await;
/// task.await;
///
/// // durations ≤ `max_duration` are accurately reflected in this metric
/// assert_eq!(next_interval().total_first_poll_delay, max_duration);
/// assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
/// }
/// ```
/// If the total delay between instrumentation and first poll exceeds [`u64::MAX`] nanoseconds,
/// [`total_first_poll_delay`][TaskMetrics::total_first_poll_delay] will overflow:
/// ```
/// # use tokio::time::Duration;
/// #
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
/// # async fn main() {
/// # let monitor = tokio_metrics::TaskMonitor::new();
/// #
/// // construct and instrument a task, but do not `await` it
/// let task_a = monitor.instrument(async {});
/// let task_b = monitor.instrument(async {});
///
/// // this is the maximum duration representable by tokio_metrics
/// let max_duration = Duration::from_nanos(u64::MAX);
///
/// // let's advance the clock by 1.5x this amount and await `task`
/// let _ = tokio::time::advance(3 * (max_duration / 2)).await;
/// task_a.await;
/// task_b.await;
///
/// // the `total_first_poll_delay` has overflowed
/// assert!(monitor.cumulative().total_first_poll_delay < max_duration);
/// # }
/// ```
/// If *many* tasks are spawned, it will take far less than a [`u64::MAX`]-nanosecond delay to bring
/// this metric to the precipice of overflow:
/// ```
/// # use tokio::time::Duration;
/// #
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
/// # async fn main() {
/// # let monitor = tokio_metrics::TaskMonitor::new();
/// # let mut interval = monitor.intervals();
/// # let mut next_interval = || interval.next().unwrap();
/// #
/// // construct and instrument u16::MAX tasks, but do not `await` them
/// let first_poll_count = u16::MAX as u64;
/// let mut tasks = Vec::with_capacity(first_poll_count as usize);
/// for _ in 0..first_poll_count { tasks.push(monitor.instrument(async {})); }
///
/// // this is the maximum duration representable by tokio_metrics
/// let max_duration = u64::MAX;
///
/// // let's advance the clock justenough such that all of the time-to-first-poll
/// // delays summed nearly equals `max_duration_nanos`, less some remainder...
/// let iffy_delay = max_duration / (first_poll_count as u64);
/// let small_remainder = max_duration % first_poll_count;
/// let _ = tokio::time::advance(Duration::from_nanos(iffy_delay)).await;
///
/// // ...then poll all of the instrumented tasks:
/// for task in tasks { task.await; }
///
/// // `total_first_poll_delay` is at the precipice of overflowing!
/// assert_eq!(
/// next_interval().total_first_poll_delay.as_nanos(),
/// (max_duration - small_remainder) as u128
/// );
/// assert_eq!(
/// monitor.cumulative().total_first_poll_delay.as_nanos(),
/// (max_duration - small_remainder) as u128
/// );
/// # }
/// ```
/// Frequent, interval-sampled metrics will retain their accuracy, even if the cumulative
/// metrics counter overflows at most once in the midst of an interval:
/// ```
/// # use tokio::time::Duration;
/// # use tokio_metrics::TaskMonitor;
/// #
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
/// # async fn main() {
/// # let monitor = TaskMonitor::new();
/// # let mut interval = monitor.intervals();
/// # let mut next_interval = || interval.next().unwrap();
/// #
/// let first_poll_count = u16::MAX as u64;
/// let batch_size = first_poll_count / 3;
///
/// let max_duration_ns = u64::MAX;
/// let iffy_delay_ns = max_duration_ns / first_poll_count;
///
/// // Instrument `batch_size` number of tasks, wait for `delay` nanoseconds,
/// // then await the instrumented tasks.
/// async fn run_batch(monitor: &TaskMonitor, batch_size: usize, delay: u64) {
/// let mut tasks = Vec::with_capacity(batch_size);
/// for _ in 0..batch_size { tasks.push(monitor.instrument(async {})); }
/// let _ = tokio::time::advance(Duration::from_nanos(delay)).await;
/// for task in tasks { task.await; }
/// }
///
/// // this is how much `total_time_to_first_poll_ns` will
/// // increase with each batch we run
/// let batch_delay = iffy_delay_ns * batch_size;
///
/// // run batches 1, 2, and 3
/// for i in 1..=3 {
/// run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
/// assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
/// assert_eq!(i * batch_delay as u128, monitor.cumulative().total_first_poll_delay.as_nanos());
/// }
///
/// /* now, the `total_time_to_first_poll_ns` counter is at the precipice of overflow */
/// assert_eq!(monitor.cumulative().total_first_poll_delay.as_nanos(), max_duration_ns as u128);
///
/// // run batch 4
/// run_batch(&monitor, batch_size as usize, iffy_delay_ns).await;
/// // the interval counter remains accurate
/// assert_eq!(1 * batch_delay as u128, next_interval().total_first_poll_delay.as_nanos());
/// // but the cumulative counter has overflowed
/// assert_eq!(batch_delay as u128 - 1, monitor.cumulative().total_first_poll_delay.as_nanos());
/// # }
/// ```
/// If a cumulative metric overflows *more than once* in the midst of an interval,
/// its interval-sampled counterpart will also overflow.
#[derive(Clone, Debug)]
pub struct TaskMonitor {
metrics: Arc<RawMetrics>,
}
pin_project! {
/// An async task that has been instrumented with [`TaskMonitor::instrument`].
#[derive(Debug)]
pub struct Instrumented<T> {
// The task being instrumented
#[pin]
task: T,
// True when the task is polled for the first time
did_poll_once: bool,
// The instant, tracked as nanoseconds since `instrumented_at`, at which the future finished
// its last poll.
idled_at: u64,
// State shared between the task and its instrumented waker.
state: Arc<State>,
}
impl<T> PinnedDrop for Instrumented<T> {
fn drop(this: Pin<&mut Self>) {
this.state.metrics.dropped_count.fetch_add(1, SeqCst);
}
}
}
/// Key metrics of [instrumented][`TaskMonitor::instrument`] tasks.
#[non_exhaustive]
#[derive(Debug, Clone, Copy, Default)]
pub struct TaskMetrics {
/// The number of tasks instrumented.
///
/// ##### Examples
/// ```
/// #[tokio::main]
/// async fn main() {
/// let monitor = tokio_metrics::TaskMonitor::new();
/// let mut interval = monitor.intervals();
/// let mut next_interval = || interval.next().unwrap();
///
/// // 0 tasks have been instrumented
/// assert_eq!(next_interval().instrumented_count, 0);
///
/// monitor.instrument(async {});
///
/// // 1 task has been instrumented
/// assert_eq!(next_interval().instrumented_count, 1);
///
/// monitor.instrument(async {});
/// monitor.instrument(async {});
///
/// // 2 tasks have been instrumented
/// assert_eq!(next_interval().instrumented_count, 2);
///
/// // since the last interval was produced, 0 tasks have been instrumented
/// assert_eq!(next_interval().instrumented_count, 0);
/// }
/// ```
pub instrumented_count: u64,
/// The number of tasks dropped.
///
/// ##### Examples
/// ```
/// #[tokio::main]
/// async fn main() {
/// let monitor = tokio_metrics::TaskMonitor::new();
/// let mut interval = monitor.intervals();
/// let mut next_interval = || interval.next().unwrap();
///
/// // 0 tasks have been dropped
/// assert_eq!(next_interval().dropped_count, 0);
///
/// let _task = monitor.instrument(async {});
///
/// // 0 tasks have been dropped
/// assert_eq!(next_interval().dropped_count, 0);
///
/// monitor.instrument(async {}).await;
/// drop(monitor.instrument(async {}));
///
/// // 2 tasks have been dropped
/// assert_eq!(next_interval().dropped_count, 2);
///
/// // since the last interval was produced, 0 tasks have been dropped
/// assert_eq!(next_interval().dropped_count, 0);
/// }
/// ```
pub dropped_count: u64,
/// The number of tasks polled for the first time.
///
/// ##### Derived metrics
/// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
/// The mean duration elapsed between the instant tasks are instrumented, and the instant they
/// are first polled.
///
/// ##### Examples
/// In the below example, no tasks are instrumented or polled in the first sampling interval;
/// one task is instrumented (but not polled) in the second sampling interval; that task is
/// awaited to completion (and, thus, polled at least once) in the third sampling interval; no
/// additional tasks are polled for the first time within the fourth sampling interval:
/// ```
/// #[tokio::main]
/// async fn main() {
/// let metrics_monitor = tokio_metrics::TaskMonitor::new();
/// let mut interval = metrics_monitor.intervals();
/// let mut next_interval = || interval.next().unwrap();
///
/// // no tasks have been constructed, instrumented, and polled at least once
/// assert_eq!(next_interval().first_poll_count, 0);
///
/// let task = metrics_monitor.instrument(async {});
///
/// // `task` has been constructed and instrumented, but has not yet been polled
/// assert_eq!(next_interval().first_poll_count, 0);
///
/// // poll `task` to completion
/// task.await;
///
/// // `task` has been constructed, instrumented, and polled at least once
/// assert_eq!(next_interval().first_poll_count, 1);
///
/// // since the last interval was produced, 0 tasks have been constructed, instrumented and polled
/// assert_eq!(next_interval().first_poll_count, 0);
///
/// }
/// ```
pub first_poll_count: u64,
/// The total duration elapsed between the instant tasks are instrumented, and the instant they
/// are first polled.
///
/// ##### Derived metrics
/// - **[`mean_first_poll_delay`][TaskMetrics::mean_first_poll_delay]**
/// The mean duration elapsed between the instant tasks are instrumented, and the instant they
/// are first polled.
///
/// ##### Examples
/// In the below example, 0 tasks have been instrumented or polled within the first sampling
/// interval, a total of 500ms elapse between the instrumentation and polling of tasks within
/// the second sampling interval, and a total of 350ms elapse between the instrumentation and
/// polling of tasks within the third sampling interval:
/// ```
/// use tokio::time::Duration;
///
/// #[tokio::main(flavor = "current_thread", start_paused = true)]
/// async fn main() {
/// let monitor = tokio_metrics::TaskMonitor::new();
/// let mut interval = monitor.intervals();
/// let mut next_interval = || interval.next().unwrap();
///
/// // no tasks have yet been created, instrumented, or polled
/// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
/// assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
///
/// // constructs and instruments a task, pauses a given duration, then awaits the task
/// async fn instrument_pause_await(monitor: &tokio_metrics::TaskMonitor, pause: Duration) {
/// let task = monitor.instrument(async move {});
/// tokio::time::sleep(pause).await;
/// task.await;
/// }
///
/// // construct and await a task that pauses for 500ms between instrumentation and first poll
/// let task_a_pause_time = Duration::from_millis(500);
/// instrument_pause_await(&monitor, task_a_pause_time).await;
///
/// assert_eq!(next_interval().total_first_poll_delay, task_a_pause_time);
/// assert_eq!(monitor.cumulative().total_first_poll_delay, task_a_pause_time);
///
/// // construct and await a task that pauses for 250ms between instrumentation and first poll
/// let task_b_pause_time = Duration::from_millis(250);
/// instrument_pause_await(&monitor, task_b_pause_time).await;
///
/// // construct and await a task that pauses for 100ms between instrumentation and first poll
/// let task_c_pause_time = Duration::from_millis(100);
/// instrument_pause_await(&monitor, task_c_pause_time).await;
///
/// assert_eq!(
/// next_interval().total_first_poll_delay,
/// task_b_pause_time + task_c_pause_time
/// );
/// assert_eq!(
/// monitor.cumulative().total_first_poll_delay,
/// task_a_pause_time + task_b_pause_time + task_c_pause_time
/// );
/// }
/// ```
///
/// ##### When is this metric recorded?
/// The delay between instrumentation and first poll is not recorded until the first poll
/// actually occurs:
/// ```
/// # use tokio::time::Duration;
/// #
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
/// # async fn main() {
/// # let monitor = tokio_metrics::TaskMonitor::new();
/// # let mut interval = monitor.intervals();
/// # let mut next_interval = || interval.next().unwrap();
/// #
/// // we construct and instrument a task, but do not `await` it
/// let task = monitor.instrument(async {});
///
/// // let's sleep for 1s before we poll `task`
/// let one_sec = Duration::from_secs(1);
/// let _ = tokio::time::sleep(one_sec).await;
///
/// // although 1s has now elapsed since the instrumentation of `task`,
/// // this is not reflected in `total_first_poll_delay`...
/// assert_eq!(next_interval().total_first_poll_delay, Duration::ZERO);
/// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
///
/// // ...and won't be until `task` is actually polled
/// task.await;
///
/// // now, the 1s delay is reflected in `total_first_poll_delay`:
/// assert_eq!(next_interval().total_first_poll_delay, one_sec);
/// assert_eq!(monitor.cumulative().total_first_poll_delay, one_sec);
/// # }
/// ```
///
/// ##### What if first-poll-delay is very large?
/// The first-poll-delay of *individual* tasks saturates at `u64::MAX` nanoseconds. However, if
/// the *total* first-poll-delay *across* monitored tasks exceeds `u64::MAX` nanoseconds, this
/// metric will wrap around:
/// ```
/// use tokio::time::Duration;
///
/// #[tokio::main(flavor = "current_thread", start_paused = true)]
/// async fn main() {
/// let monitor = tokio_metrics::TaskMonitor::new();
///
/// // construct and instrument a task, but do not `await` it
/// let task = monitor.instrument(async {});
///
/// // this is the maximum duration representable by tokio_metrics
/// let max_duration = Duration::from_nanos(u64::MAX);
///
/// // let's advance the clock by double this amount and await `task`
/// let _ = tokio::time::advance(max_duration * 2).await;
/// task.await;
///
/// // the time-to-first-poll of `task` saturates at `max_duration`
/// assert_eq!(monitor.cumulative().total_first_poll_delay, max_duration);
///
/// // ...but note that the metric *will* wrap around if more tasks are involved
/// let task = monitor.instrument(async {});
/// let _ = tokio::time::advance(Duration::from_nanos(1)).await;
/// task.await;
/// assert_eq!(monitor.cumulative().total_first_poll_delay, Duration::ZERO);
/// }
/// ```
pub total_first_poll_delay: Duration,
/// The total number of times that tasks idled, waiting to be awoken.
///
/// An idle is recorded as occurring if a if a non-zero duration elapses between the instant a
/// task completes a poll, and the instant that it is next awoken.
///
/// ##### Derived metrics
/// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**
/// The mean duration of idles.
///
/// ##### Examples
/// ```
/// #[tokio::main(flavor = "current_thread", start_paused = true)]
/// async fn main() {
/// let monitor = tokio_metrics::TaskMonitor::new();
/// let mut interval = monitor.intervals();
/// let mut next_interval = move || interval.next().unwrap();
/// let one_sec = std::time::Duration::from_secs(1);
///
/// monitor.instrument(async {}).await;
///
/// assert_eq!(next_interval().total_idled_count, 0);
/// assert_eq!(monitor.cumulative().total_idled_count, 0);
///
/// monitor.instrument(async move {
/// tokio::time::sleep(one_sec).await;
/// }).await;
///
/// assert_eq!(next_interval().total_idled_count, 1);
/// assert_eq!(monitor.cumulative().total_idled_count, 1);
///
/// monitor.instrument(async {
/// tokio::time::sleep(one_sec).await;
/// tokio::time::sleep(one_sec).await;
/// }).await;
///
/// assert_eq!(next_interval().total_idled_count, 2);
/// assert_eq!(monitor.cumulative().total_idled_count, 3);
/// }
/// ```
pub total_idled_count: u64,
/// The total duration that tasks idled.
///
/// An idle is recorded as occurring if a if a non-zero duration elapses between the instant a
/// task completes a poll, and the instant that it is next awoken.
///
/// ##### Derived metrics
/// - **[`mean_idle_duration`][TaskMetrics::mean_idle_duration]**
/// The mean duration of idles.
///
/// ##### Examples
/// ```
/// #[tokio::main(flavor = "current_thread", start_paused = true)]
/// async fn main() {
/// let monitor = tokio_metrics::TaskMonitor::new();
/// let mut interval = monitor.intervals();
/// let mut next_interval = move || interval.next().unwrap();
/// let one_sec = std::time::Duration::from_secs(1);
/// let two_sec = std::time::Duration::from_secs(2);
///
/// assert_eq!(next_interval().total_idle_duration.as_nanos(), 0);
/// assert_eq!(monitor.cumulative().total_idle_duration.as_nanos(), 0);
///
/// monitor.instrument(async move {
/// tokio::time::sleep(one_sec).await;
/// }).await;
///
/// assert_eq!(next_interval().total_idle_duration, one_sec);
/// assert_eq!(monitor.cumulative().total_idle_duration, one_sec);
///
/// monitor.instrument(async move {
/// tokio::time::sleep(two_sec).await;
/// }).await;
///
/// assert_eq!(next_interval().total_idle_duration, two_sec);
/// assert_eq!(monitor.cumulative().total_idle_duration, one_sec + two_sec);
/// }
/// ```
pub total_idle_duration: Duration,
/// The total number of times that tasks were awoken (and then, presumably, scheduled for
/// execution).
///
/// ##### Derived metrics
/// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
/// The mean duration that tasks spent waiting to be executed after awakening.
///
/// ##### Examples
/// In the below example, a task yields to the scheduler a varying number of times between
/// sampling intervals; this metric is equal to the number of times the task yielded:
/// ```
/// #[tokio::main]
/// async fn main(){
/// let metrics_monitor = tokio_metrics::TaskMonitor::new();
///
/// // [A] no tasks have been created, instrumented, and polled more than once
/// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
///
/// // [B] a `task` is created and instrumented
/// let task = {
/// let monitor = metrics_monitor.clone();
/// metrics_monitor.instrument(async move {
/// let mut interval = monitor.intervals();
/// let mut next_interval = move || interval.next().unwrap();
///
/// // [E] `task` has not yet yielded to the scheduler, and
/// // thus has not yet been scheduled since its first `poll`
/// assert_eq!(next_interval().total_scheduled_count, 0);
///
/// tokio::task::yield_now().await; // yield to the scheduler
///
/// // [F] `task` has yielded to the scheduler once (and thus been
/// // scheduled once) since the last sampling interval
/// assert_eq!(next_interval().total_scheduled_count, 1);
///
/// tokio::task::yield_now().await; // yield to the scheduler
/// tokio::task::yield_now().await; // yield to the scheduler
/// tokio::task::yield_now().await; // yield to the scheduler
///
/// // [G] `task` has yielded to the scheduler thrice (and thus been
/// // scheduled thrice) since the last sampling interval
/// assert_eq!(next_interval().total_scheduled_count, 3);
///
/// tokio::task::yield_now().await; // yield to the scheduler
///
/// next_interval
/// })
/// };
///
/// // [C] `task` has not yet been polled at all
/// assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
/// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 0);
///
/// // [D] poll `task` to completion
/// let mut next_interval = task.await;
///
/// // [H] `task` has been polled 1 times since the last sample
/// assert_eq!(next_interval().total_scheduled_count, 1);
///
/// // [I] `task` has been polled 0 times since the last sample
/// assert_eq!(next_interval().total_scheduled_count, 0);
///
/// // [J] `task` has yielded to the scheduler a total of five times
/// assert_eq!(metrics_monitor.cumulative().total_scheduled_count, 5);
/// }
/// ```
pub total_scheduled_count: u64,
/// The total duration that tasks spent waiting to be polled after awakening.
///
/// ##### Derived metrics
/// - **[`mean_scheduled_duration`][TaskMetrics::mean_scheduled_duration]**
/// The mean duration that tasks spent waiting to be executed after awakening.
///
/// ##### Examples
/// In the below example, a task that yields endlessly is raced against a task that blocks the
/// executor for 1 second; the yielding task spends approximately 1 second waiting to
/// be scheduled. In the next sampling interval, a task that yields endlessly is raced against a
/// task that blocks the executor for half a second; the yielding task spends approximately half
/// a second waiting to be scheduled.
/// ```
/// use tokio::time::Duration;
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() {
/// let metrics_monitor = tokio_metrics::TaskMonitor::new();
/// let mut interval = metrics_monitor.intervals();
/// let mut next_interval = || interval.next().unwrap();
///
/// // construct and instrument and spawn a task that yields endlessly
/// let endless_task = metrics_monitor.instrument(async {
/// loop { tokio::task::yield_now().await }
/// });
///
/// // construct and spawn a task that blocks the executor for 1 second
/// let one_sec_task = tokio::spawn(async {
/// std::thread::sleep(Duration::from_millis(1000))
/// });
///
/// // race `endless_task` against `one_sec_task`
/// tokio::select! {
/// biased;
/// _ = endless_task => { unreachable!() }
/// _ = one_sec_task => {}
/// }
///
/// // `endless_task` will have spent approximately one second waiting
/// let total_scheduled_duration = next_interval().total_scheduled_duration;
/// assert!(total_scheduled_duration >= Duration::from_millis(1000));
/// assert!(total_scheduled_duration <= Duration::from_millis(1100));
///
/// // construct and instrument and spawn a task that yields endlessly
/// let endless_task = metrics_monitor.instrument(async {
/// loop { tokio::task::yield_now().await }
/// });
///
/// // construct and spawn a task that blocks the executor for 1 second
/// let half_sec_task = tokio::spawn(async {
/// std::thread::sleep(Duration::from_millis(500))
/// });
///
/// // race `endless_task` against `half_sec_task`
/// tokio::select! {
/// biased;
/// _ = endless_task => { unreachable!() }
/// _ = half_sec_task => {}
/// }
///
/// // `endless_task` will have spent approximately half a second waiting
/// let total_scheduled_duration = next_interval().total_scheduled_duration;
/// assert!(total_scheduled_duration >= Duration::from_millis(500));
/// assert!(total_scheduled_duration <= Duration::from_millis(600));
/// }
/// ```
pub total_scheduled_duration: Duration,
/// The total number of times that tasks were polled.
///
/// ##### Definition
/// This metric is equal to [`total_fast_poll_count`][TaskMetrics::total_fast_poll_count]
/// + [`total_slow_poll_count`][TaskMetrics::total_slow_poll_count].
///
/// ##### Derived metrics
/// - **[`mean_poll_duration`][TaskMetrics::mean_poll_duration]**
/// The mean duration of polls.
///
/// ##### Examples
/// In the below example, a task with multiple yield points is await'ed to completion; this
/// metric reflects the number of `await`s within each sampling interval:
/// ```
/// #[tokio::main]
/// async fn main() {
/// let metrics_monitor = tokio_metrics::TaskMonitor::new();
///
/// // [A] no tasks have been created, instrumented, and polled more than once
/// assert_eq!(metrics_monitor.cumulative().first_poll_count, 0);
///
/// // [B] a `task` is created and instrumented
/// let task = {