# This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # copyright 2005 Duke University # # Copyright 2005 Dan Williams and Red Hat, Inc. import time import PackageJob import threading import Repo import copy import Config from plague import DebugUtils class BuildMaster(threading.Thread): MAX_CHECKOUT_JOBS = 5 def __init__(self, builder_manager, db_manager, cfg): self.builder_manager = builder_manager self._db_manager = db_manager self.hostname = cfg.get_str("General", "hostname") self.should_stop = False self._paused = False self._cfg = cfg self._repos = {} repodir = self._cfg.get_str("Directories", "repo_dir") for target_cfg in self._cfg.targets(): repo = Repo.Repo(target_cfg, repodir, builder_manager) target_str = target_cfg.target_string() self._repos[target_str] = repo repo.start() self._done_queue = [] self._done_queue_lock = threading.Lock() self._new_queue = [] self._new_queue_lock = threading.Lock() self._restart_queue = [] self._restart_queue_lock = threading.Lock() self._checkout_wait_queue = [] self._checkout_num = 0 self._checkout_wait_queue_lock = threading.Lock() self._status_updates = {} self._status_updates_lock = threading.Lock() self._archjob_status_updates = {} self._archjob_status_updates_lock = threading.Lock() self._building_jobs = {} self._building_jobs_lock = threading.Lock() threading.Thread.__init__(self) self.setName("BuildMaster") def _cleanup(self): del self._cursor, self._dbcx def _requeue_interrupted_jobs(self): """ Restart interrupted jobs from our db. """ self._cursor.execute("SELECT uid FROM jobs WHERE (status!='needsign' AND status!='failed' AND status!='finished') ORDER BY uid") uids = self._dbcx.fetchall(self._cursor) if len(uids) == 0: return for item in uids: self.requeue_job(item['uid']) def requeue_job(self, uid): self._restart_queue_lock.acquire() self._restart_queue.append(int(uid)) self._restart_queue_lock.release() def _start_requeued_jobs(self): uids = '' self._restart_queue_lock.acquire() for uid in self._restart_queue: if len(uids) == 0: uids = uids + "uid=%d" % uid else: uids = uids + " OR uid=%d" % uid self._restart_queue = [] self._restart_queue_lock.release() if len(uids) == 0: return self._cursor.execute('SELECT * FROM jobs WHERE %s ORDER BY uid' % uids) jobs = self._dbcx.fetchall(self._cursor) if len(jobs) == 0: return for row in jobs: uid = row['uid'] # Kill any archjobs that are left around self._cursor.execute('DELETE FROM archjobs WHERE parent_uid=%d' % uid) self._dbcx.commit() # Clear out old time/result information self._cursor.execute("UPDATE jobs SET starttime=%d, endtime=0, " \ "result_msg='' WHERE uid=%d" % (time.time(), uid)) self._dbcx.commit() # Now requeue the job target_str = Config.make_target_string(row['target_distro'], row['target_target'], row['target_repo']) try: repo = self._repos[target_str] except KeyError: print "%s (%s): Target '%s' not found." % (uid, row['package'], target_str) else: job = PackageJob.PackageJob(uid, row['username'], row['package'], row['source'], repo, self) print "%s (%s): Restarting '%s' on target '%s'" % (uid, row['package'], row['source'], target_str) self._building_jobs_lock.acquire() self._building_jobs[uid] = job self._building_jobs_lock.release() def stop(self): self.should_stop = True for repo in self._repos.values(): repo.stop() def create_job_request(self, email, package, source, target_dict, buildreq, ctime): req = {} req['email'] = email req['package'] = package req['target_distro'] = target_dict['distro'] req['target_target'] = target_dict['target'] req['target_repo'] = target_dict['repo'] req['buildreq'] = buildreq req['time'] = ctime req['source'] = source req['uid_avail'] = False req['uid'] = -1 return req def enqueue(self, req): self._new_queue_lock.acquire() self._new_queue.append(req) self._new_queue_lock.release() def queue_job_status_update(self, uid, attrdict): self._status_updates_lock.acquire() lcl_uid = copy.copy(uid) lcl_attrdict = copy.deepcopy(attrdict) self._status_updates[lcl_uid] = lcl_attrdict self._status_updates_lock.release() def queue_archjob_status_update(self, uid, attrdict): self._archjob_status_updates_lock.acquire() lcl_uid = copy.copy(uid) lcl_attrdict = copy.deepcopy(attrdict) self._archjob_status_updates[lcl_uid] = lcl_attrdict self._archjob_status_updates_lock.release() def queue_checkout_wait(self, job): self._checkout_wait_queue_lock.acquire() self._checkout_wait_queue.append(job) self._checkout_wait_queue_lock.release() def notify_checkout_done(self, job): self._checkout_wait_queue_lock.acquire() self._checkout_num = self._checkout_num - 1 self._checkout_wait_queue_lock.release() def _process_checkout_wait_jobs(self): self._checkout_wait_queue_lock.acquire() # We allow only 5 jobs at a time in checkout stage allowed_jobs = min(self.MAX_CHECKOUT_JOBS - self._checkout_num, len(self._checkout_wait_queue)) for i in range(allowed_jobs): job = self._checkout_wait_queue[i] self._checkout_num = self._checkout_num + 1 job.checkout_wait_done_callback() self._checkout_wait_queue = self._checkout_wait_queue[allowed_jobs:] self._checkout_wait_queue_lock.release() def notify_job_done(self, job): self._done_queue_lock.acquire() self._done_queue.append(job) self._done_queue_lock.release() def _process_finished_jobs(self): self._done_queue_lock.acquire() for job in self._done_queue: uid = job.get_uid() # Write any existing status update first self._status_updates_lock.acquire() if self._status_updates.has_key(uid): self._write_job_status_to_db(uid, self._status_updates[uid]) del self._status_updates[uid] self._status_updates_lock.release() attrdict = {} attrdict['status'] = job.cur_stage() attrdict['result'] = job.result() self._write_job_status_to_db(uid, attrdict) # Update job end time try: self._cursor.execute('UPDATE jobs SET endtime=%d WHERE uid=%d' % (job.endtime, uid)) except StandardError, exc: print "DB Error: could not access jobs database. Reason: '%s'" % exc self._dbcx.commit() print "%s (%s): Job finished." % (uid, job.package) self._building_jobs_lock.acquire() self._building_jobs[uid] = None self._building_jobs_lock.release() self._done_queue = [] self._done_queue_lock.release() def _write_job_status_to_db(self, uid, attrdict): sql = "status='%s'" % attrdict['status'] if attrdict.has_key('epoch') and attrdict.has_key('version') and attrdict.has_key('release'): sql = sql + ", rpm_epoch='%s', rpm_version='%s', rpm_release='%s'" % (attrdict['epoch'], attrdict['version'], attrdict['release']) if attrdict.has_key('result_msg'): import urllib sql = sql + ", result_msg='%s'" % (urllib.quote(attrdict['result_msg'])) if attrdict.has_key('result'): sql = sql + ", result='%s'" % attrdict['result'] sql = 'UPDATE jobs SET ' + sql + ' WHERE uid=%d' % uid try: self._cursor.execute(sql) except StandardError, exc: print "DB Error: could not access jobs database. Reason: '%s'" % exc self._dbcx.commit() def _write_archjob_status_to_db(self, uid, attrdict): self._cursor.execute("SELECT parent_uid FROM archjobs WHERE jobid='%s'" % uid) if len(self._dbcx.fetchall(self._cursor)) == 0: try: self._cursor.execute("INSERT INTO archjobs (jobid, parent_uid, starttime, " \ "endtime, arch, builder_addr, status, builder_status) " \ "VALUES ('%s', %d, %d, %d, '%s', '%s', '%s', '%s')" % (uid, attrdict['parent_uid'], \ attrdict['starttime'], attrdict['endtime'], attrdict['arch'], \ attrdict['builder_addr'], attrdict['status'], attrdict['builder_status'])) except StandardError, exc: print "DB Error: could not access jobs database. Reason: '%s'" % exc else: try: self._cursor.execute("UPDATE archjobs SET status='%s', " \ "builder_addr='%s', builder_status='%s', endtime=%d " \ "WHERE jobid='%s' AND parent_uid=%d" % (attrdict['status'], \ attrdict['builder_addr'], attrdict['builder_status'], \ attrdict['endtime'], uid, attrdict['parent_uid'])) except StandardError, exc: print "DB Error: could not access jobs database. Reason: '%s'" % exc self._dbcx.commit() def _save_job_status(self): # Write new job status to the database self._status_updates_lock.acquire() for uid in self._status_updates.keys(): self._write_job_status_to_db(uid, self._status_updates[uid]) self._status_updates = {} self._status_updates_lock.release() self._archjob_status_updates_lock.acquire() for uid in self._archjob_status_updates.keys(): self._write_archjob_status_to_db(uid, self._archjob_status_updates[uid]) self._archjob_status_updates = {} self._archjob_status_updates_lock.release() def _start_new_jobs(self): self._new_queue_lock.acquire() uid_field = self._dbcx.get_uid_field_for_insert() uid_value = self._dbcx.get_uid_value_for_insert() for item in self._new_queue: self._cursor.execute("INSERT INTO jobs (%s username, package," \ " source, target_distro, target_target, target_repo, buildreq," \ " starttime, endtime, status, result)" \ " VALUES (%s '%s', '%s', '%s', '%s', '%s', '%s', '%s', %d, 0, '%s', '')" \ % (uid_field, uid_value, item['email'], item['package'], item['source'], item['target_distro'], \ item['target_target'], item['target_repo'], item['buildreq'], \ item['time'], 'initialize')) self._dbcx.commit() # Find the UID self._cursor.execute("SELECT uid FROM jobs WHERE username='%s' AND" \ " package='%s' AND source='%s' AND target_distro='%s' AND" \ " target_target='%s' AND target_repo = '%s' AND" \ " buildreq='%s' AND starttime=%d AND status='initialize'" \ % (item['email'], item['package'], item['source'], \ item['target_distro'], item['target_target'], item['target_repo'], \ item['buildreq'], item['time'])) data = self._dbcx.fetchall(self._cursor) # If two of the same job are submitted close together, we need # to make sure we pick the last result to get the correct one row = data[len(data) - 1] target_str = Config.make_target_string(item['target_distro'], item['target_target'], item['target_repo']) repo = self._repos[target_str] job = PackageJob.PackageJob(row['uid'], item['email'], item['package'], item['source'], repo, self) print "%s (%s): Starting tag '%s' on target '%s'" % (row['uid'], \ item['package'], item['source'], target_str) item['uid'] = row['uid'] item['uid_avail'] = True self._building_jobs_lock.acquire() self._building_jobs[row['uid']] = job self._building_jobs_lock.release() self._new_queue = [] self._new_queue_lock.release() def _have_work(self): have_work = False if not self._paused: if not have_work: self._new_queue_lock.acquire() if len(self._new_queue) > 0: have_work = True self._new_queue_lock.release() if not have_work: self._restart_queue_lock.acquire() if len(self._restart_queue) > 0: have_work = True self._restart_queue_lock.release() if not have_work: self._checkout_wait_queue_lock.acquire() if (self._checkout_num < self.MAX_CHECKOUT_JOBS) and len(self._checkout_wait_queue) > 0: have_work = True self._checkout_wait_queue_lock.release() if not have_work: self._status_updates_lock.acquire() if len(self._status_updates) > 0: have_work = True self._status_updates_lock.release() if not have_work: self._archjob_status_updates_lock.acquire() if len(self._archjob_status_updates) > 0: have_work = True self._archjob_status_updates_lock.release() return have_work def get_job(self, uid): self._building_jobs_lock.acquire() try: ret_job = self._building_jobs[uid] except KeyError: ret_job = None self._building_jobs_lock.release() return ret_job def pause(self, paused): self._paused = paused def is_paused(self): return self._paused def run(self): DebugUtils.registerThreadName(self) self._dbcx = self._db_manager.dbcx() self._cursor = self._dbcx.cursor() self._requeue_interrupted_jobs() while self.should_stop == False: # Write update status for jobs to the database self._save_job_status() # Clean up jobs that have finished self._process_finished_jobs() if not self._paused: # Let a few jobs through the checkout_wait gate if needed self._process_checkout_wait_jobs() # Start any new jobs self._start_new_jobs() self._start_requeued_jobs() last_time = time.time() while not self._have_work() and time.time() <= last_time + 5: time.sleep(0.25) self._cleanup()