# This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # Copyright 2005 Dan Williams and Red Hat, Inc. import EmailUtils import time import sys import os import copy import exceptions import BuildMaster import PackageJob import Config import DBManager from plague import AuthedXMLRPCServer # API version #, just increment each time an incompatible API change is made XMLRPC_API_VERSION = 100 def validate_email(email): safe_list = ['@', '_', '-', '.', '+'] for c in email: # For now, legal characters are '@_-.+' plus alphanumeric if not (c in safe_list) and not c.isalnum(): return False return True def validate_uid(uid_in): try: uid = int(uid_in) except ValueError: return None except TypeError: return None if uid < 0: return None return uid def validate_package_name(name): safe_list = ['_', '-', '+', '.'] for c in name: if not (c in safe_list) and not c.isalnum(): return False return True def validate_cvs_tag(tag): safe_list = ['-', '_', '.', ':', '~', '[', ']', '+'] for c in tag: if not (c in safe_list) and not c.isalnum(): return False return True def validate_orderby(order): safe_list = ['username', 'status', 'result', 'uid', 'target', 'starttime', 'endtime', 'package'] if order in safe_list: return True return False def resolve_target(cfg, alias): target_cfg = None try: target_cfg = cfg.resolve_target_user_alias(alias) except Config.InvalidUserAliasException: pass # If it wasn't an alias, try actual target names if not target_cfg: target_cfg = cfg.resolve_target_string(alias) return target_cfg class UserInterface: """ Base UserInterface class. NO AUTHENTICATION. Subclass this to provide some. """ def __init__(self, builder_manager, build_master, db_manager, cfg): self._builder_manager = builder_manager self._bm = build_master self._db_manager = db_manager self._cfg = cfg def email_result(self, to, source, resultstring): """ send 'resultstring' to an email address """ subject = 'Enqueue Result: %s' % source sender = self._cfg.get_str("Email", "email_from") EmailUtils.email_result(sender, to, resultstring, subject) def _wait_for_uid(self, req): """ Wait a bit to see if the UID comes back to us """ starttime = time.time() while req['uid_avail'] == False: time.sleep(0.2) # Wait at most 3 seconds for the uid to come back if time.time() > starttime + 3: break uid = int(req['uid']) return uid def enqueue(self, email, package, source, target_alias, buildreq=None): """ Accept a job to build and stuff it into the job database """ is_cvs_tag = True if source.endswith(".src.rpm") or source.find("/") != -1: is_cvs_tag = False # Do source-type specifc validation if is_cvs_tag: if self._cfg.get_bool("CVS", "use_cvs") == False: self.email_result(email, source, "Error setting up build for %s on "\ "%s: this server builds SRPMs, not CVS checkouts." % (source, target_alias)) return (-1, "This build server is set up for building SRPMS only. ", -1) if not validate_cvs_tag(source): self.email_result(email, source, "Error setting up build for %s on "\ "%s: The CVS tag '%s' contained an illegal character. "\ "Submit a bug report?" % (package, target_alias, cvs_tag)) return (-1, "The CVS tag contained an illegal character.", -1) else: if self._cfg.get_bool("CVS", "use_cvs") == True: self.email_result(email, source, "Error setting up build for %s on "\ "%s: this server builds CVS checkouts, not SRPMs." % (source, target_alias)) return (-1, "This build server is set up for building CVS checkouts only. ", -1) # We limit the database field to 255 chars if len(source) > 255: self.email_result(email, source, "Error setting up build for %s on "\ "%s: try using a shorter path to the SRPM (< 255 chars)." % (source, target_alias)) return (-1, "Pathname to SRPM is limited to 255 characters.", -1) srpm_file = os.path.abspath(source) if not srpm_file or not os.access(srpm_file, os.R_OK): self.email_result(email, source, "Error setting up build for %s on "\ "%s: The SRPM does not exist, or is not accessible. Remember to use absolute paths." % (source, target_alias)) return (-1, "SRPM does not exist or is not accessible, remember to use absolute paths.", -1) if not validate_package_name(package): self.email_result(email, source, "Error setting up build for %s on "\ "%s: Package name '%s' contained an illegal character. "\ "Submit a bug report?" % (source, target_alias, package)) return (-1, "The package name contained an illegal character.", -1) target_cfg = resolve_target(self._cfg, target_alias) if not target_cfg: self.email_result(email, source, "Error setting up build for %s on "\ "%s: target does not exist." % (source, target_alias)) return (-1, "This build server does not support the target %s." % target_alias, -1) print "Request to enqueue '%s' tag '%s' for target '%s' (user '%s')" % (package, source, target_alias, email) req = self._bm.create_job_request(email, package, source, target_cfg.target_dict(), buildreq, time.time()) self._bm.enqueue(req) uid = self._wait_for_uid(req) return (0, "Success: package has been queued.", uid) def requeue(self, uid): uid = validate_uid(uid) if not uid: return (-1, "Error: Invalid job UID.") sql = "SELECT uid, username, status, result FROM jobs WHERE uid=%d" % uid # Run the query for the job try: dbcx = self._db_manager.dbcx() curs = dbcx.cursor() except StandardError, e: del curs, dbcx return (-1, "Unable to access job database: '%s'" % e) curs.execute(sql) job = dbcx.fetchone(curs) result = None if not job: result = (-1, "Error: Invalid job UID.") elif job['result'] != 'failed' and job['result'] != 'killed': result = (-1, "Error: Job %d must be either 'failed' or 'killed' to requeue." % uid) else: self._bm.requeue_job(uid) result = (0, "Success: Job %d has been requeued." % uid) del curs, dbcx return result def _kill_job(self, email, job, jobid): if not job: return (-1, "Job %s does not exist." % jobid) if job.is_done(): return (-1, "Job %s is already finished." % job.get_uid()) job.die(email) return (0, "Success: job %s killed." % job.get_uid()) def list_jobs(self, args_dict): """ Query job information and return it to the user """ sql = "SELECT uid, username, package, source, target_distro, target_target, " \ "target_repo, starttime, endtime, status, result FROM jobs WHERE " sql_args = [] if args_dict.has_key('email') and args_dict['email']: if validate_email(args_dict['email']): sql_args.append("username LIKE '%%%s%%'" % args_dict['email']) else: return (-1, "Error: Invalid email address.", []) if args_dict.has_key('status') and args_dict['status']: status_list = args_dict['status'] if not len(status_list): return (-1, "Error: Invalid job status.", []) status_sql = '' for status in status_list: if not PackageJob.is_package_job_stage_valid(status): return (-1, "Error: Invalid job status.", []) if len(status_sql) > 0: status_sql = status_sql + " OR status='%s'" % status else: status_sql = "status='%s'" % status status_sql = '(' + status_sql + ')' sql_args.append(status_sql) if args_dict.has_key('result') and args_dict['result']: result = args_dict['result'] if PackageJob.is_package_job_result_valid(result): sql_args.append("result='%s'" % result) else: return (-1, "Error: Invalid job result.", []) if args_dict.has_key('uid') and args_dict['uid']: uid = validate_uid(args_dict['uid']) if uid == None: return (-1, "Error: Invalid job UID.", []) sql_args.append('uid=%d' % uid) if args_dict.has_key('uid_gt') and args_dict['uid_gt']: uid = validate_uid(args_dict['uid_gt']) if uid == None: return (-1, "Error: Invalid job UID.", []) sql_args.append('uid>%d' % uid) if args_dict.has_key('uid_lt') and args_dict['uid_lt']: uid = validate_uid(args_dict['uid_lt']) if uid == None: return (-1, "Error: Invalid job UID.", []) sql_args.append('uid<%d' % uid) if args_dict.has_key('target') and args_dict['target']: target_cfg = resolve_target(self._cfg, args_dict['target']) if not target_cfg: return (-1, "Error: Invalid target.", []) sql_args.append("target_distro='%s'" % target_cfg.distro()) sql_args.append("target_target='%s'" % target_cfg.target()) sql_args.append("target_repo='%s'" % target_cfg.repo()) if not len(sql_args): return (-1, "Error: Invalid query.", []) # Assemble the final SQL statement i = 1 for arg in sql_args: sql = sql + arg if i < len(sql_args): sql = sql + " AND " i = i + 1 # Result list order if args_dict.has_key('orderby') and args_dict['orderby']: orderby_list = args_dict['orderby'] if not len(orderby_list): return (-1, "Error: Invalid result order.", []) orderby_sql = '' for order in orderby_list: direction = '' try: field, direction = order.split() direction = direction.upper() if direction != 'ASC' and direction != "DESC": return (-1, "Error: Invalid result order '%s'." % order, []) except ValueError: field = order if not validate_orderby(field): return (-1, "Error: Invalid result order field '%s'." % field, []) if len(orderby_sql) > 0: orderby_sql = orderby_sql + ', %s' % field else: orderby_sql = field if len(direction) > 0: orderby_sql = orderby_sql + ' %s' % direction sql = sql + " ORDER BY %s" % orderby_sql # Deal with max # records # MUST BE LAST ON SQL if args_dict.has_key('maxrows') and args_dict['maxrows']: if not validate_uid(args_dict['maxrows']): return (-1, "Error: Invalid max rows number.", []) sql = sql + " LIMIT %d" % args_dict['maxrows'] # Run the query for the job try: dbcx = self._db_manager.dbcx() curs = dbcx.cursor() except StandardError, e: del curs, dbcx return (-1, "Unable to access job database: '%s'" % e, []) curs.execute(sql) data = dbcx.fetchall(curs) jobs = [] for row in data: jobrec = {} jobrec['uid'] = row['uid'] jobrec['username'] = row['username'] jobrec['package'] = row['package'] jobrec['source'] = row['source'] jobrec['target_distro'] = row['target_distro'] jobrec['target_target'] = row['target_target'] jobrec['target_repo'] = row['target_repo'] jobrec['starttime'] = row['starttime'] jobrec['endtime'] = row['endtime'] jobrec['status'] = row['status'] jobrec['result'] = row['result'] jobrec['archjobs'] = [] jobs.append(copy.deepcopy(jobrec)) # Mash all returned job UIDs into an SQL query to get all their archjobs uids = '' for job in jobs: if len(uids) == 0: uids = uids + "parent_uid=%d" % job['uid'] else: uids = uids + " OR parent_uid=%d" % job['uid'] # Get all archjobs for this job if len(uids) > 0: sql = "SELECT jobid, parent_uid, starttime, endtime, arch, builder_addr, " \ "status, builder_status FROM archjobs WHERE " + uids curs.execute(sql) data = dbcx.fetchall(curs) for row in data: ajrec = {} ajrec['jobid'] = row['jobid'] ajrec['parent_uid'] = row['parent_uid'] ajrec['starttime'] = row['starttime'] ajrec['endtime'] = row['endtime'] ajrec['arch'] = row['arch'] ajrec['builder_addr'] = row['builder_addr'] ajrec['status'] = row['status'] ajrec['builder_status'] = row['builder_status'] for job in jobs: if job['uid'] == ajrec['parent_uid']: job['archjobs'].append(copy.deepcopy(ajrec)) del curs, dbcx return (0, "Success.", jobs) def detail_job(self, uid): """ Query job information and return it to the user """ uid = validate_uid(uid) if not uid: return (-1, "Error: Invalid job UID.", {}) sql = 'SELECT uid, username, package, source, target_distro, target_target, ' \ 'target_repo, starttime, endtime, status, rpm_epoch, rpm_version, rpm_release, ' \ 'archlist, result_msg FROM jobs WHERE uid=%d' % uid # Run the query for the job try: dbcx = self._db_manager.dbcx() curs = dbcx.cursor() except StandardError, e: del curs, dbcx return (-1, "Unable to access job database: '%s'" % e, {}) curs.execute(sql) job = dbcx.fetchone(curs) if not job: del curs, dbcx return (-1, "Error: Invalid job UID.", {}) jobrec = {} jobrec['uid'] = job['uid'] jobrec['username'] = job['username'] jobrec['package'] = job['package'] jobrec['source'] = job['source'] jobrec['target_distro'] = job['target_distro'] jobrec['target_target'] = job['target_target'] jobrec['target_repo'] = job['target_repo'] jobrec['starttime'] = job['starttime'] jobrec['endtime'] = job['endtime'] jobrec['status'] = job['status'] if job['rpm_epoch'] and job['rpm_version'] and job['rpm_release']: jobrec['epoch'] = job['rpm_epoch'] jobrec['version'] = job['rpm_version'] jobrec['release'] = job['rpm_release'] base_url = self._cfg.get_str("UI", "log_url") target_str = "%s-%s-%s" % (jobrec['target_distro'], jobrec['target_target'], jobrec['target_repo']) log_url = PackageJob.make_job_log_url(base_url, target_str, str(uid), jobrec['package'], jobrec['version'], jobrec['release']) if log_url and len(log_url): jobrec['log_url'] = log_url if job['result_msg']: jobrec['result_msg'] = job['result_msg'] jobrec['archjobs'] = [] # Get all archjobs for this job sql = "SELECT jobid, parent_uid, starttime, endtime, arch, builder_addr, " \ "status, builder_status FROM archjobs WHERE parent_uid=%d " % uid curs.execute(sql) data = dbcx.fetchall(curs) for row in data: ajrec = {} ajrec['jobid'] = row['jobid'] ajrec['parent_uid'] = row['parent_uid'] ajrec['starttime'] = row['starttime'] ajrec['endtime'] = row['endtime'] ajrec['arch'] = row['arch'] ajrec['builder_addr'] = row['builder_addr'] ajrec['status'] = row['status'] ajrec['builder_status'] = row['builder_status'] jobrec['archjobs'].append(ajrec) ret_job = copy.deepcopy(jobrec) del curs, dbcx return (0, "Success.", ret_job) def update_builders(self): self._builder_manager.add_new_builders() self._builder_manager.ping_suspended_builders() time.sleep(2) builder_list = self._builder_manager.list_builders() return (0, "Success.", builder_list) def list_builders(self): builder_list = self._builder_manager.list_builders() return (0, "Success.", builder_list) def targets(self): targets = [] for target_cfg in self._cfg.targets(): targets.append(target_cfg.target_string()) return targets def pause(self, paused): s = "paused" if not paused: s = "unpaused" if paused == self._bm.is_paused(): return (-1, "Error: system is already %s." % s) else: self._bm.pause(paused) return (0, "Success, system %s." % s) def is_paused(self): return self._bm.is_paused() def finish(self, uid_list): try: dbcx = self._db_manager.dbcx() curs = dbcx.cursor() except StandardError, e: del curs, dbcx return (-1, "Unable to access job database: '%s'" % e) uids = '' for uid in uid_list: uid = validate_uid(uid) if not uid: continue if len(uids) == 0: uids = uids + "uid=%d" % uid else: uids = uids + " OR uid=%d" % uid if len(uids): sql = "UPDATE jobs SET status='finished' WHERE %s" % uids curs.execute(sql) dbcx.commit() del curs, dbcx return (0, "Success.") def srpm_upload_dir(self, target_alias): target_cfg = resolve_target(self._cfg, target_alias) if not target_cfg: return (-1, "") target_str = target_cfg.target_string() work_dir = self._cfg.get_str("Directories", "server_work_dir") return (0, os.path.join(work_dir, "srpm_upload_dir", target_str)) def api_version(self): return XMLRPC_API_VERSION class UserInterfaceSSLAuth(UserInterface): """ Allow/Deny operations based on user privileges """ def enqueue(self, package, source, target_alias, buildreq=None): user = AuthedXMLRPCServer.get_authinfo() if not user or not user.own_jobs: return (-1, "Insufficient privileges.", -1) return UserInterface.enqueue(self, user.email, package, source, target_alias, buildreq) def requeue(self, uid): user = AuthedXMLRPCServer.get_authinfo() if not user or not user.own_jobs: return (-1, "Insufficient privileges.") uid = validate_uid(uid) if not uid: return (-1, "Error: Invalid job UID.") sql = 'SELECT uid, username, status FROM jobs WHERE uid=%d' % uid # Run the query for the job try: dbcx = self._db_manager.dbcx() curs = dbcx.cursor() except StandardError, e: del curs, dbcx return (-1, "Unable to access job database: '%s'" % e) curs.execute(sql) job = dbcx.fetchone(curs) result = None if not job: result = (-1, "Error: Invalid job UID.") elif job['username'] != user.email and not user.job_admin: result = (-1, "Error: You are not the original submitter for Job %d." % uid) else: result = UserInterface.requeue(self, uid) del curs, dbcx return result def kill_job(self, email, jobid): user = AuthedXMLRPCServer.get_authinfo() if not user or not user.own_jobs: return (-1, "Insufficient privileges.") if user.email != email: return (-1, "Insufficient privileges.") jobid = int(jobid) job = self._bm.get_job(jobid) if job: # Only the job owner and job_admins can kill jobs if user.email != job.username and not user.job_admin: return (-1, "Insufficient privileges.") else: return (-1, "Job %s is not currently building and therefore cannot be killed." % jobid) return self._kill_job(user.email, job, jobid) def list_jobs(self, args_dict): user = AuthedXMLRPCServer.get_authinfo() return UserInterface.list_jobs(self, args_dict) def detail_job(self, jobid): user = AuthedXMLRPCServer.get_authinfo() return UserInterface.detail_job(self, jobid) def update_builders(self): user = AuthedXMLRPCServer.get_authinfo() if not user or not user.server_admin: return (-1, "Insufficient privileges.",[]) return UserInterface.update_builders(self) def list_builders(self): user = AuthedXMLRPCServer.get_authinfo() return UserInterface.list_builders(self) def targets(self): user = AuthedXMLRPCServer.get_authinfo() return UserInterface.targets(self) def pause(self, paused): user = AuthedXMLRPCServer.get_authinfo() if not user or not user.server_admin: return (-1, "Insufficient privileges.") return UserInterface.pause(self, paused) def finish(self, uid_list): user = AuthedXMLRPCServer.get_authinfo() if not user or not user.own_jobs: return (-1, "Insufficient privileges.") uids = '' for uid in uid_list: uid = validate_uid(uid) if not uid: return (-1, "Error: Invalid job UID.") if len(uids) == 0: uids = uids + "uid=%d" % uid else: uids = uids + " OR uid=%d" % uid sql = 'SELECT uid, username, status FROM jobs WHERE %s' % uids try: dbcx = self._db_manager.dbcx() curs = dbcx.cursor() except StandardError, e: del curs, dbcx return (-1, "Unable to access job database: '%s'" % e) curs.execute(sql) data = dbcx.fetchall(curs) # Ensure that the user can actually finish the jobs they requested final_uid_list = [] error = None for row in data: uid = row['uid'] username = row['username'] status = row['status'] if status != 'needsign' and status != 'failed': error = (-1, "Error: Job %d must be either 'needsign' or 'failed' to finish it." % uid) break # Marking 'needsign' jobs as finished requires admin privs if status == 'needsign' and not user.job_admin: error = (-1, "Insufficient privileges to finish job %d." % uid) break # 'failed' jobs can only be finished by the job owner or a job_admin user if status == 'failed' and user != user.email and not user.job_admin: error = (-1, "Insufficient privileges to finish job %d." % uid) break final_uid_list.append(uid) del curs, dbcx if error: return error return UserInterface.finish(self, final_uid_list) def srpm_upload_dir(self, target_alias): user = AuthedXMLRPCServer.get_authinfo() if not user or not user.own_jobs: return (-1, "Insufficient privileges.") return UserInterface.srpm_upload_dir(self, target_alias) class UserInterfaceNoAuth(UserInterface): """ Allow all operations, NULL authentication """ def kill_job(self, email, jobid): jobid = int(jobid) job = self._bm.get_job(jobid) return self._kill_job(email, job, jobid)