# This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # Copyright 2005 Dan Williams and Red Hat, Inc. import os import threading import shutil import time import commands import popen2 import fcntl import stat import EmailUtils from plague import DebugUtils # Lockfile used by external scripts to ensure mutual exclusion # from concurrent access to the repository's directory REPO_LOCKFILE_NAME = ".repo-update.lock" class Repo(threading.Thread): """ Represents an on-disk repository of RPMs and manages updates to the repo. """ MAX_DEPSOLVE_JOBS = 4 def __init__(self, target_cfg, repodir, builder_manager): self._builder_manager = builder_manager self._target_cfg = target_cfg self._target_dict = target_cfg.target_dict() if not os.path.exists(repodir): print "Error: Repository directory '%s' does not exist." % repodir os._exit(1) # Ensure this repo's work directories are created ### Base repo dir target_str = self._target_cfg.target_string() self._repodir = os.path.join(repodir, target_str) if not os.path.exists(self._repodir): os.makedirs(self._repodir) self._lockfile_path = os.path.join(self._repodir, REPO_LOCKFILE_NAME) ### createrepo "cache" dir self._repo_cache_dir = os.path.join(repodir, "cache", target_str) if not os.path.exists(self._repo_cache_dir): os.makedirs(self._repo_cache_dir) ### SRPM HTTP upload dir parent_cfg = self._target_cfg.parent_cfg() upload_dir = os.path.join(parent_cfg.get_str("Directories", "server_work_dir"), "srpm_upload_dir", target_str) if not os.path.exists(upload_dir): os.makedirs(upload_dir) os.chmod(upload_dir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) self._lock = threading.Lock() self._repo_additions = [] self._lock_count = 0 self._stop = False # We want to execute a job's first depsolve right away, but # if that one fails subsequent depsolves should only happen # when the repo gets updated since that's when the deps might # have changed. # # The queues are dicts mapping PackageJob->boolean, where a boolean # value of True means the job's depsolve has started, and false # means it hasn't. self._depsolve_lock = threading.Lock() self._depsolve_immediate_queue = {} self._depsolve_again_queue = {} # Repo unlock queue self._repo_unlock_lock = threading.Lock() self._repo_unlock_queue = [] # Repo script stuff self._pobj = None self._repo_script_start = 0 self._repo_script = None script = self._target_cfg.get_str("General", "repo_script") if len(script): self._repo_script = script threading.Thread.__init__(self) self.setName("Repo: %s" % target_str) def target_cfg(self): return self._target_cfg def request_copy(self, package_job): """ Registers a PackageJob object that has files to copy to the repo """ self._lock.acquire() self._repo_additions.append(package_job) # We enter lock level 1 here, preventing builders from # starting their 'prep' state and jobs from depsolving if self._lock_count == 0: self._lock_count = 1 self._lock.release() def request_depsolve(self, package_job, first_try=False): """ Registers a PackageJob be notified to start depsolving when the repo is ready """ self._depsolve_lock.acquire() if first_try: self._depsolve_immediate_queue[package_job] = False else: self._depsolve_again_queue[package_job] = False self._depsolve_lock.release() def request_unlock(self, archjob): self._repo_unlock_lock.acquire() self._repo_unlock_queue.append(archjob) self._repo_unlock_lock.release() def cancel_unlock_request(self, archjob): self._repo_unlock_lock.acquire() if archjob in self._repo_unlock_queue: self._repo_unlock_queue.remove(archjob) self._repo_unlock_lock.release() def _process_unlock_requests(self): self._repo_unlock_lock.acquire() for archjob in self._repo_unlock_queue: archjob.repo_unlocked_callback() self._repo_unlock_queue = [] self._repo_unlock_lock.release() def _start_depsolves_for_queue(self, queue, max_jobs, first_try=False): num = 0 for job in queue: if queue[job]: num = num + 1 available = max(max_jobs - num, 0) if available > 0: for job in queue.keys(): if available <= 0: break if not queue[job]: queue[job] = True job.start_depsolve(first_try) available = available - 1 def _start_waiting_depsolves(self, repo_changed=False): """ Start waiting depsolves, but only a certain number to avoid nailing the build server too hard. """ self._depsolve_lock.acquire() max_immediate_depsolves = self.MAX_DEPSOLVE_JOBS max_again_depsolves = 0 if repo_changed: max_again_depsolves = self.MAX_DEPSOLVE_JOBS / 2 max_immediate_depsolves = self.MAX_DEPSOLVE_JOBS / 2 self._start_depsolves_for_queue(self._depsolve_immediate_queue, max_immediate_depsolves, first_try=True) # Only fire off non-first-try depsolves if the repo has changed if repo_changed: self._start_depsolves_for_queue(self._depsolve_again_queue, max_again_depsolves, first_try=False) self._depsolve_lock.release() def notify_depsolve_done(self, package_job): """ Notifies the repo that a job is done depsolving """ self._depsolve_lock.acquire() if package_job in self._depsolve_immediate_queue: del self._depsolve_immediate_queue[package_job] elif package_job in self._depsolve_again_queue: del self._depsolve_again_queue[package_job] self._depsolve_lock.release() def _any_depsolving_jobs(self): """ Determines if any jobs are currently depsolving """ any_depsolving = False self._depsolve_lock.acquire() for job in self._depsolve_immediate_queue.keys(): if self._depsolve_immediate_queue[job]: any_depsolving = True break if not any_depsolving: for job in self._depsolve_again_queue.keys(): if self._depsolve_again_queue[job]: any_depsolving = True break self._depsolve_lock.release() return any_depsolving def _update_repo(self): """ Copy new RPMS to the repo, and updates the repo at the end """ target_string = self._target_cfg.target_string() lockfile = None locked = False # Try to open the lockfile, creating it if it doesn't exist try: lockfile = open(self._lockfile_path, 'w') except IOError, (errno, strerr): print "Repo Error (%s): opening lockfile %s failed. Output: (errno %d) '%s'" % (target_string, self._lockfile_path, errno, strerr) if lockfile: try: rc = fcntl.flock(lockfile, fcntl.LOCK_EX) locked = True except IOError, (errno, strerr): print "Repo Error (%s): locking repodir with %s failed. Output: (errno %d) '%s'" % (target_string, self._lockfile_path, errno, strerr) for package_job in self._repo_additions: # Ensure all the files are accessible success = True bad_file = None for src in package_job.repofiles.keys(): if not os.path.exists(src) or not os.access(src, os.R_OK): success = False bad_file = src if success: for src in package_job.repofiles.keys(): dst = package_job.repofiles[src] if not os.path.exists(os.path.dirname(dst)): os.makedirs(os.path.dirname(dst)) file_in_dst = os.path.join(os.path.dirname(dst), os.path.basename(src)) if src.endswith(".src.rpm"): # Only copy SRPMs to the repo dir if there's not already one there if not os.path.exists(file_in_dst): shutil.copy(src, file_in_dst) else: shutil.copy(src, file_in_dst) # Notify the build job that we've copied its files to the repo package_job.repo_add_callback(success, bad_file) self._repo_additions = [] (status, output) = commands.getstatusoutput('/usr/bin/createrepo -q -c %s -x "*.src.rpm" -x "*-debuginfo-*" %s' % (self._repo_cache_dir, self._repodir)) if status != 0: print "Repo Error (%s): createrepo failed with exit status %d! Output: '%s'" % (target_string, status, output) # Unlock repo lockfile if lockfile and locked: fcntl.flock(lockfile, fcntl.LOCK_UN) if lockfile: lockfile.close() del target_string def _start_repo_script(self): target_str = self._target_cfg.target_string() cmd = "%s %s" % (self._repo_script, target_str) print "Repo Error (%s): executing repository script %s" % (target_str, self._repo_script) self._pobj = popen2.Popen4(cmd=cmd) fcntl.fcntl(self._pobj.fromchild.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) self._repo_script_start = time.time() def _email_repo_script_failure(self, subject): server_cfg = self._target_cfg.parent_cfg() admins = server_cfg.get_list("Email", "admin_emails") sender = server_cfg.get_str("Email", "email_from") msg = self._get_repo_script_output() for addr in admins: EmailUtils.email_result(sender, addr, msg, subject) def _get_repo_script_output(self): output = "" while True: try: string = os.read(self._pobj.fromchild.fileno(), 1024) if not len(string): break except OSError: break output = output + string return output def _monitor_repo_script(self): target_str = self._target_cfg.target_string() unlock = False exit_status = self._pobj.poll() if exit_status == 0: print "Repo '%s': repo script %s done." % (target_str, self._repo_script) unlock = True elif exit_status > 0: subj = "Repo '%s': repo script %s exited with error: %d." % (target_str, self._repo_script, exit_status) self._email_repo_script_failure(subj) print subj unlock = True else: # If the repo script has been going for more than an hour, kill it if time.time() > self._repo_script_start + (60 * 60): try: os.kill(self._pobj.pid, 9) except OSError: pass subj = "Repo '%s': repo script %s timed out and was killed." % (target_str, self._repo_script) self._email_repo_script_failure(subj) print subj unlock = True if unlock: self._repo_script_start = 0 self._lock_count = 0 self._pobj = None def run(self): DebugUtils.registerThreadName(self) repo_changed = False while self._stop == False: # We have 3 locked operations, signified by self._lock_count: # # 0 - repo unlocked # 1 - entered when jobs request packages to be copied to the repo; # builders blocked from entering the 'prep' state # 2 - entered when no builders are currently in the 'prep' state; # packages copied to repo and createrepo is run # 3 - entered when createrepo is done; repo script run self._lock.acquire() if self._lock_count == 0: # Notify archjobs that the repo is unlocked self._process_unlock_requests() # Kick off depsolves self._start_waiting_depsolves(repo_changed) if repo_changed: repo_changed = False elif self._lock_count == 1: # Enter lock level 2 if there are no builders in the # 'prep' state, no jobs are depsolving, and we are already at lock level 1 prepping_builders = self._builder_manager.any_prepping_builders() depsolving_jobs = self._any_depsolving_jobs() if not prepping_builders and not depsolving_jobs: self._lock_count = 2 elif self._lock_count == 2: # Level 2: update the repo target_str = self._target_cfg.target_string() print "Repo '%s': updating repository metadata..." % target_str self._update_repo() repo_changed = True print "Repo '%s': Done updating." % target_str # Run the repo script, if any self._pobj = None if self._repo_script: self._start_repo_script() self._lock_count = 3 elif self._lock_count == 3: # Level 3: monitor the repo script if self._pobj: self._monitor_repo_script() else: # If for some reason self._pobj is None, unlock the repo self._lock_count = 0 self._lock.release() time.sleep(5) # done print "Repo (%s): shut down." % self._target_cfg.target_string() def stop(self): self._stop = True