# This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # Copyright 2005 Dan Williams and Red Hat, Inc. import time import xmlrpclib import socket import os import urllib import threading import shutil from plague import Commands from plague import XMLRPCServerProxy from plague import FileDownloader from plague import FileUploader from plague import FileTransfer import OpenSSL import EmailUtils from plague import DebugUtils SUSPEND_NONE = 'none' SUSPEND_TIMEOUT = 'timeout' SUSPEND_HARD_ERROR = 'hard-error' TYPE_PASSIVE = 1 TYPE_ACTIVE = 2 class Builder(threading.Thread): """ Tracks all jobs on a builder instance """ def __init__(self, manager, cfg, address, weight, btype): self._manager = manager self._jobs = {} self._free_slots = 0 self._num_slots = 0 self._address = address self._available = False self._suspend_reason = SUSPEND_NONE self._stop = False self._prepping_jobs = False self._unavail_count = 0 self._target_list = [] self._when_died = 0 self._server_cfg = cfg self._weight = weight self._type = btype self._seq_gen = Commands.SequenceGenerator() self._lock = threading.Lock() self._cmd_queue = [] self._ack_pending_list = [] self._suspend_listeners = [] self._status_listeners = [] self._ip = None self._last_contact = 0 uri, rest = urllib.splittype(address) host, ignore = urllib.splithost(rest) self._host, port = urllib.splitport(host) self._get_ip() threading.Thread.__init__(self) self.setName("Builder: %s" % address) def _get_ip(self): try: self._ip = socket.gethostbyname(self._host) return True except Exception: pass return False def _match_target_dict(self, td1, td2): if td1['distro'] == td2['distro']: if td1['target'] == td2['target']: if td1['repo'] == td2['repo']: return True return False def arches(self, target_dict): for tdict in self._target_list: if self._match_target_dict(tdict, target_dict): arches = [] for arch in tdict['supported_arches']: if not arch in arches: arches.append(arch) return arches return None def can_build_arch_job(self, archjob): target_dict = archjob.target_dict() for td in self._target_list: if self._match_target_dict(td, target_dict): if archjob.arch() in td['supported_arches']: return True return False def address(self): return self._address def ip(self): return self._ip def available(self): """ Is the builder responding to requests? """ return self._available def free_slots(self): return self._free_slots def weight(self): return self._weight def type(self): return self._type def stop(self): self._stop = True def get_archjob(self, archjob_id): try: return self._jobs[archjob_id] except KeyError: pass return None def add_suspend_listener(self, listener): listeners = self._suspend_listeners[:] if listener not in listeners: self._suspend_listeners.append(listener) def remove_suspend_listener(self, listener): if listener in self._suspend_listeners: self._suspend_listeners.remove(listener) def _notify_suspend_listeners(self, reason, msg): # Must copy the list since it can be modified from # the listener during our iteration of it listeners = self._suspend_listeners[:] for listener in listeners: listener.builder_suspend_cb(self, reason, msg) del listeners def _handle_builder_suspend(self, reason, msg): self._available = False self._suspend_reason = reason self._unavail_count = 0 self._prepping_jobs = False self._when_died = time.time() self._jobs = {} self._cmd_queue = [] self._ack_pending_list = [] self._free_slots = 0 self._ip = None self._notify_suspend_listeners(reason, msg) # Notify admins print "Suspending builder '%s'. Reason: %s - %s." % (self._address, reason, msg) subject = "Builder Suspended: %s" % self._address msg = "The builder '%s' was suspended. Reason: %s - %s." % (self._address, reason, msg) sender = self._server_cfg.get_str("Email", "email_from") for addr in self._server_cfg.get_list("Email", "admin_emails"): EmailUtils.email_result(sender, addr, msg, subject) def _handle_builder_reactivate(self, mail=False): self._available = True self._suspend_reason = SUSPEND_NONE print "Re-activating builder '%s'." % self._address if mail: subject = "Builder Re-activated: %s" % self._address msg = """The builder '%s' was re-activated. Suspended at: %s Re-Enabled at: %s """ % (self._address, time.ctime(self._when_died), time.ctime(time.time())) sender = self._server_cfg.get_str("Email", "email_from") for addr in self._server_cfg.get_list("Email", "admin_emails"): EmailUtils.email_result(sender, addr, msg, subject) self._when_died = 0 def any_prepping_jobs(self): return self._prepping_jobs def to_dict(self): builder_dict = {} addr = self._address # for some reason, splithost doesn't like the protocol # method, you have to give it a string starting with "//" if addr.startswith("http"): idx = addr.find('//') addr = addr[idx:] host_port, path = urllib.splithost(addr) host, port = urllib.splitport(host_port) builder_dict['address'] = host arches = [] for tdict in self._target_list: for arch in tdict['supported_arches']: if not arch in arches: arches.append(arch) builder_dict['arches'] = arches builder_dict['available'] = self._available builder_dict['num_slots'] = self._num_slots builder_dict['free_slots'] = self._free_slots return builder_dict def _handle_building_jobs(self, cmd): building_jobs = cmd.jobs() reported_uniqids = [] new_cmds = [] for item in building_jobs: (uniqid, status) = cmd.get_job(item) try: job = self._jobs[uniqid] job.set_builder_status(self, status) reported_uniqids.append(uniqid) except KeyError: pass # We have to check jobs that weren't reported # as 'building' by the builder, since the job # may have finished on the builder and was # removed from the building job list before we # were able to know that it was done. HACK self._prepping_jobs = False for jobid, job in self._jobs.items(): # If the builder didn't report this job as building, # and its not done, explicitly get its status if jobid not in reported_uniqids and job.status() != 'done': new_cmds.append(Commands.PlgCommandJobStatus(jobid, self._seq_gen.next())) # Check for prepping jobs if job.prepping(): self._prepping_jobs = True del reported_uniqids return new_cmds def _find_and_remove_cmd_for_ack(self, ack, old_cmd_type): """Find a command in the sent command queue by sequence number and command type. Remove the command from the command queue and return it.""" old_cmd = None self._lock.acquire() for cmd in self._ack_pending_list[:]: if cmd.seq() == ack.acked_seq() and isinstance(cmd, old_cmd_type): old_cmd = cmd self._ack_pending_list.remove(cmd) break self._lock.release() return old_cmd def _handle_new_job_ack(self, ack): """Handle a NewJobAck command by notifying the parent job that this archjob is now in progress.""" old_cmd = self._find_and_remove_cmd_for_ack(ack, Commands.PlgCommandNewJobReq) if old_cmd: archjob = old_cmd.archjob() archjob_id = archjob.archjob_id() ack_archjob_id = ack.archjob_id() if archjob_id != ack_archjob_id: print "Builder Error (%s): returned archjob_id (%s) " \ "doesn't match expected (%s)." % (self._address, ack_archjob_id, archjob_id) archjob.unclaim(self) else: self._jobs[archjob_id] = archjob def _handle_job_status_ack(self, ack): """Handle a job status ack by setting telling the job object what the builder said its status was.""" old_cmd = self._find_and_remove_cmd_for_ack(ack, Commands.PlgCommandJobStatus) if old_cmd: archjob_id = ack.archjob_id() status = ack.status() job = self._jobs[archjob_id] job.set_builder_status(self, status) def _decompose_job_files_ack(self, ack): """Handle a job files ack by finding the archjob it's for, then notifying that archjob that its files are available.""" old_cmd = self._find_and_remove_cmd_for_ack(ack, Commands.PlgCommandJobFiles) if old_cmd: archjob_id = ack.archjob_id() files = ack.files() job = self._jobs[archjob_id] return (job, files) return (None, None) def _dispatch_common_command(self, cmd): """Handle commands that are common to all builder types.""" handled = True if isinstance(cmd, Commands.PlgCommandSlots): self._lock.acquire() self._free_slots = cmd.free_slots() self._num_slots = cmd.max_slots() self._lock.release() elif isinstance(cmd, Commands.PlgCommandBuildingJobs): status_reqs = self._handle_building_jobs(cmd) # Add any additional status requests onto our pending command queue if len(status_reqs) > 0: self._lock.acquire() self._cmd_queue = self._cmd_queue + status_reqs self._lock.release() elif isinstance(cmd, Commands.PlgCommandJobStatusAck): self._handle_job_status_ack(cmd) elif isinstance(cmd, Commands.PlgCommandNewJobAck): self._handle_new_job_ack(cmd) else: handled = False return handled def request_job_files(self, archjob_id): """Construct and send a request for a job's files.""" cmd = Commands.PlgCommandJobFiles(archjob_id, self._seq_gen.next()) self._lock.acquire() self._cmd_queue.append(cmd) self._lock.release() def request_kill_for_job(self, uniqid): cmd = Commands.PlgCommandKillJob(uniqid, self._seq_gen.next()) self._lock.acquire() self._cmd_queue.append(cmd) self._lock.release() def unlock_repo_for_job(self, uniqid): """Called by an archjob to request the sending of a RepoUnlocked command to the builder for a particular archjob.""" self._lock.acquire() found = False for cmd in self._cmd_queue: if isinstance(cmd, Commands.PlgCommandUnlockRepo): if cmd.archjob_id() == uniqid: found = True break if not found: cmd = Commands.PlgCommandUnlockRepo(uniqid, self._seq_gen.next()) self._cmd_queue.append(cmd) self._lock.release() # HACK: This class is a hack to work around SSL hanging issues, # which cause the whole server to grind to a halt class PassiveBuilderRequest(threading.Thread): def __init__(self, address, certs, cmds): self._address = address self._certs = certs self._cmds = cmds self.done = False self.failed = True self.response = None self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._address, self._certs, timeout=20) threading.Thread.__init__(self) def run(self): self.setName("PassiveBuilderRequest: %s" % self._address) try: cmd_stream = Commands.serialize_to_command_stream(self._cmds) self.response = self._server.request(cmd_stream) self.failed = False except (socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError, xmlrpclib.Fault), exc: print "PassiveBuilder(%s) Error in request(): '%s'" % (self._address, exc) except socket.error, exc: if exc[0] != 111: print "PassiveBuilder(%s) Error in request(): '%s'" % (self._address, exc) self.done = True def cancel(self): try: self._server.close() except: pass class PassiveBuilder(Builder): """ Passive builders are ones that do not initiate connections. They wait for the server to contact them, and therefore cannot be behind a firewall without having holes punched through it. """ # How often we try to contact unavailable builders _BUILDER_UNAVAIL_PING_INTERVAL = 300 # 5 minutes (in seconds) # How often we try to contact available builders _BUILDER_AVAIL_PING_INTERVAL = 20 def __init__(self, manager, cfg, address, weight): Builder.__init__(self, manager, cfg, address, weight, TYPE_PASSIVE) # Builder will get pinged immediately since self._last_contact == 0 self._ping_interval = self._BUILDER_UNAVAIL_PING_INTERVAL self._upload_url = None self._certs = None if self._server_cfg.get_bool("Builders", "use_ssl"): self._certs = {} self._certs['key_and_cert'] = self._server_cfg.get_str("SSL", "server_key_and_cert") self._certs['ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert") self._certs['peer_ca_cert'] = self._server_cfg.get_str("SSL", "ca_cert") def _upload_cb(self, result, cb_data, msg): """Call the archjob's upload callback with the upload result.""" (archjob_id, srpm_path, upload_cb, user_data) = cb_data # Notify the builder that the SRPM is uploaded if result == FileTransfer.FT_RESULT_SUCCESS: url = "file:///%s" % os.path.basename(srpm_path) cmd = Commands.PlgCommandJobSRPM(archjob_id, url, self._seq_gen.next()) self._lock.acquire() self._cmd_queue.append(cmd) self._lock.release() # Call the archjob's upload callback upload_cb(self, result, msg, user_data) def request_srpm_upload(self, archjob, upload_cb, user_data, srpm_path): """Called by the archjob to request an upload of the SRPM to the builder.""" archjob_id = archjob.archjob_id() # Start uploading the job to the builder data = [("archjob_id", archjob_id)] uploader = FileUploader.FileUploader(self._upload_url, [srpm_path], 'filedata', data, self._certs) uploader.set_callback(self._upload_cb, (archjob_id, srpm_path, upload_cb, user_data)) uploader.start() return uploader def _download_cb(self, result, (archjob, urls), msg): """Notify archjob of the result of its download request.""" if result == FileTransfer.FT_RESULT_FAILED: print "Builder Error (%s): result files download failed for %s (%s). " \ " '%s'" % (self._address, archjob.archjob_id(), archjob.arch(), msg) files = {} result_files_dir = archjob.get_result_files_dir() for url in urls: try: fname = FileDownloader.get_base_filename_from_url(url, ['.rpm', '.log']) except FileDownloader.FileNameException, exc: print "Error in JobFilesAck for %s: %s" % (url, exc) continue if result == FileTransfer.FT_RESULT_SUCCESS: fpath = os.path.join(result_files_dir, fname) if os.path.exists(fpath): files[fname] = FileTransfer.FT_RESULT_SUCCESS else: files[fname] = FileTransfer.FT_RESULT_FAILED else: files[fname] = FileTransfer.FT_RESULT_FAILED archjob.download_cb(files) def _handle_job_files_ack(self, cmd): (archjob, urls) = self._decompose_job_files_ack(cmd) if not archjob: return if not urls or not len(urls): archjob.download_cb({}) return # Basic sanity checks; whether the files exist, etc result_files_dir = archjob.get_result_files_dir() downloader = FileDownloader.FileDownloader(urls, result_files_dir, ['.rpm', '.log'], self._certs) downloader.set_callback(self._download_cb, (archjob, urls)) downloader.start() def _send_commands(self): """Send queued commands to the builder, and then get it's list of reply commands.""" # Grab some work for the builder if any is available new_cmds = [] if self._free_slots > 0: archjob = self._manager.claim_arch_job(self) if archjob: next_seq = self._seq_gen.next() cmd = Commands.PlgCommandNewJobReq(archjob, seq=next_seq) new_cmds.append(cmd) # Copy command queue self._lock.acquire() self._cmd_queue = self._cmd_queue + new_cmds cmd_list = self._cmd_queue[:] for cmd in self._cmd_queue: if cmd.need_ack(): self._ack_pending_list.append(cmd) self._cmd_queue = [] self._lock.release() # The actual XML-RPC request runs in a different thread because SSL # calls sometimes hang req = PassiveBuilderRequest(self._address, self._certs, cmd_list) curtime = time.time() req.start() # Give the request 10s, otherwise forget about it while time.time() < curtime + 10: if req.done: break time.sleep(0.5) # If the request isn't done yet, force-close it to # minimize chances of the commands getting through # to the builder if not req.done: req.cancel() response = None if req.failed: # Put all the commands back at the front of the queue self._lock.acquire() self._cmd_queue = cmd_list + self._cmd_queue self._lock.release() else: response = req.response return response def _dispatch_command(self, cmd, first_contact): """Dispatch one command. We let the superclass dispatch the common commands, and handle only those that need action specific to the Passive builder type.""" # The first time we contact the builder, we need to tell it to kill # all jobs that are building on it. So don't let the superclass # handle the first BuildingJobs command we get handled = False if first_contact and isinstance(cmd, Commands.PlgCommandBuildingJobs): # Tell the builder to kill all jobs it might be building # right now. Think server restart here. for item in cmd.jobs(): (uniqid, status) = cmd.get_job(item) new_cmd = Commands.PlgCommandKillJob(uniqid, self._seq_gen.next()) self._cmd_queue.append(new_cmd) handled = True # Let the superclass handle what's left if not handled: handled = self._dispatch_common_command(cmd) # The we handle what the superclass didn't if not handled: if isinstance(cmd, Commands.PlgCommandJobFilesAck): self._handle_job_files_ack(cmd) handled = True elif isinstance(cmd, Commands.PlgCommandTargets): if not len(self._target_list): self._target_list = cmd.targets() self._upload_url = cmd.upload_url() handled = True if not handled: print "Builder Error (%s): unhandled command '%s'" % (self._address, cmd.name()) _SLEEP_INTERVAL = 5 def run(self): """foobar""" DebugUtils.registerThreadName(self) while not self._stop: if self._last_contact < time.time() - self._ping_interval: # Ensure the builder's IP is up-to-date self._get_ip() # Try to talk to the builder cmd_list = self._send_commands() if cmd_list: # Builder is alive first_contact = False if not self._available: self._handle_builder_reactivate() first_contact = True self._unavail_count = 0 # process builder's response cmds = Commands.deserialize_command_stream(cmd_list) for cmd in cmds: self._dispatch_command(cmd, first_contact) else: # Builder didn't respond self._lock.acquire() self._unavail_count = self._unavail_count + 1 if self._available and self._unavail_count > 2: self._handle_builder_suspend(SUSPEND_TIMEOUT, "the builder timed out") self._lock.release() self._last_contact = time.time() time.sleep(self._SLEEP_INTERVAL) def _handle_builder_suspend(self, reason, msg): Builder._handle_builder_suspend(self, reason, msg) self._ping_interval = self._BUILDER_UNAVAIL_PING_INTERVAL self._target_list = [] def _handle_builder_reactivate(self): mail = True if self._suspend_reason == SUSPEND_NONE: # Don't send mail saying the builder has been reactivated if # this is the first time the builder has contacted us mail = False self._ping_interval = self._BUILDER_AVAIL_PING_INTERVAL self._lock.acquire() Builder._handle_builder_reactivate(self, mail=mail) self._lock.release() class ActiveBuilder(Builder): """ Active builders are ones which attempt to contact the build server by themselves. Therefore, they can be behind a firewall without punching holes through it. """ _REQUIRED_CONTACT_INTERVAL = 25 def __init__(self, manager, cfg, address, weight): Builder.__init__(self, manager, cfg, address, weight, TYPE_ACTIVE) def request_srpm_upload(self, archjob, upload_cb, user_data, srpm_path): """Called by the archjob to request that an SRPM be made available to the builder.""" # Copy the SRPM to the correct upload location http_dir = self._manager.http_dir() srpm_dest_dir = archjob.parent().make_stage_dir(http_dir, delete=False) srpm_dest = os.path.join(srpm_dest_dir, os.path.basename(srpm_path)) err_msg = "Failed" result = FileTransfer.FT_RESULT_FAILED if srpm_dest != srpm_path: # Copy file if it's not already in the HTTP server's download dir try: shutil.copyfile(srpm_path, srpm_dest) err_msg = "Success" result = FileTransfer.FT_RESULT_SUCCESS except IOError, exc: err_msg = str(exc) else: # Make sure it's where it's supposed to be if os.path.exists(srpm_dest): err_msg = "Success" result = FileTransfer.FT_RESULT_SUCCESS else: err_msg = "Candidate SRPM file %s didn't exist." % srpm_dest if result == FileTransfer.FT_RESULT_SUCCESS: # Construct the download URL archjob_id = archjob.archjob_id() url = self._manager.get_http_url_base() + srpm_dest[len(http_dir):] cmd = Commands.PlgCommandJobSRPM(archjob_id, url, self._seq_gen.next()) self._lock.acquire() self._cmd_queue.append(cmd) self._lock.release() # Call the archjob's upload callback upload_cb(self, result, err_msg, user_data) return None def _handle_job_files_ack(self, cmd): (archjob, urls) = self._decompose_job_files_ack(cmd) if not archjob: return if not urls or not len(urls): archjob.download_cb({}) return # Basic sanity checks; whether the files exist, etc result_files_dir = archjob.get_result_files_dir() files = {} for url in urls: try: fname = FileDownloader.get_base_filename_from_url(url, ['.rpm', '.log']) except FileDownloader.FileNameException, exc: print "Error in JobFilesAck for %s: %s" % (url, exc) continue fpath = os.path.join(result_files_dir, fname) if os.path.exists(fpath): files[fname] = FileTransfer.FT_RESULT_SUCCESS else: files[fname] = FileTransfer.FT_RESULT_FAILED archjob.download_cb(files) def _dispatch_command(self, cmd): """Dispatch one command. We let the superclass dispatch the common commands, and handle only those that need action specific to the Active builder type.""" handled = self._dispatch_common_command(cmd) if not handled: if isinstance(cmd, Commands.PlgCommandJobFilesAck): self._handle_job_files_ack(cmd) handled = True elif isinstance(cmd, Commands.PlgCommandTargets): if not len(self._target_list): self._lock.acquire() self._target_list = cmd.targets() self._lock.release() handled = True if not handled: print "Builder Error (%s): unhandled command '%s'" % (self._address, cmd.name()) def request(self, cmd_list): """Process and respond to an active builder's request. Called from the BuildMaster's XML-RPC server.""" # Reset unavailability counters and reactivate builder if needed self._last_contact = time.time() self._unavail_count = 0 if not self._available: self._handle_builder_reactivate(cmd_list) # Process the commands the builder sent us for cmd in cmd_list: self._dispatch_command(cmd) # Grab some work for the builder if any is available new_cmds = [] if self._free_slots > 0: archjob = self._manager.claim_arch_job(self) if archjob: next_seq = self._seq_gen.next() cmd = Commands.PlgCommandNewJobReq(archjob, seq=next_seq) new_cmds.append(cmd) self._lock.acquire() # Copy command queue self._cmd_queue = self._cmd_queue + new_cmds cmd_list = self._cmd_queue[:] # Keep around commands that need an ack for cmd in self._cmd_queue: if cmd.need_ack(): self._ack_pending_list.append(cmd) self._cmd_queue = [] self._lock.release() return cmd_list def ip(self): if not self._ip: self._get_ip() return Builder.ip(self) _SLEEP_INTERVAL = 10 def run(self): """Main builder loop. Since the builder contacts us, we don't have to do much here except handle builders going away.""" DebugUtils.registerThreadName(self) while not self._stop: if self._available: self._lock.acquire() if self._unavail_count > 2: self._handle_builder_suspend(SUSPEND_TIMEOUT, "the builder timed out") elif self._last_contact + self._REQUIRED_CONTACT_INTERVAL < time.time(): self._unavail_count = self._unavail_count + 1 self._lock.release() time.sleep(self._SLEEP_INTERVAL) def _handle_builder_suspend(self, reason, msg): Builder._handle_builder_suspend(self, reason, msg) self._last_contact = 0 def _handle_builder_reactivate(self, cmd_list): # Grab an updated target list from the command stream when # the builder contacts us target_list = [] for cmd in cmd_list: if isinstance(cmd, Commands.PlgCommandTargets): target_list = cmd.targets() elif isinstance(cmd, Commands.PlgCommandBuildingJobs): # Tell the builder to kill all jobs it might be building # right now. Think server restart here. for item in cmd.jobs(): (uniqid, status) = cmd.get_job(item) cmd = Commands.PlgCommandKillJob(uniqid, self._seq_gen.next()) self._cmd_queue.append(cmd) if not len(target_list): target_list = self._target_list mail = True if self._suspend_reason == SUSPEND_NONE: # Don't send mail saying the builder has been reactivated if # this is the first time the builder has contacted us mail = False self._lock.acquire() Builder._handle_builder_reactivate(self, mail=mail) self._target_list = target_list self._lock.release()