Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented #221, called tronstore #258

Merged
merged 53 commits into from
Aug 13, 2013

Conversation

Codeacious
Copy link

This is what is hopefully a really close to final version of tronstore, a new process to be run as a child alongside of trond that will parallelize state serialization for #221. Trond will now spawn this process as part of the PersistenceManagerFactory by default, rather than independently creating each store type class.

Here's a basic overview of what's happened structurally:

  • There's now a new folder in tron/serialize/runstate, called tronstore. This, obviously, holds all the new files related to tronstore.
  • PersistenceManagerFactory now spawns only ParallelStore objects, a new, top level class that implements the same front facing methods as the old *Store objects (ShelveStore, YamlStore, etc), as well as one new front facing method, update_from_config.
  • The PersistentStateManager now has an update_from_config method that calls ParallelStore's update_from_config method, and StateChangeWatcher.update_from_config has been updated to use this.
  • The ParallelStore object creates a StoreProcessProtocol object (located in tronstore/process.py) that handles all low level process management and communication. It does couple with ProcessProtocol in assuming that everything sent to tronstore is going to be either a StoreRequest or a StoreResponse (more on that later), and relying on the ParallelStore object to update its StoreRequestFactory whenever the configuration is updated.
  • The StoreProcessProtocol object works by using the Python multiprocessing module. It spawns tronstore.main() from tronstore/tronstore.py as a daemonized process, passing in a Pipe object for message communication. Pipes, by default, use pickle (not cPickle) for serialization, so instead, there are factories for request and response objects, which are serialized based on the Tron configuration given at runtime. The raw, serialized text is what is actually sent over the Pipe object. Pipes work somewhat like a two-way Queue, so there's no need to deal with chunking (despite there being some chunking related code left over in tronstore/chunking.py).
  • The two factory objects, StoreRequestFactory and StoreResponseFactory live in tronstore/messages.py. They deal with constructing and reconstructing StoreRequests and StoreResponses to and from tronstore, respectively. There's not too much here- just an id for matching the requests and responses together (which is actually done by the StoreProcessProtocol), and a serialization class mapping (the actual methods for serialization/deserialization live in tronstore/transport.py; the mapping references these classes).
  • There's four types of requests, which have enumerators in tronstore/msg_enums.py. They are SAVE, RESTORE, CONFIG, and SHUTDOWN. Which type is sent is controlled by ParallelStore, which constructs requests according to whatever method was called. SAVE is a non-blocking requests, while RESTORE, CONFIG and SHUTDOWN are. Blocking requests time out, after which a log entry is written and a failure is returned.
  • tronstore itself works by polling for requests on the Pipe. When a request is received, it usually delegates a thread to handle and serve the request. It puts the thread in a queue, which is then actually started by another thread that acts as a thread pool (it simply makes sure more than a certain number of threads aren't already running). If the request was a SHUTDOWN or CONFIG request, tronstore will wait until all other requests have finished, and then take appropriate action. Responses from all requests are sent back over the Pipe to trond.
  • There are signal handlers in tronstore for SIGINT, SIGTERM, and SIGHUP, which are the three signals also registered to custom handlers in trond. The handlers do nothing, so that trond can take the correct, complete action when one of the signals is recieved. This is necessary, as signals are sent to all processes in a process group, and tronstore is spawned in the same group as trond. To allow for multiple processes with signal handling, a flag, installSignalHandlers, had to be switched off on the Twisted reactor.
  • There's a couple new configuration options, both living in the state_persistence section of the config, called transport_method and db_store_method. Both must be set to one one of json, msgpack, pickle, or yaml, but db_store_method isn't actually enforced unless store_type is set to sql or mongo. store_type is still the same as before, but is now instead used by tronstore to set up its own internal store object (new store objects are located in tronstore/store.py). transport_method determines what serialization method tronstore uses to send requests and responses, while db_store_method determines what serialization method tronstore uses to store data within a database (only for SQLAlchemy and MongoDB configurations, obviously). Everything else in the state_persistence is effectively the same as before.
  • There is a script in tools/ that will migrate the stored state of Tron from 0.6.1 to 0.6.2. It's got a really long docstring describing how to use it. The old migrate_state.py script has been updated to migrate state for tronstore state objects.

@@ -426,7 +438,7 @@ def validate_jobs_and_services(config, config_context):
config_utils.unique_names(fmt_string, config['jobs'], config['services'])


DEFAULT_STATE_PERSISTENCE = ConfigState('tron_state', 'shelve', None, 1)
DEFAULT_STATE_PERSISTENCE = ConfigState('tron_state', 'shelve', 'pickle', 1, 'json', 'pickle')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This first pickle should still be None. It's ignored so we don't want it set to anything.

@Codeacious
Copy link
Author

Okay, so the post at the top is still pretty accurate, but here's what's changed:

  • We use cPickle as a default method for serializing messages across the Pipe object, instead of letting it be configurable. Normally, Pipe objects use Pickle, but cPickle's faster and we already had a simple, clean class for serializing request and response objects. As such, the transport_method configuration flag is now gone.
  • There was some renaming with the message transport/serialization objects. They're now in serialize.py, and have been renamed to use the world Serializer instead of Transport where appropriate.
  • The update_from_config method in the StateChangeWatcher has been updated to not rely on an object already being created. This also was coupled with a change where we can spawn StateChangeWatcher objects anytime, and they will be created with a null configuration. As such, update_from_config must be called on the object before it is actually usable. This is in line with other objects in Tron, which initialize with null objects until configured. In order to allow for this, null objects had to be created all the way down into tronstore.py.
  • Because of the above change, the PersistenceManagerFactory was now completely useless, as we aren't configuring or creating anything other than null objects when creating the PersistentStateManager. It has been removed.
  • chunking.py was deleted. It wasn't used anywhere.
  • The SQLAlchemy store mechanism now encodes all data via unicode(repr(serialize(data))) where serialize is the serialization function assigned to a SQLStore instance. It then calls deserialize(eval(str(data))) when loading from a SQL table. This is to ensure that all strings passed through SQL databased are Unicode, which SQLAlchemy warned about in testing. <-- THIS WAS FIXED, IGNORE. In addition, the serialization method used to encode data in any table entry is now saved along with the encoded data; SQLStore now ensures that it uses the right serialization method. This way, the serialization method can be safely changed via reconfiguration with SQL state storing configurations. Right now, Tronstore silently handles this, but there is a TODO about emitting a log entry (to be done once Tronstore can log).
  • There were two fixes in job.py that weren't really related to this branch, but are in here. The first is a simple addition to JobState.status that returns STATUS_RUNNING if there's an ActionRun in the STATE_STARTING state. Previously, the job would show up in an unknown state in tronview if it was polled in the brief moment between a run being started and a run actually running.
  • The second job.py fix is to the restore_state functions of JobScheduler and JobContainer. The JobContainer function now has its job_scheduler object watch all of the JobRun objects that were restored, which happened in the old (< 3 month) version of the code, but I missed in First pass at refactoring the Job class structure #253. In addition, there is some extra logic in JobScheduler.restore_state that starts the first queued run for a Job if there were no scheduled ones. This change means that Jobs that had a queued run when Tronstore saved/exited will actually continue to properly run once Tron starts up again.
  • The JSON and MsgPack serializers have changed to use custom encoders and decoders- this is to deal with datetime objects, which can't be encoded by default. JSON also preserves tuples and lists properly, but MsgPack spits everything out as an immutable tuple.
  • There's an extra case in version checking that ensures that version info wasn't just saved as a tuple rather than a list.
  • tronstore.py was refactored. See below.

@Codeacious
Copy link
Author

tronstore.py was refactored into a much more class oriented structure. Here's an overview of how it looks now:

  • We still start with tronstore.main() with the same arguments. However, this method is now only three lines, and just registers the null signal handlers (_register_null_handlers), spawns a TronstoreMain object, and then calls main_loop on that object.
  • TronstoreMain is a new, main class for Tronstore. It handles keeping track of everything it needs to, along with supporting a main_loop method for running the store process. It takes a configuration object (which should be the state_persistence configuration object from Tron), and a Pipe object (realistically, though, this could be any socket related object that supports both the send_bytes and recv_bytes methods). TronstoreMain, on creation, makes one SyncPipe, SyncStore, and TronstorePool object.
  • The SyncPipe object is a simple class with one mutex that has send_bytes and recv_bytes functions. It simply calls the exact same method with the same passed args and kwargs, but locks the operation over said mutex to avoid contention.
  • The SyncStore object is similar to the SyncPipe object, but also has some nice behavior where it creates the needed store.py object for the passed configuration. It locks over all save and restore requests. This probably could be better, but due to the modular nature of the store.py implementations, either we specifically implement synchronization in each of the store classes, or we simply don't allow multiple operations on them at the same time; I opted for the latter here for the use of simplicity and practicality.
  • The TronstorePool object is a thread pool that spawns TronstorePool.POOL_SIZE worker threads that attempt to consume work from a python native Queue (which is synchronized by default). It has enqueue_work and has_work functions to support this. Note that the thread pool won't be actually running until start is called (and it can be stopped with stop). It uses a PoolBool class to signal all threads to stop; PoolBool is just a simple wrapper class that allows us to have a mutable boolean. Workers in the pool are still run on the tronstore.handle_request function, and attempt to continuously grab work from the queue and perform work on it. Workers should only get save and restore requests, as TronstoreMain handles config and shutdown requests.

scheduled = self.job_runs.get_scheduled()
for job_run in scheduled:
self._set_callback(job_run)
scheduled = [run for run in self.job_runs.get_scheduled()]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is just to get a list,

scheduled = list(self.job_runs.get_scheduled()

is probably more appropriate.

dnephin added a commit that referenced this pull request Aug 13, 2013
@dnephin dnephin merged commit 804c332 into Yelp:release_0.6.2 Aug 13, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants