# This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # Copyright 2005 Dan Williams and Red Hat, Inc. import time import os import threading import urllib from plague import FileTransfer import sha def _generate_uniqid(parent_jobid, start_time, target_dict, srpm): distro = target_dict['distro'] repo = target_dict['repo'] target = target_dict['target'] arch = target_dict['arch'] hash_string = "%d%d%s%s%s%s%s" % (parent_jobid, start_time, distro, target, arch, repo, srpm) sha_hash = sha.new() sha_hash.update(hash_string) return sha_hash.hexdigest() AJ_STATUS_QUEUED = 'queued' AJ_STATUS_WAITING = 'waiting' AJ_STATUS_UPLOADING = 'uploading' AJ_STATUS_REPO_WAIT = 'repo_wait' AJ_STATUS_REPO_UNLOCK = 'repo_unlock' AJ_STATUS_RUNNING = 'running' AJ_STATUS_DOWNLOADING = 'downloading' AJ_STATUS_DONE = 'done' AJ_FAILURE_NONE = '' AJ_FAILURE_DOWNLOAD = 'download' AJ_FAILURE_BUILDER = 'builder' AJ_FAILURE_INTERNAL = 'internal' class ArchJob: """ Tracks a single build instance for a single arch on a builder """ def __init__(self, parent, target_dict, srpm_path): self._parent = parent self._builder = None self._repo = parent.repo() self._starttime = time.time() self._endtime = 0 self._id = _generate_uniqid(parent.uid, self._starttime, target_dict, srpm_path) self._status = AJ_STATUS_QUEUED self._uploader = None self._builder_status = '' self._failure_noticed = False self._failure_type = AJ_FAILURE_NONE self._target_dict = target_dict self._srpm_path = srpm_path self._result_files = {} self._die = False self._die_user_requested = False self._die_lock = threading.Lock() self._prepping = False self._orphaned = False def builder_suspend_cb(self, builder, reason, msg): self.unclaim(builder) if not self._is_done_status(): self._parent.log(self.arch(), "Builder disappeared. Requeuing arch...") def set_builder_status(self, builder, builder_status): if builder != self._builder: return # The job has just started on the builder if self._builder_status == '': addr = builder.address() self._parent.log(self.arch(), "%s - UID is %s" % (addr, self._id)) # Notify our parent PackageJob that we've started self._parent.archjob_started_cb(self) oldstatus = self._builder_status self._builder_status = builder_status if oldstatus != self._builder_status: attrdict = self._to_dict() self._parent.bm.queue_archjob_status_update(self._id, attrdict) del attrdict if builder_status in ['killed', 'failed']: self._failure_type = AJ_FAILURE_BUILDER def failure_noticed(self): return self._failure_noticed def set_failure_noticed(self): self._failure_noticed = True def _builder_finished(self): if self._builder_status in ['done', 'killed', 'failed', 'orphaned']: return True return False def failure_type(self): return self._failure_type def prepping(self): return self._prepping def arch(self): return self._target_dict['arch'] def target_dict(self): return self._target_dict def srpm_path(self): return self._srpm_path def archjob_id(self): return self._id def builder(self): return self._builder def parent(self): return self._parent def orphaned(self): return self._orphaned def claim(self, builder): """Called by the Builder via the BuilderManager when the builder agrees to build this archjob.""" if self._status != AJ_STATUS_QUEUED: return self._set_status(AJ_STATUS_WAITING) self._builder = builder builder.add_suspend_listener(self) def unclaim(self, builder): builder.remove_suspend_listener(self) if builder != self._builder: return if self._uploader: self._uploader.cancel() self._uploader = None self._builder = None if not self._is_done_status(): self._orphaned = True self._builder_status = '' # Mark ourselves as available for building again self._set_status(AJ_STATUS_QUEUED) def _to_dict(self): attrdict = {} attrdict['jobid'] = self._id attrdict['parent_uid'] = self._parent.uid attrdict['arch'] = self._target_dict['arch'] if self._builder: name = self._builder.address() # for some reason, splithost doesn't like the protocol # method, you have to give it a string starting with "//" if name.startswith("http"): idx = name.find('//') name = name[idx:] host_port, path = urllib.splithost(name) host, port = urllib.splitport(host_port) attrdict['builder_addr'] = host else: attrdict['builder_addr'] = "" attrdict['status'] = self._status attrdict['builder_status'] = self._builder_status attrdict['starttime'] = self._starttime attrdict['endtime'] = self._endtime return attrdict def _set_status(self, status): oldstatus = self._status self._status = status if oldstatus != self._status: attrdict = self._to_dict() self._parent.bm.queue_archjob_status_update(self._id, attrdict) del attrdict def _is_done_status(self): if self._status == AJ_STATUS_DONE: return True return False def _set_done(self): self._builder.remove_suspend_listener(self) self._set_status(AJ_STATUS_DONE) def _handle_builder_finished(self): self._uploader = None self._set_status(AJ_STATUS_DOWNLOADING) self._builder.request_job_files(self._id) def _status_queued(self): pass def _upload_srpm_cb(self, builder, result, msg, user_data=None): """Callback from our controlling Builer object when it has uploaded the SRPM to the remote builder.""" if builder != self._builder: return self._uploader = None if result == FileTransfer.FT_RESULT_FAILED: srpm_path = user_data print "%s (%s/%s): %s - SRPM upload failed for %s." % (self._parent.uid, self._parent.package, self._target_dict['arch'], self._id, srpm_path) # Unclaim the job and try for another builder self.unclaim(self._builder) def _status_waiting(self): if self._builder_status == 'init': # Ask our Builder object to send the SRPM to the remote builder # (if Passive) or to notify the builder of the SRPM URL (if Active). self._set_status(AJ_STATUS_UPLOADING) self._uploader = self._builder.request_srpm_upload(self, self._upload_srpm_cb, self._srpm_path, self._srpm_path) if self._builder_finished(): self._handle_builder_finished() def _status_uploading(self): # Builders pause before they enter the 'prep' state (which accesses # the repo for this target), and wait for the server to allow them # to proceed when the repo is unlocked. if self._builder_status == 'downloaded': self._set_status(AJ_STATUS_REPO_WAIT) self._repo.request_unlock(self) if self._builder_finished(): self._handle_builder_finished() def _status_repo_wait(self): pass def repo_unlocked_callback(self): if self._status == AJ_STATUS_REPO_WAIT: self._set_status(AJ_STATUS_REPO_UNLOCK) def _status_repo_unlock(self): # Builder will be in 'downloaded' state until # it notices that the repo has been unlocked self._prepping = True self._builder.unlock_repo_for_job(self._id) if self._builder_status != 'downloaded': self._set_status(AJ_STATUS_RUNNING) if self._builder_finished(): self._handle_builder_finished() def _status_running(self): if self._builder_status != 'prepping': self._prepping = False if self._builder_finished(): self._handle_builder_finished() def get_result_files_dir(self): result_dir = os.path.join(self._parent.get_stage_dir(), self._target_dict['arch']) if not os.path.exists(result_dir): os.makedirs(result_dir) return result_dir def _print_downloaded_files(self, files): file_string = "" nresults = len(files.keys()) for fname in files.keys(): string = "'" + fname + "'" if files[fname] == FileTransfer.FT_RESULT_FAILED: string = string + " (failed)" file_string = file_string + string if fname != files.keys()[nresults - 1]: file_string = file_string + ", " print "%s (%s/%s): Build result files - [ %s ]" % (self._parent.uid, self._parent.package, self._target_dict['arch'], file_string) def download_cb(self, files): """Called by the Builder to notify us that our job's files are available. The files argument should be a list of _filenames_, not paths. All files are assumed to be in the directory returned by get_result_files_dir.""" if self._is_done_status(): return if len(files.keys()) == 0: self._failure_type = AJ_FAILURE_DOWNLOAD else: for fname in files.keys(): if files[fname] == FileTransfer.FT_RESULT_FAILED: self._failure_type = AJ_FAILURE_DOWNLOAD self._result_files = files self._print_downloaded_files(self._result_files) self._endtime = time.time() self._set_done() def _status_downloading(self): # Wait to be notified that our files are downloaded pass def _handle_death(self, user_requested): if self._builder: self._builder.request_kill_for_job(self._id) if self._uploader: self._uploader.cancel() self._uploader = None if self._status == AJ_STATUS_REPO_WAIT: self._repo.cancel_unlock_request(self) self._set_done() if user_requested: print "%s (%s/%s): %s - killed." % (self._parent.uid, self._parent.package, self._target_dict['arch'], self._id) def process(self): if self._is_done_status(): return # If we're supposed to die, tell the builder and clean up self._die_lock.acquire() should_die = self._die user_requested = self._die_user_requested self._die_lock.release() if should_die: self._handle_death(user_requested) return status_func = None try: status_func = getattr(self, "_status_%s" % self._status) except AttributeError: print "%s (%s/%s): %s - internal archjob inconsistency. Unknown status '%s'." % (self._parent.uid, self._parent.package, self._target_dict['arch'], self._id, self._status) self._set_done() self._failure_type = AJ_FAILURE_INTERNAL return # Do the actual work for this status status_func() def status(self): return self._status def get_files(self): """Return a list result files that we got from the builder""" files = [] for fname in self._result_files.keys(): if self._result_files[fname] == FileTransfer.FT_RESULT_SUCCESS: files.append(fname) return files def die(self, user_requested=False): # Can be called from other threads if self._is_done_status(): return self._die_lock.acquire() self._die = True self._die_user_requested = user_requested self._die_lock.release() if user_requested: print "%s (%s/%s): %s - kill requested by parent job" % (self._parent.uid, self._parent.package, self._target_dict['arch'], self._id)