Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi.createBy().repeating() doesn't like flatMap #689

Closed
glandais opened this issue Sep 20, 2021 · 6 comments · Fixed by #691
Closed

Multi.createBy().repeating() doesn't like flatMap #689

glandais opened this issue Sep 20, 2021 · 6 comments · Fixed by #691
Assignees
Labels
bug Something isn't working
Milestone

Comments

@glandais
Copy link

glandais commented Sep 20, 2021

Context

I've used Multi.createBy().repeating() for creating a Multi from an InputStream (read line by line).
Using Mutiny 1.0.0

Description

If I want to parallelize work on this Multi, I'm getting java.lang.IllegalStateException: Invalid subscription state - already have a subscription for upstream

Additional details

Sample code :

        AtomicLong line = new AtomicLong();
        var count = Multi.createBy().repeating()
                .supplier(line::incrementAndGet)
                .until(l -> l.equals(1_000_000L))
                .flatMap(l -> Multi.createFrom().item(l * 2))
                .emitOn(Infrastructure.getDefaultExecutor())
                .collect().with(Collectors.counting())
                .await().indefinitely();
        System.out.println(count);
@jponge
Copy link
Member

jponge commented Sep 20, 2021

I ran this jbang script (forgive the mutli typo 😆 ):

///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS io.smallrye.reactive:mutiny:1.0.0

import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;

public class mutli {

    public static void main(String... args) {
        AtomicLong line = new AtomicLong();
        var count = Multi.createBy().repeating()
                .supplier(line::incrementAndGet)
                .until(l -> l.equals(1_000_000L))
                .flatMap(l -> Multi.createFrom().item(l * 2))
                .emitOn(Infrastructure.getDefaultExecutor())
                .collect().with(Collectors.counting())
                .await().indefinitely();
        System.out.println(count);
        System.out.println("Done");
    }
}

For a while I got:

❯ jbang run mutli.java
[jbang] Building jar...
999999
Done

And then I started to get errors so there is indeed a bug somewhere:

❯ jbang run mutli.java
[jbang] Building jar...
Exception in thread "main" java.lang.IllegalStateException: Invalid subscription state - already have a subscription for upstream
	at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.onSubscribe(UniToMultiPublisher.java:64)
	at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:26)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.request(UniToMultiPublisher.java:58)
	at io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber.request(SwitchableSubscriptionSubscriber.java:159)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drainLoop(MultiFlatMapOp.java:450)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drain(MultiFlatMapOp.java:266)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.request(MultiFlatMapOp.java:140)
	at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.request(MultiOperatorProcessor.java:87)
	at io.smallrye.mutiny.operators.multi.MultiEmitOnOp$MultiEmitOnProcessor.run(MultiEmitOnOp.java:208)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

BTW may I ask why you want to await indefinitely?

@jponge jponge added the bug Something isn't working label Sep 20, 2021
@jponge jponge added this to the 1.1.0 milestone Sep 20, 2021
@glandais
Copy link
Author

Just to be sure that all the Multi is fully processed, as I did not wanted to add test dependency for AssertSubscriber

I've not looked into repeating() stuff, but I guess it's somewhere in it, as there is no Uni in the code.

@jponge
Copy link
Member

jponge commented Sep 20, 2021

I've tried this and it fails after a random number of elements:

        AtomicLong line = new AtomicLong();
        Multi.createBy().repeating()
                .supplier(line::incrementAndGet)
                .until(l -> l.equals(1_000_000L))
                .flatMap(l -> Multi.createFrom().item(l * 2))
                .emitOn(Infrastructure.getDefaultExecutor())
                .subscribe().with(System.out::println, Throwable::printStackTrace);

as in:

786
788
790
792
794
796
798
800
java.lang.IllegalStateException: Invalid subscription state - already have a subscription for upstream
	at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.onSubscribe(UniToMultiPublisher.java:64)
	at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:26)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.request(UniToMultiPublisher.java:58)
	at io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber.request(SwitchableSubscriptionSubscriber.java:159)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drainLoop(MultiFlatMapOp.java:450)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drain(MultiFlatMapOp.java:266)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.request(MultiFlatMapOp.java:140)
	at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.request(MultiOperatorProcessor.java:87)
	at io.smallrye.mutiny.operators.multi.MultiEmitOnOp$MultiEmitOnProcessor.run(MultiEmitOnOp.java:208)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

@jponge
Copy link
Member

jponge commented Sep 21, 2021

BTW you are not doing anything in parallel here, just the last bit of the pipeline. See https://smallrye.io/smallrye-mutiny/guides/emit-subscription

@jponge
Copy link
Member

jponge commented Sep 21, 2021

Working on a fix

@jponge
Copy link
Member

jponge commented Sep 21, 2021

@glandais I've ported your sample to #691

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants