Skip to content

Commit

Permalink
Resolves #160 - Split larger SQL parameters into multiple queries
Browse files Browse the repository at this point in the history
  • Loading branch information
tdroxler authored and simerplaha committed Mar 9, 2022
1 parent c21cfa4 commit 9623b11
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 21 deletions.
Expand Up @@ -16,7 +16,6 @@

package org.alephium.explorer.persistence.queries

import slick.dbio.DBIOAction
import slick.jdbc.{PositionedParameters, SetParameter, SQLActionBuilder}

import org.alephium.explorer.persistence.DBActionW
Expand All @@ -27,11 +26,7 @@ object InputQueries {

/** Inserts inputs or ignore rows with primary key conflict */
def insertInputs(inputs: Iterable[InputEntity]): DBActionW[Int] =
if (inputs.isEmpty) {
DBIOAction.successful(0)
} else {
val placeholder = paramPlaceholder(rows = inputs.size, columns = 8)

QueryUtil.splitUpdates(rows = inputs, queryRowParams = 8) { (inputs, placeholder) =>
val query =
s"""
|INSERT INTO inputs ("block_hash",
Expand Down
Expand Up @@ -16,7 +16,6 @@

package org.alephium.explorer.persistence.queries

import slick.dbio.DBIOAction
import slick.jdbc.{PositionedParameters, SetParameter, SQLActionBuilder}

import org.alephium.explorer.persistence.DBActionW
Expand All @@ -27,11 +26,7 @@ object OutputQueries {

/** Inserts outputs or ignore rows with primary key conflict */
def insertOutputs(outputs: Iterable[OutputEntity]): DBActionW[Int] =
if (outputs.isEmpty) {
DBIOAction.successful(0)
} else {
val placeholder = paramPlaceholder(rows = outputs.size, columns = 10)

QueryUtil.splitUpdates(rows = outputs, queryRowParams = 10) { (outputs, placeholder) =>
val query =
s"""
|INSERT INTO outputs ("block_hash",
Expand Down
@@ -0,0 +1,95 @@
// Copyright 2018 The Alephium Authors
// This file is part of the alephium project.
//
// The library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the library. If not, see <http://www.gnu.org/licenses/>.

package org.alephium.explorer.persistence.queries

import scala.annotation.tailrec

import slick.dbio.DBIOAction

import org.alephium.explorer.persistence.DBActionW
import org.alephium.explorer.persistence.schema.CustomSetParameter.paramPlaceholder

object QueryUtil {

/** Maximum number of parameters allowed by Postgres per query.
*
* TODO: This should be passed as a configuration instead of hardcoding it
* and should be passed as parameter to the function using this value.
* */
val postgresDefaultParameterLimit: Short = Short.MaxValue

private val emptyUpdates = DBIOAction.successful(0)

/** Splits update queries into batches limited by the total number parameters allowed per query */
@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments"))
def splitUpdates[R](rows: Iterable[R],
queryRowParams: Int,
queryMaxParams: Short = postgresDefaultParameterLimit)(
queryBuilder: (Iterable[R], String) => DBActionW[Int]): DBActionW[Int] =
splitFoldLeft[R, DBActionW[Int]](initialQuery = emptyUpdates,
queryRowParams = queryRowParams,
queryMaxParams = queryMaxParams,
rows = rows) { (rows, placeholder, action) =>
action andThen queryBuilder(rows, placeholder)
}

/**
* Splits queries into batches limited by the total number parameters allowed per query.
*
* @param rows All query parameters
* @param initialQuery Returned when params are empty
* @param queryRowParams Max number of parameters in a single row of a query. Or the number of '?' in a single row.
* @param queryMaxParams Maximum number of parameters for each query. Eg: [[postgresDefaultParameterLimit]]
* @param foldLeft Given a set of following inputs (Tuple3) returns the next query.
* Similar to foldLeft in a collection type.
* - _1 = Parameter split for current query
* - _2 = Placeholder for current query
* - _3 = Previous query. Used to join with next query.
* @tparam R type of rows
* @tparam Q type of query
* @return A single query of type [[Q]]
*/
def splitFoldLeft[R, Q](rows: Iterable[R],
initialQuery: Q,
queryRowParams: Int,
queryMaxParams: Short)(foldLeft: (Iterable[R], String, Q) => Q): Q = {

//maximum number of rows per query
val maxRows = queryMaxParams / queryRowParams

@tailrec
def build(rowsRemaining: Iterable[R], previousQuery: Q): Q =
if (rowsRemaining.isEmpty) {
previousQuery
} else {
//number of rows for this query
val queryRows = rowsRemaining.size min maxRows
//generate placeholder string
val placeholder = paramPlaceholder(rows = queryRows, columns = queryRowParams)

//thisBatch = rows for this query
//remaining = rows not processed in this query
val (thisBatch, remaining) = rowsRemaining.splitAt(queryRows)
val nextResult = foldLeft(thisBatch, placeholder, previousQuery)

//process remaining
build(rowsRemaining = remaining, previousQuery = nextResult)
}

build(rowsRemaining = rows, previousQuery = initialQuery)
}
}
1 change: 1 addition & 0 deletions app/src/test/resources/big_block.json

Large diffs are not rendered by default.

Expand Up @@ -17,20 +17,23 @@
package org.alephium.explorer.persistence.dao

import scala.concurrent.ExecutionContext
import scala.io.Source

import org.scalacheck.Arbitrary.arbitrary
import org.scalacheck.Gen
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.time.{Minutes, Span}

import org.alephium.api.model
import org.alephium.api.{model, ApiModelCodec}
import org.alephium.explorer.{AlephiumSpec, Generators}
import org.alephium.explorer.api.model.{BlockEntry, Pagination}
import org.alephium.explorer.persistence.{DatabaseFixture, DBRunner}
import org.alephium.explorer.persistence.model._
import org.alephium.explorer.persistence.schema._
import org.alephium.explorer.service.BlockFlowClient
import org.alephium.util.TimeStamp
import org.alephium.explorer.util.TestUtils._
import org.alephium.json.Json._
import org.alephium.util.{Duration, TimeStamp}

class BlockDaoSpec extends AlephiumSpec with ScalaFutures with Generators with Eventually {
implicit val executionContext: ExecutionContext = ExecutionContext.global
Expand Down Expand Up @@ -120,16 +123,28 @@ class BlockDaoSpec extends AlephiumSpec with ScalaFutures with Generators with E
}
}

it should "Recreate issue #162 - not throw exception when inserting a big block" in new Fixture {
using(Source.fromResource("big_block.json")) { source =>
val rawBlock = source.getLines().mkString
val blockEntry = read[model.BlockEntry](rawBlock)
val block = BlockFlowClient.blockProtocolToEntity(blockEntry)
blockDao.insertSQL(block).futureValue is ()
}
}

trait Fixture
extends InputSchema
with OutputSchema
with BlockHeaderSchema
with BlockDepsSchema
with TransactionSchema
with DatabaseFixture
with DBRunner {
override val config = databaseConfig
val blockDao = BlockDao(groupNum, databaseConfig)
with DBRunner
with ApiModelCodec {
override val config = databaseConfig
val blockflowFetchMaxAge: Duration = Duration.ofMinutesUnsafe(30)

val blockDao = BlockDao(groupNum, databaseConfig)
val blockflow: Seq[Seq[model.BlockEntry]] =
blockFlowGen(maxChainSize = 5, startTimestamp = TimeStamp.now()).sample.get
val blocksProtocol: Seq[model.BlockEntry] = blockflow.flatten
Expand Down
Expand Up @@ -26,6 +26,7 @@ import slick.jdbc.JdbcProfile

import org.alephium.explorer.{AlephiumSpec, Generators}
import org.alephium.explorer.persistence.{DatabaseFixture, DBRunner}
import org.alephium.explorer.persistence.model.InputEntity
import org.alephium.explorer.persistence.queries.InputQueries._
import org.alephium.explorer.persistence.schema.InputSchema

Expand All @@ -38,21 +39,39 @@ class InputQueriesSpec extends AlephiumSpec with ScalaFutures {

import config.profile.api._

forAll(Gen.listOf(updatedInputEntityGen())) { existingAndUpdates =>
def runTest(existingAndUpdated: Seq[(InputEntity, InputEntity)]) = {
//fresh table
run(inputsTable.delete).futureValue

val existing = existingAndUpdates.map(_._1) //existing inputs
val ignored = existingAndUpdates.map(_._2) //updated inputs
val existing = existingAndUpdated.map(_._1) //existing inputs
val ignored = existingAndUpdated.map(_._2) //updated inputs

//insert existing
run(insertInputs(existing)).futureValue is existing.size
run(insertInputs(existing)).futureValue
run(inputsTable.result).futureValue should contain allElementsOf existing

//insert should ignore existing inputs
run(insertInputs(ignored)).futureValue is 0
run(inputsTable.result).futureValue should contain allElementsOf existing
}

info("Test with random data size generated by ScalaCheck")
forAll(Gen.listOf(updatedInputEntityGen()))(runTest)

/** Following two test inserting larger queries
* See <a href="https://github.com/alephium/explorer-backend/issues/160">#160</a>
**/
info(s"Large: Test with fixed '${QueryUtil.postgresDefaultParameterLimit}' data size")
Gen
.listOfN(QueryUtil.postgresDefaultParameterLimit.toInt, updatedInputEntityGen())
.sample
.foreach(runTest)

info(s"Large: Test with fixed '${QueryUtil.postgresDefaultParameterLimit + 1}' data size")
Gen
.listOfN(QueryUtil.postgresDefaultParameterLimit + 1, updatedInputEntityGen())
.sample
.foreach(runTest)
}

trait Fixture extends DatabaseFixture with DBRunner with Generators with InputSchema {
Expand Down

0 comments on commit 9623b11

Please sign in to comment.