1
1
import time
2
2
import socket
3
+ from concurrent .futures import ThreadPoolExecutor
3
4
from urllib .parse import urlparse , urlunparse
4
5
5
6
from textwrap import dedent
6
7
from tornado import gen
8
+ from tornado .concurrent import run_on_executor
7
9
from traitlets import Any , Integer , List , Unicode , default
8
10
9
11
from marathon import MarathonClient
10
12
from marathon .models .app import MarathonApp , MarathonHealthCheck
11
13
from marathon .models .container import MarathonContainerPortMapping , \
12
14
MarathonContainer , MarathonContainerVolume , MarathonDockerContainer
13
15
from marathon .models .constraint import MarathonConstraint
16
+ from marathon .exceptions import NotFoundError
14
17
from jupyterhub .spawner import Spawner
15
18
16
19
from .volumenaming import default_format_volume_name
@@ -88,18 +91,24 @@ class MarathonSpawner(Spawner):
88
91
def _get_default_format_volume_name (self ):
89
92
return default_format_volume_name
90
93
94
+ _executor = None
95
+ @property
96
+ def executor (self ):
97
+ cls = self .__class__
98
+ if cls ._executor is None :
99
+ cls ._executor = ThreadPoolExecutor (1 )
100
+ return cls ._executor
101
+
91
102
def __init__ (self , * args , ** kwargs ):
92
- super ().__init__ (* args , ** kwargs )
93
- if self .mem_limit is None :
94
- self .mem_limit = '1G'
103
+ super (MarathonSpawner , self ).__init__ (* args , ** kwargs )
95
104
self .marathon = MarathonClient (self .marathon_host )
96
105
97
106
@property
98
107
def container_name (self ):
99
108
return '/%s/%s' % (self .app_prefix , self .user .name )
100
109
101
110
def get_state (self ):
102
- state = super ().get_state ()
111
+ state = super (MarathonSpawner , self ).get_state ()
103
112
state ['container_name' ] = self .container_name
104
113
return state
105
114
@@ -116,8 +125,7 @@ def get_health_checks(self):
116
125
interval_seconds = 60 ,
117
126
timeout_seconds = 20 ,
118
127
max_consecutive_failures = 0
119
- )
120
- )
128
+ ))
121
129
return health_checks
122
130
123
131
def get_volumes (self ):
@@ -148,12 +156,36 @@ def get_constraints(self):
148
156
for c in self .marathon_constraints :
149
157
constraints .append (MarathonConstraint .from_json (c ))
150
158
151
- def get_ip_and_port (self ):
152
- app = self .marathon .get_app (self .container_name , embed_tasks = True )
153
- assert len (app .tasks ) == 1
159
+ @run_on_executor
160
+ def get_deployment (self , deployment_id ):
161
+ deployments = self .marathon .list_deployments ()
162
+ for d in deployments :
163
+ if d .id == deployment_id :
164
+ return d
165
+ return None
166
+
167
+ @run_on_executor
168
+ def get_deployment_for_app (self , app_name ):
169
+ deployments = self .marathon .list_deployments ()
170
+ for d in deployments :
171
+ if app_name in d .affected_apps :
172
+ return d
173
+ return None
154
174
155
- ip = socket .gethostbyname (app .tasks [0 ].host )
156
- return (ip , app .tasks [0 ].ports [0 ])
175
+ def get_ip_and_port (self , app_info ):
176
+ assert len (app_info .tasks ) == 1
177
+ ip = socket .gethostbyname (app_info .tasks [0 ].host )
178
+ return (ip , app_info .tasks [0 ].ports [0 ])
179
+
180
+ @run_on_executor
181
+ def get_app_info (self , app_name ):
182
+ try :
183
+ app = self .marathon .get_app (app_name , embed_tasks = True )
184
+ except NotFoundError :
185
+ self .log .info ("The %s application has not been started yet" , app_name )
186
+ return None
187
+ else :
188
+ return app
157
189
158
190
def _public_hub_api_url (self ):
159
191
uri = urlparse (self .hub .api_url )
@@ -166,8 +198,7 @@ def _public_hub_api_url(self):
166
198
uri .params ,
167
199
uri .query ,
168
200
uri .fragment
169
- )
170
- )
201
+ ))
171
202
172
203
def get_env (self ):
173
204
env = super (MarathonSpawner , self ).get_env ()
@@ -202,47 +233,61 @@ def start(self):
202
233
volumes = self .get_volumes ())
203
234
204
235
# the memory request in marathon is in MiB
205
- mem_request = self .mem_limit / 1024.0 / 1024.0
236
+ if hasattr (self , 'mem_limit' ) and self .mem_limit is not None :
237
+ mem_request = self .mem_limit / 1024.0 / 1024.0
238
+ else :
239
+ mem_request = 1024.0
240
+
206
241
app_request = MarathonApp (
207
- id = self .container_name ,
208
- env = self .get_env (),
209
- cpus = self .cpu_limit ,
210
- mem = mem_request ,
211
- container = app_container ,
212
- constraints = self .get_constraints (),
213
- health_checks = self .get_health_checks (),
214
- instances = 1
242
+ id = self .container_name ,
243
+ env = self .get_env (),
244
+ cpus = self .cpu_limit ,
245
+ mem = mem_request ,
246
+ container = app_container ,
247
+ constraints = self .get_constraints (),
248
+ health_checks = self .get_health_checks (),
249
+ instances = 1
215
250
)
216
251
217
- try :
218
- app = self .marathon .create_app (self .container_name , app_request )
219
- if app is False :
220
- return None
221
- except :
252
+ app = self .marathon .create_app (self .container_name , app_request )
253
+ if app is False or app .deployments is None :
254
+ self .log .error ("Failed to create application for %s" , self .container_name )
222
255
return None
223
256
224
- for i in range (self .start_timeout ):
225
- running = yield self .poll ()
226
- if running is None :
227
- ip , port = self .get_ip_and_port ()
228
- self .user .server .ip = ip
229
- self .user .server .port = port
230
- return (ip , port )
231
- time .sleep (1 )
232
- return None
257
+ while True :
258
+ app_info = yield self .get_app_info (self .container_name )
259
+ if app_info and app_info .tasks_healthy == 1 :
260
+ ip , port = self .get_ip_and_port (app_info )
261
+ break
262
+ yield gen .sleep (1 )
263
+ return (ip , port )
233
264
234
265
@gen .coroutine
235
266
def stop (self , now = False ):
236
- self .marathon .delete_app (self .container_name )
237
- return
267
+ try :
268
+ status = self .marathon .delete_app (self .container_name )
269
+ except :
270
+ self .log .error ("Could not delete application %s" , self .container_name )
271
+ raise
272
+ else :
273
+ if not now :
274
+ while True :
275
+ deployment = yield self .get_deployment (status ['deploymentId' ])
276
+ if deployment is None :
277
+ break
278
+ yield gen .sleep (1 )
238
279
239
280
@gen .coroutine
240
281
def poll (self ):
241
- try :
242
- app = self .marathon .get_app (self .container_name )
243
- except Exception as e :
244
- return ""
245
- else :
246
- if app .tasks_healthy == 1 :
247
- return None
248
- return ""
282
+ deployment = yield self .get_deployment_for_app (self .container_name )
283
+ if deployment :
284
+ for current_action in deployment .current_actions :
285
+ if current_action .action == 'StopApplication' :
286
+ self .log .error ("Application %s is shutting down" , self .container_name )
287
+ return 1
288
+ return None
289
+
290
+ app_info = yield self .get_app_info (self .container_name )
291
+ if app_info and app_info .tasks_healthy == 1 :
292
+ return None
293
+ return 0
0 commit comments