Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloning Addr bypasses mailbox capacity checks for the first message sent #329

Open
jpeel opened this issue Feb 7, 2020 · 0 comments
Open

Comments

@jpeel
Copy link

jpeel commented Feb 7, 2020

When Addr is cloned, the AddressSender is cloned. In AddressSender's clone code it clones the inner member variable of the clone target, but maybe_parked is set to false and sender_task is initialized as empty. This bypasses the mailbox checks for the first message sent into the cloned Addr. AddressSender's sender method also returns an AddressSender with similar maybe_parked and sender_task fields.

I suggest that in the clone function for AddressSender, the maybe_parked and sender_task fields are cloned. Perhaps it is better to clone sender_task and then check sender_task for is_parked to set maybe_parked. From what I'm reading that shouldn't cause a problem because these are only used in the AddressSender for determining if it is parked.

In AddressSender's sender method, the fix is more complicated and would entail checking the fullness of the mailbox and setting maybe_parked and the task appropriately.

Below is code that shows the issue written for actix 0.8.3. I can make a 0.9 version if needed, but the part that does the cloning in the master branch of actix still has this problem. If run as it is, the try_send never fails because the Addr is cloned. If the clone invocation at line 90 is commented out, the try_send sometimes fails because the mailbox is full.

use actix::prelude::*;
use std::sync::atomic::{AtomicIsize, Ordering};

fn main() {
    let system = System::builder()
        .name("testsystem")
        .stop_on_panic(true)
        .build();

    let slow_actor_arbiter = Arbiter::new();
    let slow_actor_addr = Actor::start_in_arbiter(&slow_actor_arbiter, |_| SlowActor {});

    let sender_arbiter = Arbiter::new();
    let sender_addr = Actor::start_in_arbiter(&sender_arbiter, move |_| Sender { slow_actor_addr });

    Arbiter::spawn(sender_addr.send(Start {}).map_err(|e| {
        println!("sender send error {:?}", e);
    }));
    system.run().unwrap();
}

static MSG_COUNT: AtomicIsize = AtomicIsize::new(0);

struct Msg {
    pub handled: bool,
    _no_literals: (),
}

impl Msg {
    fn new() -> Self {
        MSG_COUNT.fetch_add(1, Ordering::SeqCst);
        Self {
            handled: false,
            _no_literals: (),
        }
    }
}

impl Drop for Msg {
    fn drop(&mut self) {
        let c = MSG_COUNT.fetch_sub(1, Ordering::SeqCst);
        println!("Handled: {:?}, Remaining messages: {}", self.handled, c - 1);
    }
}

impl Message for Msg {
    type Result = ();
}

struct SlowActor {}

impl Actor for SlowActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        ctx.set_mailbox_capacity(1);
    }
}

impl Handler<Msg> for SlowActor {
    type Result = <Msg as Message>::Result;
    fn handle(&mut self, mut msg: Msg, _ctx: &mut Self::Context) -> Self::Result {
        println!("handling");
        std::thread::sleep(std::time::Duration::from_millis(1000));
        msg.handled = true;
    }
}

struct Sender {
    slow_actor_addr: Addr<SlowActor>,
}

impl Actor for Sender {
    type Context = Context<Self>;
}

struct Start {}

impl Message for Start {
    type Result = ();
}

impl Handler<Start> for Sender {
    type Result = <Start as Message>::Result;

    fn handle(&mut self, _msg: Start, _ctx: &mut Self::Context) -> Self::Result {
        loop {
            std::thread::sleep(std::time::Duration::from_millis(500));
            self.slow_actor_addr
                .clone()
                .try_send(Msg::new())
                .map_err(|e| {
                    println!("Unable to send {:?}", e);
                })
                .ok();
        }
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant