Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Something like Future.sequentialRun/chunkRun #71

Open
exoego opened this issue Mar 2, 2021 · 5 comments
Open

Something like Future.sequentialRun/chunkRun #71

exoego opened this issue Mar 2, 2021 · 5 comments

Comments

@exoego
Copy link

exoego commented Mar 2, 2021

On parallel processing, I think it is relatively common usecase to limit the number of concurrent tasks running simultaneously.

For example, let's say we have 10,000 files and upload each of them to somewhere 5-by-5 at a time, to avoid service down due to uploading 10,000 files all together.

Scala standard libs currently offers Future.sequence, whose name sounds suitable for this purpose, but unfortunately it is not...

For this usecase, I found myself and colleagues often define utilities like below:

def sequentialRun[T, U](items: IterableOnce[T])
                       (op: T => Future[U])
                       (implicit ec: ExecutionContext): Future[Seq[U]] = {
  items.iterator
    .foldLeft(Future.successful(Vector.empty[U])) { (f, item) =>
      f.flatMap { acc =>
        op(item).map(acc :+ _)
      }
    }
}

def chunkRun[T, U](chunkSize: Int, items: Seq[T])
                  (op: T => Future[U])
                  (implicit ec: ExecutionContext): Future[Seq[U]] = {
  sequentialRun(items.grouped(chunkSize)) { item =>
    Future.traverse(item)(op)
  }.map(_.flatten)
}


// use-site
chunkRun(5, files) { file =>
  ashnchronoslyUploadToSomewhere(file)
}

I thinks this is helpful if Scala library next offers similar feature.

@julienrf
Copy link
Contributor

julienrf commented Mar 2, 2021

Another way to achieve this is to use an ExecutionContext with a bounded number of threads. Unfortunately, there is no super simple Scala-ish way to do it currently. Here is what I do to create an ExecutionContext with at most one thread (to get things running sequentially), and five threads:

// Daemonic so that it won't prevent the application from shutting down
val daemonicThreadFactory = new ThreadFactoryBuilder().setDaemon(true).build()
val oneThread = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor(daemonicThreadFactory))
val fiveThreads = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(5, daemonicThreadFactory))

// then…
def asynchronouslyUploadToSomewhere(file: File): Future[Uploaded] = Future {
  ...
}(fiveThreads /* <− THIS IS THE IMPORTANT PART */)

// Then I can just use the good old `traverse`
Future.traverse(files)(asynchronouslyUploadToSomewhere)

So, the responsibility of chunking / grouping the tasks and collecting their results is delegated to the underlying scheduler.

It is a bit less flexible than what you propose because the parallelism level is fixed once and for all in the execution context (ie, there is no chunkSize parameter in my version), and you may not create as many execution contexts as parallelism levels you want.

Another issue that I see with the current API is that it is a bit cumbersome to create the execution context. As a matter of comparison, here is the Monix equivalent:

- // Daemonic so that it won't prevent the application from shutting down
- val daemonicThreadFactory = new ThreadFactoryBuilder().setDaemon(true).build()
- val oneThread = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor(daemonicThreadFactory))
+ val oneThread = Scheduler.fixedPool("sequential", poolSize = 1)
- val fiveThreads = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(5, daemonicThreadFactory))
+ val fiveThreads = Scheduler.fixedPool("five-threads", poolSize = 5)

That being said, I agree that it would be useful to have a variant of Future.traverse that works chunk by chunk, and that takes the size of the chunks as a parameter.

object Future {
  def traverse[A, B](as: Seq[A], parallelismLevel: Int)(f: A => Future[B]): Future[Seq[B]]
}

@makingthematrix
Copy link

@julienrf I was just thinking about something like this. In wire-signals we made a subclass of ExecutionContext, DispatchQueue which lets us control the number of parallel operations:
https://github.com/wireapp/wire-signals/blob/main/core/src/main/scala/com/wire/signals/DispatchQueue.scala
But we don't use it much, except for a special case when we want to limit the "chunk" down to one.

@viktorklang
Copy link

Having thought about this for a while, I think instead of creating a proliferation of overloaded methods with paralellism, I think the "correct" fix would be to make it easier to limit the parallelism of ExecutionContexts. Of course this is a bit more difficult than the simplest solution, since we also need to take blocking/BlockContext into account.

Being able to do something to the effect of implicit val newEc = ExecutionContext.limitParallelism(ec, 5)would be nice.

@viktorklang
Copy link

viktorklang commented Mar 2, 2021

I haven't taken BlockContext into any thought yet, but perhaps something like this could work (will have to study all possible interactions):

def limitParallelism(executionContext: ExecutionContext, maxParallelism: Int): ExecutionContext = {
  require(maxParallelism > 0)
  require(executionContext ne null)

  new java.util.concurrent.atomic.AtomicInteger(maxParallelism) with ExecutionContext {
    private[this] final val queue = new java.util.concurrent.ConcurrentLinkedQueue[Runnable]()
    override final def reportFailure(cause: Throwable): Unit = executionContext.reportFailure(cause)
    override final def execute(runnable: Runnable): Unit = {
      queue.add(runnable)
      schedule()
    }

    private[this] final def schedule(): Unit = {
      val permits = get()
      if (permits > 0) {
        if (compareAndSet(permits, permits - 1)) {
          executionContext.execute(new Runnable {
            override final def run(): Unit =
              queue.poll() match {
                case null => incrementAndGet()
                case some =>
                  try some.run()
                  finally executionContext.execute(this)
              }
          })
        } else schedule() // retry
      }
    }
  }
}

@alexklibisz
Copy link

For this usecase, I found myself and colleagues often define utilities like below:

I've written this utility a few times. Totally agree it's worth having in the stdlib or a closely-adjacent library that doesn't require a full effect system.

I'll mention another option that hasn't come up so far: introduce an asynchronous semaphore.

If you have a semaphore like this:

final class AsyncSemaphore (permits: Int) {
  def acquire(): Future[Unit] = ???
  def release(): Future[Unit] = ???

  def withPermit[A](f: () => Future[A])(implicit ec: ExecutionContext): Future[A] =
    for {
      _ <- acquire()
      ta <- f().transformWith(Future.successful) // Lift to a Future[Try[A]] to avoid short-circuiting on failure.
      _ <- release()
      a <- Future.fromTry(ta) // Drop back to a Future[A].
    } yield a
}

Then you can use it to constrain parallelism as follows:

val as: Seq[A] = ??? // Lots of elements, need to constrain parallelism
val f: A => Future[B] = ??? // Apply this to each A.
val n = 16 // This many at a time.

val sem = new AsyncSemaphore(n)
val results = Future.sequence(as.map(a => sem.withPermit(() => f(a)))

Here's a Scastie example of one possible implementation of this semaphore: https://scastie.scala-lang.org/G4Of7yLARza1C7e0OGRSyQ

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants