Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented #221, called tronstore #258

Merged
merged 53 commits into from
Aug 13, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
39fd2d4
tronstore first steps and some unit tests, incomplete
Jul 8, 2013
79cb5b5
added unit tests
Jul 9, 2013
d0e4dc7
unit tests done
Jul 9, 2013
17f0a21
docstrings. docstrings everywhere
Jul 9, 2013
53b3b46
made transport_method an optional variable
Jul 10, 2013
ddc37af
first steps for state migration
Jul 10, 2013
f1dde38
more migration
Jul 10, 2013
ccf1c36
conversion script progress, bugfixing
Jul 11, 2013
9c1f0ed
large refactor to not use twisted for tronstore
Jul 17, 2013
74ff784
added unit tests for tronstore.py
Jul 18, 2013
c94d8c1
slight bugfix to solve race conditions in the migration script
Jul 18, 2013
d031635
Merge remote-tracking branch 'canon/release_0.6.2' into jrm_parallel_…
Jul 18, 2013
ada38df
merge of release_0.6.2, fix to default state persistence
Jul 18, 2013
a513345
updated migrate_state.py, slight cleanup, enabled parallelstore in tron
Jul 19, 2013
8806cc4
tons of bugfixes, test fixes, new tests, etc
Jul 22, 2013
b353fd4
comment cleanup
Jul 22, 2013
9af7ef8
a bit more comment cleanup
Jul 22, 2013
b135429
fixed up tronstore reconfiguration, new unit tests to reflect this
Jul 22, 2013
57cb0f3
some cleanup
Jul 24, 2013
c2a85cc
changed tronstore to use a nullstore object by default, updated class…
Jul 25, 2013
23659f0
renamed rebuild function in request/response factories to from_msg
Jul 25, 2013
5061dba
refactor of tronstore.py into classes
Jul 29, 2013
8fefdb1
redid all broken unit tests
Jul 31, 2013
19d2ff4
deleted chunking.py
Jul 31, 2013
2a43c28
minor cleanup
Jul 31, 2013
a14ed26
Merge branch 'release_0.6.2' of https://github.com/Yelp/Tron into jrm…
Jul 31, 2013
b162f26
Merge branch 'release_0.6.2' of https://github.com/Yelp/Tron into jrm…
Aug 1, 2013
933e277
Merge branch 'release_0.6.2' of https://github.com/Yelp/Tron into jrm…
Aug 1, 2013
37015b4
updated conversion script
Aug 5, 2013
0f2131d
another update to conversion script, fix for list stored versions
Aug 5, 2013
837b4ed
comment cleanup
Aug 5, 2013
e9b94cb
updated version info
Aug 5, 2013
1c185d9
slight improvements to conversion, investigating serialization method…
Aug 5, 2013
9e657de
fix to sqlstore to use unicode in db
Aug 5, 2013
73e9177
verification fixes to conversion script
Aug 6, 2013
362bab7
fix to reconfig bug
Aug 6, 2013
2ed832b
docstring update
Aug 7, 2013
ade6218
Merge remote-tracking branch 'canon/release_0.6.2' into jrm_parallel_…
Aug 7, 2013
42ebf72
fixed broken test, code cleanup
Aug 7, 2013
82b0eeb
cleaned up commented code
Aug 7, 2013
449097f
fixed JobState.status to return correctly if a run is starting
Aug 8, 2013
d2f3ab5
fixed serializers, migrate_state
Aug 8, 2013
a5583c6
all the docstrings
Aug 8, 2013
87e6890
reduced default POOL_SIZE
Aug 8, 2013
7143081
improved sqlstore, fixed restore_state on queued runs
Aug 9, 2013
7c39f08
updated docs
Aug 9, 2013
98f7a23
made job scheduler properly watch runs on restore
Aug 12, 2013
d96de3f
enforced new watch condition in unit test
Aug 12, 2013
c0d2d6f
style cleanup for queued run callback
Aug 12, 2013
746f620
it's a docstring, captain
Aug 12, 2013
befac57
improved sqlstore to not use eval/repr
Aug 12, 2013
668d271
style cleanup in jobscheduler
Aug 12, 2013
a905d0b
fixed inspect_serialized_state
Aug 13, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,18 @@ State Persistence
The number of save calls to buffer before writing the state. Defaults to 1,
which is no buffering.

**db_store_method**
The method to use for saving state information to a SQL database. Only used if store_type is sql.

Valid options are:
**json** - uses the `simplejson` module.

**msgpack** - uses the `msgpack` module, from the msgpack-python package (tested with version 0.3.0).

**pickle** - uses the `cPickle` module. be careful with this one, as pickle is Turing complete.

**yaml** - uses the `yaml` module, from the PyYaml package (tested with version 3.10).


Example::

Expand All @@ -223,6 +235,7 @@ Example::
name: local_sqlite
connection_details: "sqlite:///dest_state.db"
buffer_size: 1 # No buffer
db_store_method: json


.. _action_runners:
Expand Down
3 changes: 3 additions & 0 deletions docs/man_tronview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ Options
``-s, --save``
Save server and color options to client config file (~/.tron)

``--namespace``
Only show jobs and services from the specified namespace


States
----------
Expand Down
37 changes: 36 additions & 1 deletion tests/core/job_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,17 @@ def test_restore_state(self):
job_runs = [mock.Mock(), mock.Mock()]
state_data = ({'enabled': False, 'run_ids': [1, 2]}, run_data)

with mock.patch.object(self.job.job_runs, 'restore_state', return_value=job_runs):
with contextlib.nested(
mock.patch.object(self.job.job_runs, 'restore_state', return_value=job_runs),
mock.patch.object(self.job.job_runs, 'get_run_numbers', return_value=state_data[0]['run_ids'])
):
self.job.restore_state(state_data)

assert not self.job.enabled
calls = [mock.call(job_runs[i]) for i in xrange(len(job_runs))]
self.job.watcher.watch.assert_has_calls(calls)
calls = [mock.call(job_runs[i], jobrun.JobRun.NOTIFY_DONE) for i in xrange(len(job_runs))]
self.job_scheduler.watch.assert_has_calls(calls)
assert_equal(self.job.job_state.state_data, state_data[0])
self.job.job_runs.restore_state.assert_called_once_with(
sorted(run_data, key=lambda data: data['run_num'], reverse=True),
Expand All @@ -117,6 +122,7 @@ def test_restore_state(self):
self.job.context,
self.job.node_pool
)
self.job.job_runs.get_run_numbers.assert_called_once_with()
self.job.job_scheduler.restore_state.assert_called_once_with()
self.job.event.ok.assert_called_with('restored')

Expand Down Expand Up @@ -500,6 +506,35 @@ def mock_eventloop(self):
def teardown_job(self):
event.EventManager.reset()

def test_restore_state_scheduled(self):
mock_scheduled = [mock.Mock(), mock.Mock()]
with contextlib.nested(
mock.patch.object(self.job_scheduler.job_runs, 'get_scheduled',
return_value=iter(mock_scheduled)),
mock.patch.object(self.job_scheduler, 'schedule'),
mock.patch.object(self.job_scheduler, '_set_callback')
) as (get_patch, sched_patch, back_patch):
self.job_scheduler.restore_state()
get_patch.assert_called_once_with()
calls = [mock.call(m) for m in mock_scheduled]
back_patch.assert_has_calls(calls)
sched_patch.assert_called_once_with()

def test_restore_state_queued(self):
queued = mock.Mock()
with contextlib.nested(
mock.patch.object(self.job_scheduler.job_runs, 'get_scheduled',
return_value=iter([])),
mock.patch.object(self.job_scheduler.job_runs, 'get_first_queued',
return_value=queued),
mock.patch.object(self.job_scheduler, 'schedule'),
mock.patch.object(job.eventloop, 'call_later')
) as (get_patch, queue_patch, sched_patch, later_patch):
self.job_scheduler.restore_state()
get_patch.assert_called_once_with()
later_patch.assert_called_once_with(0, self.job_scheduler.run_job, queued, run_queued=True)
sched_patch.assert_called_once_with()

def test_schedule(self):
with mock.patch.object(self.job_scheduler.job_state, 'is_enabled',
new=True):
Expand Down
1 change: 1 addition & 0 deletions tests/mcp_reconfigure_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def teardown_mcp(self):
event.EventManager.reset()
filehandler.OutputPath(self.test_dir).delete()
filehandler.FileHandleManager.reset()
self.mcp.state_watcher.shutdown()

def reconfigure(self):
config = {schema.MASTER_NAMESPACE: self._get_config(1, self.test_dir)}
Expand Down
16 changes: 8 additions & 8 deletions tests/mcp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ class MasterControlProgramTestCase(TestCase):
def setup_mcp(self):
self.working_dir = tempfile.mkdtemp()
self.config_path = tempfile.mkdtemp()
self.mcp = mcp.MasterControlProgram(
with mock.patch('tron.serialize.runstate.statemanager.StateChangeWatcher', autospec=True):
self.mcp = mcp.MasterControlProgram(
self.working_dir, self.config_path)
self.mcp.state_watcher = mock.create_autospec(
statemanager.StateChangeWatcher)

@teardown
def teardown_mcp(self):
Expand Down Expand Up @@ -134,11 +133,12 @@ class MasterControlProgramRestoreStateTestCase(TestCase):
def setup_mcp(self):
self.working_dir = tempfile.mkdtemp()
self.config_path = tempfile.mkdtemp()
self.mcp = mcp.MasterControlProgram(
self.working_dir, self.config_path)
self.mcp.jobs = mock.create_autospec(job.JobCollection)
self.mcp.services = mock.create_autospec(service.ServiceCollection)
self.mcp.state_watcher = mock.create_autospec(statemanager.StateChangeWatcher)
with mock.patch('tron.serialize.runstate.statemanager.StateChangeWatcher', autospec=True):
self.mcp = mcp.MasterControlProgram(
self.working_dir, self.config_path)
self.mcp.jobs = mock.create_autospec(job.JobCollection)
self.mcp.services = mock.create_autospec(service.ServiceCollection)
self.mcp.state_watcher = mock.create_autospec(statemanager.StateChangeWatcher)

@teardown
def teardown_mcp(self):
Expand Down
110 changes: 75 additions & 35 deletions tests/serialize/runstate/statemanager_test.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,15 @@
import os
import mock
import contextlib
from testify import TestCase, assert_equal, setup, run

from tests.assertions import assert_raises
from tests.testingutils import autospec_method
from tron.config import schema
from tron.serialize import runstate
from tron.serialize.runstate.shelvestore import ShelveStateStore
from tron.serialize.runstate.statemanager import PersistentStateManager, StateChangeWatcher
from tron.serialize.runstate.statemanager import StateSaveBuffer
from tron.serialize.runstate.statemanager import StateMetadata
from tron.serialize.runstate.statemanager import PersistenceStoreError
from tron.serialize.runstate.statemanager import VersionMismatchError
from tron.serialize.runstate.statemanager import PersistenceManagerFactory


class PersistenceManagerFactoryTestCase(TestCase):

def test_from_config_shelve(self):
thefilename = 'thefilename'
config = schema.ConfigState(
store_type='shelve', name=thefilename, buffer_size=0,
connection_details=None)
manager = PersistenceManagerFactory.from_config(config)
store = manager._impl
assert_equal(store.filename, config.name)
assert isinstance(store, ShelveStateStore)
os.unlink(thefilename)


class StateMetadataTestCase(TestCase):
Expand Down Expand Up @@ -73,13 +56,19 @@ class PersistentStateManagerTestCase(TestCase):

@setup
def setup_manager(self):
self.store = mock.Mock()
self.store.build_key.side_effect = lambda t, i: '%s%s' % (t, i)
self.buffer = StateSaveBuffer(1)
self.manager = PersistentStateManager(self.store, self.buffer)
with mock.patch('tron.serialize.runstate.statemanager.ParallelStore', autospec=True) \
as self.store_patch:
self.store = self.store_patch.return_value
self.build_patch = mock.Mock(side_effect=lambda t, i: '%s%s' % (t, i))
self.store_patch.return_value.configure_mock(build_key=self.build_patch)
self.buffer = StateSaveBuffer(1)
self.manager = PersistentStateManager()
self.manager._buffer = self.buffer

def test__init__(self):
assert_equal(self.manager._impl, self.store)
self.store_patch.assert_called_once_with()
self.build_patch.assert_called_once_with(runstate.MCP_STATE, StateMetadata.name)
assert_equal(self.manager.metadata_key, self.manager._impl.build_key(runstate.MCP_STATE, StateMetadata.name))

def test_keys_for_items(self):
names = ['namea', 'nameb']
Expand Down Expand Up @@ -137,14 +126,43 @@ def test_disabled_nested(self):
pass
assert not self.manager.enabled

def test_update_config_success(self):
new_config = mock.Mock(buffer_size=5)
self.store.load_config.configure_mock(return_value=True)
with contextlib.nested(
mock.patch.object(self.manager, '_save_from_buffer'),
mock.patch('tron.serialize.runstate.statemanager.StateSaveBuffer', autospec=True)
) as (save_patch, buffer_patch):
assert_equal(self.manager.update_from_config(new_config), True)
save_patch.assert_called_once_with()
self.store.load_config.assert_called_once_with(new_config)
buffer_patch.assert_called_once_with(new_config.buffer_size)

def test_update_config_failure(self):
new_config = mock.Mock(buffer_size=5)
self.store.load_config.configure_mock(return_value=False)
with contextlib.nested(
mock.patch.object(self.manager, '_save_from_buffer'),
mock.patch('tron.serialize.runstate.statemanager.StateSaveBuffer', autospec=True)
) as (save_patch, buffer_patch):
assert_equal(self.manager.update_from_config(new_config), False)
save_patch.assert_called_once_with()
self.store.load_config.assert_called_once_with(new_config)
assert not buffer_patch.called


class StateChangeWatcherTestCase(TestCase):

@setup
def setup_watcher(self):
self.watcher = StateChangeWatcher()
self.state_manager = mock.create_autospec(PersistentStateManager)
self.watcher.state_manager = self.state_manager
with mock.patch('tron.serialize.runstate.statemanager.PersistentStateManager', autospec=True) \
as self.persistence_patch:
self.watcher = StateChangeWatcher()
self.state_manager = mock.create_autospec(PersistentStateManager)
self.watcher.state_manager = self.state_manager

def test__init__(self):
self.persistence_patch.assert_called_once_with()

def test_update_from_config_no_change(self):
self.watcher.config = state_config = mock.Mock()
Expand All @@ -153,17 +171,39 @@ def test_update_from_config_no_change(self):
assert_equal(self.watcher.state_manager, self.state_manager)
assert not self.watcher.shutdown.mock_calls

@mock.patch('tron.serialize.runstate.statemanager.PersistenceManagerFactory',
autospec=True)
def test_update_from_config_changed(self, mock_factory):
state_config = mock.Mock()
autospec_method(self.watcher.shutdown)
def test_update_from_config_success(self):
state_config = mock.Mock(store_type="shelve")
assert self.watcher.update_from_config(state_config)
assert_equal(self.watcher.config, state_config)
self.watcher.shutdown.assert_called_with()
assert_equal(self.watcher.state_manager,
mock_factory.from_config.return_value)
mock_factory.from_config.assert_called_with(state_config)
self.state_manager.update_from_config.assert_called_once_with(state_config)

def test_update_from_config_failure_same_config(self):
state_config = self.watcher.config
assert not self.watcher.update_from_config(state_config)
assert_equal(self.watcher.config, state_config)
assert not self.state_manager.update_from_config.called

def test_update_from_config_failure_from_state_manager(self):
self.state_manager.update_from_config.configure_mock(return_value=False)
state_config = self.watcher.config
fake_config = mock.Mock(store_type="shelve")
assert not self.watcher.update_from_config(fake_config)
assert_equal(self.watcher.config, state_config)
self.state_manager.update_from_config.assert_called_once_with(fake_config)

def test_update_from_config_failure_bad_store_type(self):
state_config = self.watcher.config
fake_config = mock.Mock(store_type="hue_hue_hue")
assert_raises(PersistenceStoreError, self.watcher.update_from_config, fake_config)
assert_equal(self.watcher.config, state_config)
assert not self.state_manager.update_from_config.called

def test_update_from_config_failure_bad_db_type(self):
state_config = self.watcher.config
fake_config = mock.Mock(store_type="sql", store_method="make_it_rain")
assert_raises(PersistenceStoreError, self.watcher.update_from_config, fake_config)
assert_equal(self.watcher.config, state_config)
assert not self.state_manager.update_from_config.called

def test_save_job(self):
mock_job = mock.Mock()
Expand Down
Empty file.
Loading