New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fixes #3307 case #3312
fixes #3307 case #3312
Conversation
public void testFluxIterableSlowPath() { | ||
Iterable<String> iterable = mock(Iterable.class); | ||
Mockito.when(iterable.spliterator()).thenReturn(mock(Spliterator.class)); | ||
Flux.fromIterable(iterable).limitRate(1).next().block(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this case is covered in the factory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right. duplication. fixed
|
||
public class FluxIterableTest { | ||
|
||
final Iterable<Integer> source = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); | ||
|
||
static Stream<Function<Flux, Flux>> factory() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider not duplicating the code present in both this file and the flatten scenario. Otherwise, best to add a note as a comment that these should be kept in sync upon change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
if (cancelled) { | ||
if (!b) { | ||
s.onComplete(); | ||
onCloseWithDropError(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just noticed onClose is never actually provided. Maybe it's worth creating an issue to remove the implementation for it? It looks like the constructor accepting onClose is an internal one, so this wouldn't be a breaking change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is used by FluxStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. FluxIterable#subscribe
with onClose
is a static method used by FluxStream
. I stand corrected, I only looked at constructors of FluxIterable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com> Signed-off-by: OlegDokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
} | ||
catch (Throwable ex) { | ||
Operators.error(s, ex); | ||
isc.onCloseWithDropError(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it expected even before onSubscribe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
look at Operators.error(s, ex);
it looks like the following
s.onSubscribe(EmptySubscription.INSTANCE);
s.onError(e);
so it is after
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in that case, instantiated IterableSubscription is rather an existing thing that can close the resource instead of writing
if (onClose != null) {
try {
onClose.run();
}
catch (Throwable t) {
Operators.onErrorDropped(t, s.currentContext());
}
}
@OlegDokuka this PR seems to have been merged on a maintenance branch, please ensure the change is merge-forwarded to intermediate maintenance branches and up to |
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
closes #3307
Signed-off-by: Oleh Dokuka odokuka@vmware.com