Skip to content

Commit 18706c7

Browse files
committed
Allowing app to run in different modes
1 parent 2312416 commit 18706c7

File tree

15 files changed

+690
-129
lines changed

15 files changed

+690
-129
lines changed

Modes.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# App-modes
2+
Anshar now supports being started in different _modes_ to allow separate instances for each datatype - both for reducing the necessary data-synchronization between instances, and making it possible to scale parts of the application separately.
3+
4+
Possible modes are PROXY, DATA_ET, DATA_VM, DATA_SX (from enum `AppMode`)
5+
6+
Recommended (i.e. tested) usage is either all modes in one application, or separate instances for each mode.
7+
8+
## PROXY
9+
- Subscription-management.
10+
- Starts/stops subscriptions
11+
- Monitors heartbeats/deliveries from external data-producers, restarts subscriptions when necessary
12+
- Fetches data from Polling/Fetched delivery-subscriptions
13+
- Receives all incoming SIRI-data
14+
- Applies mapping-rules (including lists of unmapped ids)
15+
- Manages profile-validator
16+
- Publishes processed data to pubsub-topic
17+
- Forwards client-requests to correct service based on the data-type requested.
18+
- Forwards SubscriptionRequests from external clients to correct service
19+
- Does not keep/hold any actual data
20+
- Requires config of baseUrls to data-handlers
21+
- `anshar.data.handler.baseurl.et`, `-.vm`, `-.sx`
22+
23+
## DATA_ET, DATA_VM, DATA_SX
24+
- Each mode handles its own datatype
25+
- Reads processed data from related pubsub-topic (as published from PROXY)
26+
- Adds updated objects to internal maps (holds all data in memory)
27+
- Publishes data to
28+
- External subscribers
29+
- Pubsub
30+
- Kafka

src/main/java/no/rutebanken/anshar/config/AnsharConfiguration.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ public class AnsharConfiguration {
114114
@Value("${anshar.admin.blocked.clients:}")
115115
private List<String> blockedEtClientNames;
116116

117+
@Value("${anshar.application.mode:}")
118+
private List<AppMode> appModes;
119+
117120
public String getHazelcastManagementUrl() {
118121
return hazelcastManagementUrl;
119122
}
@@ -229,4 +232,23 @@ public String getMessageQueueCamelRoutePrefix() {
229232
public List<String> getBlockedEtClientNames() {
230233
return blockedEtClientNames;
231234
}
235+
236+
public List<AppMode> getAppModes() {
237+
return appModes;
238+
}
239+
public boolean processET() {
240+
return (appModes.isEmpty() || appModes.contains(AppMode.DATA_ET));
241+
}
242+
public boolean processVM() {
243+
return (appModes.isEmpty() || appModes.contains(AppMode.DATA_VM));
244+
}
245+
public boolean processSX() {
246+
return (appModes.isEmpty() || appModes.contains(AppMode.DATA_SX));
247+
}
248+
public boolean processAdmin() {
249+
return (appModes.isEmpty() || appModes.contains(AppMode.PROXY));
250+
}
251+
public boolean processData() {
252+
return (appModes.isEmpty() || ((appModes.contains(AppMode.DATA_ET) | appModes.contains(AppMode.DATA_VM) | appModes.contains(AppMode.DATA_SX))));
253+
}
232254
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package no.rutebanken.anshar.config;
2+
3+
public enum AppMode {
4+
PROXY, DATA_ET, DATA_VM, DATA_SX
5+
}

src/main/java/no/rutebanken/anshar/routes/RestRouteBuilder.java

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
import org.apache.camel.InvalidPayloadException;
2121
import org.apache.camel.LoggingLevel;
2222
import org.apache.camel.builder.RouteBuilder;
23+
import org.apache.camel.support.builder.Namespaces;
2324
import org.apache.http.HttpHeaders;
2425
import org.entur.protobuf.mapper.SiriMapper;
2526
import org.rutebanken.siri20.util.SiriJson;
2627
import org.rutebanken.siri20.util.SiriXml;
2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
2930
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.beans.factory.annotation.Value;
3032
import uk.org.siri.siri20.Siri;
3133

3234
import javax.servlet.http.HttpServletResponse;
@@ -44,9 +46,24 @@ public class RestRouteBuilder extends RouteBuilder {
4446

4547
private final Logger logger = LoggerFactory.getLogger(this.getClass());
4648

49+
protected Namespaces ns = new Namespaces("siri", "http://www.siri.org.uk/siri")
50+
.add("xsd", "http://www.w3.org/2001/XMLSchema");
51+
52+
53+
@Value("${anshar.data.handler.baseurl.vm:}")
54+
protected String vmHandlerBaseUrl;
55+
56+
@Value("${anshar.data.handler.baseurl.et:}")
57+
protected String etHandlerBaseUrl;
58+
59+
@Value("${anshar.data.handler.baseurl.sx:}")
60+
protected String sxHandlerBaseUrl;
61+
4762
@Autowired
4863
private AnsharConfiguration configuration;
4964

65+
private static boolean isDataHandlersInitialized = false;
66+
5067
@Override
5168
public void configure() throws Exception {
5269

@@ -75,6 +92,11 @@ public void configure() throws Exception {
7592
.loggingLevel(LoggingLevel.INFO)
7693
);
7794

95+
if (!isDataHandlersInitialized) {
96+
isDataHandlersInitialized=true;
97+
createClientRequestRoutes();
98+
}
99+
78100
from("direct:anshar.invalid.tracking.header.response")
79101
.removeHeaders("*")
80102
.setHeader(Exchange.HTTP_RESPONSE_CODE, constant("400")) //400 Bad request
@@ -90,6 +112,144 @@ public void configure() throws Exception {
90112
;
91113

92114
}
115+
116+
/*
117+
* Creates routes to handle routing of incoming requests based on the mode the instance is started with
118+
*
119+
* PROXY redirects requests to et/vm/sx-instances
120+
*/
121+
protected void createClientRequestRoutes() {
122+
123+
if (configuration.processET()) {
124+
from("direct:process.et.subscription.request")
125+
.to("direct:internal.handle.subscription")
126+
;
127+
from("direct:process.et.service.request")
128+
.to("direct:internal.process.service.request")
129+
;
130+
from("direct:process.et.service.request.cache")
131+
.to("direct:internal.process.service.request.cache")
132+
;
133+
//REST
134+
from("direct:anshar.rest.et")
135+
.to("direct:internal.anshar.rest.et")
136+
;
137+
from("direct:anshar.rest.et.cached")
138+
.to("direct:internal.anshar.rest.et.cached")
139+
;
140+
from("direct:anshar.rest.et.monitored")
141+
.to("direct:internal.anshar.rest.et.monitored")
142+
;
143+
from("direct:anshar.rest.et.monitored.cached")
144+
.to("direct:internal.anshar.rest.et.monitored.cached")
145+
;
146+
} else {
147+
from("direct:process.et.subscription.request")
148+
.to("direct:redirect.request.et")
149+
;
150+
from("direct:process.et.service.request")
151+
.to("direct:redirect.request.et")
152+
;
153+
from("direct:process.et.service.request.cache")
154+
.to("direct:redirect.request.et")
155+
;
156+
//REST
157+
from("direct:anshar.rest.et")
158+
.to("direct:redirect.request.et")
159+
;
160+
from("direct:anshar.rest.et.cached")
161+
.to("direct:redirect.request.et")
162+
;
163+
from("direct:anshar.rest.et.monitored")
164+
.to("direct:internal.anshar.rest.et.monitored")
165+
;
166+
from("direct:anshar.rest.et.monitored.cached")
167+
.to("direct:redirect.request.et")
168+
;
169+
from("direct:redirect.request.et")
170+
.to(etHandlerBaseUrl + "${header.CamelHttpUri}?bridgeEndpoint=true")
171+
;
172+
}
173+
174+
if (configuration.processVM()) {
175+
from("direct:process.vm.subscription.request")
176+
.to("direct:internal.handle.subscription")
177+
;
178+
from("direct:process.vm.service.request")
179+
.to("direct:internal.process.service.request")
180+
;
181+
from("direct:process.vm.service.request.cache")
182+
.to("direct:internal.process.service.request.cache")
183+
;
184+
//REST
185+
from("direct:anshar.rest.vm")
186+
.to("direct:internal.anshar.rest.vm")
187+
;
188+
from("direct:anshar.rest.vm.cached")
189+
.to("direct:internal.anshar.rest.vm.cached")
190+
;
191+
192+
} else {
193+
from("direct:process.vm.subscription.request")
194+
.to("direct:redirect.request.vm")
195+
;
196+
from("direct:process.vm.service.request")
197+
.to("direct:redirect.request.vm")
198+
;
199+
from("direct:process.vm.service.request.cache")
200+
.to("direct:redirect.request.vm")
201+
;
202+
from("direct:anshar.rest.vm")
203+
.to("direct:redirect.request.vm")
204+
;
205+
from("direct:anshar.rest.vm.cached")
206+
.to("direct:redirect.request.vm")
207+
;
208+
from("direct:redirect.request.vm")
209+
.to(vmHandlerBaseUrl + "${header.CamelHttpUri}?bridgeEndpoint=true")
210+
;
211+
}
212+
213+
if (configuration.processSX()) {
214+
from("direct:process.sx.subscription.request")
215+
.to("direct:internal.handle.subscription")
216+
;
217+
from("direct:process.sx.service.request")
218+
.to("direct:internal.process.service.request")
219+
;
220+
from("direct:process.sx.service.request.cache")
221+
.to("direct:internal.process.service.request.cache")
222+
;
223+
224+
//REST
225+
from("direct:anshar.rest.sx")
226+
.to("direct:internal.anshar.rest.sx")
227+
;
228+
from("direct:anshar.rest.sx.cached")
229+
.to("direct:internal.anshar.rest.sx.cached")
230+
;
231+
} else {
232+
from("direct:process.sx.subscription.request")
233+
.to("direct:redirect.request.sx")
234+
;
235+
from("direct:process.sx.service.request")
236+
.to("direct:redirect.request.sx")
237+
;
238+
from("direct:process.sx.service.request.cache")
239+
.to("direct:redirect.request.sx")
240+
;
241+
from("direct:anshar.rest.sx")
242+
.to("direct:redirect.request.sx")
243+
;
244+
from("direct:anshar.rest.sx.cached")
245+
.to("direct:redirect.request.sx")
246+
;
247+
from("direct:redirect.request.sx")
248+
.to(sxHandlerBaseUrl + "${header.CamelHttpUri}?bridgeEndpoint=true")
249+
;
250+
}
251+
}
252+
93253
protected boolean isTrackingHeaderAcceptable(Exchange e) {
94254
String camelHttpMethod = (String) e.getIn().getHeader("CamelHttpMethod");
95255

0 commit comments

Comments
 (0)