# This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # Copyright 2005 Dan Williams and Red Hat, Inc. import time import socket import os import threading import Builder from plague import DebugUtils from plague import AuthedXMLRPCServer from plague import HTTPServer from plague import Commands import ArchJob class AddrCache(object): _ENTRY_EXPIRE_TIME = 3600 def __init__(self): self._cache = {} def get(self, name): # Expire cache entry if one exists and is old ip = None try: (itime, ip) = self._cache[name] if itime < time.time() - self._ENTRY_EXPIRE_TIME: del self._cache[name] itime = ip = None except KeyError: pass # Do a lookup and cache it if not ip: try: ip = socket.gethostbyname(name) self._cache[name] = (time.time(), ip) except: pass return ip class AuthedSSLBuilderServer(AuthedXMLRPCServer.AuthedSSLXMLRPCServer): """ SSL XMLRPC server that authenticates builders based on their certificate. """ def __init__(self, address, certs, builder_manager): AuthedXMLRPCServer.AuthedSSLXMLRPCServer.__init__(self, address, self.auth_cb, certs) self.authenticator = builder_manager self._addr_cache = AddrCache() def auth_cb(self, request, con_addr_pair): peer_cert = request.get_peer_certificate() cert_address = peer_cert.get_subject().commonName try: (con_address, con_port) = con_addr_pair cert_ip = self._addr_cache.get(cert_address) con_ip = self._addr_cache.get(con_address) builder = self.authenticator.get_builder(cert_ip, con_ip) if builder.type() is not Builder.TYPE_ACTIVE: builder = None except Exception: builder = None return builder class AuthedBuilderServer(AuthedXMLRPCServer.AuthedXMLRPCServer): """ Authenticates builders based on their IP address. """ def __init__(self, address, builder_manager): AuthedXMLRPCServer.AuthedXMLRPCServer.__init__(self, address, self.auth_cb) self.authenticator = builder_manager self._addr_cache = AddrCache() def auth_cb(self, request, con_addr_pair): try: (con_address, con_port) = con_addr_pair ip = self._addr_cache.get(con_address) builder = self.authenticator.get_builder(ip, ip) if builder.type() is not Builder.TYPE_ACTIVE: builder = None except Exception: builder = None return builder class BuilderDispatcher(object): def request(self, cmd_list): # Authorize the builder, then pass the request # to the correct builder object builder = AuthedXMLRPCServer.get_authinfo() if not builder: cmd = Commands.PlgCommandError("Builder is not authorized") return [cmd.serialize()] cmds = Commands.deserialize_command_stream(cmd_list) cmds_for_server = builder.request(cmds) cmd_stream = Commands.serialize_to_command_stream(cmds_for_server) return cmd_stream class BuilderServerThread(threading.Thread): """ Object to serve active builder requests in a separate thread. Can't block the main BuilderManager object by sitting in serve_forever(). """ def __init__(self, bm, cfg, use_ssl, certs): self._cfg = cfg self._bm = bm self._stopped = False threading.Thread.__init__(self) self.setName("BuilderServerThread") hostname = cfg.get_str("General", "hostname") port = cfg.get_int("Active Builders", "xmlrpc_server_port") try: if use_ssl: self._server = AuthedSSLBuilderServer((hostname, port), certs, self._bm) else: self._server = AuthedBuilderServer((hostname, port), self._bm) except socket.gaierror, exc: raise socket.gaierror(exc[0], "Couldn't bind to address %s:%d (%s)" % (hostname, port, exc[1])) self._dispatcher = BuilderDispatcher() self._server.register_instance(self._dispatcher) def run(self): DebugUtils.registerThreadName(self) self._server.serve_forever() self._stopped = True def stop(self): self._server.stop() tm = time.time() while not self._stopped: try: if time.time() > tm + 2: break except KeyboardInterrupt: pass class BuilderManager: """ Tracks individual builder instances. """ def __init__(self, cfg): self._cfg = cfg self._builders_lock = threading.Lock() self._builders = [] any_active = self._load_builders() self._queue_lock = threading.Lock() self._queue = [] self._certs = None self._use_ssl = cfg.get_bool("Builders", "use_ssl") if self._use_ssl: self._certs = {} self._certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert") self._certs['ca_cert'] = cfg.get_str("SSL", "ca_cert") self._certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert") self._xmlrpc_server = None if any_active: # Builder XMLRPC server # Only start it when there are active-type builders listed # in the config file self._xmlrpc_server = BuilderServerThread(self, cfg, self._use_ssl, self._certs) self._xmlrpc_server.start() # Builder HTTP fileserver self._hostname = cfg.get_str("General", "hostname") self._http_port = cfg.get_int("Active Builders", "file_server_port") self._http_dir = os.path.join(cfg.get_str("Directories", "server_work_dir"), "srpm_http_dir") self._fileserver = HTTPServer.PlgHTTPServerManager((self._hostname, self._http_port), self._http_dir, self._certs) if any_active: self._fileserver.set_POST_handler('/upload', self.upload_callback) self._fileserver.start() self._print_builders() def get_http_url_base(self): """Return the base HTTP server URL, taking port and SSL into account.""" method = "http" if self._use_ssl: method = "https" return "%s://%s:%d/" % (method, self._hostname, self._http_port) def http_dir(self): return self._http_dir def upload_callback(self, request_handler, fs): # Ensure we know this builder ip = request_handler.client_address[0] builder = self.get_builder(ip, ip) if not builder: request_handler.send_error(403, "Unauthorized") return # Search for filename fslist = [fs] if not fs.name and not fs.filename and fs.value: fslist = fs.value jobid = filename = tmpfile = None for item in fslist: if item.name == 'archjob_id': try: jobid = str(item.value) except ValueError: pass elif item.name == 'filedata': filename = item.filename tmpfile = item.file if jobid and filename and tmpfile: archjob = builder.get_archjob(jobid) if archjob: upload_dir = archjob.get_result_files_dir() import shutil, urllib destpath = os.path.join(upload_dir, urllib.unquote(filename)) dest = file(destpath, "w+b") shutil.copyfileobj(tmpfile, dest) dest.close() request_handler.send_response(200, "Success") request_handler.send_header("Content-type", "text/html") request_handler.end_headers() request_handler.wfile.write("Success!") else: request_handler.send_error(400, "Invalid request for archjob_id %s" % jobid) else: request_handler.send_error(400, "Invalid request for %s" % request_handler.path) def _print_builders(self): # Print out builder list when starting up print "\nAuthorized Builders:" print "-" * 90 for builder in self._builders: addr = builder.address() string = " " + addr string = string + " " * (40 - len(addr)) builder_dict = builder.to_dict() for arch in builder_dict['arches']: string = string + arch + " " string = string + " " * (80 - len(string)) status = "unavailable" if builder_dict['available']: status = "available" string = string + status del builder_dict print string print "" def stop(self): for builder in self._builders: builder.stop() if self._xmlrpc_server: self._xmlrpc_server.stop() self._fileserver.stop() def _load_builders(self): self._builders_lock.acquire() any_active = False builders = self._cfg.builders() for address in builders: (weight, btype) = builders[address] if btype == Builder.TYPE_ACTIVE: any_active = True # If the address is already in our _builders list, skip it skip = False for builder in self._builders: addr = builder.address() if address == addr: skip = True break if skip == True: continue # Add the builder to our build list if btype == Builder.TYPE_ACTIVE: builder = Builder.ActiveBuilder(self, self._cfg, address, weight) else: builder = Builder.PassiveBuilder(self, self._cfg, address, weight) builder.start() self._builders.append(builder) self._builders_lock.release() return any_active def ping_suspended_builders(self): self._builders_lock.acquire() for builder in self._builders: passive = (builder.type() == Builder.TYPE_PASSIVE) if passive and not builder.alive(): builder.ping_asap() self._builders_lock.release() def list_builders(self): builder_list = [] for builder in self._builders: builder_list.append(builder.to_dict()) return builder_list def get_builder(self, cert_ip, con_ip): """Return the Builder object, if any, that matches the specified IP address. Performs basic checking on the address too.""" self._builders_lock.acquire() ret_builder = None # Ensure builder's certificate (if SSL) and # the remote address of its connection are the same if cert_ip == con_ip: # Find matching builder in our authorized builders list for builder in self._builders: ip = builder.ip() if ip and cert_ip == ip: ret_builder = builder break self._builders_lock.release() return ret_builder def _builder_cmp_func(self, builder1, builder2): # If both builders have at least one free slot, sort on # weight, not free slots b1_free = builder1.free_slots() b2_free = builder2.free_slots() if b1_free > 0 and b2_free > 0: b1_weight = builder1.weight() b2_weight = builder2.weight() # Equal weight case falls through to # sort on free slots if b1_weight > b2_weight: return 1 elif b1_weight < b2_weight: return -1 if b1_free > b2_free: return 1 elif b1_free == b2_free: return 0 elif b1_free < b2_free: return -1 return 1 def claim_arch_job(self, builder): """Called by a Builder instance to find a job for the builder to build.""" archjob = None self._queue_lock.acquire() # First pass: look for orphaned jobs for job in self._queue: if job.status() != ArchJob.AJ_STATUS_QUEUED: continue if job.orphaned() and builder.can_build_arch_job(job): job.claim(builder) archjob = job break # Second pass: just pick any job if not archjob: for job in self._queue: if job.status() != ArchJob.AJ_STATUS_QUEUED: continue if builder.can_build_arch_job(job): job.claim(builder) archjob = job break self._queue_lock.release() return archjob def request_arch_job(self, archjob): """Called by the PackageJob instance to queue new arch-specific build jobs.""" self._queue_lock.acquire() self._queue.append(archjob) self._queue_lock.release() def any_prepping_builders(self): # query each build builder for any jobs that are in the 'prepping' state for builder in self._builders: if builder.available() and builder.any_prepping_jobs(): return True return False