Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQLAlchemy v2 support #553

Merged
merged 3 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions ckanext/harvest/harvesters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import re
import uuid

from sqlalchemy import exists, and_
import sqlalchemy as sa
from sqlalchemy.orm import contains_eager

from ckantoolkit import config
Expand Down Expand Up @@ -344,7 +344,9 @@ def _create_or_update_package(self, package_dict, harvest_object,
# plugin)
harvest_object.add()

model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED')
model.Session.execute(
sa.text('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED')
)
model.Session.flush()

new_package = p.toolkit.get_action(
Expand Down Expand Up @@ -400,12 +402,12 @@ def last_error_free_job(cls, harvest_job):
.filter(HarvestJob.status == 'Finished')
.filter(HarvestJob.id != harvest_job.id)
.filter(
~exists().where(
~sa.exists().where(
HarvestGatherError.harvest_job_id == HarvestJob.id))
.outerjoin(HarvestObject,
and_(HarvestObject.harvest_job_id == HarvestJob.id,
HarvestObject.current == False, # noqa: E712
HarvestObject.report_status != 'not modified'))
sa.and_(HarvestObject.harvest_job_id == HarvestJob.id,
HarvestObject.current == False, # noqa: E712
HarvestObject.report_status != 'not modified'))
.options(contains_eager(HarvestJob.objects))
.order_by(HarvestJob.gather_started.desc()))
# now check them until we find one with no fetch/import errors
Expand Down
19 changes: 11 additions & 8 deletions ckanext/harvest/logic/action/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import logging
import datetime

import sqlalchemy as sa
from ckantoolkit import config
from sqlalchemy import and_, or_
from urllib.parse import urljoin

from ckan.lib.search.index import PackageSearchIndex
Expand Down Expand Up @@ -194,7 +194,8 @@ def harvest_source_clear(context, data_dict):
sql += """
COMMIT;
"""
model.Session.execute(sql)

model.Session.execute(sa.text(sql))

# Refresh the index for this source to update the status object
get_action("harvest_source_reindex")(context, {"id": harvest_source_id})
Expand Down Expand Up @@ -376,7 +377,7 @@ def harvest_source_job_history_clear(context, data_dict):
COMMIT;
'''.format(harvest_source_id=harvest_source_id)

model.Session.execute(sql)
model.Session.execute(sa.text(sql))

# Refresh the index for this source to update the status object
get_action('harvest_source_reindex')(context, {'id': harvest_source_id})
Expand Down Expand Up @@ -497,8 +498,8 @@ def harvest_objects_import(context, data_dict):
.join(Package)
.filter(HarvestObject.current == True) # noqa: E712
.filter(Package.state == u'active')
.filter(or_(Package.id == package_id_or_name,
Package.name == package_id_or_name)))
.filter(sa.or_(Package.id == package_id_or_name,
Package.name == package_id_or_name)))
join_datasets = False
else:
last_objects_ids = \
Expand Down Expand Up @@ -639,8 +640,8 @@ def harvest_jobs_run(context, data_dict):
num_objects_in_progress = \
session.query(HarvestObject.id) \
.filter(HarvestObject.harvest_job_id == job['id']) \
.filter(and_((HarvestObject.state != u'COMPLETE'),
(HarvestObject.state != u'ERROR'))) \
.filter(sa.and_((HarvestObject.state != u'COMPLETE'),
(HarvestObject.state != u'ERROR'))) \
.count()

if num_objects_in_progress == 0:
Expand Down Expand Up @@ -947,7 +948,9 @@ def harvest_source_reindex(context, data_dict):
'validate': False,
})
package_dict = logic.get_action('harvest_source_show')(
context, {'id': harvest_source_id})
dict(context, validate=False, use_cache=False),
{'id': harvest_source_id},
)
log.debug('Updating search index for harvest source: %s',
package_dict.get('name') or harvest_source_id)

Expand Down
1 change: 0 additions & 1 deletion ckanext/harvest/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,6 @@ def harvest_object_before_insert_listener(mapper, connection, target):
if not target.harvest_source_id or not target.source:
if not target.job:
raise Exception("You must define a Harvest Job for each Harvest Object")
target.source = target.job.source
target.harvest_source_id = target.job.source.id


Expand Down