Skip to content

Commit

Permalink
Move datasets between 4CAT servers (#375)
Browse files Browse the repository at this point in the history
* Export/import datasets between 4CAT servers

* move datasource and add init, add new setting to allow export, pass 4CAT version for comparison, run some tests,

I had no idea where you left off, so I just wanted to test it and see that way. Made some fixes and now see that you worked up until the log and actual data.

* use version function for comparison

this uses the commit, which makes sense at the moment, but perhaps not in the long term.

* close, but children fail; possibly just zip files

* use send_from_directory (tested on 1.5 gig files successfully); cleaned up some errors/logging and added notes

* Use custom exception instead of TypeError when dataset not found

* Use 401 HTTP status for login form

* Ignore hidden files in cleanup worker

* Ensure unique dataset key and allow changing keys

* Allow workers to set dataset key manually

* MANY CHANGES

* Don't print log to stdout twice when not running as daemon

* Remove stray debug print

* Show original timestamp for imported datasets

* Dataset form logic

* Don't show empty dataset status

* Forbid importing unfinished datasets

* Not using this file

* Fix copied comment

* Fix interrupt routine and clean up half-imported data

* Catch dataset not found error in expiration worker

* Hide anon/label options when importing

* Fix delete button on dataset creation page

* Fix interrupting imports

* Remove "filtered from" on imported datasets

* Add DESCRIPTION.md for import data source

* Better markdown

* return error() not raise

* use replace on result_file to use new_dataset.key (`with_stem` is python3.9 and newer)

* Clarify some comments

* get_software_version() -> get_software_commit()

* Use version instead of commit to determine migration compatibility

* More commentary

* Use reserve_results_file() to ensure correct import data path

---------

Co-authored-by: Dale Wahl <[email protected]>
  • Loading branch information
stijn-uva and dale-wahl authored Oct 25, 2023
1 parent 07904a6 commit 0de7baf
Show file tree
Hide file tree
Showing 28 changed files with 738 additions and 84 deletions.
13 changes: 7 additions & 6 deletions backend/lib/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
from backend.lib.worker import BasicWorker
from common.lib.dataset import DataSet
from common.lib.fourcat_module import FourcatModule
from common.lib.helpers import get_software_version, remove_nuls, send_email
from common.lib.exceptions import WorkerInterruptedException, ProcessorInterruptedException, ProcessorException, MapItemException
from common.lib.helpers import get_software_commit, remove_nuls, send_email
from common.lib.exceptions import (WorkerInterruptedException, ProcessorInterruptedException, ProcessorException,
DataSetException, MapItemException)
from common.config_manager import config, ConfigWrapper


Expand Down Expand Up @@ -98,7 +99,7 @@ def work(self):
# that actually queued the processor, so their config is relevant
self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db)
self.owner = self.dataset.creator
except TypeError as e:
except DataSetException as e:
# query has been deleted in the meantime. finish without error,
# as deleting it will have been a conscious choice by a user
self.job.finish()
Expand Down Expand Up @@ -133,10 +134,10 @@ def work(self):
self.job.finish()
return

except TypeError:
except DataSetException:
# we need to know what the source_dataset dataset was to properly handle the
# analysis
self.log.warning("Processor %s queued for orphan query %s: cannot run, cancelling job" % (
self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % (
self.type, self.dataset.key))
self.job.finish()
return
Expand All @@ -160,7 +161,7 @@ def work(self):

# start log file
self.dataset.update_status("Processing data")
self.dataset.update_version(get_software_version())
self.dataset.update_version(get_software_commit())

# get parameters
# if possible, fill defaults where parameters are not provided
Expand Down
4 changes: 2 additions & 2 deletions backend/workers/cancel_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Delete and cancel a dataset
"""
from backend.lib.worker import BasicWorker
from common.lib.exceptions import JobNotFoundException
from common.lib.exceptions import JobNotFoundException, DataSetException
from common.lib.dataset import DataSet
from common.lib.job import Job

Expand All @@ -27,7 +27,7 @@ def work(self):
try:
dataset = DataSet(key=self.job.data["remote_id"], db=self.db)
jobtype = dataset.data["type"]
except TypeError:
except DataSetException:
# dataset already deleted, apparently
self.job.finish()
return
Expand Down
8 changes: 6 additions & 2 deletions backend/workers/cleanup_tempfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from common.config_manager import config
from backend.lib.worker import BasicWorker
from common.lib.dataset import DataSet
from common.lib.exceptions import WorkerInterruptedException
from common.lib.exceptions import WorkerInterruptedException, DataSetException


class TempFileCleaner(BasicWorker):
Expand All @@ -34,6 +34,10 @@ def work(self):

result_files = Path(config.get('PATH_DATA')).glob("*")
for file in result_files:
if file.stem.startswith("."):
# skip hidden files
continue

if self.interrupted:
raise WorkerInterruptedException("Interrupted while cleaning up orphaned result files")

Expand All @@ -50,7 +54,7 @@ def work(self):

try:
dataset = DataSet(key=key, db=self.db)
except TypeError:
except DataSetException:
# the dataset has been deleted since, but the result file still
# exists - should be safe to clean up
self.log.info("No matching dataset with key %s for file %s, deleting file" % (key, str(file)))
Expand Down
15 changes: 10 additions & 5 deletions backend/workers/expire_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from backend.lib.worker import BasicWorker
from common.lib.dataset import DataSet
from common.config_manager import config
from common.lib.exceptions import DataSetNotFoundException

from common.lib.user import User

Expand Down Expand Up @@ -55,10 +55,15 @@ def expire_datasets(self):
""")

for dataset in datasets:
dataset = DataSet(key=dataset["key"], db=self.db)
if dataset.is_expired():
self.log.info(f"Deleting dataset {dataset.key} (expired)")
dataset.delete()
try:
dataset = DataSet(key=dataset["key"], db=self.db)
if dataset.is_expired():
self.log.info(f"Deleting dataset {dataset.key} (expired)")
dataset.delete()

except DataSetNotFoundException:
# dataset already deleted I guess?
pass

def expire_users(self):
"""
Expand Down
6 changes: 6 additions & 0 deletions common/lib/config_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@
"help": "Can use explorer",
"tooltip": "Controls whether users can use the Explorer feature to navigate datasets."
},
"privileges.can_export_datasets": {
"type": UserInput.OPTION_TOGGLE,
"default": True,
"help": "Can export datasets",
"tooltip": "Allows users to export datasets they own to other 4CAT instances."
},
"privileges.admin.can_manage_users": {
"type": UserInput.OPTION_TOGGLE,
"default": False,
Expand Down
78 changes: 60 additions & 18 deletions common/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime
import hashlib
import fnmatch
import random
import shutil
import json
import time
Expand All @@ -14,9 +15,10 @@
import backend
from common.config_manager import config
from common.lib.job import Job, JobNotFoundException
from common.lib.helpers import get_software_version, NullAwareTextIOWrapper, convert_to_int
from common.lib.helpers import get_software_commit, NullAwareTextIOWrapper, convert_to_int
from common.lib.fourcat_module import FourcatModule
from common.lib.exceptions import ProcessorInterruptedException, DataSetException, MapItemException
from common.lib.exceptions import (ProcessorInterruptedException, DataSetException, DataSetNotFoundException,
MapItemException)


class DataSet(FourcatModule):
Expand Down Expand Up @@ -78,29 +80,29 @@ def __init__(self, parameters=None, key=None, job=None, data=None, db=None, pare
self.key = key
current = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (self.key,))
if not current:
raise TypeError("DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given" % key)
raise DataSetNotFoundException("DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given" % key)

query = current["query"]
elif job is not None:
current = self.db.fetchone("SELECT * FROM datasets WHERE parameters::json->>'job' = %s", (job,))
if not current:
raise TypeError("DataSet() requires a valid job ID for its 'job' argument")
raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument")

query = current["query"]
self.key = current["key"]
elif data is not None:
current = data
if "query" not in data or "key" not in data or "parameters" not in data or "key_parent" not in data:
raise ValueError("DataSet() requires a complete dataset record for its 'data' argument")
raise DataSetException("DataSet() requires a complete dataset record for its 'data' argument")

query = current["query"]
self.key = current["key"]
else:
if parameters is None:
raise TypeError("DataSet() requires either 'key', or 'parameters' to be given")
raise DataSetException("DataSet() requires either 'key', or 'parameters' to be given")

if not type:
raise ValueError("Datasets must have their type set explicitly")
raise DataSetException("Datasets must have their type set explicitly")

query = self.get_label(parameters, default=type)
self.key = self.get_key(query, parameters, parent)
Expand All @@ -122,7 +124,7 @@ def __init__(self, parameters=None, key=None, job=None, data=None, db=None, pare
"timestamp": int(time.time()),
"is_finished": False,
"is_private": is_private,
"software_version": get_software_version(),
"software_version": get_software_commit(),
"software_file": "",
"num_rows": 0,
"progress": 0.0,
Expand Down Expand Up @@ -556,7 +558,7 @@ def delete(self, commit=True):
try:
child = DataSet(key=child["key"], db=self.db)
child.delete(commit=commit)
except TypeError:
except DataSetException:
# dataset already deleted - race condition?
pass

Expand Down Expand Up @@ -977,7 +979,7 @@ def reserve_result_file(self, parameters=None, extension="csv"):
self.data["result_file"] = file
return updated > 0

def get_key(self, query, parameters, parent=""):
def get_key(self, query, parameters, parent="", time_offset=0):
"""
Generate a unique key for this dataset that can be used to identify it
Expand All @@ -987,6 +989,9 @@ def get_key(self, query, parameters, parent=""):
:param str query: Query string
:param parameters: Dataset parameters
:param parent: Parent dataset's key (if applicable)
:param time_offset: Offset to add to the time component of the dataset
key. This can be used to ensure a unique key even if the parameters and
timing is otherwise identical to an existing dataset's
:return str: Dataset key
"""
Expand All @@ -999,16 +1004,53 @@ def get_key(self, query, parameters, parent=""):
for key in sorted(parameters):
param_key[key] = parameters[key]

# this ensures a different key for the same query if not queried
# at the exact same second. Since the same query may return
# different results when done at different times, getting a
# duplicate key is not actually always desirable. The resolution
# of this salt could be experimented with...
param_key["_salt"] = int(time.time())
# we additionally use the current time as a salt - this should usually
# ensure a unique key for the dataset. if for some reason there is a
# hash collision
param_key["_salt"] = int(time.time()) + time_offset

parent_key = str(parent) if parent else ""
plain_key = repr(param_key) + str(query) + parent_key
return hashlib.md5(plain_key.encode("utf-8")).hexdigest()
hashed_key = hashlib.md5(plain_key.encode("utf-8")).hexdigest()

if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)):
# key exists, generate a new one
return self.get_key(query, parameters, parent, time_offset=random.randint(1,10))
else:
return hashed_key

def set_key(self, key):
"""
Change dataset key
In principe, keys should never be changed. But there are rare cases
where it is useful to do so, in particular when importing a dataset
from another 4CAT instance; in that case it makes sense to try and
ensure that the key is the same as it was before. This function sets
the dataset key and updates any dataset references to it.
:param str key: Key to set
:return str: Key that was set. If the desired key already exists, the
original key is kept.
"""
key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,))
if key_exists or not key:
return self.key

old_key = self.key
self.db.update("datasets", data={"key": key}, where={"key": old_key})

# update references
self.db.update("datasets", data={"key_parent": key}, where={"key_parent": old_key})
self.db.update("datasets_owners", data={"key": key}, where={"key": old_key})
self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key})
self.db.update("users_favourites", data={"key": key}, where={"key": old_key})

# for good measure
self.db.commit()
self.key = key

return self.key

def get_status(self):
"""
Expand Down Expand Up @@ -1186,7 +1228,7 @@ def get_genealogy(self, inclusive=False):
while key_parent:
try:
parent = DataSet(key=key_parent, db=self.db)
except TypeError:
except DataSetException:
break

genealogy.append(parent)
Expand Down
6 changes: 6 additions & 0 deletions common/lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ class DataSetException(FourcatException):
"""
pass

class DataSetNotFoundException(DataSetException):
"""
Raise if dataset does not exist
"""
pass


class JobClaimedException(QueueException):
"""
Expand Down
28 changes: 24 additions & 4 deletions common/lib/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from collections.abc import MutableMapping
from html.parser import HTMLParser
from urllib.parse import urlparse
from pathlib import Path
from calendar import monthrange

from common.lib.user_input import UserInput
Expand Down Expand Up @@ -99,18 +99,21 @@ def sniff_encoding(file):
return "utf-8-sig" if maybe_bom == b"\xef\xbb\xbf" else "utf-8"


def get_software_version():
def get_software_commit():
"""
Get current 4CAT version
Get current 4CAT commit hash
Reads a given version file and returns the first string found in there
(up until the first space). On failure, return an empty string.
Use `get_software_version()` instead if you need the release version
number rather than the precise commit hash.
If no version file is available, run `git show` to test if there is a git
repository in the 4CAT root folder, and if so, what commit is currently
checked out in it.
:return str: 4CAT version
:return str: 4CAT git commit hash
"""
versionpath = config.get('PATH_ROOT').joinpath(config.get('path.versionfile'))

Expand Down Expand Up @@ -139,6 +142,23 @@ def get_software_version():
except OSError:
return ""

def get_software_version():
"""
Get current 4CAT version
This is the actual software version, i.e. not the commit hash (see
`get_software_hash()` for that). The current version is stored in a file
with a canonical location: if the file doesn't exist, an empty string is
returned.
:return str: Software version, for example `1.37`.
"""
current_version_file = Path(config.get("PATH_ROOT"), "config/.current-version")
if not current_version_file.exists():
return ""

with current_version_file.open() as infile:
return infile.readline().strip()

def get_github_version(timeout=5):
"""
Expand Down
6 changes: 2 additions & 4 deletions common/lib/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ def __init__(self, output=False, filename='4cat.log'):
self.logger.addHandler(slack_handler)
except Exception:
# we *may* need the logger before the database is in working order
config.db.rollback()
if config.db is not None:
config.db.rollback()

def log(self, message, level=logging.INFO, frame=None):
"""
Expand All @@ -221,9 +222,6 @@ def log(self, message, level=logging.INFO, frame=None):
:param frame: Traceback frame. If no frame is given, it is
extrapolated
"""
if self.print_logs and level > logging.DEBUG:
print("LOG: %s" % message)

# logging can include the full stack trace in the log, but that's a
# bit excessive - instead, only include the location the log was called
if not frame:
Expand Down
Loading

0 comments on commit 0de7baf

Please sign in to comment.