Skip to content

Commit

Permalink
Merge pull request #470 from hotosm/fix-processing
Browse files Browse the repository at this point in the history
Fix minor issues in the drone image processing
  • Loading branch information
nrjadkry authored Feb 11, 2025
2 parents 2995e64 + 3e6f18b commit 85543ee
Showing 1 changed file with 31 additions and 40 deletions.
71 changes: 31 additions & 40 deletions src/backend/app/projects/image_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def download_images_from_s3(
self,
bucket_name: str,
local_dir: str,
task_id: Optional[uuid.UUID] = None,
task_id: uuid.UUID,
batch_size: int = 20,
):
"""
Expand All @@ -110,11 +110,7 @@ async def download_images_from_s3(
:param task_id: Optional specific task ID
:param batch_size: Number of images to download concurrently
"""
prefix = (
f"dtm-data/projects/{self.project_id}/{task_id}"
if task_id
else f"dtm-data/projects/{self.project_id}"
)
prefix = f"dtm-data/projects/{self.project_id}/{task_id}"
objects = list_objects_from_bucket(bucket_name, prefix)

if not objects:
Expand All @@ -133,6 +129,8 @@ async def download_images_from_s3(
]

total_files = len(object_urls)
log.info(f"{total_files} images found in S3 for task {task_id}")

async with aiohttp.ClientSession() as session:
for i in range(0, total_files, batch_size):
batch = object_urls[i : i + batch_size]
Expand All @@ -145,7 +143,7 @@ async def download_images_from_s3(
session,
url,
os.path.join(
local_dir, f"{uuid.uuid4()}_file_{i + j + 1}.jpg"
local_dir, f"{task_id}_file_{i + j + 1}.jpg"
), # unique image name are maintained with uuid
)
for j, url in enumerate(batch)
Expand Down Expand Up @@ -303,40 +301,33 @@ async def process_images_from_s3(
bucket_name, name=name, options=options, webhook=webhook
)

# If webhook is passed, webhook does this job.
if not webhook:
# If webhook is passed, webhook does this job.
if not webhook:
# Monitor task progress
self.monitor_task(task)

# Optionally, download results
output_file_path = f"/tmp/{self.project_id}"
path_to_download = self.download_results(
task, output_path=output_file_path
)

# Upload the results into s3
s3_path = (
f"dtm-data/projects/{self.project_id}/{self.task_id}/assets.zip"
)
add_file_to_bucket(bucket_name, path_to_download, s3_path)
# now update the task as completed in Db.
# Call the async function using asyncio

# Update background task status to COMPLETED
update_task_status_sync = async_to_sync(task_logic.update_task_state)
update_task_status_sync(
self.db,
self.project_id,
self.task_id,
self.user_id,
"Task completed.",
State.IMAGE_UPLOADED,
State.IMAGE_PROCESSING_FINISHED,
timestamp(),
)
return task

# Monitor task progress
self.monitor_task(task)

# Optionally, download results
output_file_path = f"/tmp/{self.project_id}"
path_to_download = self.download_results(task, output_path=output_file_path)

# Upload the results into s3
s3_path = f"dtm-data/projects/{self.project_id}/{self.task_id}/assets.zip"
add_file_to_bucket(bucket_name, path_to_download, s3_path)
# now update the task as completed in Db.
# Call the async function using asyncio

# Update background task status to COMPLETED
update_task_status_sync = async_to_sync(task_logic.update_task_state)
update_task_status_sync(
self.db,
self.project_id,
self.task_id,
self.user_id,
"Task completed.",
State.IMAGE_UPLOADED,
State.IMAGE_PROCESSING_FINISHED,
timestamp(),
)
return task

async def process_images_for_all_tasks(
Expand Down

0 comments on commit 85543ee

Please sign in to comment.