Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a method on Receiver that gives information on queue status in broadcast channel #4542

Merged

Conversation

b-naber
Copy link
Contributor

@b-naber b-naber commented Feb 25, 2022

Attempts to fix #4405

This PR adds a method recv_with_queue_status to Receiver that returns (in a successful call) a tuple consisting of the received message and the number of messages that have been sent into the channel but have yet to be received by this Receiver.

@github-actions github-actions bot added the R-loom Run loom tests on this PR label Feb 25, 2022
@b-naber b-naber force-pushed the queue-status-receiver-bounded-broadcast branch from f418506 to 0518874 Compare February 25, 2022 09:31
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Feb 25, 2022
tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
tokio/src/sync/broadcast.rs Outdated Show resolved Hide resolved
@b-naber b-naber force-pushed the queue-status-receiver-bounded-broadcast branch 2 times, most recently from 28bffd0 to 2fff4dc Compare February 25, 2022 17:38
@b-naber
Copy link
Contributor Author

b-naber commented Feb 25, 2022

@Darksonn Updated the PR to what we discussed on discord.

@b-naber b-naber force-pushed the queue-status-receiver-bounded-broadcast branch from 2fff4dc to 7aae066 Compare February 25, 2022 19:17
@b-naber b-naber force-pushed the queue-status-receiver-bounded-broadcast branch from 7aae066 to b36ce25 Compare February 26, 2022 10:16
/// assert_eq!(rx1.num_msgs(), 0);
/// }
/// ```
pub fn num_msgs(&self) -> u64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is num_msgs the right name for this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but I think it's clearer than len (which is mostly used for container like data structures). Plus clippy complained that this missed an empty method given that it had a len method, where again I'm not sure it makes sense to speak of a Receiver as being empty, though some people might like that interface. num_msgs is probably not great either, I was thinking of something like num_queued_messages, but thought it might have been too verbose. I'm open to choose whatever you think is the most plausible choice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I posted this question for discussion on #tokio-dev on discord.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be inclined to just call this len --- crossbeam::channel has a len method, and I think the meaning of referring to a channel's "length" is relatively unambiguous. Alternatively, if we don't want to call it a "length", i would suggest num_queued --- I don't think adding "messages" to the method name conveys as much information...

/// assert_eq!(rx1.num_msgs(), 0);
/// }
/// ```
pub fn num_msgs(&self) -> u64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is u64 the right return type here? The usize type seems more reasonable to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose that because tail.pos has that type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, the capacity argument to the constructor for the channel takes an usize.

(The reason that tail.pos has the type u64 is that it needs to fit the total number of messages that will be sent on the channel, not just the total number of messages in-flight at any one time.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be usize.

@carllerche
Copy link
Member

carllerche commented Mar 2, 2022

I would strongly suggest sticking w/ prior-art for naming here. I would vote for len() and capacity().

Edit: To clarify a bit more, it isn't a hard rule that we must copy existing fn names, but it should be the default choice and we would need a strong reason to deviate. I don't see a strong argument against using len() and capacity() here.

@b-naber b-naber force-pushed the queue-status-receiver-bounded-broadcast branch from 878b8de to 6ba3fbd Compare March 3, 2022 15:40
@b-naber b-naber force-pushed the queue-status-receiver-bounded-broadcast branch from 6ba3fbd to 841963f Compare March 3, 2022 15:57
Comment on lines 697 to 699
/// If the returned value from `len` is larger or equal to the capacity of
/// the channel any call to [`recv`] will return an `Err(RecvError::Lagged)`
/// and any call to [`try_recv`] will return an `Err(TryRecvError::Lagged)`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you also get a lagged error if they are equal? I don't think so.

Maybe a test would be in order.

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@Darksonn Darksonn merged commit 2f944df into tokio-rs:master Mar 7, 2022
@b-naber b-naber deleted the queue-status-receiver-bounded-broadcast branch March 7, 2022 12:53
hawkw added a commit that referenced this pull request Apr 27, 2022
# 1.18.0 (April 27, 2022)

This release adds a number of new APIs in `tokio::net`, `tokio::signal`, and
`tokio::sync`. In addition, it adds new unstable APIs to `tokio::task` (`Id`s
for uniquely identifying a task, and `AbortHandle` for remotely cancelling a
task), as well as a number of bugfixes.

### Fixed

- blocking: add missing `#[track_caller]` for `spawn_blocking` ([#4616])
- macros: fix `select` macro to process 64 branches ([#4519])
- net: fix `try_io` methods not calling Mio's `try_io` internally ([#4582])
- runtime: recover when OS fails to spawn a new thread ([#4485])

### Added

- macros: support setting a custom crate name for `#[tokio::main]` and
  `#[tokio::test]` ([#4613])
- net: add `UdpSocket::peer_addr` ([#4611])
- net: add `try_read_buf` method for named pipes ([#4626])
- signal: add `SignalKind` `Hash`/`Eq` impls and `c_int` conversion ([#4540])
- signal: add support for signals up to `SIGRTMAX` ([#4555])
- sync: add `watch::Sender::send_modify` method ([#4310])
- sync: add `broadcast::Receiver::len` method ([#4542])
- sync: add `watch::Receiver::same_channel` method ([#4581])
- sync: implement `Clone` for `RecvError` types ([#4560])

### Changed

- update `nix` to 0.24, limit features ([#4631])
- update `mio` to 0.8.1 ([#4582])
- macros: rename `tokio::select!`'s internal `util` module ([#4543])
- runtime: use `Vec::with_capacity` when building runtime ([#4553])

### Documented

- improve docs for `tokio_unstable` ([#4524])
- runtime: include more documentation for thread_pool/worker ([#4511])
- runtime: update `Handle::current`'s docs to mention `EnterGuard` ([#4567])
- time: clarify platform specific timer resolution ([#4474])
- signal: document that `Signal::recv` is cancel-safe ([#4634])
- sync: `UnboundedReceiver` close docs ([#4548])

### Unstable

The following changes only apply when building with `--cfg tokio_unstable`:

- task: add `task::Id` type ([#4630])
- task: add `AbortHandle` type for cancelling tasks in a `JoinSet` ([#4530],
  [#4640])
- task: fix missing `doc(cfg(...))` attributes for `JoinSet` ([#4531])
- task: fix broken link in `AbortHandle` RustDoc ([#4545])
- metrics: add initial IO driver metrics ([#4507])

[#4616]: #4616
[#4519]: #4519
[#4582]: #4582
[#4485]: #4485
[#4613]: #4613
[#4611]: #4611
[#4626]: #4626
[#4540]: #4540
[#4555]: #4555
[#4310]: #4310
[#4542]: #4542
[#4581]: #4581
[#4560]: #4560
[#4631]: #4631
[#4582]: #4582
[#4543]: #4543
[#4553]: #4553
[#4524]: #4524
[#4511]: #4511
[#4567]: #4567
[#4474]: #4474
[#4634]: #4634
[#4548]: #4548
[#4630]: #4630
[#4530]: #4530
[#4640]: #4640
[#4531]: #4531
[#4545]: #4545
[#4507]: #4507
hawkw added a commit that referenced this pull request Apr 27, 2022
# 1.18.0 (April 27, 2022)

This release adds a number of new APIs in `tokio::net`, `tokio::signal`, and
`tokio::sync`. In addition, it adds new unstable APIs to `tokio::task` (`Id`s
for uniquely identifying a task, and `AbortHandle` for remotely cancelling a
task), as well as a number of bugfixes.

### Fixed

- blocking: add missing `#[track_caller]` for `spawn_blocking` ([#4616])
- macros: fix `select` macro to process 64 branches ([#4519])
- net: fix `try_io` methods not calling Mio's `try_io` internally ([#4582])
- runtime: recover when OS fails to spawn a new thread ([#4485])

### Added

- macros: support setting a custom crate name for `#[tokio::main]` and
  `#[tokio::test]` ([#4613])
- net: add `UdpSocket::peer_addr` ([#4611])
- net: add `try_read_buf` method for named pipes ([#4626])
- signal: add `SignalKind` `Hash`/`Eq` impls and `c_int` conversion ([#4540])
- signal: add support for signals up to `SIGRTMAX` ([#4555])
- sync: add `watch::Sender::send_modify` method ([#4310])
- sync: add `broadcast::Receiver::len` method ([#4542])
- sync: add `watch::Receiver::same_channel` method ([#4581])
- sync: implement `Clone` for `RecvError` types ([#4560])

### Changed

- update `nix` to 0.24, limit features ([#4631])
- update `mio` to 0.8.1 ([#4582])
- macros: rename `tokio::select!`'s internal `util` module ([#4543])
- runtime: use `Vec::with_capacity` when building runtime ([#4553])

### Documented

- improve docs for `tokio_unstable` ([#4524])
- runtime: include more documentation for thread_pool/worker ([#4511])
- runtime: update `Handle::current`'s docs to mention `EnterGuard` ([#4567])
- time: clarify platform specific timer resolution ([#4474])
- signal: document that `Signal::recv` is cancel-safe ([#4634])
- sync: `UnboundedReceiver` close docs ([#4548])

### Unstable

The following changes only apply when building with `--cfg tokio_unstable`:

- task: add `task::Id` type ([#4630])
- task: add `AbortHandle` type for cancelling tasks in a `JoinSet` ([#4530],
  [#4640])
- task: fix missing `doc(cfg(...))` attributes for `JoinSet` ([#4531])
- task: fix broken link in `AbortHandle` RustDoc ([#4545])
- metrics: add initial IO driver metrics ([#4507])

[#4616]: #4616
[#4519]: #4519
[#4582]: #4582
[#4485]: #4485
[#4613]: #4613
[#4611]: #4611
[#4626]: #4626
[#4540]: #4540
[#4555]: #4555
[#4310]: #4310
[#4542]: #4542
[#4581]: #4581
[#4560]: #4560
[#4631]: #4631
[#4582]: #4582
[#4543]: #4543
[#4553]: #4553
[#4524]: #4524
[#4511]: #4511
[#4567]: #4567
[#4474]: #4474
[#4634]: #4634
[#4548]: #4548
[#4630]: #4630
[#4530]: #4530
[#4640]: #4640
[#4531]: #4531
[#4545]: #4545
[#4507]: #4507
hawkw added a commit that referenced this pull request Apr 27, 2022
# 1.18.0 (April 27, 2022)

This release adds a number of new APIs in `tokio::net`, `tokio::signal`, and
`tokio::sync`. In addition, it adds new unstable APIs to `tokio::task` (`Id`s
for uniquely identifying a task, and `AbortHandle` for remotely cancelling a
task), as well as a number of bugfixes.

### Fixed

- blocking: add missing `#[track_caller]` for `spawn_blocking` ([#4616])
- macros: fix `select` macro to process 64 branches ([#4519])
- net: fix `try_io` methods not calling Mio's `try_io` internally ([#4582])
- runtime: recover when OS fails to spawn a new thread ([#4485])

### Added

- macros: support setting a custom crate name for `#[tokio::main]` and
  `#[tokio::test]` ([#4613])
- net: add `UdpSocket::peer_addr` ([#4611])
- net: add `try_read_buf` method for named pipes ([#4626])
- signal: add `SignalKind` `Hash`/`Eq` impls and `c_int` conversion ([#4540])
- signal: add support for signals up to `SIGRTMAX` ([#4555])
- sync: add `watch::Sender::send_modify` method ([#4310])
- sync: add `broadcast::Receiver::len` method ([#4542])
- sync: add `watch::Receiver::same_channel` method ([#4581])
- sync: implement `Clone` for `RecvError` types ([#4560])

### Changed

- update `nix` to 0.24, limit features ([#4631])
- update `mio` to 0.8.1 ([#4582])
- macros: rename `tokio::select!`'s internal `util` module ([#4543])
- runtime: use `Vec::with_capacity` when building runtime ([#4553])

### Documented

- improve docs for `tokio_unstable` ([#4524])
- runtime: include more documentation for thread_pool/worker ([#4511])
- runtime: update `Handle::current`'s docs to mention `EnterGuard` ([#4567])
- time: clarify platform specific timer resolution ([#4474])
- signal: document that `Signal::recv` is cancel-safe ([#4634])
- sync: `UnboundedReceiver` close docs ([#4548])

### Unstable

The following changes only apply when building with `--cfg tokio_unstable`:

- task: add `task::Id` type ([#4630])
- task: add `AbortHandle` type for cancelling tasks in a `JoinSet` ([#4530],
  [#4640])
- task: fix missing `doc(cfg(...))` attributes for `JoinSet` ([#4531])
- task: fix broken link in `AbortHandle` RustDoc ([#4545])
- metrics: add initial IO driver metrics ([#4507])

[#4616]: #4616
[#4519]: #4519
[#4582]: #4582
[#4485]: #4485
[#4613]: #4613
[#4611]: #4611
[#4626]: #4626
[#4540]: #4540
[#4555]: #4555
[#4310]: #4310
[#4542]: #4542
[#4581]: #4581
[#4560]: #4560
[#4631]: #4631
[#4582]: #4582
[#4543]: #4543
[#4553]: #4553
[#4524]: #4524
[#4511]: #4511
[#4567]: #4567
[#4474]: #4474
[#4634]: #4634
[#4548]: #4548
[#4630]: #4630
[#4530]: #4530
[#4640]: #4640
[#4531]: #4531
[#4545]: #4545
[#4507]: #4507

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
crapStone pushed a commit to Calciumdibromid/CaBr2 that referenced this pull request May 1, 2022
This PR contains the following updates:

| Package | Type | Update | Change |
|---|---|---|---|
| [tokio](https://tokio.rs) ([source](https://github.com/tokio-rs/tokio)) | dependencies | minor | `1.17.0` -> `1.18.0` |
| [tokio](https://tokio.rs) ([source](https://github.com/tokio-rs/tokio)) | dev-dependencies | minor | `1.17.0` -> `1.18.0` |

---

### Release Notes

<details>
<summary>tokio-rs/tokio</summary>

### [`v1.18.0`](https://github.com/tokio-rs/tokio/releases/tokio-1.18.0)

[Compare Source](tokio-rs/tokio@tokio-1.17.0...tokio-1.18.0)

##### 1.18.0 (April 27, 2022)

This release adds a number of new APIs in `tokio::net`, `tokio::signal`, and
`tokio::sync`. In addition, it adds new unstable APIs to `tokio::task` (`Id`s
for uniquely identifying a task, and `AbortHandle` for remotely cancelling a
task), as well as a number of bugfixes.

##### Fixed

-   blocking: add missing `#[track_caller]` for `spawn_blocking` ([#&#8203;4616](tokio-rs/tokio#4616))
-   macros: fix `select` macro to process 64 branches ([#&#8203;4519](tokio-rs/tokio#4519))
-   net: fix `try_io` methods not calling Mio's `try_io` internally ([#&#8203;4582](tokio-rs/tokio#4582))
-   runtime: recover when OS fails to spawn a new thread ([#&#8203;4485](tokio-rs/tokio#4485))

##### Added

-   macros: support setting a custom crate name for `#[tokio::main]` and
    `#[tokio::test]` ([#&#8203;4613](tokio-rs/tokio#4613))
-   net: add `UdpSocket::peer_addr` ([#&#8203;4611](tokio-rs/tokio#4611))
-   net: add `try_read_buf` method for named pipes ([#&#8203;4626](tokio-rs/tokio#4626))
-   signal: add `SignalKind` `Hash`/`Eq` impls and `c_int` conversion ([#&#8203;4540](tokio-rs/tokio#4540))
-   signal: add support for signals up to `SIGRTMAX` ([#&#8203;4555](tokio-rs/tokio#4555))
-   sync: add `watch::Sender::send_modify` method ([#&#8203;4310](tokio-rs/tokio#4310))
-   sync: add `broadcast::Receiver::len` method ([#&#8203;4542](tokio-rs/tokio#4542))
-   sync: add `watch::Receiver::same_channel` method ([#&#8203;4581](tokio-rs/tokio#4581))
-   sync: implement `Clone` for `RecvError` types ([#&#8203;4560](tokio-rs/tokio#4560))

##### Changed

-   update `mio` to 0.8.1 ([#&#8203;4582](tokio-rs/tokio#4582))
-   macros: rename `tokio::select!`'s internal `util` module ([#&#8203;4543](tokio-rs/tokio#4543))
-   runtime: use `Vec::with_capacity` when building runtime ([#&#8203;4553](tokio-rs/tokio#4553))

##### Documented

-   improve docs for `tokio_unstable` ([#&#8203;4524](tokio-rs/tokio#4524))
-   runtime: include more documentation for thread_pool/worker ([#&#8203;4511](tokio-rs/tokio#4511))
-   runtime: update `Handle::current`'s docs to mention `EnterGuard` ([#&#8203;4567](tokio-rs/tokio#4567))
-   time: clarify platform specific timer resolution ([#&#8203;4474](tokio-rs/tokio#4474))
-   signal: document that `Signal::recv` is cancel-safe ([#&#8203;4634](tokio-rs/tokio#4634))
-   sync: `UnboundedReceiver` close docs ([#&#8203;4548](tokio-rs/tokio#4548))

##### Unstable

The following changes only apply when building with `--cfg tokio_unstable`:

-   task: add `task::Id` type ([#&#8203;4630](tokio-rs/tokio#4630))
-   task: add `AbortHandle` type for cancelling tasks in a `JoinSet` ([#&#8203;4530](tokio-rs/tokio#4530)],
    \[[#&#8203;4640](tokio-rs/tokio#4640))
-   task: fix missing `doc(cfg(...))` attributes for `JoinSet` ([#&#8203;4531](tokio-rs/tokio#4531))
-   task: fix broken link in `AbortHandle` RustDoc ([#&#8203;4545](tokio-rs/tokio#4545))
-   metrics: add initial IO driver metrics ([#&#8203;4507](tokio-rs/tokio#4507))

</details>

---

### Configuration

📅 **Schedule**: At any time (no schedule defined).

🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied.

♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox.

🔕 **Ignore**: Close this PR and you won't be reminded about these updates again.

---

 - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, click this checkbox.

---

This PR has been generated by [Renovate Bot](https://github.com/renovatebot/renovate).

Co-authored-by: cabr2-bot <cabr2.help@gmail.com>
Reviewed-on: https://codeberg.org/Calciumdibromid/CaBr2/pulls/1327
Reviewed-by: crapStone <crapstone@noreply.codeberg.org>
Co-authored-by: Calciumdibromid Bot <cabr2_bot@noreply.codeberg.org>
Co-committed-by: Calciumdibromid Bot <cabr2_bot@noreply.codeberg.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-sync Module: tokio/sync R-loom Run loom tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Ability to know the used capacity of a broadcast channel
5 participants