Skip to content

Commit

Permalink
[Subscriptions] convert subscribe-methods to Celery tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
stefankoegl committed Sep 13, 2014
1 parent 5b58950 commit 006677c
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 168 deletions.
4 changes: 1 addition & 3 deletions mygpo/administration/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from mygpo.podcasts.models import Podcast, Episode
from mygpo.users.models import Client, EpisodeAction
from mygpo.maintenance.merge import PodcastMerger
from mygpo.subscriptions import subscribe, unsubscribe
from mygpo.subscriptions.tasks import subscribe, unsubscribe
from mygpo.db.couchdb.episode_state import episode_state_for_user_episode, \
add_episode_actions
from mygpo.utils import get_timestamp
Expand Down Expand Up @@ -56,9 +56,7 @@ def test_merge(self):
device2 = Client.objects.create(user=user, uid='dev2', id=uuid.uuid1())

subscribe(p1, user, device1)
time.sleep(1)
unsubscribe(p1, user, device1)
time.sleep(1)
subscribe(p1, user, device1)
subscribe(p2, user, device2)

Expand Down
6 changes: 3 additions & 3 deletions mygpo/api/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from mygpo.users.models import SubscriptionException
from mygpo.api.backend import get_device
from mygpo.utils import normalize_feed_url
from mygpo.subscriptions import subscribe, unsubscribe
from mygpo.subscriptions.tasks import subscribe, unsubscribe

import logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -73,11 +73,11 @@ def upload(request):

for n in new:
p = Podcast.objects.get_or_create_for_url(n)
subscribe(p, user, dev)
subscribe.delay(p, user, dev)

for r in rem:
p = Podcast.objects.get_or_create_for_url(r)
unsubscribe(p, user, dev)
unsubscribe.delay(p, user, dev)

return HttpResponse('@SUCCESS', content_type='text/plain')

Expand Down
7 changes: 4 additions & 3 deletions mygpo/api/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
from mygpo.api.httpresponse import JsonResponse
from mygpo.directory.models import ExamplePodcast
from mygpo.api.advanced.directory import podcast_data
from mygpo.subscriptions import get_subscribed_podcasts, subscribe, unsubscribe
from mygpo.subscriptions import get_subscribed_podcasts
from mygpo.subscriptions.tasks import subscribe, unsubscribe
from mygpo.directory.search import search_podcasts
from mygpo.decorators import allowed_methods, cors_origin
from mygpo.utils import parse_range, normalize_feed_url
Expand Down Expand Up @@ -213,11 +214,11 @@ def set_subscriptions(urls, user, device_uid, user_agent):

remove_podcasts = Podcast.objects.filter(urls__url__in=rem)
for podcast in remove_podcasts:
unsubscribe(podcast, user, device)
unsubscribe.delay(podcast, user, device)

for url in new:
podcast = Podcast.objects.get_or_create_for_url(url)
subscribe(podcast, user, device, url)
subscribe.delay(podcast, user, device, url)

# Only an empty response is a successful response
return HttpResponse('', content_type='text/plain')
Expand Down
8 changes: 4 additions & 4 deletions mygpo/api/subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
from mygpo.decorators import cors_origin
from mygpo.users.models import Client
from mygpo.core.json import JSONDecodeError
from mygpo.subscriptions import (subscribe, unsubscribe,
get_subscription_history, subscription_diff)
from mygpo.subscriptions.tasks import subscribe, unsubscribe
from mygpo.subscriptions import get_subscription_history, subscription_diff
from mygpo.api.basic_auth import require_valid_user, check_username


Expand Down Expand Up @@ -164,11 +164,11 @@ def update_subscriptions(self, user, device, add, remove):

for add_url in add_s:
podcast = Podcast.objects.get_or_create_for_url(add_url)
subscribe(podcast, user, device, add_url)
subscribe.delay(podcast, user, device, add_url)

remove_podcasts = Podcast.objects.filter(urls__url__in=rem_s)
for podcast in remove_podcasts:
unsubscribe(podcast, user, device)
unsubscribe.delay(podcast, user, device)

return updated_urls

Expand Down
140 changes: 0 additions & 140 deletions mygpo/subscriptions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
from datetime import datetime
import collections

from django.db import transaction

from mygpo.users.models import Client
from mygpo.subscriptions.models import (Subscription, SubscribedPodcast,
PodcastConfig, )
from mygpo.subscriptions.signals import subscription_changed
from mygpo.history.models import HistoryEntry
from mygpo.utils import to_maxlength

import logging
logger = logging.getLogger(__name__)
Expand All @@ -20,122 +15,6 @@
)


@transaction.atomic
def subscribe(podcast, user, client, ref_url=None):
""" subscribes user to the current podcast on one client
Takes syned devices into account. """
ref_url = ref_url or podcast.url
now = datetime.utcnow()
clients = _affected_clients(client)

# fully execute subscriptions, before firing events
changed = list(_perform_subscribe(podcast, user, clients, now, ref_url))
_fire_events(podcast, user, changed, True)


@transaction.atomic
def unsubscribe(podcast, user, client):
""" unsubscribes user from the current podcast on one client
Takes syned devices into account. """
now = datetime.utcnow()
clients = _affected_clients(client)

# fully execute unsubscriptions, before firing events
# otherwise the first fired event might revert the unsubscribe
changed = list(_perform_unsubscribe(podcast, user, clients, now))
_fire_events(podcast, user, changed, False)


@transaction.atomic
def subscribe_all(podcast, user, ref_url=None):
""" subscribes user to the current podcast on all clients """
ref_url = ref_url or podcast.url
now = datetime.utcnow()
clients = user.client_set.all()

# fully execute subscriptions, before firing events
changed = list(_perform_subscribe(podcast, user, clients, now, ref_url))
_fire_events(podcast, user, changed, True)


@transaction.atomic
def unsubscribe_all(podcast, user):
""" unsubscribes user from the current podcast on all clients """
now = datetime.utcnow()
clients = user.client_set.filter(subscription__podcast=podcast)

# fully execute subscriptions, before firing events
changed = list(_perform_unsubscribe(podcast, user, clients, now))
_fire_events(podcast, user, changed, False)


def _perform_subscribe(podcast, user, clients, timestamp, ref_url):
""" Subscribes to a podcast on multiple clients
Yields the clients on which a subscription was added, ie not those where
the subscription already existed. """

for client in clients:
subscription, created = Subscription.objects.get_or_create(
user=user, client=client, podcast=podcast, defaults={
'ref_url': to_maxlength(Subscription, 'ref_url', ref_url),
'created': timestamp,
'modified': timestamp,
}
)

if not created:
continue

logger.info('{user} subscribed to {podcast} on {client}'.format(
user=user, podcast=podcast, client=client))

HistoryEntry.objects.create(
timestamp=timestamp,
podcast=podcast,
user=user,
client=client,
action=HistoryEntry.SUBSCRIBE,
)

yield client


def _perform_unsubscribe(podcast, user, clients, timestamp):
""" Unsubscribes from a podcast on multiple clients
Yields the clients on which a subscription was removed, ie not those where
the podcast was not subscribed. """

for client in clients:

try:
subscription = Subscription.objects.get(
user=user,
client=client,
podcast=podcast,
)
except Subscription.DoesNotExist:
continue

subscription.delete()

logger.info('{user} unsubscribed from {podcast} on {client}'.format(
user=user, podcast=podcast, client=client))

HistoryEntry.objects.create(
timestamp=timestamp,
podcast=podcast,
user=user,
client=client,
action=HistoryEntry.UNSUBSCRIBE,
)

yield client


def get_subscribe_targets(podcast, user):
""" Clients / SyncGroup on which the podcast can be subscribed
Expand Down Expand Up @@ -264,22 +143,3 @@ def subscription_diff(history):
subscriptions.items() if value < 0]

return subscribe, unsubscribe


def _affected_clients(client):
""" the clients that are affected if the given one is to be changed """
if client.sync_group:
# if the client is synced, all are affected
return client.sync_group.client_set.all()

else:
# if its not synced, only the client is affected
return [client]


def _fire_events(podcast, user, clients, subscribed):
""" Fire the events for subscription / unsubscription """
for client in clients:
subscription_changed.send(sender=podcast.__class__, instance=podcast,
user=user, client=client,
subscribed=subscribed)
Loading

0 comments on commit 006677c

Please sign in to comment.