Skip to content

Commit

Permalink
fix: 🐛 detect removed files from the restui list with git
Browse files Browse the repository at this point in the history
The git provider does not track changes in the specification list
  • Loading branch information
MaethorNaur committed Sep 24, 2020
1 parent b8521a1 commit 4d036ad
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 92 deletions.
2 changes: 1 addition & 1 deletion project/BaseSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object BaseSettings {
javacOptions ++= Seq("-source", "11"),
fork in Test := true,
cancelable in Global := true,
addCompilerPlugin("org.scalameta" % "semanticdb-scalac" % "4.3.20" cross CrossVersion.full),
addCompilerPlugin("org.scalameta" % "semanticdb-scalac" % "4.3.22" cross CrossVersion.full),
scalacOptions += "-Yrangepos",
test in assembly := {},
scalacOptions in Compile ++= ScalacOptions.options,
Expand Down
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object Dependencies {
}

object Akka {
private val akkaVersion = "2.6.8"
private val akkaVersion = "2.6.9"
private val akkaHttpVersion = "10.2.0"
private val alpakka = "2.0.1"
private val sl4j = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion
Expand All @@ -41,7 +41,7 @@ object Dependencies {
}

object Testing {
private val scalaTest = "org.scalatest" %% "scalatest" % "3.2.1" % Test
private val scalaTest = "org.scalatest" %% "scalatest" % "3.2.2" % Test
private val scalamock = "org.scalamock" %% "scalamock" % "5.0.0" % Test
val all = Seq(scalaTest, scalamock)
}
Expand All @@ -55,7 +55,7 @@ object Dependencies {
lazy val providerDocker = libraryDependencies ++= common ++ Akka.all ++ Circe.all ++ Seq(Akka.unixDomain)

lazy val providerKubernetes = libraryDependencies ++= common ++ Akka.all ++
Seq("io.skuber" %% "skuber" % "2.5.0")
Seq("io.skuber" %% "skuber" % "2.6.0")

lazy val providerGit = libraryDependencies ++= common ++ Akka.all ++ Circe.all
lazy val providerWebhook = libraryDependencies ++= common ++ Akka.all ++ Circe.all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class GitProvider extends Provider with LazyLogging {
VCS
.source(settings, Http().singleRequest(_))
.mapMaterializedValue(_ => NotUsed)
.map(service => name -> ServiceEvent.ServiceUp(service))
.map(event => name -> event)
}

}
173 changes: 98 additions & 75 deletions providers/git/src/main/scala/restui/providers/git/git/Git.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import java.nio.file.{FileSystemException, Files, Path, Paths}
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
import scala.util._
import scala.util.chaining._

import akka.stream.SourceShape
import akka.stream.scaladsl.{Broadcast, Flow => AkkaFlow, GraphDSL, Merge, Source => AkkaSource}
import cats.syntax.either._
import com.typesafe.scalalogging.LazyLogging
import io.circe.generic.auto._
import io.circe.yaml.parser
import restui.Concurrency
import restui.models.{Metadata, Service}
import restui.models.{Metadata, Service, ServiceEvent}
import restui.providers.git._
import restui.providers.git.git.data._
import restui.providers.git.process.{Process, ProcessArgs}
Expand All @@ -24,59 +24,81 @@ import restui.providers.git.settings.{Location, RepositorySettings}
object Git extends LazyLogging {
private val GitCmd = "git"
private val DefaultBranch = "master"
private type RepositoryWithSha = (Repository, Option[String])
private type Files = (Repository, List[(Option[String], Path)])
private type FilesWithSha = (Files, Option[String])
private type RepositoryWithSha = (Repository, Option[String])
private type Files = (Repository, List[(Option[String], Path)])
private type FileEvents = (Repository, List[GitFileEvent])
private type FilesWithSha = (Files, Option[String])
private type FilesWithShaWithEvent = (FileEvents, Option[String])

private val outboundFlow: Flow[FilesWithSha, Service] =
AkkaFlow[FilesWithSha].flatMapMerge(Concurrency.AvailableCore, { case (files, _) => retrieveSpecificationFiles(files) })
private val outboundFlow: Flow[FilesWithShaWithEvent, ServiceEvent] =
AkkaFlow[FilesWithShaWithEvent].flatMapConcat { case (files, _) => retrieveSpecificationFiles(files) }

private val cloneOrFetch: Flow[RepositoryWithSha, FilesWithSha] =
AkkaFlow[RepositoryWithSha].flatMapMerge(
Concurrency.AvailableCore,
{
case (repository, hash) =>
val source = hash match {
case None =>
cloneRepository(repository).map { repository =>
val localFiles = Files
.walk(repository.directory.get.toPath)
.iterator
.asScala
.to(LazyList)
.filter(Files.isRegularFile(_))
.map(path => None -> path.normalize)
.toList
repository -> localFiles
}
case Some(sha1) => pullRepository(repository).flatMapConcat(changedFiles(_, sha1))
}
source.map(_ -> hash)
}
)
AkkaFlow[RepositoryWithSha].flatMapConcat {
case (repository, hash) =>
val source = hash match {
case None =>
cloneRepository(repository).map { repository =>
val localFiles = Files
.walk(repository.directory.get.toPath)
.iterator
.asScala
.to(LazyList)
.filter(Files.isRegularFile(_))
.map(path => None -> path.normalize)
.toList
repository -> localFiles

}
case Some(sha1) => pullRepository(repository).flatMapConcat(changedFiles(_, sha1))
}
source.map(_ -> hash)
}

private def changedFiles(repository: Repository, sha1: String): Source[Files] =
execute("diff" :: "--name-only" :: sha1 :: "HEAD" :: Nil, repository.directory).flatMapConcat {
case Right(files) =>
val repoPath = repository.directory.get.toPath
AkkaSource.single(repository -> files.map(file => None -> repoPath.resolve(Paths.get(file)).normalize))
case Left(exception) =>
logger.warn(s"Error during changed: $exception")
AkkaSource.empty[Files]
}

private val findSpecificationFiles: Flow[FilesWithSha, FilesWithSha] = AkkaFlow[FilesWithSha].map {
private val findSpecificationFiles: Flow[FilesWithSha, FilesWithShaWithEvent] = AkkaFlow[FilesWithSha].map {
case ((repository, files), sha1) =>
val repositoryWithNewPath = findRestUIConfig(repository.directory.get.toPath).fold(repository) {
case RestUI(serviceName, specificationPaths) => repository.copy(specificationPaths = specificationPaths, serviceName = serviceName)
}
(repositoryWithNewPath -> filterSpecificationsFiles(repositoryWithNewPath, files) -> sha1)
val repoPath = repository.directory.get.toPath

val toAdd = filterSpecificationsFiles(repositoryWithNewPath, files)
val toDelete = repository.specificationPaths.filter { spec =>
!repositoryWithNewPath.specificationPaths.exists(newSpec => newSpec.path == spec.path)
}.map { spec =>
spec.path
.pipe(repoPath.resolve)
.normalize
.pipe(GitFileEvent.Deleted)
}
val events = toDelete ++ toAdd
(repositoryWithNewPath -> events -> sha1)
}

private val latestSha1: Flow[FilesWithSha, FilesWithSha] = AkkaFlow[FilesWithSha].flatMapMerge(
Concurrency.AvailableCore,
{
case ((repository, files), _) =>
execute("rev-parse" :: "--verify" :: repository.branch :: Nil, repository.directory).flatMapConcat {
case Right(sha1 :: _) => AkkaSource.single(repository -> files, Some(sha1))
case Left(exception) =>
logger.warn("Error during latest sha1", exception)
AkkaSource.empty[FilesWithSha]
}
}
)
private val latestSha1: Flow[FilesWithShaWithEvent, FilesWithShaWithEvent] = AkkaFlow[FilesWithShaWithEvent].flatMapConcat {
case ((repository, files), _) =>
execute("rev-parse" :: "--verify" :: repository.branch :: Nil, repository.directory).flatMapConcat {
case Right(sha1 :: _) => AkkaSource.single(repository -> files, Some(sha1))
case Right(Nil) =>
logger.warn("Error during latest sha1")
AkkaSource.empty[FilesWithShaWithEvent]
case Left(exception) =>
logger.warn("Error during latest sha1", exception)
AkkaSource.empty[FilesWithShaWithEvent]
}
}

def fromSettings(cacheDuration: FiniteDuration, repositories: Seq[RepositorySettings]): Source[Service] =
def fromSettings(cacheDuration: FiniteDuration, repositories: Seq[RepositorySettings]): Source[ServiceEvent] =
fromSource(
cacheDuration,
AkkaSource(repositories.collect {
Expand All @@ -85,17 +107,17 @@ object Git extends LazyLogging {
})
)

def fromSource(cacheDuration: FiniteDuration, repositories: Source[Repository]): Source[Service] =
def fromSource(cacheDuration: FiniteDuration, repositories: Source[Repository]): Source[ServiceEvent] =
AkkaSource.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._

val delayFlow: Flow[FilesWithSha, RepositoryWithSha] =
AkkaFlow[FilesWithSha].delay(cacheDuration).map { case ((repository, _), sha1) => repository -> sha1 }
val delayFlow: Flow[FilesWithShaWithEvent, RepositoryWithSha] =
AkkaFlow[FilesWithShaWithEvent].delay(cacheDuration).map { case ((repository, _), sha1) => repository -> sha1 }
val init = builder.add(repositories.map(_ -> None))
val outbound = builder.add(outboundFlow.async)
val delay = builder.add(delayFlow)
val merge = builder.add(Merge[RepositoryWithSha](2))
val broadcast = builder.add(Broadcast[FilesWithSha](2, eagerCancel = true))
val broadcast = builder.add(Broadcast[FilesWithShaWithEvent](2, eagerCancel = true))

// format: OFF
init ~> merge ~> cloneOrFetch ~> findSpecificationFiles ~> latestSha1 ~> broadcast ~> outbound
Expand Down Expand Up @@ -123,16 +145,6 @@ object Git extends LazyLogging {
AkkaSource.empty[Repository]
}

private def changedFiles(repository: Repository, sha1: String): Source[Files] =
execute("diff" :: "--name-only" :: sha1 :: "HEAD" :: Nil, repository.directory).flatMapConcat {
case Right(files) =>
val repoPath = repository.directory.get.toPath
AkkaSource.single(repository -> files.map(file => None -> repoPath.resolve(Paths.get(file)).normalize))
case Left(exception) =>
logger.warn(s"Error during changed: $exception")
AkkaSource.empty[Files]
}

private def execute(args: List[String], cwd: Option[File] = None): Source[Either[String, List[String]]] =
AkkaSource.single(ProcessArgs(GitCmd :: args, cwd)).via(Process.execute)

Expand All @@ -151,7 +163,7 @@ object Git extends LazyLogging {
None
}

private def filterSpecificationsFiles(repo: Repository, files: List[(Option[String], Path)]): List[(Option[String], Path)] = {
private def filterSpecificationsFiles(repo: Repository, files: List[(Option[String], Path)]): List[GitFileEvent] = {
val repoPath = repo.directory.get.toPath
val specificationPaths = repo.specificationPaths.map {
case UnnamedSpecification(path) => None -> repoPath.resolve(path).normalize
Expand All @@ -163,20 +175,24 @@ object Git extends LazyLogging {
specificationPaths.find {
case (_, specificationPath) =>
file.startsWith(specificationPath)
}.map { case (name, _) => name -> file }
}.map {
case (name, _) =>
GitFileEvent.Upserted(name, file)
}
}
}
}

private def retrieveSpecificationFiles(repoWithFiles: Files): Source[Service] = {
private def retrieveSpecificationFiles(repoWithFiles: FileEvents): Source[ServiceEvent] = {
val (repo, files) = repoWithFiles

val uri = akka.http.scaladsl.model.Uri(repo.uri)
val nameFromUri = uri.path.toString.substring(1)

AkkaSource(files)
.flatMapMerge(Concurrency.AvailableCore, loadFile(_).async)
.flatMapConcat(loadFile(_).async)
.map {
case (maybeName, path, content) =>
val uri = akka.http.scaladsl.model.Uri(repo.uri)
val nameFromUri = uri.path.toString.substring(1)
case Right((maybeName, path, content)) =>
val serviceName = maybeName.getOrElse(repo.serviceName.getOrElse(nameFromUri))
val filePath = repo.directory.get.toPath.relativize(path).toString
val id = s"$nameFromUri:$filePath"
Expand All @@ -186,20 +202,27 @@ object Git extends LazyLogging {
Metadata.Provider -> provider,
Metadata.File -> filePath
)
Service(id, serviceName, content, metadata)
ServiceEvent.ServiceUp(Service(id, serviceName, content, metadata))
case Left(path) =>
val filePath = repo.directory.get.toPath.relativize(path).toString
val id = s"$nameFromUri:$filePath"
ServiceEvent.ServiceDown(id)
}
.async

}

private def loadFile(file: (Option[String], Path)): Source[(Option[String], Path, String)] = {
val (name, path) = file
Try(new String(Files.readAllBytes(path), StandardCharsets.UTF_8)) match {
case Success(content) =>
AkkaSource.single((name, path, content))
case Failure(exception) =>
logger.warn(s"Error while reading $path", exception)
AkkaSource.empty[(Option[String], Path, String)]
private def loadFile(event: GitFileEvent): Source[Either[Path, (Option[String], Path, String)]] =
event match {
case GitFileEvent.Deleted(path) =>
AkkaSource.single(Left(path))
case GitFileEvent.Upserted(maybeName, path) =>
Try(new String(Files.readAllBytes(path), StandardCharsets.UTF_8)) match {
case Success(content) =>
AkkaSource.single(Right((maybeName, path, content)))
case Failure(exception) =>
logger.warn(s"Error while reading $path", exception)
AkkaSource.single(Left(path))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package restui.providers.git.git

import java.io.File
import java.nio.file.Path

import cats.syntax.functor._
import io.circe.generic.auto._
Expand All @@ -12,11 +13,20 @@ package object data {
specificationPaths: List[Specification],
directory: Option[File] = None,
serviceName: Option[String] = None)
trait Specification

trait Specification extends Product with Serializable {
val path: String
}
final case class UnnamedSpecification(path: String) extends Specification
final case class NamedSpecification(name: String, path: String) extends Specification
final case class RestUI(name: Option[String], specifications: List[Specification])

trait GitFileEvent extends Product with Serializable
object GitFileEvent {
final case class Deleted(path: Path) extends GitFileEvent
final case class Upserted(maybeName: Option[String], path: Path) extends GitFileEvent
}

object RestUI {
implicit val decoder: Decoder[RestUI] = (cursor: HCursor) =>
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import scala.concurrent.ExecutionContext

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Merge, Source => AkkaSource}
import restui.models.Service
import restui.models.ServiceEvent
import restui.providers.git._
import restui.providers.git.git.Git
import restui.providers.git.github.{Github, GithubClient}
Expand All @@ -13,12 +13,12 @@ import restui.providers.git.settings.{GitSettings, GithubSettings, Settings}
object VCS {
def source(settings: Settings, requestExecutor: RequestExecutor)(implicit
actorSystem: ActorSystem,
executionContext: ExecutionContext): Source[Service] =
executionContext: ExecutionContext): Source[ServiceEvent] =
settings.vcs.map {
case githubSettings: GithubSettings =>
Git.fromSource(settings.cacheDuration, Github(GithubClient(githubSettings, requestExecutor)))
case GitSettings(repositories) => Git.fromSettings(settings.cacheDuration, repositories)
}.fold(AkkaSource.empty[Service]) { (acc, source) =>
}.fold(AkkaSource.empty[ServiceEvent]) { (acc, source) =>
AkkaSource.combine(acc, source)(Merge(_))
}
}

0 comments on commit 4d036ad

Please sign in to comment.