# This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # Copyright 2006 Dan Williams and Red Hat, Inc. import os import sys import socket import time import threading import urllib import string import xmlrpclib import OpenSSL import shutil from plague import Commands from plague.AuthedXMLRPCServer import AuthedSSLXMLRPCServer from plague.AuthedXMLRPCServer import AuthedXMLRPCServer from plague import HTTPServer from plague import XMLRPCServerProxy from plague import FileDownloader from plague import FileUploader from plague import FileTransfer import Config import BuilderMock class BuilderInitException(Exception): pass def get_hostname(cfg, bind_all=False): """Get the builder's hostname, optionally returning a hostname suitable for binding to all active interfaces.""" cfg_hostname = cfg.get_str("General", "hostname") if cfg_hostname and len(cfg_hostname): return cfg_hostname elif bind_all: return '' return socket.gethostname() def _determine_max_jobs(): """ Simple max job calculator based on number of CPUs """ import commands max_jobs = 1 cmd = "/usr/bin/getconf _NPROCESSORS_ONLN" (status, output) = commands.getstatusoutput(cmd) if status == 0: try: max_jobs = int(output) except ValueError: pass return max_jobs class Builder(object): """ Abstract builder base object """ def __init__(self, cfg): self._cfg = cfg self._max_slots = _determine_max_jobs() # Check for user-configured override of max jobs if cfg.has_option("General", "max_jobs"): max_slots = cfg.get_int("General", "max_jobs") if max_slots < 1 or max_slots > 100: self._log("Invalid configured max job count; using " \ "autoconfigured count %d instead." % self._max_slots) else: self._max_slots = max_slots self._seq_gen = Commands.SequenceGenerator() self._queued_cmds = [] self._stop = False self._stopped = False self._building_jobs_lock = threading.Lock() self._building_jobs = [] self._all_jobs = {} # Decompose hostname to just get the hostname name = cfg.get_str("General", "server") slash_idx = name.find("//") if slash_idx > 0: name = name[slash_idx + 2:] slash_idx = name.find("/") if slash_idx > 0: name = name[:slash_idx] self._server_hostname = name self._server_ip = None self._get_server_ip() self._upload_url = "" self._certs = None self._use_ssl = cfg.get_bool("SSL", "use_ssl") if self._use_ssl: self._certs = {} hostname = get_hostname(self._cfg) builder_cert = cfg.get_str("SSL", "builder_key_and_cert_dir") self._certs['key_and_cert'] = os.path.join(builder_cert, "%s.pem" % hostname) self._certs['ca_cert'] = cfg.get_str("SSL", "ca_cert") self._certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert") build_arches = [] for target in self._cfg.targets(): for arch in target.arches(): if not arch in build_arches: build_arches.append(arch) self._log("Available architectures: [%s]" % string.join(build_arches, ", ")) def _get_server_ip(self): try: self._server_ip = socket.gethostbyname(self._server_hostname) return True except Exception: pass return False def _log(self, msg, newline=True): if self._cfg.get_bool("General", "debug"): if newline: msg = msg + "\n" sys.stdout.write(msg) sys.stdout.flush() def _prefix_url(self, url): """Convenience function to add correct URL method for the security method we're using.""" if self._use_ssl: return "https://" + url return "http://" + url def new_builder(cfg, btype): """Create and return a new builder object of the requested type.""" if btype == 'passive': return PassiveBuilder(cfg) elif btype == 'active': return ActiveBuilder(cfg) else: return None new_builder = staticmethod(new_builder) def work(self): """Builder work loop, starts the builder object's thread and sleeps until it's done.""" self.start() try: while not self._stop: time.sleep(60) except KeyboardInterrupt: pass def cleanup(self): """Clean up the builder by killing all running jobs and waiting for them to complete.""" self._building_jobs_lock.acquire() for job in self._building_jobs: job.die() self._building_jobs_lock.release() # wait for the jobs to clean up before quitting self._log("Waiting for running jobs to stop...", newline=False) while True: if len(self._building_jobs) == 0: break try: self._log(".", newline=False) time.sleep(0.5) except KeyboardInterrupt: break def slots(self): self._building_jobs_lock.acquire() free_slots = self._max_slots - len(self._building_jobs) self._building_jobs_lock.release() return (free_slots, self._max_slots) def supported_targets(self): targets = [] for target in self._cfg.targets(): target_dict = target.target_dict() target_dict['supported_arches'] = target.arches() targets.append(target_dict) return targets def notify_job_done(self, archjob): self._building_jobs_lock.acquire() if archjob in self._building_jobs: self._building_jobs.remove(archjob) self._building_jobs_lock.release() def _get_target_cfg(self, target_dict): target_cfg = None # First try to find a target for buildarch specifically try: target_cfg = self._cfg.get_target(target_dict) except Config.InvalidTargetException: pass if not target_cfg: # If that doesn't work, just get a target that can build the arch try: target_cfg = self._cfg.get_target(target_dict, True) except Config.InvalidTargetException: pass return target_cfg def _get_default_commands(self): """Return a python list of serialized commands that the builder sends to the server every time it contacts the server.""" defcmds = [] # always send a target list next_seq = self._seq_gen.next() cmd = Commands.PlgCommandTargets(self.supported_targets(), self._upload_url, next_seq) defcmds.append(cmd) # always send free & max slots next_seq = self._seq_gen.next() (free_slots, max_slots) = self.slots() cmd = Commands.PlgCommandSlots(free_slots, max_slots, next_seq) defcmds.append(cmd) defcmds.append(self._get_building_jobs_cmd()) return defcmds def _dispatch_server_command(self, cmd): """Process a single command from the server.""" if isinstance(cmd, Commands.PlgCommandKillJob): self._handle_kill_job_command(cmd) elif isinstance(cmd, Commands.PlgCommandNewJobReq): (uniqid, msg) = self._start_new_job(cmd) ack = Commands.PlgCommandNewJobAck(uniqid, msg, cmd.seq(), self._seq_gen.next()) self._queued_cmds.append(ack) elif isinstance(cmd, Commands.PlgCommandUnlockRepo): self._handle_unlock_repo_request(cmd) elif isinstance(cmd, Commands.PlgCommandJobStatus): reply = self._handle_job_status_request(cmd) if reply: self._queued_cmds.append(reply) elif isinstance(cmd, Commands.PlgCommandJobFiles): reply = self._handle_job_files_request(cmd) if reply: self._queued_cmds.append(reply) elif isinstance(cmd, Commands.PlgCommandJobSRPM): self._handle_job_srpm_command(cmd) def _process_server_commands(self, cmd_list): """Process the server's command stream.""" if not cmd_list: # Something went wrong... return cmds = Commands.deserialize_command_stream(cmd_list) for cmd in cmds: self._dispatch_server_command(cmd) def _new_job_for_arch(self, target_cfg, buildarch, srpm_url, uniqid): """Creates a new mock build job given a particular build architecture.""" if buildarch != 'noarch' and not BuilderMock.BuilderClassDict.has_key(buildarch): # we know nothing about the architecture 'buildarch' return None builder_class = None if buildarch == 'noarch': # Just grab the first available architecture from the ones we support builder_class = BuilderMock.BuilderClassDict[target_cfg.arches()[0]] elif buildarch in target_cfg.arches(): builder_class = BuilderMock.BuilderClassDict[buildarch] # We'll throw a TypeError here if there's no available builder_class for this arch return builder_class(self, target_cfg, buildarch, srpm_url, uniqid) def _start_new_job(self, cmd): target_dict = cmd.target_dict() jobname = cmd.job_name() uniqid = cmd.archjob_id() target_str = Config.make_target_string(target_dict['distro'], target_dict['target'], target_dict['arch'], target_dict['repo']) msg = "Success" (free_slots, max_slots) = self.slots() if free_slots <= 0: msg = "Error: Tried to build '%s' on target %s when already building" \ " maximum (%d) jobs" % (jobname, target_str, max_slots) self._log(msg) return (-1, msg) target_cfg = self._get_target_cfg(target_dict) if not target_cfg: msg = "Error: Tried to build '%s' on target %s which isn't supported" % (jobname, target_str) self._log(msg) return (-1, msg) try: archjob = self._new_job_for_arch(target_cfg, target_dict['arch'], jobname, uniqid) self._all_jobs[uniqid] = archjob self._building_jobs_lock.acquire() self._building_jobs.append(archjob) self._building_jobs_lock.release() msg = "%s: started %s on %s arch %s at time %d" % (uniqid, jobname, target_str, target_dict['arch'], archjob.starttime()) archjob.start() except (OSError, TypeError), exc: msg = "Failed request for %s on %s: '%s'" % (jobname, target_str, exc) self._log(msg) return (uniqid, msg) def _get_building_jobs_cmd(self): cmd = Commands.PlgCommandBuildingJobs(self._seq_gen.next()) self._building_jobs_lock.acquire() for job in self._building_jobs: cmd.add_job(job.uniqid(), job.status()) self._building_jobs_lock.release() return cmd def _handle_unlock_repo_request(self, cmd): uniqid = cmd.archjob_id() self._building_jobs_lock.acquire() for job in self._building_jobs: if job.uniqid() == uniqid: job.unlock_repo() self._building_jobs_lock.release() def _handle_job_status_request(self, cmd): reply = None try: uniqid = cmd.archjob_id() job = self._all_jobs[uniqid] reply = Commands.PlgCommandJobStatusAck(uniqid, job.status(), cmd.seq(), self._seq_gen.next()) except KeyError: pass return reply def _handle_kill_job_command(self, cmd): try: uniqid = cmd.archjob_id() archjob = self._all_jobs[uniqid] archjob.die() self._building_jobs_lock.acquire() if archjob in self._building_jobs: self._building_jobs.remove(archjob) self._building_jobs_lock.release() except KeyError: pass def _handle_job_srpm_command(self, cmd): try: uniqid = cmd.archjob_id() archjob = self._all_jobs[uniqid] archjob.notify_srpm_url(cmd.srpm_url()) except KeyError: pass class PassiveBuilder(Builder, threading.Thread): """ Passive builders initiate no communication of their own. They wait for the build server to contact them, and therefore may not be used behind a firewall without holes being punched through it. The class structure here is somewhat convoluted, since PassiveBuilder is actually an XML-RPC request handler as well as a subclass of Builder. It's still it's own thread for housekeeping purposes, but most of it's functions get called from a thread spawned by the XML-RPC server to handle requests. """ def __init__(self, cfg): Builder.__init__(self, cfg) self._http_server = None self._xmlrpc_server = None self._work_dir = os.path.abspath(cfg.get_str("Directories", "builder_work_dir")) self._fileserver_port = self._cfg.get_int("Passive", "fileserver_port") threading.Thread.__init__(self) self._init_servers() def _init_servers(self): """Startup HTTP and XML-RPC servers which the build server uses to talk to us.""" hostname = get_hostname(self._cfg, bind_all=True) external_hostname = get_hostname(self._cfg) try: self._http_server = HTTPServer.PlgHTTPServerManager((hostname, self._fileserver_port), self._work_dir, self._certs) except socket.error, exc: raise socket.error(exc[0], "Couldn't create server for %s:%s: '%s'" % (hostname, self._fileserver_port, exc[1])) # Set up upload handlers and addresses upload_loc = "/upload" host = self._prefix_url(external_hostname) self._http_server.set_POST_handler(upload_loc, self.upload_callback) self._upload_url = "%s:%d%s" % (host, self._fileserver_port, upload_loc) xmlrpc_port = self._cfg.get_int("Passive", "xmlrpc_port") self._log("Binding to address '%s:%d'\n" % (external_hostname, xmlrpc_port)) try: if self._use_ssl: self._xmlrpc_server = AuthedSSLXMLRPCServer((hostname, xmlrpc_port), None, self._certs) else: self._xmlrpc_server = AuthedXMLRPCServer((hostname, xmlrpc_port), None) except socket.error, exc: if exc[0] == 98: raise BuilderInitException("Error: couldn't bind to address '%s:%s'. " \ "Is the builder already running?\n" % (hostname, xmlrpc_port)) self._xmlrpc_server.register_instance(self) def _get_workdir_for_job(self, archjob_id): return os.path.join(self._work_dir, archjob_id) def upload_callback(self, request_handler, fs): """Handle SRPM uploads from the server.""" # Ensure we know the server if not self._server_ip: self._get_server_ip() ip = request_handler.client_address[0] if not self._server_ip or self._server_ip != ip: request_handler.send_error(403, "Unauthorized") return # Search for filename fslist = [fs] if not fs.name and not fs.filename and fs.value: fslist = fs.value jobid = filename = tmpfile = None for item in fslist: if item.name == 'archjob_id': try: jobid = urllib.unquote(str(item.value)) # ensure we know about this job already if not jobid in self._all_jobs.keys(): jobid = None except ValueError: pass elif item.name == 'filedata': filename = item.filename tmpfile = item.file if jobid and filename and tmpfile: upload_dir = os.path.join(self._get_workdir_for_job(jobid), "source") destpath = os.path.join(upload_dir, urllib.unquote(filename)) dest = file(destpath, "w+b") shutil.copyfileobj(tmpfile, dest) dest.close() request_handler.send_response(200, "Success") request_handler.send_header("Content-type", "text/html") request_handler.end_headers() request_handler.wfile.write("Success!") else: request_handler.send_error(400, "Invalid request for %s" % request_handler.path) def run(self): """Main builder loop. Sit around and serve requests.""" self._http_server.start() # We sit in serve_forever() until stopped # FIXME: how do we stop this? server_close() doesn't seem to self._xmlrpc_server.serve_forever() self._stopped = True def stop(self): """Tear down HTTP and XML-RPC servers and cleanup their resources.""" self._http_server.stop() self._xmlrpc_server.stop() self._xmlrpc_server.server_close() # FIXME: server_close() doesn't seem to stop the serve_forever() return while not self._stopped: try: time.sleep(0.2) except KeyboardInterrupt: pass ################################################################### # Code below called by XML-RPC request handler from request thread ################################################################### def request(self, cmd_list): """Main XML-RPC handler, called by the build server. Dispatches the build server's requests and returns our response.""" self._process_server_commands(cmd_list) cmds_for_server = self._get_default_commands() cmds_for_server = cmds_for_server + self._queued_cmds self._queued_cmds = [] cmd_stream = Commands.serialize_to_command_stream(cmds_for_server) return cmd_stream def download_srpm(self, archjob_id, url, target_dir, dl_callback, cb_data=None): """For passive builders, the server uploads the RPM to the builder. Therefore, we already have it. Move it from the HTTP server's upload directory to the requested target_dir, if the SRPM exists.""" filename = os.path.basename(urllib.unquote(url)) source_dir = os.path.join(self._get_workdir_for_job(archjob_id), "source") target_file = os.path.join(target_dir, filename) result = FileTransfer.FT_RESULT_FAILED msg = "Failed" # Usually the upload dir will be the same as the archjob's target dir, # but if it's not, copy the file over if source_dir != target_dir: try: shutil.move(os.path.join(source_dir, filename), target_file) result = FileTransfer.FT_RESULT_SUCCESS except IOError, exc: msg = str(exc) elif os.access(target_file, os.R_OK): # Otherwise make sure the files where the archjob wants it result = FileTransfer.FT_RESULT_SUCCESS if result == FileTransfer.FT_RESULT_SUCCESS: msg = "Success" dl_callback(result, cb_data, msg) return None def upload_files(self, archjob_id, files, ul_callback, cb_data=None): """For passive builders, the build server retrieves the result files from the builder. So we pretty much do nothing here, since the work_dir is already the HTTP download dir.""" work_dir = self._get_workdir_for_job(archjob_id) result = FileTransfer.FT_RESULT_FAILED msg = "Failed" for fpath in files: if fpath.startswith(work_dir): result = FileTransfer.FT_RESULT_SUCCESS continue last_part = fpath[len(work_dir):] new_path = os.path.join(work_dir, last_part) try: shutil.move(fpath, new_path) result = FileTransfer.FT_RESULT_SUCCESS except IOError, exc: msg = "Failed moving %s to %s: '%s'" % (fpath, new_path, str(exc)) break if result == FileTransfer.FT_RESULT_SUCCESS: msg = "Success" ul_callback(result, cb_data, msg) return None def _handle_job_files_request(self, cmd): """Return a list of urls of the result files of this job.""" archjob_id = cmd.archjob_id() try: job = self._all_jobs[archjob_id] except KeyError: return None # url-ify the file list urls = [] host = self._prefix_url(get_hostname(self._cfg)) for fpath in job.files(): if not fpath.startswith(self._work_dir): return None file_part = urllib.quote(fpath[len(self._work_dir) + 1:]) full_url = "%s:%d/%s" % (host, self._fileserver_port, file_part) urls.append(full_url) return Commands.PlgCommandJobFilesAck(archjob_id, urls, cmd.seq(), self._seq_gen.next()) # HACK: This class is a hack to work around SSL hanging issues, # which cause the whole server to grind to a halt class ActiveBuilderRequest(threading.Thread): def __init__(self, server, address, cmds): self._server = server self._address = address self._cmds = cmds self.done = False self.failed = False self.response = None self.err_msg = '' threading.Thread.__init__(self) def run(self): self.setName("ActiveBuilderRequest: %s" % self._address) try: cmd_stream = Commands.serialize_to_command_stream(self._cmds) self.response = self._server.request(cmd_stream) except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError), exc: self.err_msg = "Builder Error (%s) in request(): network problem '%s'" % (self._address, exc) self.failed = True except xmlrpclib.Fault, exc: self.err_msg = "Builder Error (%s) in request(): server replied '%s'" % (self._address, exc) self.failed = True self.done = True class ActiveBuilder(Builder, threading.Thread): """ Active builders initiate all communication to the builder server, and therefore may be used from behind a firewall. """ _SERVER_CONTACT_INTERVAL = 20 def __init__(self, cfg): Builder.__init__(self, cfg) self._last_comm = time.time() - self._SERVER_CONTACT_INTERVAL - 1 self._xmlrpc_address = self._get_server_address(cfg.get_str("Active", "xmlrpc_port")) self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._xmlrpc_address, self._certs, timeout=20) threading.Thread.__init__(self) def _get_server_address(self, port): addr = self._prefix_url(self._server_hostname) return addr + ":" + port def download_srpm(self, archjob_id, url, target_dir, dl_callback, cb_data=None): """Download an SRPM from the build server. Only used by BuilderMock objects.""" self._log("%s: Starting download of %s.\n" % (archjob_id, url)) downloader = FileDownloader.FileDownloader(url, target_dir, ['.src.rpm'], self._certs) downloader.set_callback(dl_callback, url) downloader.start() return downloader def upload_files(self, archjob_id, files, ul_callback, cb_data=None): port = self._cfg.get_int("Active", "fileserver_port") url = "%s:%d/upload" % (self._server_hostname, port) url = self._prefix_url(url) data = [("archjob_id", archjob_id)] uploader = FileUploader.FileUploader(url, files, 'filedata', data, self._certs) uploader.set_callback(ul_callback, cb_data) uploader.start() return uploader def _handle_job_files_request(self, cmd): """Return a list of urls of the result files of this job.""" archjob_id = cmd.archjob_id() try: job = self._all_jobs[archjob_id] except KeyError: return None # url-ify the file list urls = [] for fpath in job.files(): urls.append("file:///%s" % os.path.basename(fpath)) return Commands.PlgCommandJobFilesAck(archjob_id, urls, cmd.seq(), self._seq_gen.next()) def _send_commands(self): """Send default commands, and any commands that we've queued up since the last time we sent commands to the server.""" cmds = self._get_default_commands() cmds = cmds + self._queued_cmds # The actual XML-RPC request runs in a different thread because SSL # calls sometimes hang req = ActiveBuilderRequest(self._server, self._xmlrpc_address, cmds) curtime = time.time() req.start() # Give the request 10s, otherwise forget about it while time.time() - curtime < 10: if req.done: break time.sleep(0.5) if req.done: if not req.failed: self._queued_cmds = [] return req.response else: self._log(req.err_msg) return None def run(self): """Main builder loop, send commands to and receive commands from the server every so often.""" while not self._stop: if self._last_comm < time.time() - self._SERVER_CONTACT_INTERVAL: self._last_comm = time.time() resp = self._send_commands() self._process_server_commands(resp) time.sleep(1) self._stopped = True def stop(self): self._stop = True while not self._stopped: try: time.sleep(0.2) except KeyboardInterrupt: pass