11"""Container and object replication handlers using aiohttp."""
22
3+ import asyncio
34import logging
45import os
56import ssl
2627REPL_TIMEOUT = 16384
2728
2829
30+ def _is_cancelled (app , job_id : str ) -> bool :
31+ job = app ["replication_jobs" ].get (job_id ) if app else None
32+ return (job is None ) or job .get ("cancel" , False )
33+
34+
2935class ObjectReplicationProxy :
3036 """A class for replicating objects."""
3137
@@ -90,8 +96,11 @@ async def a_ensure_container(self, segmented: bool = False) -> None:
9096
9197 LOGGER .info (f"Created container '{ container } '." )
9298
93- async def a_sync_object_segments (self , manifest : str ) -> str :
99+ async def a_sync_object_segments (self , manifest : str , job_id : str , app ) -> str :
94100 """Get object segments."""
101+ if _is_cancelled (app , job_id ):
102+ raise asyncio .CancelledError ()
103+
95104 async with self .client .get (
96105 common .generate_download_url (
97106 self .source_host , container = manifest .split ("/" )[0 ]
@@ -123,6 +132,8 @@ def filter_with_prefix(segment: str) -> bool:
123132 LOGGER .debug (f"Got following segments: { segments } " )
124133
125134 for segment in segments :
135+ if _is_cancelled (app , job_id ):
136+ raise asyncio .CancelledError ()
126137 from_url = common .generate_download_url (
127138 self .source_host , container = manifest .split ("/" )[0 ], object_name = segment
128139 )
@@ -152,6 +163,8 @@ def filter_with_prefix(segment: str) -> bool:
152163 reason = "ETag missing, maybe segments file empty"
153164 )
154165
166+ if _is_cancelled (app , job_id ):
167+ raise asyncio .CancelledError ()
155168 to_url = common .generate_download_url (
156169 self .host , container = f"{ self .container } _segments" , object_name = segment
157170 )
@@ -175,8 +188,10 @@ def filter_with_prefix(segment: str) -> bool:
175188 )
176189 return new_manifest
177190
178- async def a_copy_object (self , object_name : str ) -> None :
191+ async def a_copy_object (self , object_name : str , job_id : str , app ) -> None :
179192 """Copy an object from a location."""
193+ if _is_cancelled (app , job_id ):
194+ raise asyncio .CancelledError ()
180195 # Get the object stream handle
181196 async with self .client .get (
182197 common .generate_download_url (
@@ -216,6 +231,8 @@ async def a_copy_object(self, object_name: str) -> None:
216231 raise aiohttp .web .HTTPUnprocessableEntity (
217232 reason = "ETag missing, maybe segments file empty"
218233 )
234+ if _is_cancelled (app , job_id ):
235+ raise asyncio .CancelledError ()
219236 async with self .client .put (
220237 common .generate_download_url (self .host , self .container , object_name ),
221238 data = resp_g .content .iter_chunked (65564 ),
@@ -235,13 +252,20 @@ async def a_copy_object(self, object_name: str) -> None:
235252 # segmented upload
236253 LOGGER .debug (f"Copying object { object_name } in segments." )
237254
255+ if _is_cancelled (app , job_id ):
256+ raise asyncio .CancelledError ()
238257 manifest = await self .a_sync_object_segments (
239- resp_g .headers ["X-Object-Manifest" ]
258+ resp_g .headers ["X-Object-Manifest" ],
259+ job_id = job_id ,
260+ app = app ,
240261 )
241262
242263 LOGGER .debug ("Uploading manifest" )
243264 # Add manifest headers
244265 headers ["X-Object-Manifest" ] = manifest
266+
267+ if _is_cancelled (app , job_id ):
268+ raise asyncio .CancelledError ()
245269 # Create manifest file
246270 async with self .client .put (
247271 common .generate_download_url (
@@ -262,44 +286,80 @@ async def a_copy_single_object(self, object_name: str) -> None:
262286 """Only copy a single object."""
263287 await self .a_copy_object (object_name )
264288
265- async def a_get_container_page (self , marker : str = "" ) -> list [str ]:
266- """Get a single page of objects from a container."""
289+ async def a_get_container_count (self ) -> int :
290+ """Get total object count in a container."""
291+ async with self .client .head (
292+ common .generate_download_url (
293+ self .source_host , container = self .source_container
294+ ),
295+ headers = {"X-Auth-Token" : self .token },
296+ timeout = ClientTimeout (total = REPL_TIMEOUT ),
297+ ssl = ssl_context ,
298+ ) as resp :
299+ if resp .status >= 400 :
300+ raise aiohttp .web .HTTPBadRequest (
301+ reason = "Could not HEAD source container."
302+ )
303+
304+ try :
305+ return int (resp .headers .get ("X-Container-Object-Count" , "0" ))
306+ except ValueError :
307+ return 0
308+
309+ async def a_get_container_page (
310+ self , marker : str = "" , limit : int = 10000
311+ ) -> list [str ]:
312+ """Get one page of object names from a container."""
313+ params = {"limit" : str (limit )}
314+ if marker :
315+ params ["marker" ] = marker
316+
267317 async with self .client .get (
268318 common .generate_download_url (
269- self .source_host ,
270- container = self .source_container ,
319+ self .source_host , container = self .source_container
271320 ),
272321 headers = {"X-Auth-Token" : self .token },
273- params = { "marker" : marker } if marker else None ,
322+ params = params ,
274323 timeout = ClientTimeout (total = REPL_TIMEOUT ),
275324 ssl = ssl_context ,
276325 ) as resp :
277326 if resp .status >= 400 :
278- LOGGER .debug (f"Container fetch failed with status { resp .status } " )
279327 raise aiohttp .web .HTTPBadRequest (
280328 reason = "Could not fetch source container."
281329 )
282330
283331 if resp .status == 200 :
284- ret = await resp .text ()
285- return ret . rstrip ().lstrip (). split ("\n " )
332+ text = await resp .text ()
333+ return [ x for x in text . strip ().split ("\n " ) if x ]
286334
287335 return []
288336
289- async def a_copy_from_container (self ) -> None :
290- """Copy objects from a source container."""
337+ async def a_copy_from_container (self , job_id : str , app ) -> None :
338+ """Copy objects from a source container with live progress + cancel ."""
291339 LOGGER .debug (f"Fetching objects from container { self .source_container } " )
292- container_url = common .generate_download_url (
293- self .source_host , container = self .source_container
294- )
295- LOGGER .debug (f"Container url: { container_url } " )
296340
297- # Page through all the objects in a container
298- to_copy : list [str ] = []
299- page = await self .a_get_container_page ()
300- while page :
301- to_copy = to_copy + page
302- page = await self .a_get_container_page (to_copy [- 1 ])
341+ job = app ["replication_jobs" ].get (job_id )
342+ if job is not None :
343+ job ["total" ] = await self .a_get_container_count ()
344+ job ["done" ] = 0
345+
346+ marker = ""
347+
348+ while True :
349+ job = app ["replication_jobs" ].get (job_id )
350+ if job is None or job .get ("cancel" ):
351+ raise asyncio .CancelledError ()
352+
353+ page = await self .a_get_container_page (marker = marker )
354+ if not page :
355+ break
356+
357+ for obj in page :
358+ job = app ["replication_jobs" ].get (job_id )
359+ if job is None or job .get ("cancel" ):
360+ raise asyncio .CancelledError ()
361+
362+ await self .a_copy_object (obj , job_id = job_id , app = app )
303363
304- for obj in to_copy :
305- await self . a_copy_object ( obj )
364+ job [ "done" ] += 1
365+ marker = page [ - 1 ]
0 commit comments