Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source: source reader will stall after replacing if previous reader has finished #6300

Closed
BugenZhao opened this issue Nov 10, 2022 · 2 comments · Fixed by #6304
Closed

source: source reader will stall after replacing if previous reader has finished #6300

BugenZhao opened this issue Nov 10, 2022 · 2 comments · Fixed by #6304
Assignees
Labels
component/streaming Stream processing related issue. type/bug Something isn't working

Comments

@BugenZhao
Copy link
Member

BugenZhao commented Nov 10, 2022

The SelectWithStrategy maintains an internal state of termination for both sides. If one side is terminated, it'll never poll it in the future, for optimization.

match this.internal_state {
    InternalState::Start => {
        let next_side = (this.clos)(this.state);
        poll_inner(&mut this, next_side, cx)
    }
    InternalState::LeftFinished => match this.stream2.poll_next(cx) {
        Poll::Ready(None) => {
            *this.internal_state = InternalState::BothFinished;
            Poll::Ready(None)
        }
        a => a,
    },
    InternalState::RightFinished => match this.stream1.poll_next(cx) {
        Poll::Ready(None) => {
            *this.internal_state = InternalState::BothFinished;
            Poll::Ready(None)
        }
        a => a,
    },
    InternalState::BothFinished => Poll::Ready(None),
}

If the source reader has finished before split changing (or scaling), even if we replace it with a new active reader, the SelectWithStrategy still won't poll it anymore. So we can never get the chunks from this source! Actually, the futures has warned us about this.

Note that care must be taken to avoid tampering with the state of the stream which may otherwise confuse this combinator.

/// Replace the source stream with a new one for given `stream`. Used for split change.
pub fn replace_source_stream(&mut self, stream: BoxSourceWithStateStream) {
*self.inner.get_mut().1 = Self::source_stream(stream).map_ok(Either::Right).boxed();
}

Since we set the event num for the Nexmark source in the deterministic scaling test, and it will finish after emitting all events, this is also why we get incorrect results if we allow scaling source in the chaos test.

// FIXME: we should already support reschedule source fragment, but there might be a bug
// of nexmark generator recovery.
let has_source = identity_contains("StreamSource")(f);

@BugenZhao BugenZhao added type/bug Something isn't working component/streaming Stream processing related issue. labels Nov 10, 2022
@BugenZhao BugenZhao self-assigned this Nov 10, 2022
@github-actions github-actions bot added this to the release-0.1.14 milestone Nov 10, 2022
@BugenZhao BugenZhao changed the title scale: source reader will stall if previous reader has finished source: source reader will stall after replacing if previous reader has finished Nov 10, 2022
@tabVersion
Copy link
Contributor

tabVersion commented Nov 11, 2022

If the source reader has finished before split changing (or scaling)

what does it mean "a source reader has finished"? The source always consumes an endless stream.

@BugenZhao
Copy link
Member Author

If the source reader has finished before split changing (or scaling)

what does it mean "a source reader has finished"? The source always consumes an endless stream.

In production, yes. However, the nexmark source will "finish" if we set the event num for it.

#5657 (comment)

@mergify mergify bot closed this as completed in #6304 Nov 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/streaming Stream processing related issue. type/bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants