Skip to content

Commit 2048618

Browse files
jrcastro2slint
authored andcommitted
names: add orcid public data sync
* closes #353
1 parent bf9ee3c commit 2048618

File tree

8 files changed

+225
-29
lines changed

8 files changed

+225
-29
lines changed

invenio_vocabularies/cli.py

+22-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ def vocabularies():
2626

2727
def _process_vocab(config, num_samples=None):
2828
"""Import a vocabulary."""
29+
import time
30+
start_time = time.time()
2931
ds = DataStreamFactory.create(
3032
readers_config=config["readers"],
3133
transformers_config=config.get("transformers"),
@@ -34,7 +36,8 @@ def _process_vocab(config, num_samples=None):
3436

3537
success, errored, filtered = 0, 0, 0
3638
left = num_samples or -1
37-
for result in ds.process():
39+
for result in ds.process(batch_size=config["batch_size"] if "batch_size" in config else 100
40+
,write_many=config["write_many"] if "write_many" in config else False):
3841
left = left - 1
3942
if result.filtered:
4043
filtered += 1
@@ -47,6 +50,20 @@ def _process_vocab(config, num_samples=None):
4750
if left == 0:
4851
click.secho(f"Number of samples reached {num_samples}", fg="green")
4952
break
53+
54+
end_time = time.time()
55+
56+
elapsed_time = end_time - start_time
57+
friendly_time = time.strftime("%H:%M:%S", time.gmtime(elapsed_time))
58+
friendly_time_per_record = 0
59+
if success:
60+
elapsed_time_per_record = elapsed_time/success * 1000
61+
friendly_time_per_record = time.strftime("%H:%M:%S", time.gmtime(elapsed_time_per_record))
62+
63+
print(f"CLI elapsed time: {friendly_time} for {success} entries. An average of {friendly_time_per_record} per 1000 entry.\n")
64+
with open("/tmp/elapsed_time.txt", "a") as file:
65+
file.write(f"CLI elapsed time: {friendly_time} for {success} entries. An average of {friendly_time_per_record} per 1000 entry.\n")
66+
5067
return success, errored, filtered
5168

5269

@@ -101,7 +118,10 @@ def update(vocabulary, filepath=None, origin=None):
101118
config = vc.get_config(filepath, origin)
102119

103120
for w_conf in config["writers"]:
104-
w_conf["args"]["update"] = True
121+
if w_conf["type"] == "async":
122+
w_conf["args"]["writer"]["args"]["update"] = True
123+
else:
124+
w_conf["args"]["update"] = True
105125

106126
success, errored, filtered = _process_vocab(config)
107127

invenio_vocabularies/config.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
ZipReader,
2525
)
2626
from .datastreams.transformers import XMLTransformer
27-
from .datastreams.writers import ServiceWriter, YamlWriter
27+
from .datastreams.writers import AsyncWriter, ServiceWriter, YamlWriter
2828
from .resources import VocabulariesResourceConfig
2929
from .services.config import VocabulariesServiceConfig
3030

@@ -134,6 +134,7 @@
134134
VOCABULARIES_DATASTREAM_WRITERS = {
135135
"service": ServiceWriter,
136136
"yaml": YamlWriter,
137+
"async": AsyncWriter,
137138
}
138139
"""Data Streams writers."""
139140

@@ -154,3 +155,9 @@
154155
"sort": ["name", "count"],
155156
}
156157
"""Vocabulary type search configuration."""
158+
159+
VOCABULARIES_ORCID_ACCESS_KEY="CHANGE_ME"
160+
VOCABULARIES_ORCID_SECRET_KEY="CHANGE_ME"
161+
VOCABULARIES_ORCID_FOLDER="/tmp/ORCID_public_data_files/"
162+
VOCABULARIES_ORCID_SUMMARIES_BUCKET="v3.0-summaries"
163+
VOCABULARIES_DATASTREAM_BATCH_SIZE = 100

invenio_vocabularies/contrib/names/datastreams.py

+103-7
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,94 @@
1212
from invenio_records.dictutils import dict_lookup
1313

1414
from ...datastreams.errors import TransformerError
15-
from ...datastreams.readers import SimpleHTTPReader
15+
from ...datastreams.readers import SimpleHTTPReader, BaseReader
1616
from ...datastreams.transformers import BaseTransformer
1717
from ...datastreams.writers import ServiceWriter
18+
import boto3
19+
from flask import current_app
20+
from datetime import datetime
21+
from datetime import timedelta
22+
import tarfile
23+
import io
24+
from concurrent.futures import ThreadPoolExecutor
25+
26+
class OrcidDataSyncReader(BaseReader):
27+
"""ORCiD Data Sync Reader."""
28+
29+
def _iter(self, fp, *args, **kwargs):
30+
"""."""
31+
raise NotImplementedError(
32+
"OrcidDataSyncReader downloads one file and therefore does not iterate through items"
33+
)
34+
35+
def read(self, item=None, *args, **kwargs):
36+
"""Downloads the ORCiD lambda file and yields an in-memory binary stream of it."""
37+
38+
path = current_app.config["VOCABULARIES_ORCID_FOLDER"]
39+
date_format = '%Y-%m-%d %H:%M:%S.%f'
40+
date_format_no_millis = '%Y-%m-%d %H:%M:%S'
41+
42+
s3client = boto3.client('s3', aws_access_key_id=current_app.config["VOCABULARIES_ORCID_ACCESS_KEY"], aws_secret_access_key=current_app.config["VOCABULARIES_ORCID_SECRET_KEY"])
43+
response = s3client.get_object(Bucket='orcid-lambda-file', Key='last_modified.csv.tar')
44+
tar_content = response['Body'].read()
45+
46+
days_to_sync = 60*9
47+
last_sync = datetime.now() - timedelta(minutes=days_to_sync)
48+
# TODO: Do we want to use last_run to kee keep track of the last time the sync was run?
49+
# Might not be ideal as it seems the file is updated at midnight
50+
51+
# last_ran_path = os.path.join(path, 'last_ran.config')
52+
# if os.path.isfile(last_ran_path):
53+
# with open(last_ran_path, 'r') as f:
54+
# date_string = f.readline()
55+
# last_sync = datetime.strptime(date_string, date_format)
56+
57+
# with open(last_ran_path, 'w') as f:
58+
# f.write(datetime.now().strftime(date_format))
59+
60+
61+
def process_file(fileobj):
62+
file_content = fileobj.read().decode('utf-8')
63+
orcids = []
64+
for line in file_content.splitlines()[1:]: # Skip the header line
65+
elements = line.split(',')
66+
orcid = elements[0]
67+
68+
last_modified_str = elements[3]
69+
try:
70+
last_modified_date = datetime.strptime(last_modified_str, date_format)
71+
except ValueError:
72+
last_modified_date = datetime.strptime(last_modified_str, date_format_no_millis)
73+
74+
if last_modified_date >= last_sync:
75+
orcids.append(orcid)
76+
else:
77+
break
78+
return orcids
79+
80+
orcids_to_sync = []
81+
with tarfile.open(fileobj=io.BytesIO(tar_content)) as tar:
82+
for member in tar.getmembers():
83+
f = tar.extractfile(member)
84+
if f:
85+
orcids_to_sync.extend(process_file(f))
86+
87+
def fetch_orcid_data(orcid_to_sync, bucket):
88+
suffix = orcid_to_sync[-3:]
89+
key = f'{suffix}/{orcid_to_sync}.xml'
90+
try:
91+
file_response = s3client.get_object(Bucket=bucket, Key=key)
92+
return file_response['Body'].read()
93+
except Exception as e:
94+
# TODO: log
95+
return None
96+
97+
with ThreadPoolExecutor(max_workers=40) as executor: # TODO allow to configure max_workers / test to use asyncio
98+
futures = [executor.submit(fetch_orcid_data, orcid, current_app.config["VOCABULARIES_ORCID_SUMMARIES_BUCKET"]) for orcid in orcids_to_sync]
99+
for future in futures:
100+
result = future.result()
101+
if result is not None:
102+
yield result
18103

19104

20105
class OrcidHTTPReader(SimpleHTTPReader):
@@ -89,6 +174,7 @@ def _entry_id(self, entry):
89174

90175
VOCABULARIES_DATASTREAM_READERS = {
91176
"orcid-http": OrcidHTTPReader,
177+
"orcid-data-sync": OrcidDataSyncReader,
92178
}
93179

94180

@@ -107,22 +193,32 @@ def _entry_id(self, entry):
107193
DATASTREAM_CONFIG = {
108194
"readers": [
109195
{
110-
"type": "tar",
111-
"args": {
112-
"regex": "\\.xml$",
113-
},
196+
"type": "orcid-data-sync",
114197
},
115198
{"type": "xml"},
116199
],
117200
"transformers": [{"type": "orcid"}],
201+
# "writers": [
202+
# {
203+
# "type": "names-service",
204+
# "args": {
205+
# "identity": system_identity,
206+
# },
207+
# }
208+
# ],
118209
"writers": [
119210
{
120-
"type": "names-service",
211+
"type": "async",
121212
"args": {
122-
"identity": system_identity,
213+
"writer":{
214+
"type": "names-service",
215+
"args": {},
216+
}
123217
},
124218
}
125219
],
220+
"batch_size": 1000, # TODO: current_app.config["VOCABULARIES_DATASTREAM_BATCH_SIZE"],
221+
"write_many": True,
126222
}
127223
"""ORCiD Data Stream configuration.
128224

invenio_vocabularies/datastreams/datastreams.py

+45-12
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
from .errors import ReaderError, TransformerError, WriterError
1212

13-
1413
class StreamEntry:
1514
"""Object to encapsulate streams processing."""
1615

@@ -38,16 +37,10 @@ def __init__(self, readers, writers, transformers=None, *args, **kwargs):
3837
def filter(self, stream_entry, *args, **kwargs):
3938
"""Checks if an stream_entry should be filtered out (skipped)."""
4039
return False
41-
42-
def process(self, *args, **kwargs):
43-
"""Iterates over the entries.
44-
45-
Uses the reader to get the raw entries and transforms them.
46-
It will iterate over the `StreamEntry` objects returned by
47-
the reader, apply the transformations and yield the result of
48-
writing it.
49-
"""
50-
for stream_entry in self.read():
40+
41+
def process_batch(self, batch, write_many=False):
42+
transformed_entries = []
43+
for stream_entry in batch:
5144
if stream_entry.errors:
5245
yield stream_entry # reading errors
5346
else:
@@ -58,7 +51,33 @@ def process(self, *args, **kwargs):
5851
transformed_entry.filtered = True
5952
yield transformed_entry
6053
else:
61-
yield self.write(transformed_entry)
54+
transformed_entries.append(transformed_entry)
55+
if transformed_entries:
56+
if write_many:
57+
print(f"write_many {len(transformed_entries)} entries.")
58+
yield from self.batch_write(transformed_entries)
59+
else:
60+
print(f"write {len(transformed_entries)} entries.")
61+
yield from (self.write(entry) for entry in transformed_entries)
62+
63+
def process(self, batch_size=100, write_many=False, *args, **kwargs):
64+
"""Iterates over the entries.
65+
66+
Uses the reader to get the raw entries and transforms them.
67+
It will iterate over the `StreamEntry` objects returned by
68+
the reader, apply the transformations and yield the result of
69+
writing it.
70+
"""
71+
batch = []
72+
for stream_entry in self.read():
73+
batch.append(stream_entry)
74+
if len(batch) >= batch_size:
75+
yield from self.process_batch(batch, write_many=write_many)
76+
batch = []
77+
78+
# Process any remaining entries in the last batch
79+
if batch:
80+
yield from self.process_batch(batch, write_many=write_many)
6281

6382
def read(self):
6483
"""Recursively read the entries."""
@@ -106,6 +125,20 @@ def write(self, stream_entry, *args, **kwargs):
106125
stream_entry.errors.append(f"{writer.__class__.__name__}: {str(err)}")
107126

108127
return stream_entry
128+
129+
def batch_write(self, stream_entries, *args, **kwargs):
130+
"""Apply the transformations to an stream_entry."""
131+
for writer in self._writers:
132+
try:
133+
success, errors = writer.write_many(stream_entries)
134+
for record in success:
135+
yield StreamEntry(entry=record)
136+
for error in errors:
137+
yield StreamEntry(entry=error["record"], errors=error["errors"])
138+
except WriterError as err:
139+
for stream_entry in stream_entries:
140+
stream_entry.errors.append(f"{writer.__class__.__name__}: {str(err)}")
141+
yield stream_entry
109142

110143
def total(self, *args, **kwargs):
111144
"""The total of entries obtained from the origin."""

invenio_vocabularies/datastreams/readers.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@
2121
import requests
2222
import yaml
2323
from lxml import etree
24-
from lxml.html import parse as html_parse
24+
from lxml.html import fromstring
2525

2626
from .errors import ReaderError
2727
from .xml import etree_to_dict
2828

29+
2930
try:
3031
import oaipmh_scythe
3132
except ImportError:
@@ -226,8 +227,8 @@ class XMLReader(BaseReader):
226227
def _iter(self, fp, *args, **kwargs):
227228
"""Read and parse an XML file to dict."""
228229
# NOTE: We parse HTML, to skip XML validation and strip XML namespaces
229-
xml_tree = html_parse(fp).getroot()
230-
record = etree_to_dict(xml_tree)["html"]["body"].get("record")
230+
xml_tree = fromstring(fp)
231+
record = etree_to_dict(xml_tree).get("record")
231232

232233
if not record:
233234
raise ReaderError(f"Record not found in XML entry.")

invenio_vocabularies/datastreams/tasks.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,22 @@
1515

1616

1717
@shared_task(ignore_result=True)
18-
def write_entry(writer, entry):
18+
def write_entry(writer_config, entry):
1919
"""Write an entry.
2020
2121
:param writer: writer configuration as accepted by the WriterFactory.
2222
:param entry: dictionary, StreamEntry is not serializable.
2323
"""
24-
writer = WriterFactory.create(config=writer)
24+
writer = WriterFactory.create(config=writer_config)
2525
writer.write(StreamEntry(entry))
26+
27+
@shared_task(ignore_result=True)
28+
def write_many_entry(writer_config, entries):
29+
"""Write many entries.
30+
31+
:param writer: writer configuration as accepted by the WriterFactory.
32+
:param entry: lisf ot dictionaries, StreamEntry is not serializable.
33+
"""
34+
writer = WriterFactory.create(config=writer_config)
35+
stream_entries = [StreamEntry(entry) for entry in entries]
36+
writer.write_many(stream_entries)

0 commit comments

Comments
 (0)