Skip to content

Commit

Permalink
Merge pull request #152 from alephium/136-batch-write-blocks
Browse files Browse the repository at this point in the history
Batch write blocks with SQL
  • Loading branch information
simerplaha committed Mar 3, 2022
2 parents f951934 + 8054b4c commit c21cfa4
Show file tree
Hide file tree
Showing 28 changed files with 1,252 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ trait BlockDao {
height: Height): Future[Seq[BlockEntry]]
def insert(block: BlockEntity): Future[Unit]
def insertAll(blocks: Seq[BlockEntity]): Future[Unit]
def insertSQL(block: BlockEntity): Future[Unit]
def insertAllSQL(blocks: Seq[BlockEntity]): Future[Unit]
def listMainChain(pagination: Pagination): Future[(Seq[BlockEntry.Lite], Int)]
def listMainChainSQL(pagination: Pagination): Future[(Seq[BlockEntry.Lite], Int)]
def listIncludingForks(from: TimeStamp, to: TimeStamp): Future[Seq[BlockEntry.Lite]]
Expand Down Expand Up @@ -119,6 +121,14 @@ object BlockDao {
run(DBIOAction.sequence(blocks.map(b => insertAction(b, groupNum)))).map(_ => ())
}

/** Inserts a single block transactionally via SQL */
def insertSQL(block: BlockEntity): Future[Unit] =
insertAllSQL(Seq(block))

/** Inserts a multiple blocks transactionally via SQL */
def insertAllSQL(blocks: Seq[BlockEntity]): Future[Unit] =
run(insertBlockEntity(blocks, groupNum)).map(_ => ())

def listMainChain(pagination: Pagination): Future[(Seq[BlockEntry.Lite], Int)] = {
val mainChain = blockHeadersTable.filter(_.mainChain)
val action =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2018 The Alephium Authors
// This file is part of the alephium project.
//
// The library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the library. If not, see <http://www.gnu.org/licenses/>.

package org.alephium.explorer.persistence.model

import org.alephium.explorer.api.model.BlockEntry

/**
* Class for defining rows in table [[org.alephium.explorer.persistence.schema.BlockDepsSchema]]
*/
final case class BlockDepEntity(hash: BlockEntry.Hash, dep: BlockEntry.Hash, order: Int)
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,14 @@ final case class BlockEntity(
)
}

/** Builds entries for block_deps table */
def toBlockDepEntities(): Seq[BlockDepEntity] =
deps.zipWithIndex map {
case (dep, i) =>
BlockDepEntity(hash = hash, dep = dep, order = i)
}

@inline def toBlockHeader(groupNum: Int): BlockHeader =
BlockHeader.fromEntity(this, groupNum)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2018 The Alephium Authors
// This file is part of the alephium project.
//
// The library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the library. If not, see <http://www.gnu.org/licenses/>.

package org.alephium.explorer.persistence.queries

import slick.dbio.DBIOAction
import slick.jdbc.{PositionedParameters, SetParameter, SQLActionBuilder}

import org.alephium.explorer.persistence.DBActionW
import org.alephium.explorer.persistence.model.BlockDepEntity
import org.alephium.explorer.persistence.schema.CustomSetParameter._

object BlockDepQueries {

/**
* Insert block_deps or ignore if there is a primary key conflict.
*
* Slick creates the following `INSERT` using string interpolation. Here
* the same is achieved by manually creating the [[SQLActionBuilder]] so
* our inserts can write multiple rows within a single `INSERT` statement.
*
* <a href="https://scala-slick.org/doc/3.3.3/sql.html#splicing-literal-values">Splicing</a>
* is not used to insert values so these queries are still cacheable prepared-statements.
*/
def insertBlockDeps(deps: Iterable[BlockDepEntity]): DBActionW[Int] =
if (deps.isEmpty) {
DBIOAction.successful(0)
} else {
//generate '?' placeholders for the parameterised SQL query
val placeholder = paramPlaceholder(rows = deps.size, columns = 3)
val query =
s"""
|INSERT INTO block_deps ("hash", "dep", "order")
|VALUES $placeholder
|ON CONFLICT ON CONSTRAINT hash_deps_pk
| DO NOTHING
|""".stripMargin

//set parameters following the insert order defined by the query above
val parameters: SetParameter[Unit] =
(_: Unit, params: PositionedParameters) =>
deps foreach { dep =>
params >> dep.hash
params >> dep.dep
params >> dep.order
}

//Return builder generated by Slick's string interpolation
SQLActionBuilder(
queryParts = query,
unitPConv = parameters
).asUpdate
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@

package org.alephium.explorer.persistence.queries

import scala.collection.mutable.ListBuffer

import com.typesafe.scalalogging.StrictLogging
import slick.basic.DatabaseConfig
import slick.dbio.DBIOAction
import slick.jdbc.JdbcProfile
import slick.jdbc.{JdbcProfile, PositionedParameters, SetParameter, SQLActionBuilder}

import org.alephium.explorer.api.model._
import org.alephium.explorer.persistence._
import org.alephium.explorer.persistence.model._
import org.alephium.explorer.persistence.queries.BlockDepQueries.insertBlockDeps
import org.alephium.explorer.persistence.queries.InputQueries.insertInputs
import org.alephium.explorer.persistence.queries.OutputQueries.insertOutputs
import org.alephium.explorer.persistence.schema._
import org.alephium.explorer.persistence.schema.CustomSetParameter._

trait BlockQueries
extends BlockHeaderSchema
Expand Down Expand Up @@ -84,7 +90,7 @@ trait BlockQueries
def insertAction(block: BlockEntity, groupNum: Int): DBActionRWT[Unit] =
(for {
_ <- DBIOAction.sequence(block.deps.zipWithIndex.map {
case (dep, i) => blockDepsTable.insertOrUpdate((block.hash, dep, i))
case (dep, i) => blockDepsTable.insertOrUpdate(BlockDepEntity(block.hash, dep, i))
})
_ <- insertTransactionFromBlockQuery(block)
_ <- blockHeadersTable.insertOrUpdate(BlockHeader.fromEntity(block, groupNum)).filter(_ > 0)
Expand Down Expand Up @@ -213,4 +219,88 @@ trait BlockQueries
.result
.headOption
}

/** Inserts block_headers or ignore them if there is a primary key conflict */
// scalastyle:off magic.number
def insertBlockHeaders(blocks: Iterable[BlockHeader]): DBActionW[Int] =
if (blocks.isEmpty) {
DBIOAction.successful(0)
} else {
val placeholder = paramPlaceholder(rows = blocks.size, columns = 14)

val query =
s"""
|insert into $block_headers ("hash",
| "timestamp",
| "chain_from",
| "chain_to",
| "height",
| "main_chain",
| "nonce",
| "version",
| "dep_state_hash",
| "txs_hash",
| "txs_count",
| "target",
| "hashrate",
| "parent")
|values $placeholder
|ON CONFLICT ON CONSTRAINT block_headers_pkey
| DO NOTHING
|""".stripMargin

val parameters: SetParameter[Unit] =
(_: Unit, params: PositionedParameters) =>
blocks foreach { block =>
params >> block.hash
params >> block.timestamp
params >> block.chainFrom
params >> block.chainTo
params >> block.height
params >> block.mainChain
params >> block.nonce
params >> block.version
params >> block.depStateHash
params >> block.txsHash
params >> block.txsCount
params >> block.target
params >> block.hashrate
params >> block.parent
}

SQLActionBuilder(
queryParts = query,
unitPConv = parameters
).asUpdate
}
// scalastyle:on magic.number

/** Transactionally write blocks */
@SuppressWarnings(
Array("org.wartremover.warts.MutableDataStructures", "org.wartremover.warts.NonUnitStatements"))
def insertBlockEntity(blocks: Iterable[BlockEntity], groupNum: Int): DBActionRWT[Int] = {
val blockDeps = ListBuffer.empty[BlockDepEntity]
val transactions = ListBuffer.empty[TransactionEntity]
val inputs = ListBuffer.empty[InputEntity]
val outputs = ListBuffer.empty[OutputEntity]
val blockHeaders = ListBuffer.empty[BlockHeader]

//build data for all insert queries in single iteration
blocks foreach { block =>
if (block.height.value != 0) blockDeps addAll block.toBlockDepEntities()
transactions addAll block.transactions
inputs addAll block.inputs
outputs addAll block.outputs
blockHeaders addOne block.toBlockHeader(groupNum)
}

val query =
insertBlockDeps(blockDeps) andThen
insertTransactions(transactions) andThen
insertInputs(inputs) andThen
insertOutputs(outputs) andThen
insertBlockHeaders(blockHeaders)

query.transactionally
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2018 The Alephium Authors
// This file is part of the alephium project.
//
// The library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the library. If not, see <http://www.gnu.org/licenses/>.

package org.alephium.explorer.persistence.queries

import slick.dbio.DBIOAction
import slick.jdbc.{PositionedParameters, SetParameter, SQLActionBuilder}

import org.alephium.explorer.persistence.DBActionW
import org.alephium.explorer.persistence.model.InputEntity
import org.alephium.explorer.persistence.schema.CustomSetParameter._

object InputQueries {

/** Inserts inputs or ignore rows with primary key conflict */
def insertInputs(inputs: Iterable[InputEntity]): DBActionW[Int] =
if (inputs.isEmpty) {
DBIOAction.successful(0)
} else {
val placeholder = paramPlaceholder(rows = inputs.size, columns = 8)

val query =
s"""
|INSERT INTO inputs ("block_hash",
| "tx_hash",
| "timestamp",
| "hint",
| "output_ref_key",
| "unlock_script",
| "main_chain",
| "order")
|VALUES $placeholder
|ON CONFLICT
| ON CONSTRAINT inputs_pk
| DO NOTHING
|""".stripMargin

val parameters: SetParameter[Unit] =
(_: Unit, params: PositionedParameters) =>
inputs foreach { input =>
params >> input.blockHash
params >> input.txHash
params >> input.timestamp
params >> input.hint
params >> input.outputRefKey
params >> input.unlockScript
params >> input.mainChain
params >> input.order
}

SQLActionBuilder(
queryParts = query,
unitPConv = parameters
).asUpdate
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2018 The Alephium Authors
// This file is part of the alephium project.
//
// The library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the library. If not, see <http://www.gnu.org/licenses/>.

package org.alephium.explorer.persistence.queries

import slick.dbio.DBIOAction
import slick.jdbc.{PositionedParameters, SetParameter, SQLActionBuilder}

import org.alephium.explorer.persistence.DBActionW
import org.alephium.explorer.persistence.model.OutputEntity
import org.alephium.explorer.persistence.schema.CustomSetParameter._

object OutputQueries {

/** Inserts outputs or ignore rows with primary key conflict */
def insertOutputs(outputs: Iterable[OutputEntity]): DBActionW[Int] =
if (outputs.isEmpty) {
DBIOAction.successful(0)
} else {
val placeholder = paramPlaceholder(rows = outputs.size, columns = 10)

val query =
s"""
|INSERT INTO outputs ("block_hash",
| "tx_hash",
| "timestamp",
| "hint",
| "key",
| "amount",
| "address",
| "main_chain",
| "lock_time",
| "order")
|VALUES $placeholder
|ON CONFLICT
| ON CONSTRAINT outputs_pk
| DO NOTHING
|""".stripMargin

val parameters: SetParameter[Unit] =
(_: Unit, params: PositionedParameters) =>
outputs foreach { output =>
params >> output.blockHash
params >> output.txHash
params >> output.timestamp
params >> output.hint
params >> output.key
params >> output.amount
params >> output.address
params >> output.mainChain
params >> output.lockTime
params >> output.order
}

SQLActionBuilder(
queryParts = query,
unitPConv = parameters
).asUpdate
}
}

0 comments on commit c21cfa4

Please sign in to comment.