Skip to content

Commit

Permalink
Merge pull request #3632 from xoriole/fix_torrent_checker
Browse files Browse the repository at this point in the history
Fixed issue with closing Torrent checker
  • Loading branch information
qstokkink authored May 18, 2018
2 parents 2809e1f + f319be0 commit 21faebb
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 36 deletions.
50 changes: 34 additions & 16 deletions Tribler/Core/TorrentChecker/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
MAX_TRACKER_MULTI_SCRAPE = 74


def create_tracker_session(tracker_url, timeout, socket_manager):
def create_tracker_session(tracker_url, timeout, socket_manager, connection_pool=None):
"""
Creates a tracker session with the given tracker URL.
:param tracker_url: The given tracker URL.
Expand All @@ -47,8 +47,7 @@ def create_tracker_session(tracker_url, timeout, socket_manager):

if tracker_type == u'udp':
return UdpTrackerSession(tracker_url, tracker_address, announce_page, timeout, socket_manager)
else:
return HttpTrackerSession(tracker_url, tracker_address, announce_page, timeout)
return HttpTrackerSession(tracker_url, tracker_address, announce_page, timeout, connection_pool=connection_pool)


class TrackerSession(TaskManager):
Expand All @@ -70,6 +69,7 @@ def __init__(self, tracker_type, tracker_url, tracker_address, announce_page, ti

self._retries = 0
self.timeout = timeout
self.timeout_call = None

self._last_contact = None

Expand Down Expand Up @@ -124,6 +124,22 @@ def connect_to_tracker(self):
"""Does some work when a connection has been established."""
pass

def start_timeout(self):
self.timeout_call = self.register_task("timeout", reactor.callLater(self.timeout, self.on_timeout)) \
if self.timeout != 0 else None

def on_timeout(self):
"""
This method is executed if session fails to return the response within expected time.
"""
self._is_failed = True
self._is_timed_out = True
if self.result_deferred and not self.result_deferred.called:
timeout_msg = "%s tracker timeout for url %s" % (self._tracker_type, self._tracker_url)
self.result_deferred.errback(ValueError(timeout_msg))
self.result_deferred = None
self.timeout_call = None

@abstractproperty
def max_retries(self):
"""Number of retries before a session is marked as failed."""
Expand Down Expand Up @@ -175,7 +191,7 @@ def is_timed_out(self):


class HttpTrackerSession(TrackerSession):
def __init__(self, tracker_url, tracker_address, announce_page, timeout):
def __init__(self, tracker_url, tracker_address, announce_page, timeout, connection_pool=None):
super(HttpTrackerSession, self).__init__(u'http', tracker_url, tracker_address, announce_page, timeout)
self._header_buffer = None
self._message_buffer = None
Expand All @@ -184,7 +200,7 @@ def __init__(self, tracker_url, tracker_address, announce_page, timeout):
self._received_length = None
self.result_deferred = None
self.request = None
self._connection_pool = HTTPConnectionPool(reactor, False)
self._connection_pool = connection_pool if connection_pool else HTTPConnectionPool(reactor, False)

def max_retries(self):
"""
Expand Down Expand Up @@ -221,9 +237,10 @@ def connect_to_tracker(self):
self.request = self.register_task("request", agent.request('GET', bytes(url)))
self.request.addCallback(self.on_response)
self.request.addErrback(self.on_error)

self._logger.debug(u"%s HTTP SCRAPE message sent: %s", self, url)

self.start_timeout()

# Return deferred that will evaluate when the whole chain is done.
self.result_deferred = self.register_task("result", Deferred(canceller=self._on_cancel))

Expand Down Expand Up @@ -251,24 +268,23 @@ def on_response(self, response):
# All ok, parse the body
self.register_task("parse_body", readBody(response).addCallbacks(self._process_scrape_response, self.on_error))

def _on_cancel(self, a):
def _on_cancel(self, _):
"""
:param _: The deferred which we ignore.
This function handles the scenario of the session prematurely being cleaned up,
most likely due to a shutdown.
This function only should be called by the result_deferred.
"""
self._logger.info(
"The result deferred of this HTTP tracker session is being cancelled due to a session cleanup. HTTP url: %s",
self.tracker_url)
self._logger.info("The result deferred of this HTTP tracker session is being cancelled "
"due to a session cleanup. HTTP url: %s", self.tracker_url)

def failed(self, msg=None):
"""
This method handles everything that needs to be done when one step
in the session has failed and thus no data can be obtained.
"""
self._is_failed = True
if self.result_deferred:
if self.result_deferred and not self.result_deferred.called:
result_msg = "HTTP tracker failed for url %s" % self._tracker_url
if msg:
result_msg += " (error: %s)" % unicode(msg, errors='replace')
Expand Down Expand Up @@ -319,7 +335,8 @@ def _process_scrape_response(self, body):
response_list.append({'infohash': infohash.encode('hex'), 'seeders': 0, 'leechers': 0})

self._is_finished = True
self.result_deferred.callback({self.tracker_url: response_list})
if self.result_deferred and not self.result_deferred.called:
self.result_deferred.callback({self.tracker_url: response_list})

@inlineCallbacks
def cleanup(self):
Expand Down Expand Up @@ -382,8 +399,6 @@ def __init__(self, tracker_url, tracker_address, announce_page, timeout, socket_
self.action = TRACKER_ACTION_CONNECT
self.generate_transaction_id()

self.timeout_call = self.reactor.callLater(self.timeout, self.failed) if self.timeout != 0 else None

def on_error(self, failure):
"""
Handles the case when resolving an ip address fails.
Expand Down Expand Up @@ -419,7 +434,7 @@ def failed(self, msg=None):
This method handles everything that needs to be done when one step
in the session has failed and thus no data can be obtained.
"""
if self.result_deferred and not self._is_failed:
if self.result_deferred and not self.result_deferred.called and not self._is_failed:
result_msg = "UDP tracker failed for url %s" % self._tracker_url
if msg:
result_msg += " (error: %s)" % unicode(msg, errors='replace')
Expand Down Expand Up @@ -488,6 +503,8 @@ def connect_to_tracker(self):
# no more requests can be appended to this session
self._is_initiated = True

self.start_timeout()

# clean old deferreds if present
self.cancel_pending_task("result")
self.cancel_pending_task("resolve")
Expand Down Expand Up @@ -610,7 +627,8 @@ def handle_scrape_response(self, response):
UdpTrackerSession.remove_transaction_id(self)
self._is_finished = True

self.result_deferred.callback({self.tracker_url: response_list})
if self.result_deferred and not self.result_deferred.called:
self.result_deferred.callback({self.tracker_url: response_list})


class FakeDHTSession(TrackerSession):
Expand Down
42 changes: 33 additions & 9 deletions Tribler/Core/TorrentChecker/torrent_checker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
import socket
import time
from Tribler.Core.Utilities.utilities import is_valid_url
from binascii import hexlify

from twisted.internet import reactor
from twisted.internet.defer import DeferredList, CancelledError, fail, succeed, maybeDeferred
from twisted.internet.error import ConnectingCancelledError
from twisted.python.failure import Failure
from twisted.web.client import HTTPConnectionPool

from Tribler.Core.TorrentChecker.session import create_tracker_session, FakeDHTSession, UdpSocketManager
from Tribler.Core.Utilities.tracker_utils import MalformedTrackerURLException
Expand Down Expand Up @@ -44,11 +46,13 @@ def __init__(self, session):
self.session_stop_defer_list = []

self.socket_mgr = self.udp_port = None
self.connection_pool = None

@blocking_call_on_reactor_thread
def initialize(self):
self._torrent_db = self.tribler_session.open_dbhandler(NTFY_TORRENTS)
self._reschedule_tracker_select()
self.connection_pool = HTTPConnectionPool(reactor, False)
self.socket_mgr = UdpSocketManager()
self.create_socket_or_schedule()

Expand Down Expand Up @@ -79,6 +83,9 @@ def shutdown(self):
self.session_stop_defer_list.append(maybeDeferred(self.udp_port.stopListening))
self.udp_port = None

if self.connection_pool:
self.session_stop_defer_list.append(self.connection_pool.closeCachedConnections())

self.shutdown_task_manager()

# kill all the tracker sessions.
Expand Down Expand Up @@ -113,7 +120,7 @@ def _task_select_tracker(self):
self._reschedule_tracker_select()

# start selecting torrents
tracker_url = self.tribler_session.lm.tracker_manager.get_next_tracker_for_auto_check()
tracker_url = self.get_valid_next_tracker_for_auto_check()
if tracker_url is None:
self._logger.warn(u"No tracker to select from, skip")
return succeed(None)
Expand All @@ -126,14 +133,14 @@ def _task_select_tracker(self):
if len(infohashes) == 0:
# We have not torrent to recheck for this tracker. Still update the last_check for this tracker.
self._logger.info("No torrent to check for tracker %s", tracker_url)
self.tribler_session.lm.tracker_manager.update_tracker_info(tracker_url, True)
self.update_tracker_info(tracker_url, True)
return succeed(None)
elif tracker_url != u'DHT' and tracker_url != u'no-DHT':
try:
session = self._create_session_for_request(tracker_url, timeout=30)
except MalformedTrackerURLException as e:
# Remove the tracker from the database
self.tribler_session.lm.tracker_manager.remove_tracker(tracker_url)
self.remove_tracker(tracker_url)
self._logger.error(e)
return succeed(None)

Expand All @@ -149,6 +156,27 @@ def get_callbacks_for_session(self, session):
error_lambda = lambda failure: self.on_session_error(session, failure)
return success_lambda, error_lambda

def get_valid_next_tracker_for_auto_check(self):
tracker_url = self.get_next_tracker_for_auto_check()
while tracker_url and not is_valid_url(tracker_url):
self.remove_tracker(tracker_url)
tracker_url = self.get_next_tracker_for_auto_check()
return tracker_url

def get_next_tracker_for_auto_check(self):
return self.tribler_session.lm.tracker_manager.get_next_tracker_for_auto_check()

def remove_tracker(self, tracker_url):
self.tribler_session.lm.tracker_manager.remove_tracker(tracker_url)

def update_tracker_info(self, tracker_url, value):
self.tribler_session.lm.tracker_manager.update_tracker_info(tracker_url, value)

def get_valid_trackers_of_torrent(self, torrent_id):
""" Get a set of valid trackers for torrent. Also remove any invalid torrent."""
db_tracker_list = self._torrent_db.getTrackerListByTorrentID(torrent_id)
return set([tracker for tracker in db_tracker_list if is_valid_url(tracker) or tracker == u'DHT'])

def on_gui_request_completed(self, infohash, result):
final_response = {}

Expand Down Expand Up @@ -195,11 +223,7 @@ def add_gui_request(self, infohash, timeout=20, scrape_now=False):
"leechers": result[u'num_leechers'], "infohash": infohash.encode('hex')}})

# get torrent's tracker list from DB
tracker_set = set()
db_tracker_list = self._torrent_db.getTrackerListByTorrentID(torrent_id)
for tracker in db_tracker_list:
tracker_set.add(tracker)

tracker_set = self.get_valid_trackers_of_torrent(torrent_id)
if not tracker_set:
self._logger.warn(u"no trackers, skip GUI request. infohash: %s", hexlify(infohash))
# TODO: add code to handle torrents with no tracker
Expand Down Expand Up @@ -244,7 +268,7 @@ def on_session_error(self, session, failure):
return failure

def _create_session_for_request(self, tracker_url, timeout=20):
session = create_tracker_session(tracker_url, timeout, self.socket_mgr)
session = create_tracker_session(tracker_url, timeout, self.socket_mgr, connection_pool=self.connection_pool)

if tracker_url not in self._session_list:
self._session_list[tracker_url] = []
Expand Down
2 changes: 2 additions & 0 deletions Tribler/Core/Utilities/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ def is_valid_url(url):
:param url: an object representing the URL
:return: Boolean specifying whether the URL is valid
"""
if ' ' in url.strip():
return
if url.lower().startswith('udp'):
url = url.lower().replace('udp', 'http', 1)
split_url = urlparse.urlsplit(url)
Expand Down
18 changes: 18 additions & 0 deletions Tribler/Test/Core/TorrentChecker/test_torrentchecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,24 @@ def test_tracker_no_infohashes(self):
self.session.lm.tracker_manager.add_tracker('http://trackertest.com:80/announce')
return self.torrent_checker._task_select_tracker()

def test_get_valid_next_tracker_for_auto_check(self):
""" Test if only valid tracker url is used for auto check """
test_tracker_list = ["http://anno nce.torrentsmd.com:8080/announce",
"http://announce.torrentsmd.com:8080/announce"]

def get_next_tracker_for_auto_check():
return test_tracker_list[0] if test_tracker_list else None

def remove_tracker(tracker_url):
test_tracker_list.remove(tracker_url)

self.torrent_checker.get_next_tracker_for_auto_check = get_next_tracker_for_auto_check
self.torrent_checker.remove_tracker = remove_tracker

next_tracker_url = self.torrent_checker.get_valid_next_tracker_for_auto_check()
self.assertEqual(len(test_tracker_list), 1)
self.assertEqual(next_tracker_url, "http://announce.torrentsmd.com:8080/announce")

@inlineCallbacks
@blocking_call_on_reactor_thread
def tearDown(self, annotate=True):
Expand Down
66 changes: 56 additions & 10 deletions Tribler/Test/Core/TorrentChecker/test_torrentchecker_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@
from Tribler.Test.twisted_thread import deferred


class ClockedUdpTrackerSession(UdpTrackerSession):
reactor = Clock()


class FakeUdpSocketManager(object):
transport = 1

Expand Down Expand Up @@ -82,6 +78,62 @@ def on_error(failure):
session.connect_to_tracker().addErrback(on_error)
return test_deferred

@deferred(timeout=5)
def test_httpsession_timeout(self):
test_deferred = Deferred()

def on_fake_connect_to_tracker():
session.start_timeout()
session.result_deferred = Deferred()
return session.result_deferred

def on_fake_timeout():
session.timeout_called = True
timeout_func()

def on_error(failure):
failure.trap(ValueError)
self.assertTrue(session.timeout_called)
test_deferred.callback(None)

session = HttpTrackerSession("localhost", ("localhost", 80), "/announce", 1)
timeout_func = session.on_timeout
session.timeout_called = False

session.on_timeout = on_fake_timeout
session.connect_to_tracker = on_fake_connect_to_tracker

session.connect_to_tracker().addErrback(on_error)
return test_deferred

@deferred(timeout=5)
def test_udpsession_timeout(self):
test_deferred = Deferred()

def on_fake_connect_to_tracker():
session.start_timeout()
session.result_deferred = Deferred()
return session.result_deferred

def on_fake_timeout():
session.timeout_called = True
timeout_func()

def on_error(failure):
failure.trap(ValueError)
self.assertTrue(session.timeout_called)
test_deferred.callback(None)

session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 1, self.socket_mgr)
timeout_func = session.on_timeout
session.timeout_called = False

session.on_timeout = on_fake_timeout
session.connect_to_tracker = on_fake_connect_to_tracker

session.connect_to_tracker().addErrback(on_error)
return test_deferred

@deferred(timeout=5)
def test_httpsession_cancel_operation(self):
test_deferred = Deferred()
Expand All @@ -97,12 +149,6 @@ def test_udpsession_cancel_operation(self):
d.addErrback(lambda _: None)
session.result_deferred = d

def test_udpsession_udp_tracker_timeout(self):
session = ClockedUdpTrackerSession("localhost", ("localhost", 4782), "/announce", 15, self.socket_mgr)
# Advance 16 seconds so the timeout triggered
session.reactor.advance(session.timeout + 1)
self.assertTrue(session.is_failed)

def test_udpsession_handle_response_wrong_len(self):
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, self.socket_mgr)
session.on_ip_address_resolved("127.0.0.1")
Expand Down
Loading

0 comments on commit 21faebb

Please sign in to comment.