Skip to content

Commit

Permalink
Netty: ApplicationStarted event is fired before the server starts acc…
Browse files Browse the repository at this point in the history
…epting connections (#3286)

KTOR-4259 Add ServerReady event, which is fired when an engine is ready to accept connections
  • Loading branch information
Stexxe committed Dec 7, 2022
1 parent eb86355 commit 04a9791
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 6 deletions.
Expand Up @@ -4,6 +4,7 @@

package io.ktor.server.cio

import io.ktor.events.*
import io.ktor.http.cio.*
import io.ktor.server.application.*
import io.ktor.server.cio.backend.*
Expand Down Expand Up @@ -57,6 +58,7 @@ public class CIOApplicationEngine(

runBlocking {
startupJob.await()
environment.monitor.raiseCatching(ServerReady, environment, environment.log)

if (wait) {
serverJob.join()
Expand Down
1 change: 1 addition & 0 deletions ktor-server/ktor-server-core/api/ktor-server-core.api
Expand Up @@ -107,6 +107,7 @@ public final class io/ktor/server/application/DefaultApplicationEventsKt {
public static final fun getApplicationStopPreparing ()Lio/ktor/events/EventDefinition;
public static final fun getApplicationStopped ()Lio/ktor/events/EventDefinition;
public static final fun getApplicationStopping ()Lio/ktor/events/EventDefinition;
public static final fun getServerReady ()Lio/ktor/events/EventDefinition;
}

public class io/ktor/server/application/DuplicateApplicationPluginException : java/lang/Exception {
Expand Down
Expand Up @@ -22,6 +22,11 @@ public val ApplicationStarting: EventDefinition<Application> = EventDefinition()
*/
public val ApplicationStarted: EventDefinition<Application> = EventDefinition()

/**
* Fired when the server is ready to accept connections
*/
public val ServerReady: EventDefinition<ApplicationEnvironment> = EventDefinition()

/**
* Event definition for an event that is fired when the application is going to stop
*/
Expand Down
Expand Up @@ -202,11 +202,7 @@ public class ApplicationEngineEnvironmentReloading(
}

private fun safeRiseEvent(event: EventDefinition<Application>, application: Application) {
try {
monitor.raise(event, application)
} catch (cause: Throwable) {
log.error("One or more of the handlers thrown an exception", cause)
}
monitor.raiseCatching(event, application)
}

private fun destroyApplication() {
Expand Down
Expand Up @@ -4,6 +4,7 @@

package io.ktor.server.jetty

import io.ktor.events.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import kotlinx.coroutines.*
Expand Down Expand Up @@ -54,6 +55,8 @@ public open class JettyApplicationEngineBase(
.map { it.second.withPort((it.first as ServerConnector).localPort) }
resolvedConnectors.complete(connectors)

environment.monitor.raiseCatching(ServerReady, environment, environment.log)

if (wait) {
server.join()
stop(1, 5, TimeUnit.SECONDS)
Expand Down
Expand Up @@ -4,6 +4,7 @@

package io.ktor.server.netty

import io.ktor.events.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.util.network.*
Expand Down Expand Up @@ -222,6 +223,8 @@ public class NettyApplicationEngine(
throw cause
}

environment.monitor.raiseCatching(ServerReady, environment, environment.log)

cancellationDeferred = stopServerOnCancellation()

if (wait) {
Expand Down Expand Up @@ -310,7 +313,7 @@ public class EventLoopGroupProxy(
private fun markParkingProhibited() {
try {
prohibitParkingFunction?.invoke(null)
} catch (cause: Throwable) {
} catch (_: Throwable) {
}
}
}
Expand Down
Expand Up @@ -4,7 +4,15 @@

package io.ktor.server.testing.suites

import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.test.dispatcher.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Assert.*
Expand Down Expand Up @@ -36,4 +44,40 @@ abstract class ConnectionTestSuite(val engine: ApplicationEngineFactory<*, *>) {
assertFalse(addresses.any { it.port == 0 })
server.stop(50, 1000)
}

@OptIn(DelicateCoroutinesApi::class)
@Test
fun testServerReadyEvent() = runBlocking {
val serverStarted = CompletableDeferred<Unit>()
val serverPort = withContext(Dispatchers.IO) { ServerSocket(0).use { it.localPort } }
val env = applicationEngineEnvironment {
connector { port = serverPort }

module {
routing {
get("/") {
call.respond(HttpStatusCode.OK)
}
}
}
}

val server = embeddedServer(engine, env)

server.environment.monitor.subscribe(ServerReady) {
serverStarted.complete(Unit)
}

GlobalScope.launch {
server.start(true)
}

withTimeout(5000) {
serverStarted.join()
val response = HttpClient(CIO).get("http://127.0.0.1:$serverPort/")
assertEquals(HttpStatusCode.OK, response.status)
}

server.stop(50, 100)
}
}
Expand Up @@ -4,6 +4,7 @@

package io.ktor.server.tomcat

import io.ktor.events.*
import io.ktor.http.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
Expand Down Expand Up @@ -147,6 +148,7 @@ public class TomcatApplicationEngine(
val connectors = server.service.findConnectors().zip(environment.connectors)
.map { it.second.withPort(it.first.localPort) }
resolvedConnectors.complete(connectors)
environment.monitor.raiseCatching(ServerReady, environment, environment.log)

cancellationDeferred = stopServerOnCancellation()
if (wait) {
Expand Down
5 changes: 5 additions & 0 deletions ktor-shared/ktor-events/api/ktor-events.api
Expand Up @@ -9,3 +9,8 @@ public final class io/ktor/events/Events {
public final fun unsubscribe (Lio/ktor/events/EventDefinition;Lkotlin/jvm/functions/Function1;)V
}

public final class io/ktor/events/EventsKt {
public static final fun raiseCatching (Lio/ktor/events/Events;Lio/ktor/events/EventDefinition;Ljava/lang/Object;Lorg/slf4j/Logger;)V
public static synthetic fun raiseCatching$default (Lio/ktor/events/Events;Lio/ktor/events/EventDefinition;Ljava/lang/Object;Lorg/slf4j/Logger;ILjava/lang/Object;)V
}

12 changes: 12 additions & 0 deletions ktor-shared/ktor-events/common/src/io/ktor/events/Events.kt
Expand Up @@ -6,6 +6,7 @@ package io.ktor.events

import io.ktor.util.*
import io.ktor.util.collections.*
import io.ktor.util.logging.*
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*

Expand Down Expand Up @@ -61,6 +62,17 @@ public class Events {
}
}

/**
* Raises an event the same way as [Events.raise] but catches an exception and logs it if the [logger] is provided
*/
public fun <T> Events.raiseCatching(definition: EventDefinition<T>, value: T, logger: Logger? = null) {
try {
raise(definition, value)
} catch (cause: Throwable) {
logger?.error("Some handlers have thrown an exception", cause)
}
}

/**
* Specifies signature for the event handler
*/
Expand Down

0 comments on commit 04a9791

Please sign in to comment.