Skip to content

Commit 369f430

Browse files
committed
Integrates fetcher with zipsByProduct
Signed-off-by: nagesh bansal <[email protected]>
1 parent 85fabd8 commit 369f430

File tree

5 files changed

+69
-63
lines changed

5 files changed

+69
-63
lines changed

neonwranglerpy/fetcher/fetcher.py

+38-17
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
import os
44
from concurrent.futures import ThreadPoolExecutor
55
from os.path import join as pjoin
6+
import requests
67
from itertools import repeat
8+
9+
710
if 'NEONWRANGLER_HOME' in os.environ:
811
fury_home = os.environ['NEONWRANGLER_HOME']
912
else:
@@ -32,7 +35,7 @@ async def _request(session, url):
3235
return await response.json()
3336

3437

35-
async def _download(session, url, filename, sem, size=None):
38+
async def _download(session, url, filename, sem,month, size=None):
3639
"""An asynchronous function to download file from url.
3740
3841
Parameters
@@ -46,8 +49,8 @@ async def _download(session, url, filename, sem, size=None):
4649
size : int, optional
4750
Length of the content in bytes
4851
"""
52+
# print(month)
4953
if not os.path.exists(filename):
50-
print(f'Downloading: {filename}')
5154
async with sem:
5255
async with session.get(url) as response:
5356
size = response.content_length if not size else size
@@ -61,37 +64,55 @@ async def _download(session, url, filename, sem, size=None):
6164
# update_progressbar(progress, size)
6265

6366

64-
async def _fetcher(batch, rate_limit, headers):
67+
async def _fetcher(data, rate_limit, headers, files_to_stack_path="filesToStack"):
6568
"""Fetcher for downloading files."""
6669
sem = asyncio.Semaphore(rate_limit)
70+
data = data['data']
6771
dir_name = '.'.join([
68-
'NEON', batch['productCode'], batch['siteCode'], batch['month'], batch['release']
72+
'NEON', data['productCode'], data['siteCode'], data['month'], data['release']
6973
])
70-
d_urls = [file['url'] for file in batch["files"]]
71-
sizes = [file['size'] for file in batch["files"]]
72-
f_names = [file['name'] for file in batch["files"]]
73-
f_paths = [pjoin(dir_name, name) for name in f_names]
74+
print(f"{data['siteCode']}" + "-" + f"{data['month']}" )
75+
zip_dir_path = os.path.join(files_to_stack_path, f'{dir_name}')
76+
os.mkdir(zip_dir_path)
77+
78+
d_urls = [f['url'] for f in data["files"]]
79+
sizes = [f['size'] for f in data["files"]]
80+
f_names = [f['name'] for f in data["files"]]
81+
f_paths = [pjoin(zip_dir_path, name) for name in f_names]
82+
month = [data['month']]
7483
zip_url = zip(d_urls, f_paths, sizes)
7584
async with aiohttp.ClientSession() as session:
7685
tasks = []
7786
for url, name, sz in zip_url:
78-
task = asyncio.create_task(_download(session, url, name, sem, sz))
87+
task = asyncio.create_task(_download(session, url, name, sem, month, sz))
7988
tasks.append(task)
8089

8190
await asyncio.gather(*tasks)
8291

8392

84-
def fetcher(batch, rate_limit, headers):
93+
async def vst_fetcher(item, rate_limit, headers, files_to_stack_path="filesToStack"):
94+
data = requests.get(item).json()
95+
await _fetcher(data, rate_limit, headers, files_to_stack_path)
96+
97+
98+
def fetcher(batch, data_type, rate_limit, headers, files_to_stack_path):
8599
try:
86-
asyncio.run(_fetcher(batch, rate_limit, headers))
100+
if data_type == 'vst':
101+
asyncio.run(vst_fetcher(batch, rate_limit, headers, files_to_stack_path))
102+
elif data_type == 'aop':
103+
asyncio.run(_fetcher(batch, rate_limit, headers, files_to_stack_path))
104+
87105
except Exception as e:
88106
print(f"Error processing URLs: {e}")
89107

90108

91-
def run_threaded_batches(batches, batch_size, rate_limit, headers=None):
92-
max_thread = 2
93-
num_threads = (len(batches) + batch_size - 1) // batch_size
94-
with ThreadPoolExecutor(max_workers=max_thread) as executor:
109+
def run_threaded_batches(batches, data_type, rate_limit, headers=None, savepath='/filesToStack'):
110+
num_cores = os.cpu_count() # Get the number of CPU cores
111+
num_threads = min(num_cores, len(batches)) # Limit threads to CPU cores or the number of batches, whichever is smaller
112+
113+
with ThreadPoolExecutor(max_workers=num_threads) as executor:
95114
for i in range(num_threads):
96-
batch = batches[i * batch_size:min((i + 1) * batch_size, len(batches))]
97-
executor.map(fetcher, batch, repeat(rate_limit), repeat(headers))
115+
# Distribute the batches evenly among threads
116+
batch = batches[i::int(num_threads)]
117+
# executor.submit(fetcher, batch, rate_limit, headers)
118+
executor.map(fetcher, batch, repeat(data_type), repeat(rate_limit), repeat(headers), repeat(savepath))

neonwranglerpy/lib/retrieve_coords_itc.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,18 @@ def retrieve_coords_itc(dat):
3232
plots_df = plots.loc[vst_rows]
3333

3434
convert_dict = {
35-
'pointID': str,
35+
'pointID': 'string',
3636
}
3737
# converting the pointID dtype from string to float64
38-
plots_df = plots_df.astype({'pointID': 'float64'})
38+
plots_df = plots_df.astype({'pointID': 'Int64'}).astype(convert_dict)
39+
data = dat.astype({'pointID': 'Int64'}).astype(convert_dict)
3940

40-
vst_df = dat.merge(plots_df, how='inner', on=['plotID', 'pointID', 'siteID'])
41-
vst_df = vst_df.astype(convert_dict)
42-
na_values = vst_df['stemAzimuth'].isnull().values.any()
43-
if na_values:
41+
vst_df = data.merge(plots_df, how='inner', on=['plotID', 'pointID', 'siteID'])
42+
na_values = vst_df['stemAzimuth'].isnull().values.sum()
43+
44+
if na_values > 0:
4445
print(
45-
f"{len(na_values)} entries could not be georeferenced and will be discarded.")
46+
f"{na_values} entries could not be georeferenced and will be discarded.")
4647
vst_df.dropna(subset=['stemAzimuth'], axis=0, inplace=True)
4748
vst_df.reset_index(drop=True, inplace=True)
4849
# if retrieve_dist_to_utm doesn't work add p[0] as an extra argument to

neonwranglerpy/lib/retrieve_vst_data.py

+11-10
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88

99
def retrieve_vst_data(dpId="DP1.10098.001",
1010
site="all",
11-
start_date="",
12-
end_date="",
11+
start_date=None,
12+
end_date=None,
1313
method="shp",
1414
savepath="",
15+
attributes=None,
1516
save_files=False,
1617
stacked_df=True):
1718
"""Retrieve Vegetation Structure Data From NEON and Add Individual ID coordinates.
@@ -61,14 +62,14 @@ def retrieve_vst_data(dpId="DP1.10098.001",
6162
# Adds the UTM coordinates of vst entries based on azimuth and distance
6263
vst["vst_mappingandtagging"] = retrieve_coords_itc(vst_mappingandtagging)
6364

64-
attributes = vst_apparentindividual[[
65-
'uid', 'individualID', 'eventID', 'tagStatus', 'growthForm', 'plantStatus',
66-
'stemDiameter', 'measurementHeight', 'height', 'baseCrownHeight', 'breakHeight',
67-
'breakDiameter', 'maxCrownDiameter', 'ninetyCrownDiameter', 'canopyPosition',
68-
'shape', 'basalStemDiameter', 'basalStemDiameterMsrmntHeight',
69-
'maxBaseCrownDiameter', 'ninetyBaseCrownDiameter'
70-
]]
71-
65+
if attributes is None:
66+
attributes = vst_apparentindividual[[
67+
'uid', 'individualID', 'eventID', 'tagStatus', 'growthForm', 'plantStatus',
68+
'stemDiameter', 'measurementHeight', 'height', 'baseCrownHeight', 'breakHeight',
69+
'breakDiameter', 'maxCrownDiameter', 'ninetyCrownDiameter', 'canopyPosition',
70+
'shape', 'basalStemDiameter', 'basalStemDiameterMsrmntHeight',
71+
'maxBaseCrownDiameter', 'ninetyBaseCrownDiameter'
72+
]]
7273
vst['vst_mappingandtagging'].rename(columns={'eventID': 'tagEventID'}, inplace=True)
7374
csv_vst = pd.merge(attributes,
7475
vst["vst_mappingandtagging"],

neonwranglerpy/utilities/loadByProduct.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ def load_by_product(dpID,
7272
f'directly to R with this function.Use the byFileAOP() or ' \
7373
f'byTileAOP() function to download locally." '
7474

75-
if len(start_date):
75+
if start_date is not None:
7676
if not match(DATE_PATTERN, start_date):
7777
return 'startdate and enddate must be either NA or valid dates in' \
7878
' the form YYYY-MM'
7979

80-
if len(end_date):
80+
if end_date is not None:
8181
if not match(DATE_PATTERN, end_date):
8282
return 'startdate and enddate must be either NA or valid dates in' \
8383
' the form YYYY-MM'

neonwranglerpy/utilities/zipsByProduct.py

+10-27
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@
66
from neonwranglerpy.utilities.tools import get_api, get_month_year_urls
77
from neonwranglerpy.utilities.defaults import NEON_API_BASE_URL
88
from neonwranglerpy.utilities.getzipurls import get_zip_urls
9+
import neonwranglerpy.fetcher.fetcher as fetcher
910

1011
DATE_PATTERN = re.compile('20[0-9]{2}-[0-9]{2}')
1112

1213

1314
def zips_by_product(dpID,
1415
site='all',
15-
start_date='',
16-
end_date='',
16+
start_date=None,
17+
end_date=None,
1718
package="basic",
1819
release="current",
1920
savepath='',
@@ -65,12 +66,12 @@ def zips_by_product(dpID,
6566
return f"{dpID} is not a properly formatted data product ID. The correct format" \
6667
f" is DP#.#####.00#, where the first placeholder must be between 1 and 4."
6768

68-
if len(start_date):
69+
if start_date is not None:
6970
if not re.match(DATE_PATTERN, start_date):
7071
return 'startdate and enddate must be either NA or valid dates in the form '\
7172
'YYYY-MM'
7273

73-
if len(end_date):
74+
if end_date is not None:
7475
if not re.match(DATE_PATTERN, end_date):
7576
return 'startdate and enddate must be either NA or valid dates in the form ' \
7677
'YYYY-MM'
@@ -109,21 +110,18 @@ def zips_by_product(dpID,
109110
print(f"There is no data for site {site}")
110111

111112
# extracting urls for specified start and end dates
112-
if len(start_date):
113+
if start_date is not None:
113114
month_urls = get_month_year_urls(start_date, month_urls, 'start')
114115

115116
if not len(month_urls):
116117
print("There is no data for selected dates")
117118

118-
if len(end_date):
119+
if end_date is not None:
119120
month_urls = get_month_year_urls(end_date, month_urls, 'end')
120121

121122
if not len(month_urls):
122123
print("There is no data for selected dates")
123124

124-
# list of all the urls of the files
125-
temp = get_zip_urls(month_urls, package, dpID, release, token)
126-
127125
# TODO: calculate download size
128126
# TODO: user input for downloading or not
129127
if not savepath:
@@ -135,25 +133,10 @@ def zips_by_product(dpID,
135133
os.makedirs(savepath)
136134

137135
files_to_stack_path = os.path.join(savepath, "filesToStack")
138-
os.mkdir(files_to_stack_path)
139-
140-
# TODO: add progress bar
136+
if not os.path.isdir(files_to_stack_path):
137+
os.mkdir(files_to_stack_path)
141138

142139
if files_to_stack_path:
143-
for zips in temp:
144-
dirname = '.'.join([
145-
'NEON', zips['productCode'], zips['siteCode'], zips['month'],
146-
zips['release']
147-
])
148-
zip_dir_path = os.path.join(files_to_stack_path, f'{dirname}')
149-
os.mkdir(zip_dir_path)
150-
for file in zips['files']:
151-
try:
152-
save_path = os.path.join(zip_dir_path, f"{file['name']}")
153-
file_path, _ = urlretrieve(file['url'], save_path)
154-
155-
except HTTPError as e:
156-
print("HTTPError :", e)
157-
return None
140+
fetcher.run_threaded_batches(month_urls,'vst', rate_limit=2, headers=None, savepath=files_to_stack_path)
158141
# returns the path to /filestostack directory
159142
return files_to_stack_path

0 commit comments

Comments
 (0)