Skip to content

Commit 00535e0

Browse files
committed
datastreams: add logging
1 parent 7f5d38e commit 00535e0

File tree

4 files changed

+91
-22
lines changed

4 files changed

+91
-22
lines changed

invenio_vocabularies/cli.py

+19-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import click
1414
from flask.cli import with_appcontext
1515
from invenio_access.permissions import system_identity
16+
from invenio_logging.structlog import LoggerFactory
1617
from invenio_pidstore.errors import PIDDeletedError, PIDDoesNotExistError
1718

1819
from .datastreams import DataStreamFactory
@@ -31,22 +32,38 @@ def _process_vocab(config, num_samples=None):
3132
transformers_config=config.get("transformers"),
3233
writers_config=config["writers"],
3334
)
34-
35+
cli_logger = LoggerFactory.get_logger("cli")
36+
cli_logger.info("Starting processing")
3537
success, errored, filtered = 0, 0, 0
3638
left = num_samples or -1
37-
for result in ds.process():
39+
batch_size = config.get("batch_size", 1000)
40+
write_many = config.get("write_many", False)
41+
42+
for result in ds.process(batch_size=batch_size, write_many=write_many):
3843
left = left - 1
3944
if result.filtered:
4045
filtered += 1
46+
cli_logger.info("Filtered", entry=result.entry, operation=result.op_type)
4147
if result.errors:
4248
for err in result.errors:
4349
click.secho(err, fg="red")
50+
cli_logger.error(
51+
"Error",
52+
entry=result.entry,
53+
operation=result.op_type,
54+
errors=result.errors,
55+
)
4456
errored += 1
4557
else:
4658
success += 1
59+
cli_logger.info("Success", entry=result.entry, operation=result.op_type)
4760
if left == 0:
4861
click.secho(f"Number of samples reached {num_samples}", fg="green")
4962
break
63+
cli_logger.info(
64+
"Finished processing", success=success, errored=errored, filtered=filtered
65+
)
66+
5067
return success, errored, filtered
5168

5269

invenio_vocabularies/datastreams/datastreams.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
"""Base data stream."""
1010

11+
from invenio_logging.structlog import LoggerFactory
12+
1113
from .errors import ReaderError, TransformerError, WriterError
1214

1315

@@ -65,16 +67,21 @@ def process_batch(self, batch, write_many=False):
6567
else:
6668
yield from (self.write(entry) for entry in transformed_entries)
6769

68-
def process(self, batch_size=100, write_many=False, *args, **kwargs):
70+
def process(self, batch_size=100, write_many=False, logger=None, *args, **kwargs):
6971
"""Iterates over the entries.
7072
7173
Uses the reader to get the raw entries and transforms them.
7274
It will iterate over the `StreamEntry` objects returned by
7375
the reader, apply the transformations and yield the result of
7476
writing it.
7577
"""
78+
if not logger:
79+
logger = LoggerFactory.get_logger("datastreams")
7680

7781
batch = []
82+
logger.info(
83+
f"Start reading datastream with batch_size={batch_size} and write_many={write_many}"
84+
)
7885
for stream_entry in self.read():
7986
batch.append(stream_entry)
8087
if len(batch) >= batch_size:

invenio_vocabularies/datastreams/tasks.py

+19-5
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,43 @@
99
"""Data Streams Celery tasks."""
1010

1111
from celery import shared_task
12+
from invenio_logging.structlog import LoggerFactory
1213

1314
from ..datastreams import StreamEntry
1415
from ..datastreams.factories import WriterFactory
1516

1617

17-
@shared_task(ignore_result=True)
18+
@shared_task(ignore_result=True, logger=None)
1819
def write_entry(writer_config, entry):
1920
"""Write an entry.
2021
2122
:param writer: writer configuration as accepted by the WriterFactory.
2223
:param entry: dictionary, StreamEntry is not serializable.
2324
"""
25+
if not logger:
26+
logger = LoggerFactory.get_logger("write_entry")
2427
writer = WriterFactory.create(config=writer_config)
25-
writer.write(StreamEntry(entry))
26-
28+
stream_entry_processed = writer.write(StreamEntry(entry))
29+
if stream_entry_processed.errors:
30+
logger.error("Error writing entry", entry=entry, errors=stream_entry_processed.errors)
31+
else:
32+
logger.info("Entry written", entry=entry)
2733

2834
@shared_task(ignore_result=True)
29-
def write_many_entry(writer_config, entries):
35+
def write_many_entry(writer_config, entries, logger=None):
3036
"""Write many entries.
3137
3238
:param writer: writer configuration as accepted by the WriterFactory.
3339
:param entry: lisf ot dictionaries, StreamEntry is not serializable.
3440
"""
41+
if not logger:
42+
logger = LoggerFactory.get_logger("write_many_entry")
3543
writer = WriterFactory.create(config=writer_config)
3644
stream_entries = [StreamEntry(entry) for entry in entries]
37-
writer.write_many(stream_entries)
45+
stream_entries_processed = writer.write_many(stream_entries)
46+
errored = [entry for entry in stream_entries_processed if entry.errors]
47+
succeeded = len(stream_entries_processed) - len(errored)
48+
logger.info("Entries written", succeeded=succeeded)
49+
if errored:
50+
for entry in errored:
51+
logger.error("Error writing entry", entry=entry.entry, errors=entry.errors)

invenio_vocabularies/services/tasks.py

+45-14
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from celery import shared_task
1111
from flask import current_app
12+
from invenio_logging.structlog import LoggerFactory
1213

1314
from ..datastreams.factories import DataStreamFactory
1415
from ..factories import get_vocabulary_config
@@ -17,22 +18,52 @@
1718
@shared_task(ignore_result=True)
1819
def process_datastream(stream):
1920
"""Process a datastream from config."""
20-
vc_config = get_vocabulary_config(stream)
21-
config = vc_config.get_config()
21+
try:
22+
stream_logger = LoggerFactory.get_logger("datastreams-" + stream)
23+
stream_logger.info("Starting processing")
24+
vc_config = get_vocabulary_config(stream)
25+
config = vc_config.get_config()
2226

23-
if not config:
24-
raise ValueError("Invalid stream configuration")
27+
if not config:
28+
stream_logger.error("Invalid stream configuration")
29+
raise ValueError("Invalid stream configuration")
2530

26-
ds = DataStreamFactory.create(
27-
readers_config=config["readers"],
28-
transformers_config=config.get("transformers"),
29-
writers_config=config["writers"],
30-
)
31-
32-
for result in ds.process():
33-
if result.errors:
34-
for err in result.errors:
35-
current_app.logger.error(err)
31+
ds = DataStreamFactory.create(
32+
readers_config=config["readers"],
33+
transformers_config=config.get("transformers"),
34+
writers_config=config["writers"],
35+
)
36+
stream_logger.info("Datastream created")
37+
stream_logger.info("Processing Datastream")
38+
success, errored, filtered = 0, 0, 0
39+
for result in ds.process(
40+
batch_size=config.get("batch_size", 100),
41+
write_many=config.get("write_many", False),
42+
logger=stream_logger,
43+
):
44+
if result.filtered:
45+
filtered += 1
46+
stream_logger.info(
47+
"Filtered", entry=result.entry, operation=result.op_type
48+
)
49+
if result.errors:
50+
errored += 1
51+
stream_logger.error(
52+
"Error",
53+
entry=result.entry,
54+
operation=result.op_type,
55+
errors=result.errors,
56+
)
57+
else:
58+
success += 1
59+
stream_logger.info(
60+
"Success", entry=result.entry, operation=result.op_type
61+
)
62+
stream_logger.info(
63+
"Finished processing", success=success, errored=errored, filtered=filtered
64+
)
65+
except Exception as e:
66+
stream_logger.exception("Error processing stream", error=e)
3667

3768

3869
@shared_task()

0 commit comments

Comments
 (0)