# This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # Copyright 2006 Dan Williams and Red Hat, Inc. """ Common functions and constants for command parsing, receiving, and sending """ import threading import os CMD_NAME_ERROR = "Error" CMD_NAME_NOP = "NOP" CMD_NAME_TARGETS = "Targets" CMD_NAME_SLOTS = "Slots" CMD_NAME_NEW_JOB_REQ = "NewJob" CMD_NAME_NEW_JOB_ACK = "NewJobAck" CMD_NAME_JOB_SRPM = "JobSRPM" CMD_NAME_UNLOCK_REPO = "UnlockRepo" CMD_NAME_BUILDING_JOBS = "BuildingJobs" CMD_NAME_JOB_STATUS_REQ = "JobStatus" CMD_NAME_JOB_STATUS_ACK = "JobStatusAck" CMD_NAME_JOB_FILES_REQ = "JobFiles" CMD_NAME_JOB_FILES_ACK = "JobFilesAck" CMD_NAME_KILL_JOB = "KillJob" class SequenceGenerator(object): """An class that atomically increments a number.""" def __init__(self): self._lock = threading.Lock() self._seq = 0 def __del__(self): del self._lock del self._seq def next(self): """Returns the next number in the object's sequence.""" self._lock.acquire() num = self._seq = self._seq + 1 self._lock.release() return num def serialize_to_command_stream(cmds): stream = [] for cmd in cmds: stream.append(cmd.serialize()) return stream def deserialize_command_stream(cmd_stream): """ Converts a command stream into a list of command objects. Argument is a python list of serialized command objects, each represented by a python dict. Return value is a python list of command objects. """ cmds = [] for item in cmd_stream: cmd = PlgCommand.deserialize(item) if cmd: cmds.append(cmd) return cmds class PlgCommand(object): """Abstract base class for all builder<->server command objects.""" _need_ack = False def __init__(self, name, seq=0): """Initialize the command object. Not used directly, but called by subclasses. The NAME argument is the name (type) of the command. The SEQ argument is the command's sequence number, which should be unique among all commands the program sends. """ self._name = name self._seq = seq def deserialize(cmd_stream): """Class method that deserializes a command stream into a particular subclass of command object. The CMD_STREAM argument is a python dict representing the serialized state of the command. Return value is the command object. """ cmd = name = args = seq = None try: # Pull out required fields name = cmd_stream['name'] args = cmd_stream['args'] seq = cmd_stream['sequence'] except (KeyError, TypeError), exc: print "PlgCommand deserialize error: %s" % exc return None # Create the specific command object from the command's name if name == CMD_NAME_ERROR: cmd = PlgCommandError._deserialize(args) elif name == CMD_NAME_TARGETS: cmd = PlgCommandTargets._deserialize(args) elif name == CMD_NAME_SLOTS: cmd = PlgCommandSlots._deserialize(args) elif name == CMD_NAME_NEW_JOB_REQ: cmd = PlgCommandNewJobReq._deserialize(args) elif name == CMD_NAME_NEW_JOB_ACK: cmd = PlgCommandNewJobAck._deserialize(args) elif name == CMD_NAME_JOB_SRPM: cmd = PlgCommandJobSRPM._deserialize(args) elif name == CMD_NAME_BUILDING_JOBS: cmd = PlgCommandBuildingJobs._deserialize(args) elif name == CMD_NAME_JOB_STATUS_REQ: cmd = PlgCommandJobStatus._deserialize(args) elif name == CMD_NAME_JOB_STATUS_ACK: cmd = PlgCommandJobStatusAck._deserialize(args) elif name == CMD_NAME_JOB_FILES_REQ: cmd = PlgCommandJobFiles._deserialize(args) elif name == CMD_NAME_JOB_FILES_ACK: cmd = PlgCommandJobFilesAck._deserialize(args) elif name == CMD_NAME_UNLOCK_REPO: cmd = PlgCommandUnlockRepo._deserialize(args) elif name == CMD_NAME_KILL_JOB: cmd = PlgCommandKillJob._deserialize(args) # If command creation was successful, set the sequence # number from the command stream on the command. We don't # want to expose the sequence number to subclasses because # they should never need it. if cmd: cmd._seq = seq return cmd deserialize = staticmethod(deserialize) def name(self): """Returns the command's name.""" return self._name def seq(self): """Returns the command's sequece number.""" return self._seq def need_ack(self): """Returns a boolean stating whether this command requires an acknowledgement command or not.""" return self._need_ack def _serialize(self, args): """Serializes the basic command information into the command stream, returning a python dict. Should only be called by subclasses, after determining the arguments the subclass wishes to serialize into the command. Argument ARGS is a python dict containing the subclass-specific command's arguments. """ if not args: raise ValueError("args cannot be None") cmd = {} cmd['name'] = self._name cmd['sequence'] = self._seq cmd['args'] = args return cmd def __str__(self): return "%s(seq: %d)" % (self._name, self._seq) __repr__ = __str__ class PlgCommandAck(PlgCommand): """Abstract base class for acknowledgement commands.""" def __init__(self, name, acked_seq, seq=0): PlgCommand.__init__(self, name, seq) self._acked_seq = acked_seq def _deserialize_acked_seq(args): try: acked_seq = args['acked_seq'] except (KeyError, TypeError): raise ValueError("No 'acked_seq' argument found.") return acked_seq _deserialize_acked_seq = staticmethod(_deserialize_acked_seq) def _serialize(self, args): args['acked_seq'] = self._acked_seq return PlgCommand._serialize(self, args) def acked_seq(self): return self._acked_seq class PlgCommandNop(PlgCommand): def __init__(self, seq=0): PlgCommand.__init__(self, CMD_NAME_NOP, seq) def _deserialize(args): return PlgCommandNop() _deserialize = staticmethod(_deserialize) def serialize(self): return PlgCommand._serialize(self, {}) class PlgCommandError(PlgCommand): def __init__(self, reason, seq=0): PlgCommand.__init__(self, CMD_NAME_ERROR, seq) self._reason = reason def _deserialize(args): try: reason = args['reason'] except (KeyError, TypeError): raise ValueError("No 'reason' argument found.") return PlgCommandError(reason) _deserialize = staticmethod(_deserialize) def serialize(self): args = {} args['reason'] = self._reason return PlgCommand._serialize(self, args) def reason(self): return self._reason def __str__(self): return "%s(seq: %d, reason: %s)" % (self._name, self._seq, self._reason) class PlgCommandSlots(PlgCommand): def __init__(self, free, maximum, seq=0): PlgCommand.__init__(self, CMD_NAME_SLOTS, seq) self._free_slots = free self._max_slots = maximum def _deserialize(args): try: free = args['free'] except (ValueError, TypeError): raise ValueError("No 'free' argument found.") try: maximum = args['max'] except (KeyError, TypeError): raise ValueError("No 'max' argument found.") return PlgCommandSlots(free, maximum) _deserialize = staticmethod(_deserialize) def serialize(self): args = {} args['free'] = self._free_slots args['max'] = self._max_slots return PlgCommand._serialize(self, args) def free_slots(self): return self._free_slots def max_slots(self): return self._max_slots def __str__(self): return "%s(seq: %d, slots: (%d of %d))" % (self._name, self._seq, self._free_slots, self._max_slots) class PlgCommandTargets(PlgCommand): def __init__(self, targets, upload_url, seq=0): PlgCommand.__init__(self, CMD_NAME_TARGETS, seq) self._targets = targets self._upload_url = upload_url def _deserialize(args): try: targets = args['targets'] upload_url = args['upload_url'] except (KeyError, TypeError), exc: raise ValueError("No '%s' argument found." % exc) # Sanity checking on targets argument if type(targets) != type([]): raise ValueError("'targets' argument was not a list.") for target in targets: try: distro = target['distro'] tg = target['target'] basearch = target['arch'] repo = target['repo'] suparch = target['supported_arches'] except KeyError, exc: raise ValueError("Required item '%s' not found in a target." % exc) return PlgCommandTargets(targets, upload_url) _deserialize = staticmethod(_deserialize) def serialize(self): args = {} args['targets'] = self._targets args['upload_url'] = self._upload_url return PlgCommand._serialize(self, args) def targets(self): return self._targets def upload_url(self): return self._upload_url def __str__(self): return "%s(seq: %d, targets: %s, upload_url: %s)" % (self._name, self._seq, self._targets, self._upload_url) class PlgCommandNewJobReq(PlgCommand): _need_ack = True _SRPM_EXT = ".src.rpm" def __init__(self, archjob, target_dict=None, jobname=None, archjob_id=None, seq=0): PlgCommand.__init__(self, CMD_NAME_NEW_JOB_REQ, seq) self._archjob = archjob # doesn't get serialized if archjob: self._target_dict = archjob.target_dict() self._jobname = os.path.basename(archjob.srpm_path()) if self._jobname.endswith(self._SRPM_EXT): self._jobname = self._jobname[:len(self._jobname) - len(self._SRPM_EXT)] self._archjob_id = archjob.archjob_id() else: if not target_dict or not jobname or not archjob_id: raise Exception("Need a target_dict, a jobname, and an archjob_id.") self._target_dict = target_dict self._jobname = jobname self._archjob_id = archjob_id def _deserialize(args): try: target_dict = args['target_dict'] jobname = args['jobname'] archjob_id = args['archjob_id'] except (KeyError, TypeError), exc: raise ValueError("No '%s' argument found." % exc) return PlgCommandNewJobReq(None, target_dict, jobname, archjob_id) _deserialize = staticmethod(_deserialize) def serialize(self): args = {} args['target_dict'] = self._target_dict args['jobname'] = self._jobname args['archjob_id'] = self._archjob_id return PlgCommand._serialize(self, args) def archjob(self): return self._archjob def target_dict(self): return self._target_dict def job_name(self): return self._jobname def archjob_id(self): return self._archjob_id def __str__(self): return "%s(seq: %d, target_dict: %s, jobname: %s, archjob_id: %s)" % (self._name, self._seq, self._target_dict, self._jobname, self._archjob_id) class PlgCommandNewJobAck(PlgCommandAck): def __init__(self, archjob_id, msg, req_seq, seq=0): PlgCommandAck.__init__(self, CMD_NAME_NEW_JOB_ACK, req_seq, seq) self._archjob_id = archjob_id self._msg = msg def _deserialize(args): try: archjob_id = args['archjob_id'] msg = args['msg'] except (KeyError, TypeError): raise ValueError("No 'archjob_id' argument found.") req_seq = PlgCommandAck._deserialize_acked_seq(args) return PlgCommandNewJobAck(archjob_id, msg, req_seq) _deserialize = staticmethod(_deserialize) def serialize(self): args = {} args['archjob_id'] = self._archjob_id args['msg'] = self._msg return PlgCommandAck._serialize(self, args) def archjob_id(self): return self._archjob_id def message(self): return self._msg class PlgCommandJobSRPM(PlgCommand): def __init__(self, archjob_id, srpm_url, seq=0): PlgCommand.__init__(self, CMD_NAME_JOB_SRPM, seq) self._archjob_id = archjob_id self._srpm_url = srpm_url def _deserialize(args): try: archjob_id = args['archjob_id'] except (ValueError, TypeError): raise ValueError("No 'archjob_id' argument found.") try: srpm_url = args['srpm_url'] except (ValueError, TypeError): raise ValueError("No 'srpm_url' argument found.") return PlgCommandJobSRPM(archjob_id, srpm_url) _deserialize = staticmethod(_deserialize) def serialize(self): args = {} args['archjob_id'] = self._archjob_id args['srpm_url'] = self._srpm_url return PlgCommand._serialize(self, args) def archjob_id(self): return self._archjob_id def srpm_url(self): return self._srpm_url def __str__(self): return "%s(seq: %d, archjob_id: %s, srpm_url: %s)" % (self._name, self._seq, self._archjob_id, self._srpm_url) class PlgCommandUnlockRepo(PlgCommand): def __init__(self, archjob_id, seq=0): PlgCommand.__init__(self, CMD_NAME_UNLOCK_REPO, seq) self._archjob_id = archjob_id def _deserialize(args): try: archjob_id = args['archjob_id'] except (ValueError, TypeError): raise ValueError("No 'archjob_id' argument found.") return PlgCommandUnlockRepo(archjob_id) _deserialize = staticmethod(_deserialize) def serialize(self): args = {} args['archjob_id'] = self._archjob_id return PlgCommand._serialize(self, args) def archjob_id(self): return self._archjob_id def __str__(self): return "%s(seq: %d, archjob_id: %s)" % (self._name, self._seq, self._archjob_id) class PlgCommandBuildingJobs(PlgCommand): def __init__(self, seq=0): PlgCommand.__init__(self, CMD_NAME_BUILDING_JOBS, seq) self._jobs = [] def _deserialize(args): try: jobs = args['jobs'] except (ValueError, TypeError): raise ValueError("No 'jobs' argument found.") # Basic sanity if type(jobs) != type([]): print "Jobs is type %s" % type(jobs) raise ValueError("jobs wasn't a list.") try: for job in jobs: archjob_id = job['archjob_id'] status = job['status'] except KeyError: raise ValueError("Required item '%s' wasn't found in jobs item.") cmd = PlgCommandBuildingJobs() for job in jobs: cmd.add_job(job['archjob_id'], job['status']) return cmd _deserialize = staticmethod(_deserialize) def serialize(self): args = {} args['jobs'] = self._jobs return PlgCommand._serialize(self, args) def jobs(self): return self._jobs def add_job(self, archjob_id, status): self._jobs.append({'archjob_id': archjob_id, 'status': status}) def get_job(self, job): return (job['archjob_id'], job['status']) def __str__(self): return "%s(seq: %d, jobs: %s)" % (self._name, self._seq, self._jobs) class PlgCommandJobStatus(PlgCommand): _need_ack = True def __init__(self, archjob_id, seq=0): PlgCommand.__init__(self, CMD_NAME_JOB_STATUS_REQ, seq) self._archjob_id = archjob_id def _deserialize(args): try: archjob_id = args['archjob_id'] except (KeyError, TypeError): raise ValueError("No 'archjob_id' argument found.") return PlgCommandJobStatus(archjob_id) _deserialize = staticmethod(_deserialize) def serialize(self): args = {} args['archjob_id'] = self._archjob_id return PlgCommand._serialize(self, args) def archjob_id(self): return self._archjob_id def __str__(self): return "%s(seq: %d, archjob_id: %s)" % (self._name, self._seq, self._archjob_id) class PlgCommandJobStatusAck(PlgCommandAck): def __init__(self, archjob_id, status, req_seq, seq=0): PlgCommandAck.__init__(self, CMD_NAME_JOB_STATUS_ACK, req_seq, seq) self._archjob_id = archjob_id self._status = status def _deserialize(args): try: archjob_id = args['archjob_id'] except (KeyError, TypeError): raise ValueError("No 'archjob_id' argument found.") try: status = args['status'] except (KeyError, TypeError): raise ValueError("No 'status' argument found.") req_seq = PlgCommandAck._deserialize_acked_seq(args) return PlgCommandJobStatusAck(archjob_id, status, req_seq) _deserialize = staticmethod(_deserialize) def serialize(self): args = {} args['archjob_id'] = self._archjob_id args['status'] = self._status return PlgCommandAck._serialize(self, args) def archjob_id(self): return self._archjob_id def status(self): return self._status class PlgCommandJobFiles(PlgCommand): _need_ack = True def __init__(self, archjob_id, seq=0): PlgCommand.__init__(self, CMD_NAME_JOB_FILES_REQ, seq) self._archjob_id = archjob_id def _deserialize(args): try: archjob_id = args['archjob_id'] except (KeyError, TypeError): raise ValueError("No 'archjob_id' argument found.") return PlgCommandJobFiles(archjob_id) _deserialize = staticmethod(_deserialize) def serialize(self): args = {} args['archjob_id'] = self._archjob_id return PlgCommand._serialize(self, args) def archjob_id(self): return self._archjob_id def __str__(self): return "%s(seq: %d, archjob_id: %s)" % (self._name, self._seq, self._archjob_id) class PlgCommandJobFilesAck(PlgCommandAck): def __init__(self, archjob_id, files, req_seq, seq=0): """The files argument is a python list of output files from the job""" PlgCommandAck.__init__(self, CMD_NAME_JOB_FILES_ACK, req_seq, seq) self._archjob_id = archjob_id if type(files) is not type([]): raise ValueError("files argument must be a list of URLs.") self._files = files def _deserialize(args): try: archjob_id = args['archjob_id'] except (KeyError, TypeError): raise ValueError("No 'archjob_id' argument found.") try: files = args['files'] except (KeyError, TypeError): raise ValueError("No 'files' argument found.") if type(files) != type([]): raise ValueError("The 'files' argument was of the wrong type.") req_seq = PlgCommandAck._deserialize_acked_seq(args) return PlgCommandJobFilesAck(archjob_id, files, req_seq) _deserialize = staticmethod(_deserialize) def serialize(self): args = {} args['archjob_id'] = self._archjob_id args['files'] = self._files return PlgCommandAck._serialize(self, args) def archjob_id(self): return self._archjob_id def files(self): return self._files class PlgCommandKillJob(PlgCommand): def __init__(self, archjob_id, seq=0): PlgCommand.__init__(self, CMD_NAME_KILL_JOB, seq) self._archjob_id = archjob_id def _deserialize(args): try: archjob_id = args['archjob_id'] except (KeyError, TypeError): raise ValueError("No 'archjob_id' argument found.") return PlgCommandKillJob(archjob_id) _deserialize = staticmethod(_deserialize) def serialize(self): args = {} args['archjob_id'] = self._archjob_id return PlgCommand._serialize(self, args) def archjob_id(self): return self._archjob_id def __str__(self): return "%s(seq: %d, archjob_id: %s)" % (self._name, self._seq, self._archjob_id)