Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolving multiple related issues #275

Merged
merged 2 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/src/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ FROM openjdk:8-jre-alpine

RUN apk --no-cache add curl

RUN mkdir -p /opt/app
RUN mkdir -p /opt/app \
&& mkdir -p /data/config
COPY app.jar /opt/app/app.jar

EXPOSE 9393
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package io.orangebuffalo.simpleaccounting.domain.documents.storage.gdrive.impl

import io.orangebuffalo.simpleaccounting.services.integration.oauth2.OAuth2WebClientBuilderProvider
import io.orangebuffalo.simpleaccounting.domain.documents.storage.DocumentStorageException
import io.orangebuffalo.simpleaccounting.domain.documents.storage.StorageAuthorizationRequiredException
import io.orangebuffalo.simpleaccounting.domain.documents.storage.gdrive.OAUTH2_CLIENT_REGISTRATION_ID
import kotlinx.coroutines.reactive.awaitFirst
import io.orangebuffalo.simpleaccounting.services.integration.oauth2.OAuth2WebClientBuilderProvider
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingle
import org.springframework.beans.factory.annotation.Value
import org.springframework.core.io.buffer.DataBuffer
import org.springframework.http.HttpStatus
Expand Down Expand Up @@ -61,7 +61,7 @@ class GoogleDriveApiAdapter(
}
.bodyToMono(GDriveFile::class.java)
.map { driveFile -> driveFile.toUploadFileResponse() }
.awaitFirst()
.awaitSingle()
}

suspend fun downloadFile(fileId: String): Flux<DataBuffer> {
Expand Down Expand Up @@ -98,7 +98,7 @@ class GoogleDriveApiAdapter(
"Error while retrieving folder $folderName for $parentFolderId: $errorJson"
}
.bodyToMono(GDriveFiles::class.java)
.awaitFirst()
.awaitSingle()

return if (matchingFolders.files.isEmpty()) null else matchingFolders.files[0].id
}
Expand Down Expand Up @@ -126,7 +126,7 @@ class GoogleDriveApiAdapter(
}
.bodyToMono(GDriveFile::class.java)
.map { driveFile -> driveFile.toFolderResponse() }
.awaitFirst()
.awaitSingle()

suspend fun getFolderById(folderId: String): FolderResponse? {
return createWebClient()
Expand Down Expand Up @@ -155,7 +155,7 @@ class GoogleDriveApiAdapter(
errorDescriptor: (errorJson: String?) -> String
): ClientResponse {
val clientResponse = try {
this.exchange().awaitFirst()
this.exchange().awaitSingle()
} catch (e: OAuth2AuthorizationException) {
throw StorageAuthorizationRequiredException(cause = e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,8 @@
package io.orangebuffalo.simpleaccounting.services.integration

import io.orangebuffalo.simpleaccounting.services.business.PlatformUserService
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.*
import mu.KotlinLogging
import org.springframework.stereotype.Service
import java.util.*
Expand All @@ -21,118 +15,37 @@ private val logger = KotlinLogging.logger {}
class PushNotificationService(
private val platformUserService: PlatformUserService
) {
suspend fun subscribeToEventsForCurrentUser(): Flow<PushNotificationMessage> = coroutineScope {
val currentUser = platformUserService.getCurrentUser()
val subscriber = PushNotificationsSubscriber(currentUser.id!!)

// subscribe in background to get back to the client as fast as we can
launch {
pushNotificationsBus.send(AddSubscriberCommand(subscriber))
}

flow {
logger.trace { "Starting consuming messages for ${subscriber.id}" }

try {
subscriber.listeningChannel.consumeEach { pushNotificationMessage ->
logger.trace { "Received $pushNotificationMessage in ${subscriber.id}" }

emit(pushNotificationMessage)
}
} finally {
logger.trace { "Cancelling subscriber ${subscriber.id}" }

// unsubscribe and release resources when user cancelled the flow (typically closed the connection)
pushNotificationsBus.send(RemoveSubscriberCommand(subscriber))
private val notificationsFlow = MutableSharedFlow<PushNotificationMessage>(
extraBufferCapacity = 500,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)

suspend fun subscribeToEventsForCurrentUser(): Flow<PushNotificationMessage> {
val currentUserId = platformUserService.getCurrentUser().id
val subscriberId = UUID.randomUUID().toString()
logger.trace { "Subscribing $subscriberId (user: $currentUserId)" }
return notificationsFlow
.filter { message ->
message.userId == null || message.userId == currentUserId
}
.onEach { message ->
logger.trace { "Received $message in $subscriberId" }
}
}
}

suspend fun sendPushNotification(
eventName: String,
userId: Long? = null,
data: Any? = null
) {
pushNotificationsBus.send(BroadcastNotificationCommand(PushNotificationMessage(eventName, userId, data)))
notificationsFlow.emit(PushNotificationMessage(eventName, userId, data))
}

/**
* An actor responsible for managing subscribers and delivering push events to the subscribers
*/
private companion object PushMessagesOrchestrator {
private val subscribers = mutableListOf<PushNotificationsSubscriber>()

private val pushNotificationsBus = GlobalScope.actor<OrchestratorCommand> {
consumeEach { command ->
when (command) {
is AddSubscriberCommand -> addSubscriber(command)
is RemoveSubscriberCommand -> removeSubscriber(command)
is BroadcastNotificationCommand -> broadcastNotification(command)
}
}
}

private fun addSubscriber(command: AddSubscriberCommand) {
val subscriber = command.subscriber
subscribers.add(command.subscriber)
logger.debug { "Registered new subscriber ${subscriber.id} (${subscriber.userId})" }
}

private fun removeSubscriber(command: RemoveSubscriberCommand) {
val subscriber = command.subscriber
logger.debug { "Removing subscriber ${subscriber.id}" }

subscriber.listeningChannel.close()
subscribers.remove(subscriber)
}

private suspend fun broadcastNotification(command: BroadcastNotificationCommand) {
val message = command.message

logger.debug { "Received broadcast request for $message" }

subscribers.removeIf { it.listeningChannel.isClosedForSend }

subscribers.asSequence()
.filter { subscriber ->
message.userId == null || message.userId == subscriber.userId
}
.forEach { subscriber ->
logger.debug { "Sending message to ${subscriber.id}" }

subscriber.listeningChannel.send(message)
}

logger.debug { "All subscribers have been notified" }
}
}
suspend fun getActiveSubscribersCount() = notificationsFlow.subscriptionCount.first()
}

data class PushNotificationMessage(
val eventName: String,
val userId: Long?,
val data: Any?
)

/**
* A subscriber for push notifications, providing a [listeningChannel] to retrieve events from.
* The channel is conflated, so that broadcasting is not suspended and all subscribers have a chance
* to react to the message.
*/
private class PushNotificationsSubscriber(val userId: Long) {

val listeningChannel = Channel<PushNotificationMessage>(capacity = Channel.CONFLATED)

val id = UUID.randomUUID().toString()

override fun hashCode(): Int = id.hashCode()

override fun equals(other: Any?): Boolean =
other is PushNotificationsSubscriber && other.id == this.id
}

private sealed class OrchestratorCommand

private class AddSubscriberCommand(val subscriber: PushNotificationsSubscriber) : OrchestratorCommand()
private class RemoveSubscriberCommand(val subscriber: PushNotificationsSubscriber) : OrchestratorCommand()
private class BroadcastNotificationCommand(val message: PushNotificationMessage) : OrchestratorCommand()
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import io.orangebuffalo.simpleaccounting.services.integration.oauth2.impl.Client
import io.orangebuffalo.simpleaccounting.services.integration.oauth2.impl.PersistentOAuth2AuthorizedClient
import io.orangebuffalo.simpleaccounting.services.integration.withDbContext
import io.orangebuffalo.simpleaccounting.services.persistence.entities.PlatformUser
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingle
import org.springframework.context.ApplicationEventPublisher
import org.springframework.security.oauth2.client.endpoint.OAuth2AuthorizationCodeGrantRequest
import org.springframework.security.oauth2.client.endpoint.ReactiveOAuth2AccessTokenResponseClient
Expand Down Expand Up @@ -157,7 +157,7 @@ class OAuth2ClientAuthorizationProvider(
)

val tokenResponse = try {
accessTokenResponseClient.getTokenResponse(codeGrantRequest).awaitFirst()
accessTokenResponseClient.getTokenResponse(codeGrantRequest).awaitSingle()
} catch (e: OAuth2AuthorizationException) {
publishFailedAuthEvent(savedRequest)
throw e
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.orangebuffalo.simpleaccounting.services.security

import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactor.mono
import org.springframework.security.authentication.AbstractAuthenticationToken
import org.springframework.security.core.Authentication
Expand Down Expand Up @@ -45,7 +45,7 @@ suspend fun <T : Any?> runAs(principal: SpringSecurityPrincipal, block: suspend
block()
}.subscriberContext(
ReactiveSecurityContextHolder.withAuthentication(ProgrammaticAuthentication(principal))
).awaitFirst()
).awaitSingle()
}

class ProgrammaticAuthentication(val user: SpringSecurityPrincipal) : AbstractAuthenticationToken(user.authorities) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import io.orangebuffalo.simpleaccounting.services.security.jwt.JwtService
import io.orangebuffalo.simpleaccounting.services.security.remeberme.RefreshAuthenticationToken
import io.orangebuffalo.simpleaccounting.services.security.remeberme.RefreshTokenService
import io.orangebuffalo.simpleaccounting.services.security.remeberme.TOKEN_LIFETIME_IN_DAYS
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitSingle
import org.springframework.http.HttpHeaders
import org.springframework.http.ResponseCookie
import org.springframework.http.ResponseEntity
import org.springframework.security.authentication.*
import org.springframework.security.authentication.BadCredentialsException
import org.springframework.security.authentication.InsufficientAuthenticationException
import org.springframework.security.authentication.ReactiveAuthenticationManager
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken
import org.springframework.security.core.Authentication
import org.springframework.web.bind.annotation.*
import java.time.Duration
Expand All @@ -30,7 +33,7 @@ class AuthenticationApiController(
@PostMapping("login")
suspend fun login(@Valid @RequestBody loginRequest: LoginRequest): ResponseEntity<TokenResponse> {
val authenticationToken = UsernamePasswordAuthenticationToken(loginRequest.userName, loginRequest.password)
val authentication = authenticationManager.authenticate(authenticationToken).awaitFirst()
val authentication = authenticationManager.authenticate(authenticationToken).awaitSingle()
val principal = authentication.principal as SecurityPrincipal
val jwtToken = jwtService.buildJwtToken(principal)

Expand Down Expand Up @@ -70,7 +73,7 @@ class AuthenticationApiController(
RefreshAuthenticationToken(
refreshToken
)
authenticationManager.authenticate(authenticationToken).awaitFirst()
authenticationManager.authenticate(authenticationToken).awaitSingle()
}
else -> throw InsufficientAuthenticationException("Not authenticated")
}
Expand Down
21 changes: 12 additions & 9 deletions backend/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ spring:
- "https://www.googleapis.com/auth/drive.file"
redirect-uri: "https://simple-accounting.orange-buffalo.io/oauth-callback"

resources:
add-mappings: false

jpa:
properties:
hibernate.create_empty_composites.enabled: true
web:
resources:
add-mappings: false

server:
port: 9393
Expand All @@ -42,8 +42,6 @@ simpleaccounting:
---

spring:
profiles: development

datasource:
url: jdbc:h2:./local-dev/dev
password: killallhumans
Expand All @@ -55,6 +53,9 @@ spring:
registration:
google-drive:
redirect-uri: "http://localhost:9393/oauth-callback"
config:
activate:
on-profile: development

simpleaccounting:
documents:
Expand All @@ -65,8 +66,6 @@ simpleaccounting:
---

spring:
profiles: ci-tests

datasource:
url: jdbc:h2:mem:ci-tests

Expand All @@ -78,12 +77,13 @@ spring:
client-id: "noop"
client-secret: "noop"
redirect-uri: "http://ci-tests/noop"
config:
activate:
on-profile: ci-tests

---

spring:
profiles: load-tests

datasource:
url: jdbc:h2:/tmp/sa-load-tests

Expand All @@ -95,3 +95,6 @@ spring:
client-id: "noop"
client-secret: "noop"
redirect-uri: "http://ci-tests/noop"
config:
activate:
on-profile: load-tests