Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrates fetcher with zipsByProduct #84

Merged
merged 1 commit into from
Sep 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 38 additions & 17 deletions neonwranglerpy/fetcher/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import os
from concurrent.futures import ThreadPoolExecutor
from os.path import join as pjoin
import requests
from itertools import repeat


if 'NEONWRANGLER_HOME' in os.environ:
fury_home = os.environ['NEONWRANGLER_HOME']
else:
Expand Down Expand Up @@ -32,7 +35,7 @@ async def _request(session, url):
return await response.json()


async def _download(session, url, filename, sem, size=None):
async def _download(session, url, filename, sem,month, size=None):
"""An asynchronous function to download file from url.

Parameters
Expand All @@ -46,8 +49,8 @@ async def _download(session, url, filename, sem, size=None):
size : int, optional
Length of the content in bytes
"""
# print(month)
if not os.path.exists(filename):
print(f'Downloading: {filename}')
async with sem:
async with session.get(url) as response:
size = response.content_length if not size else size
Expand All @@ -61,37 +64,55 @@ async def _download(session, url, filename, sem, size=None):
# update_progressbar(progress, size)


async def _fetcher(batch, rate_limit, headers):
async def _fetcher(data, rate_limit, headers, files_to_stack_path="filesToStack"):
"""Fetcher for downloading files."""
sem = asyncio.Semaphore(rate_limit)
data = data['data']
dir_name = '.'.join([
'NEON', batch['productCode'], batch['siteCode'], batch['month'], batch['release']
'NEON', data['productCode'], data['siteCode'], data['month'], data['release']
])
d_urls = [file['url'] for file in batch["files"]]
sizes = [file['size'] for file in batch["files"]]
f_names = [file['name'] for file in batch["files"]]
f_paths = [pjoin(dir_name, name) for name in f_names]
print(f"{data['siteCode']}" + "-" + f"{data['month']}" )
zip_dir_path = os.path.join(files_to_stack_path, f'{dir_name}')
os.mkdir(zip_dir_path)

d_urls = [f['url'] for f in data["files"]]
sizes = [f['size'] for f in data["files"]]
f_names = [f['name'] for f in data["files"]]
f_paths = [pjoin(zip_dir_path, name) for name in f_names]
month = [data['month']]
zip_url = zip(d_urls, f_paths, sizes)
async with aiohttp.ClientSession() as session:
tasks = []
for url, name, sz in zip_url:
task = asyncio.create_task(_download(session, url, name, sem, sz))
task = asyncio.create_task(_download(session, url, name, sem, month, sz))
tasks.append(task)

await asyncio.gather(*tasks)


def fetcher(batch, rate_limit, headers):
async def vst_fetcher(item, rate_limit, headers, files_to_stack_path="filesToStack"):
data = requests.get(item).json()
await _fetcher(data, rate_limit, headers, files_to_stack_path)


def fetcher(batch, data_type, rate_limit, headers, files_to_stack_path):
try:
asyncio.run(_fetcher(batch, rate_limit, headers))
if data_type == 'vst':
asyncio.run(vst_fetcher(batch, rate_limit, headers, files_to_stack_path))
elif data_type == 'aop':
asyncio.run(_fetcher(batch, rate_limit, headers, files_to_stack_path))

except Exception as e:
print(f"Error processing URLs: {e}")


def run_threaded_batches(batches, batch_size, rate_limit, headers=None):
max_thread = 2
num_threads = (len(batches) + batch_size - 1) // batch_size
with ThreadPoolExecutor(max_workers=max_thread) as executor:
def run_threaded_batches(batches, data_type, rate_limit, headers=None, savepath='/filesToStack'):
num_cores = os.cpu_count() # Get the number of CPU cores
num_threads = min(num_cores, len(batches)) # Limit threads to CPU cores or the number of batches, whichever is smaller

with ThreadPoolExecutor(max_workers=num_threads) as executor:
for i in range(num_threads):
batch = batches[i * batch_size:min((i + 1) * batch_size, len(batches))]
executor.map(fetcher, batch, repeat(rate_limit), repeat(headers))
# Distribute the batches evenly among threads
batch = batches[i::int(num_threads)]
# executor.submit(fetcher, batch, rate_limit, headers)
executor.map(fetcher, batch, repeat(data_type), repeat(rate_limit), repeat(headers), repeat(savepath))
15 changes: 8 additions & 7 deletions neonwranglerpy/lib/retrieve_coords_itc.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,18 @@ def retrieve_coords_itc(dat):
plots_df = plots.loc[vst_rows]

convert_dict = {
'pointID': str,
'pointID': 'string',
}
# converting the pointID dtype from string to float64
plots_df = plots_df.astype({'pointID': 'float64'})
plots_df = plots_df.astype({'pointID': 'Int64'}).astype(convert_dict)
data = dat.astype({'pointID': 'Int64'}).astype(convert_dict)

vst_df = dat.merge(plots_df, how='inner', on=['plotID', 'pointID', 'siteID'])
vst_df = vst_df.astype(convert_dict)
na_values = vst_df['stemAzimuth'].isnull().values.any()
if na_values:
vst_df = data.merge(plots_df, how='inner', on=['plotID', 'pointID', 'siteID'])
na_values = vst_df['stemAzimuth'].isnull().values.sum()

if na_values > 0:
print(
f"{len(na_values)} entries could not be georeferenced and will be discarded.")
f"{na_values} entries could not be georeferenced and will be discarded.")
vst_df.dropna(subset=['stemAzimuth'], axis=0, inplace=True)
vst_df.reset_index(drop=True, inplace=True)
# if retrieve_dist_to_utm doesn't work add p[0] as an extra argument to
Expand Down
21 changes: 11 additions & 10 deletions neonwranglerpy/lib/retrieve_vst_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@

def retrieve_vst_data(dpId="DP1.10098.001",
site="all",
start_date="",
end_date="",
start_date=None,
end_date=None,
method="shp",
savepath="",
attributes=None,
save_files=False,
stacked_df=True):
"""Retrieve Vegetation Structure Data From NEON and Add Individual ID coordinates.
Expand Down Expand Up @@ -61,14 +62,14 @@ def retrieve_vst_data(dpId="DP1.10098.001",
# Adds the UTM coordinates of vst entries based on azimuth and distance
vst["vst_mappingandtagging"] = retrieve_coords_itc(vst_mappingandtagging)

attributes = vst_apparentindividual[[
'uid', 'individualID', 'eventID', 'tagStatus', 'growthForm', 'plantStatus',
'stemDiameter', 'measurementHeight', 'height', 'baseCrownHeight', 'breakHeight',
'breakDiameter', 'maxCrownDiameter', 'ninetyCrownDiameter', 'canopyPosition',
'shape', 'basalStemDiameter', 'basalStemDiameterMsrmntHeight',
'maxBaseCrownDiameter', 'ninetyBaseCrownDiameter'
]]

if attributes is None:
attributes = vst_apparentindividual[[
'uid', 'individualID', 'eventID', 'tagStatus', 'growthForm', 'plantStatus',
'stemDiameter', 'measurementHeight', 'height', 'baseCrownHeight', 'breakHeight',
'breakDiameter', 'maxCrownDiameter', 'ninetyCrownDiameter', 'canopyPosition',
'shape', 'basalStemDiameter', 'basalStemDiameterMsrmntHeight',
'maxBaseCrownDiameter', 'ninetyBaseCrownDiameter'
]]
vst['vst_mappingandtagging'].rename(columns={'eventID': 'tagEventID'}, inplace=True)
csv_vst = pd.merge(attributes,
vst["vst_mappingandtagging"],
Expand Down
4 changes: 2 additions & 2 deletions neonwranglerpy/utilities/loadByProduct.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ def load_by_product(dpID,
f'directly to R with this function.Use the byFileAOP() or ' \
f'byTileAOP() function to download locally." '

if len(start_date):
if start_date is not None:
if not match(DATE_PATTERN, start_date):
return 'startdate and enddate must be either NA or valid dates in' \
' the form YYYY-MM'

if len(end_date):
if end_date is not None:
if not match(DATE_PATTERN, end_date):
return 'startdate and enddate must be either NA or valid dates in' \
' the form YYYY-MM'
Expand Down
37 changes: 10 additions & 27 deletions neonwranglerpy/utilities/zipsByProduct.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
from neonwranglerpy.utilities.tools import get_api, get_month_year_urls
from neonwranglerpy.utilities.defaults import NEON_API_BASE_URL
from neonwranglerpy.utilities.getzipurls import get_zip_urls
import neonwranglerpy.fetcher.fetcher as fetcher

DATE_PATTERN = re.compile('20[0-9]{2}-[0-9]{2}')


def zips_by_product(dpID,
site='all',
start_date='',
end_date='',
start_date=None,
end_date=None,
package="basic",
release="current",
savepath='',
Expand Down Expand Up @@ -65,12 +66,12 @@ def zips_by_product(dpID,
return f"{dpID} is not a properly formatted data product ID. The correct format" \
f" is DP#.#####.00#, where the first placeholder must be between 1 and 4."

if len(start_date):
if start_date is not None:
if not re.match(DATE_PATTERN, start_date):
return 'startdate and enddate must be either NA or valid dates in the form '\
'YYYY-MM'

if len(end_date):
if end_date is not None:
if not re.match(DATE_PATTERN, end_date):
return 'startdate and enddate must be either NA or valid dates in the form ' \
'YYYY-MM'
Expand Down Expand Up @@ -109,21 +110,18 @@ def zips_by_product(dpID,
print(f"There is no data for site {site}")

# extracting urls for specified start and end dates
if len(start_date):
if start_date is not None:
month_urls = get_month_year_urls(start_date, month_urls, 'start')

if not len(month_urls):
print("There is no data for selected dates")

if len(end_date):
if end_date is not None:
month_urls = get_month_year_urls(end_date, month_urls, 'end')

if not len(month_urls):
print("There is no data for selected dates")

# list of all the urls of the files
temp = get_zip_urls(month_urls, package, dpID, release, token)

# TODO: calculate download size
# TODO: user input for downloading or not
if not savepath:
Expand All @@ -135,25 +133,10 @@ def zips_by_product(dpID,
os.makedirs(savepath)

files_to_stack_path = os.path.join(savepath, "filesToStack")
os.mkdir(files_to_stack_path)

# TODO: add progress bar
if not os.path.isdir(files_to_stack_path):
os.mkdir(files_to_stack_path)

if files_to_stack_path:
for zips in temp:
dirname = '.'.join([
'NEON', zips['productCode'], zips['siteCode'], zips['month'],
zips['release']
])
zip_dir_path = os.path.join(files_to_stack_path, f'{dirname}')
os.mkdir(zip_dir_path)
for file in zips['files']:
try:
save_path = os.path.join(zip_dir_path, f"{file['name']}")
file_path, _ = urlretrieve(file['url'], save_path)

except HTTPError as e:
print("HTTPError :", e)
return None
fetcher.run_threaded_batches(month_urls,'vst', rate_limit=2, headers=None, savepath=files_to_stack_path)
# returns the path to /filestostack directory
return files_to_stack_path