-
Notifications
You must be signed in to change notification settings - Fork 0
/
ModelServerFluent.scala
129 lines (107 loc) · 5 KB
/
ModelServerFluent.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package com.lightbend.scala.standard.modelserver.scala
import java.util.{HashMap, Properties}
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.lightbend.java.configuration.kafka.ApplicationKafkaParameters
import com.lightbend.kafka.scala.streams.StreamsBuilderS
import com.lightbend.scala.modelServer.model.{DataRecord, ModelToServe, ModelWithDescriptor}
import com.lightbend.scala.standard.modelserver.scala.queriablestate.QueriesResource
import com.lightbend.scala.standard.modelserver.scala.store.ModelStateSerde
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.state.Stores
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import scala.concurrent.duration._
object ModelServerFluent {
private val port = 8888 // Port for queryable state
import ApplicationKafkaParameters._
def main(args: Array[String]): Unit = {
System.out.println("Using kafka brokers at " + KAFKA_BROKER)
val streamsConfiguration = new Properties
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
// against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-model-server")
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, DATA_GROUP)
// Where to find Kafka broker(s).
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER)
// Provide the details of our embedded http service that we'll use to connect to this streams
// instance and discover locations of stores.
streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "127.0.0.1:" + port)
// Default serdes
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass)
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass)
// Add a topic config by prefixing with topic
// streamsConfiguration.put(StreamsConfig.topicPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest")
// Create topology
val streams = createStreams(streamsConfiguration)
// Set Stream exception handler
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
System.out.println("Uncaught exception on thread " + t + " " + e.toString)
}
})
// Start streams
streams.start()
// Start the Restful proxy for servicing remote access to state stores
startRestProxy(streams, port)
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
sys.addShutdownHook{
try {
streams.close()
} catch {
case e: Exception =>
// ignored
}
}
}
private def createStreams(streamsConfiguration: Properties) : KafkaStreams = { // Create topology
// Store definition
val logConfig = new HashMap[String, String]
val storeSupplier = Stores.inMemoryKeyValueStore(STORE_NAME)
val storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.Integer, new ModelStateSerde).withLoggingEnabled(logConfig)
// Create Stream builder
val builder = new StreamsBuilderS
// Data input streams
val data = builder.stream[Array[Byte], Array[Byte]](DATA_TOPIC)
val models = builder.stream[Array[Byte], Array[Byte]](MODELS_TOPIC)
// DataStore
builder.addStateStore(storeBuilder)
// Data Processor
data
.mapValues(value => DataRecord.fromByteArray(value))
.filter((key, value) => (value.isSuccess))
.transform(() => new DataProcessor, STORE_NAME)
.mapValues(value => {
if(value.processed) println(s"Calculated quality - ${value.result} calculated in ${value.duration} ms")
else println("No model available - skipping")
value
})
//Models Processor
models
.mapValues(value => ModelToServe.fromByteArray(value))
.filter((key, value) => (value.isSuccess))
.mapValues(value => ModelWithDescriptor.fromModelToServe(value.get))
.filter((key, value) => (value.isSuccess))
.process(() => new ModelProcessor, STORE_NAME)
// Create and build topology
val topology = builder.build
println(topology.describe)
return new KafkaStreams(topology, streamsConfiguration)
}
private def startRestProxy(streams: KafkaStreams, port: Int) = {
implicit val system = ActorSystem("ModelServing")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
implicit val timeout = Timeout(10.seconds)
val host = "127.0.0.1"
val port = 8888
val routes: Route = QueriesResource.storeRoutes(streams, port)
Http().bindAndHandle(routes, host, port) map
{ binding => println(s"Starting models observer on port ${binding.localAddress}") } recover {
case ex =>
println(s"Models observer could not bind to $host:$port", ex.getMessage)
}
}
}