Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combined Unis does not get canceled if one of them emits a failure #677

Closed
DarioArena87 opened this issue Sep 8, 2021 · 3 comments
Closed
Assignees
Labels
invalid This doesn't seem right
Milestone

Comments

@DarioArena87
Copy link

Hi, i was trying to execute some concurrent calls and found an unexpected behaviour. In the JavaDoc of UniAndGroupIterable<O> unis​(java.lang.Iterable<? extends Uni<?>> unis) in class UniZip it is written:

The produced Uni fires a failure event if one of the Unis fires a failure. This will cause the other Uni to be cancelled, expect if UniAndGroupIterable.collectFailures() is invoked which delay the failure event until all Unis have fires an item or failure event.

But i found that when i combine some unis without calling collectFailures() and one of them fails the rest of them are not cancelled and continue the execution.
I made a simple naive test that should reproduce the issue

package it.cabel.ecltitoli;

import io.smallrye.mutiny.Uni;
import org.junit.jupiter.api.Test;

import java.util.*;
import java.util.function.Function;

public class TestUniCombineAll {

    @Test
    void test() {

        List<Uni<Object>> emitters = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            int finalI = i;
            Uni<Object> emitter = Uni.createFrom().emitter(uniEmitter -> {
                safeSleep(finalI * 100);
                System.out.println("Complete " + finalI);
                uniEmitter.complete(finalI);
            });
            emitters.add(emitter);

        }

        emitters.add(Uni.createFrom().emitter(uniEmitter -> {
            safeSleep(350);
            System.out.println("Fail!!!");
            uniEmitter.fail(new Exception("This is a test failure"));
        }));

        Collections.shuffle(emitters);

        Uni.combine().all().unis(emitters)
            .combinedWith(Object.class, Function.identity())
            .subscribe().with(
                completed -> System.out.println("Completed " + completed.size()),
                fail -> System.out.println("Failed: " + fail.getMessage())
           );

    }

    private void safeSleep(int millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

And if you run it a few times the output is similar to this (it may change a litle bit due to Collection.shuffle randomness)

Complete 5
Complete 4
Complete 0
Complete 8
Complete 1
Complete 7
Fail!!!
Failed: This is a test failure
Complete 2
Complete 3
Complete 6
Complete 9

Maybe there is something that i'm missing but i was expecting that after a failure all the remaining unis (the one that are still sleeping when the failure occurs) are canceled.
Is this a bug or maybe i'm doing something wrong? The mutiny library that i'm using is the one that comes with quarkus 2.2.1.Final ( i think it is version 1.0.0).

@jponge jponge added this to the 1.1.0 milestone Sep 9, 2021
@jponge jponge self-assigned this Sep 9, 2021
@jponge jponge added the invalid This doesn't seem right label Oct 4, 2021
@jponge
Copy link
Member

jponge commented Oct 4, 2021

I haven't been able to reproduce this behaviour with the following quick and dirty extrapolation test case:

@RepeatedTest(100)
    void reproducer_677() {
        List<Uni<Integer>> unis = new ArrayList<>();

        for (int i = 0; i < 20; i++) {
            int finalI = i;
            Uni.createFrom().emitter(uniEmitter -> {
                safeSleep(finalI * 100);
                System.out.println("Complete " + finalI);
                uniEmitter.complete(finalI);
            });
        }

        unis.add(Uni.createFrom().emitter(uniEmitter -> {
            safeSleep(350);
            System.out.println("Fail!!!");
            uniEmitter.fail(new Exception("This is a test failure"));
        }));

        Collections.shuffle(unis);

        AtomicBoolean success = new AtomicBoolean();
        AtomicBoolean failed = new AtomicBoolean();
        Uni.combine().all().unis(unis)
                .combinedWith(Object.class, Function.identity())
                .subscribe().with(
                        completed -> {
                            success.set(true);
                            System.out.println("Completed " + completed.size());
                        },
                        fail -> {
                            failed.set(true);
                            System.out.println("Failed: " + fail.getMessage());
                        }
                );

        assertThat(success).isFalse();
        assertThat(failed).isTrue();
    }

    private void safeSleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

The combination is consistently failed.

I suspect that you intended to run concurrent operations, but please note that everything is being executed on the same (subscription) thread. So each Uni blocks the thread to sleep, then passes the torch to the next Uni that sleeps, etc.

@jponge jponge closed this as completed Oct 4, 2021
@DarioArena87
Copy link
Author

Hi @jponge maybe i explained myself wrong but even in my previous example the overall combination fails: when i subscribe after a while i receive a failure event that gets printed with the fail -> System.out.println("Failed: " + fail.getMessage()) lambda passed as second parameter to the subscribe() method. My concern was more on the fact that the operations that are blocked when the failure event is emitted are not canceled automatically and continue to execute.
I know that is a bad practice but if the concurrent tasks performs operation like a heavy computation, in the case of a failure all the remaining computation, succesfull or not, are "wasted". Is there any way to achieve this behaviour?

@jponge
Copy link
Member

jponge commented Oct 5, 2021

I haven't seen the blocked operations print anything when I ran it. Perhaps you were using an older version?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
invalid This doesn't seem right
Projects
None yet
Development

No branches or pull requests

2 participants