Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parEvalMap deadlocks when Stream's F has extra error channels #3199

Open
Daenyth opened this issue Apr 4, 2023 · 3 comments
Open

parEvalMap deadlocks when Stream's F has extra error channels #3199

Daenyth opened this issue Apr 4, 2023 · 3 comments

Comments

@Daenyth
Copy link
Contributor

Daenyth commented Apr 4, 2023

Tested and reproduced on version 3.2.7 and 3.6.1

This happens because in parEvalMapUnordered, the results of the evalMap action are passed back to the controlling stream using a callback in F.
But when F has a failure mode other than raiseError, the Either[Throwable, Result] => ... callback can't be invoked - leading to deadlock.
The fix for this probably involves using Outcome instead of the Either-based callback.

  test("Stream of IorT deadlocks on Ior.Left") {
    val s = fs2
      .Stream(1)
      .covary[IorT[IO, String, *]]
      .parEvalMapUnordered(Int.MaxValue)(_ =>
        IorT(IO.pure("fail".leftIor[Int]))
      )
      .compile
      .drain
      .value

    TestControl.executeEmbed(s).assertEquals(Ior.left("fail"))
  }

This fails the same way when using EitherT instead of IorT

Thanks to @armanbilge and @ChristopherDavenport for your help in debugging!

@armanbilge
Copy link
Member

armanbilge commented Apr 4, 2023

The fix for this probably involves using Outcome instead of the Either-based callback.

Interesting, not sure I understand how that would work :)


So to me this feels like a variant of typelevel/cats#4308. Specifically, see my comment typelevel/cats#4308 (comment).

parEvalMapUnordered expects a lawful F[_]: Concurrent, and its behavior is only specified with respect to Concurrent operations/laws.

IorT[IO, String, *] and EitherT[IO, String, *] do not have a lawful Concurrent in terms of the String error channel. So it is picking up an alternative implementation in terms of Throwable, but this implementation does not respect errors raised on the Ior/Either.

Ideally we should be using a GenConcurrent[IorT[IO, String, *], String] instance for parEvalMapUnordered in this case. However, as described in typelevel/cats#4308 (comment) such a thing cannot lawfully exist.

@hamnis
Copy link
Contributor

hamnis commented May 15, 2023

This also happens on 3.7.0 and CE 3.5.0

Seems to be triggered with translate and doobie ConnectionIO

@Daenyth
Copy link
Contributor Author

Daenyth commented May 16, 2023

That's unrelated, and due to the way WeakAsync uses fromFuture - this happens when F has an error channel that can't be translated to IO.raiseError. ConnectionIO is safe from this issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants