-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathRestConnector.scala
95 lines (81 loc) · 3.59 KB
/
RestConnector.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package io.cloudstate.connector
import java.util.concurrent.TimeUnit
import java.util.logging.Logger
import com.twitter.finagle.http.{Method, Request, Response}
import com.twitter.finagle.service.{Backoff, RetryBudget}
import com.twitter.finagle.{Http, Service, http}
import com.twitter.util._
import io.cloudstate.connector.Connector.{RestRequest, RestResponse}
import io.cloudstate.javasupport.EntityId
import io.cloudstate.javasupport.crdt._
import com.twitter.conversions.DurationOps._
import scala.util.{Failure, Success, Try}
/**
* CRDT ( Conflict free replicate data type) entity that we use as Rest connector.
* Using this type of entity we can replicate the state between others replicas in the cluster.
*
* We use this entity as a StatefulService that can be deployed in the same cluster as the +
* main program, and this program can invoke the connector though the DSL created in the
* proto file using gRPC and by localhost/port which is pretty much the same as if is in the
* same JVM. With the great advantage that we can deploy independently this connector without affect
* the other service.
*/
@CrdtEntity
class RestConnector(ctx: CrdtCreationContext, @EntityId val entityId: String) {
private val logger: Logger = Logger.getLogger(classOf[RestConnector].getName)
private val endpoints: ORMap[String, LWWRegister[RestRequest]] = ctx.newORMap()
private val sharedEndpoints: LWWRegisterMap[String, RestRequest] = new LWWRegisterMap(endpoints)
private var services: Map[String, Service[Request, Response]] = Map()
private val budget: RetryBudget = RetryBudget(
ttl = 10.seconds,
minRetriesPerSec = 5,
percentCanRetry = 0.1
)
@CommandHandler
def makeRequest(requestCommand: RestRequest): RestResponse = {
Try {
val endpoint = requestCommand.getHost + requestCommand.getUri
logger.info("Request to Connector:" + endpoint)
val request = http.Request(getMethod(requestCommand), requestCommand.getUri)
request.host = requestCommand.getHost
services.get(endpoint) match {
case Some(service) =>
getBodyResponse(service(request))
case None =>
logger.info("Adding new service in CRDT for uri:" + endpoint)
sharedEndpoints.put(endpoint, requestCommand)
val service: Service[Request, Response] = createFinagleService(requestCommand)
services += endpoint -> service
getBodyResponse(service(request))
}
}
match {
case Success(responseBody) => RestResponse.newBuilder.setResponse(responseBody).build();
case Failure(exception) =>
logger.info(s"Error in Rest connector. Caused by:$exception")
RestResponse.newBuilder.setResponse(exception.getMessage).build();
}
}
/**
* Function to create Finagle service with some retry strategy configuration
*/
private def createFinagleService: RestRequest => Service[Request, Response] = {
requestCommand =>
Http.client
.withRetryBudget(budget)
.withRetryBackoff(Backoff.const(10.seconds))
.newService(s"${requestCommand.getHost}:${requestCommand.getPort}")
}
private def getBodyResponse: Future[Response] => String = {
responseFuture => Await.result(responseFuture, Duration(10, TimeUnit.SECONDS)).getContentString()
}
private def getMethod: RestRequest => Method = {
requestCommand =>
requestCommand.getMethod match {
case Connector.Method.GET => http.Method.Get
case Connector.Method.POST => http.Method.Post
case Connector.Method.PUT => http.Method.Put
case Connector.Method.DELETE => http.Method.Delete
}
}
}