diff --git a/mygpo/administration/tests.py b/mygpo/administration/tests.py index 09411b248..e4ffcd5a7 100644 --- a/mygpo/administration/tests.py +++ b/mygpo/administration/tests.py @@ -16,7 +16,7 @@ from mygpo.podcasts.models import Podcast, Episode from mygpo.users.models import Client, EpisodeAction from mygpo.maintenance.merge import PodcastMerger -from mygpo.subscriptions import subscribe, unsubscribe +from mygpo.subscriptions.tasks import subscribe, unsubscribe from mygpo.db.couchdb.episode_state import episode_state_for_user_episode, \ add_episode_actions from mygpo.utils import get_timestamp @@ -56,9 +56,7 @@ def test_merge(self): device2 = Client.objects.create(user=user, uid='dev2', id=uuid.uuid1()) subscribe(p1, user, device1) - time.sleep(1) unsubscribe(p1, user, device1) - time.sleep(1) subscribe(p1, user, device1) subscribe(p2, user, device2) diff --git a/mygpo/api/legacy.py b/mygpo/api/legacy.py index 486360734..188be3035 100644 --- a/mygpo/api/legacy.py +++ b/mygpo/api/legacy.py @@ -28,7 +28,7 @@ from mygpo.users.models import SubscriptionException from mygpo.api.backend import get_device from mygpo.utils import normalize_feed_url -from mygpo.subscriptions import subscribe, unsubscribe +from mygpo.subscriptions.tasks import subscribe, unsubscribe import logging logger = logging.getLogger(__name__) @@ -73,11 +73,11 @@ def upload(request): for n in new: p = Podcast.objects.get_or_create_for_url(n) - subscribe(p, user, dev) + subscribe.delay(p, user, dev) for r in rem: p = Podcast.objects.get_or_create_for_url(r) - unsubscribe(p, user, dev) + unsubscribe.delay(p, user, dev) return HttpResponse('@SUCCESS', content_type='text/plain') diff --git a/mygpo/api/simple.py b/mygpo/api/simple.py index e22205c3e..ee2ff2ff7 100644 --- a/mygpo/api/simple.py +++ b/mygpo/api/simple.py @@ -37,7 +37,8 @@ from mygpo.api.httpresponse import JsonResponse from mygpo.directory.models import ExamplePodcast from mygpo.api.advanced.directory import podcast_data -from mygpo.subscriptions import get_subscribed_podcasts, subscribe, unsubscribe +from mygpo.subscriptions import get_subscribed_podcasts +from mygpo.subscriptions.tasks import subscribe, unsubscribe from mygpo.directory.search import search_podcasts from mygpo.decorators import allowed_methods, cors_origin from mygpo.utils import parse_range, normalize_feed_url @@ -213,11 +214,11 @@ def set_subscriptions(urls, user, device_uid, user_agent): remove_podcasts = Podcast.objects.filter(urls__url__in=rem) for podcast in remove_podcasts: - unsubscribe(podcast, user, device) + unsubscribe.delay(podcast, user, device) for url in new: podcast = Podcast.objects.get_or_create_for_url(url) - subscribe(podcast, user, device, url) + subscribe.delay(podcast, user, device, url) # Only an empty response is a successful response return HttpResponse('', content_type='text/plain') diff --git a/mygpo/api/subscriptions.py b/mygpo/api/subscriptions.py index 94d02ccd1..2068c5b55 100644 --- a/mygpo/api/subscriptions.py +++ b/mygpo/api/subscriptions.py @@ -32,8 +32,8 @@ from mygpo.decorators import cors_origin from mygpo.users.models import Client from mygpo.core.json import JSONDecodeError -from mygpo.subscriptions import (subscribe, unsubscribe, - get_subscription_history, subscription_diff) +from mygpo.subscriptions.tasks import subscribe, unsubscribe +from mygpo.subscriptions import get_subscription_history, subscription_diff from mygpo.api.basic_auth import require_valid_user, check_username @@ -164,11 +164,11 @@ def update_subscriptions(self, user, device, add, remove): for add_url in add_s: podcast = Podcast.objects.get_or_create_for_url(add_url) - subscribe(podcast, user, device, add_url) + subscribe.delay(podcast, user, device, add_url) remove_podcasts = Podcast.objects.filter(urls__url__in=rem_s) for podcast in remove_podcasts: - unsubscribe(podcast, user, device) + unsubscribe.delay(podcast, user, device) return updated_urls diff --git a/mygpo/subscriptions/__init__.py b/mygpo/subscriptions/__init__.py index 59a6807da..028a8a092 100644 --- a/mygpo/subscriptions/__init__.py +++ b/mygpo/subscriptions/__init__.py @@ -1,14 +1,9 @@ -from datetime import datetime import collections -from django.db import transaction - from mygpo.users.models import Client from mygpo.subscriptions.models import (Subscription, SubscribedPodcast, PodcastConfig, ) -from mygpo.subscriptions.signals import subscription_changed from mygpo.history.models import HistoryEntry -from mygpo.utils import to_maxlength import logging logger = logging.getLogger(__name__) @@ -20,122 +15,6 @@ ) -@transaction.atomic -def subscribe(podcast, user, client, ref_url=None): - """ subscribes user to the current podcast on one client - - Takes syned devices into account. """ - ref_url = ref_url or podcast.url - now = datetime.utcnow() - clients = _affected_clients(client) - - # fully execute subscriptions, before firing events - changed = list(_perform_subscribe(podcast, user, clients, now, ref_url)) - _fire_events(podcast, user, changed, True) - - -@transaction.atomic -def unsubscribe(podcast, user, client): - """ unsubscribes user from the current podcast on one client - - Takes syned devices into account. """ - now = datetime.utcnow() - clients = _affected_clients(client) - - # fully execute unsubscriptions, before firing events - # otherwise the first fired event might revert the unsubscribe - changed = list(_perform_unsubscribe(podcast, user, clients, now)) - _fire_events(podcast, user, changed, False) - - -@transaction.atomic -def subscribe_all(podcast, user, ref_url=None): - """ subscribes user to the current podcast on all clients """ - ref_url = ref_url or podcast.url - now = datetime.utcnow() - clients = user.client_set.all() - - # fully execute subscriptions, before firing events - changed = list(_perform_subscribe(podcast, user, clients, now, ref_url)) - _fire_events(podcast, user, changed, True) - - -@transaction.atomic -def unsubscribe_all(podcast, user): - """ unsubscribes user from the current podcast on all clients """ - now = datetime.utcnow() - clients = user.client_set.filter(subscription__podcast=podcast) - - # fully execute subscriptions, before firing events - changed = list(_perform_unsubscribe(podcast, user, clients, now)) - _fire_events(podcast, user, changed, False) - - -def _perform_subscribe(podcast, user, clients, timestamp, ref_url): - """ Subscribes to a podcast on multiple clients - - Yields the clients on which a subscription was added, ie not those where - the subscription already existed. """ - - for client in clients: - subscription, created = Subscription.objects.get_or_create( - user=user, client=client, podcast=podcast, defaults={ - 'ref_url': to_maxlength(Subscription, 'ref_url', ref_url), - 'created': timestamp, - 'modified': timestamp, - } - ) - - if not created: - continue - - logger.info('{user} subscribed to {podcast} on {client}'.format( - user=user, podcast=podcast, client=client)) - - HistoryEntry.objects.create( - timestamp=timestamp, - podcast=podcast, - user=user, - client=client, - action=HistoryEntry.SUBSCRIBE, - ) - - yield client - - -def _perform_unsubscribe(podcast, user, clients, timestamp): - """ Unsubscribes from a podcast on multiple clients - - Yields the clients on which a subscription was removed, ie not those where - the podcast was not subscribed. """ - - for client in clients: - - try: - subscription = Subscription.objects.get( - user=user, - client=client, - podcast=podcast, - ) - except Subscription.DoesNotExist: - continue - - subscription.delete() - - logger.info('{user} unsubscribed from {podcast} on {client}'.format( - user=user, podcast=podcast, client=client)) - - HistoryEntry.objects.create( - timestamp=timestamp, - podcast=podcast, - user=user, - client=client, - action=HistoryEntry.UNSUBSCRIBE, - ) - - yield client - - def get_subscribe_targets(podcast, user): """ Clients / SyncGroup on which the podcast can be subscribed @@ -264,22 +143,3 @@ def subscription_diff(history): subscriptions.items() if value < 0] return subscribe, unsubscribe - - -def _affected_clients(client): - """ the clients that are affected if the given one is to be changed """ - if client.sync_group: - # if the client is synced, all are affected - return client.sync_group.client_set.all() - - else: - # if its not synced, only the client is affected - return [client] - - -def _fire_events(podcast, user, clients, subscribed): - """ Fire the events for subscription / unsubscription """ - for client in clients: - subscription_changed.send(sender=podcast.__class__, instance=podcast, - user=user, client=client, - subscribed=subscribed) diff --git a/mygpo/subscriptions/tasks.py b/mygpo/subscriptions/tasks.py new file mode 100644 index 000000000..70f5d420d --- /dev/null +++ b/mygpo/subscriptions/tasks.py @@ -0,0 +1,155 @@ +from datetime import datetime + +from django.db import transaction + +from mygpo.subscriptions.models import Subscription +from mygpo.subscriptions.signals import subscription_changed +from mygpo.history.models import HistoryEntry +from mygpo.utils import to_maxlength +from mygpo.celery import celery + +import logging +logger = logging.getLogger(__name__) + + +SUBSCRIPTION_ACTIONS = ( + HistoryEntry.SUBSCRIBE, + HistoryEntry.UNSUBSCRIBE, +) + + +@celery.task(max_retries=5, default_retry_delay=60) +def subscribe(podcast, user, client, ref_url=None): + """ subscribes user to the current podcast on one client + + Takes syned devices into account. """ + ref_url = ref_url or podcast.url + now = datetime.utcnow() + clients = _affected_clients(client) + + # fully execute subscriptions, before firing events + changed = list(_perform_subscribe(podcast, user, clients, now, ref_url)) + _fire_events(podcast, user, changed, True) + + +@celery.task(max_retries=5, default_retry_delay=60) +def unsubscribe(podcast, user, client): + """ unsubscribes user from the current podcast on one client + + Takes syned devices into account. """ + now = datetime.utcnow() + clients = _affected_clients(client) + + # fully execute unsubscriptions, before firing events + # otherwise the first fired event might revert the unsubscribe + changed = list(_perform_unsubscribe(podcast, user, clients, now)) + _fire_events(podcast, user, changed, False) + + +@celery.task(max_retries=5, default_retry_delay=60) +def subscribe_all(podcast, user, ref_url=None): + """ subscribes user to the current podcast on all clients """ + ref_url = ref_url or podcast.url + now = datetime.utcnow() + clients = user.client_set.all() + + # fully execute subscriptions, before firing events + changed = list(_perform_subscribe(podcast, user, clients, now, ref_url)) + _fire_events(podcast, user, changed, True) + + +@celery.task(max_retries=5, default_retry_delay=60) +def unsubscribe_all(podcast, user): + """ unsubscribes user from the current podcast on all clients """ + now = datetime.utcnow() + clients = user.client_set.filter(subscription__podcast=podcast) + + # fully execute subscriptions, before firing events + changed = list(_perform_unsubscribe(podcast, user, clients, now)) + _fire_events(podcast, user, changed, False) + + +@transaction.atomic +def _perform_subscribe(podcast, user, clients, timestamp, ref_url): + """ Subscribes to a podcast on multiple clients + + Yields the clients on which a subscription was added, ie not those where + the subscription already existed. """ + + for client in clients: + subscription, created = Subscription.objects.get_or_create( + user=user, client=client, podcast=podcast, defaults={ + 'ref_url': to_maxlength(Subscription, 'ref_url', ref_url), + 'created': timestamp, + 'modified': timestamp, + } + ) + + if not created: + continue + + logger.info('{user} subscribed to {podcast} on {client}'.format( + user=user, podcast=podcast, client=client)) + + HistoryEntry.objects.create( + timestamp=timestamp, + podcast=podcast, + user=user, + client=client, + action=HistoryEntry.SUBSCRIBE, + ) + + yield client + + +@transaction.atomic +def _perform_unsubscribe(podcast, user, clients, timestamp): + """ Unsubscribes from a podcast on multiple clients + + Yields the clients on which a subscription was removed, ie not those where + the podcast was not subscribed. """ + + for client in clients: + + try: + subscription = Subscription.objects.get( + user=user, + client=client, + podcast=podcast, + ) + except Subscription.DoesNotExist: + continue + + subscription.delete() + + logger.info('{user} unsubscribed from {podcast} on {client}'.format( + user=user, podcast=podcast, client=client)) + + HistoryEntry.objects.create( + timestamp=timestamp, + podcast=podcast, + user=user, + client=client, + action=HistoryEntry.UNSUBSCRIBE, + ) + + yield client + + +def _affected_clients(client): + """ the clients that are affected if the given one is to be changed """ + if client.sync_group: + # if the client is synced, all are affected + return client.sync_group.client_set.all() + + else: + # if its not synced, only the client is affected + return [client] + + +def _fire_events(podcast, user, clients, subscribed): + """ Fire the events for subscription / unsubscription """ + for client in clients: + subscription_changed.send(sender=podcast.__class__, instance=podcast, + user=user, client=client, + subscribed=subscribed) diff --git a/mygpo/users/models.py b/mygpo/users/models.py index 33ba63e05..0e7e34416 100644 --- a/mygpo/users/models.py +++ b/mygpo/users/models.py @@ -318,7 +318,7 @@ class SyncGroup(models.Model): def sync(self): """ Sync the group, ie bring all members up-to-date """ - from mygpo.subscriptions import subscribe + from mygpo.subscriptions.tasks import subscribe # get all subscribed podcasts podcasts = set(self.get_subscribed_podcasts()) @@ -327,7 +327,7 @@ def sync(self): for client in self.client_set.all(): missing_podcasts = self.get_missing_podcasts(client, podcasts) for podcast in missing_podcasts: - subscribe(podcast, self.user, client) + subscribe.delay(podcast, self.user, client) def get_subscribed_podcasts(self): return Podcast.objects.filter(subscription__client__sync_group=self) diff --git a/mygpo/users/tests.py b/mygpo/users/tests.py index b3f64a205..ce41643ee 100644 --- a/mygpo/users/tests.py +++ b/mygpo/users/tests.py @@ -29,7 +29,7 @@ from mygpo.maintenance.merge import PodcastMerger from mygpo.api.backend import get_device from mygpo.users.models import Client, SyncGroup, UserProxy -from mygpo.subscriptions import subscribe, unsubscribe +from mygpo.subscriptions.tasks import subscribe, unsubscribe class DeviceSyncTests(unittest.TestCase): @@ -102,12 +102,6 @@ def test_merge_podcasts(self): pm = PodcastMerger([self.podcast1, self.podcast2], Counter(), []) pm.merge() - # seems that setting delayed_commit = false in the CouchDB config, as - # well as a delay here fix the intermittent failures. - # TODO: further investiation needed - import time - time.sleep(2) - # get podcast for URL of podcast2 and unsubscribe from it p = Podcast.objects.get(urls__url=self.P2_URL) unsubscribe(p, self.user, self.device) diff --git a/mygpo/web/views/podcast.py b/mygpo/web/views/podcast.py index 4ee370703..cba2db961 100644 --- a/mygpo/web/views/podcast.py +++ b/mygpo/web/views/podcast.py @@ -16,12 +16,12 @@ from mygpo.podcasts.models import Podcast, PodcastGroup, Episode, Tag from mygpo.users.models import SubscriptionException from mygpo.subscriptions.models import Subscription -from mygpo.subscriptions import ( +from mygpo.subscriptions import get_subscribe_targets +from mygpo.subscriptions.tasks import ( subscribe as subscribe_podcast, unsubscribe as unsubscribe_podcast, subscribe_all as subscribe_podcast_all, unsubscribe_all as unsubscribe_podcast_all, - get_subscribe_targets ) from mygpo.history.models import HistoryEntry from mygpo.core.proxy import proxy_object @@ -282,7 +282,7 @@ def subscribe(request, podcast): for uid in device_uids: try: device = request.user.client_set.get(uid=uid) - subscribe_podcast(podcast, request.user, device) + subscribe_podcast.delay(podcast, request.user, device) except Client.DoesNotExist as e: messages.error(request, str(e)) @@ -303,7 +303,7 @@ def subscribe(request, podcast): def subscribe_all(request, podcast): """ subscribe all of the user's devices to the podcast """ user = request.user - subscribe_podcast_all(podcast, user) + subscribe_podcast_all.delay(podcast, user) return HttpResponseRedirect(get_podcast_link_target(podcast)) @@ -325,7 +325,7 @@ def unsubscribe(request, podcast, device_uid): return HttpResponseRedirect(return_to) try: - unsubscribe_podcast(podcast, user, device) + unsubscribe_podcast.delay(podcast, user, device) except SubscriptionException as e: logger.exception('Web: %(username)s: could not unsubscribe from podcast %(podcast_url)s on device %(device_id)s' % {'username': request.user.username, 'podcast_url': podcast.url, 'device_id': device.id}) @@ -339,7 +339,7 @@ def unsubscribe(request, podcast, device_uid): def unsubscribe_all(request, podcast): """ unsubscribe all of the user's devices from the podcast """ user = request.user - unsubscribe_podcast_all(podcast, user) + unsubscribe_podcast_all.delay(podcast, user) return HttpResponseRedirect(get_podcast_link_target(podcast))