diff --git a/progress.py b/progress.py index ae79748..de46f53 100644 --- a/progress.py +++ b/progress.py @@ -42,12 +42,12 @@ def duration_str(total): class Progress(object): - def __init__(self, title, total=0, units='', print_newline=False): + def __init__(self, title, total=0, units='', print_newline=False, delay=True): self._title = title self._total = total self._done = 0 self._start = time() - self._show = False + self._show = not delay self._units = units self._print_newline = print_newline # Only show the active jobs section if we run more than one in parallel. diff --git a/subcmds/sync.py b/subcmds/sync.py index b14ad24..bf1369c 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py @@ -45,11 +45,6 @@ except ImportError: def _rlimit_nofile(): return (256, 256) -try: - import multiprocessing -except ImportError: - multiprocessing = None - import event_log from git_command import GIT, git_require from git_config import GetUrlCookieFile @@ -69,10 +64,6 @@ from manifest_xml import GitcManifest _ONE_DAY_S = 24 * 60 * 60 -class _FetchError(Exception): - """Internal error thrown in _FetchHelper() when we don't want stack trace.""" - - class Sync(Command, MirrorSafeCommand): jobs = 1 common = True @@ -315,148 +306,119 @@ later is required to fix a server side protocol bug. self._ReloadManifest(manifest_path) return manifest_path - def _FetchProjectList(self, opt, projects, sem, *args, **kwargs): - """Main function of the fetch threads. + def _FetchProjectList(self, opt, projects): + """Main function of the fetch worker. + + The projects we're given share the same underlying git object store, so we + have to fetch them in serial. Delegates most of the work to _FetchHelper. Args: opt: Program options returned from optparse. See _Options(). projects: Projects to fetch. - sem: We'll release() this semaphore when we exit so that another thread - can be started up. - *args, **kwargs: Remaining arguments to pass to _FetchHelper. See the - _FetchHelper docstring for details. """ - try: - for project in projects: - success = self._FetchHelper(opt, project, *args, **kwargs) - if not success and opt.fail_fast: - break - finally: - sem.release() + return [self._FetchOne(opt, x) for x in projects] - def _FetchHelper(self, opt, project, lock, fetched, pm, err_event, - clone_filter): + def _FetchOne(self, opt, project): """Fetch git objects for a single project. Args: opt: Program options returned from optparse. See _Options(). project: Project object for the project to fetch. - lock: Lock for accessing objects that are shared amongst multiple - _FetchHelper() threads. - fetched: set object that we will add project.gitdir to when we're done - (with our lock held). - pm: Instance of a Project object. We will call pm.update() (with our - lock held). - err_event: We'll set this event in the case of an error (after printing - out info about the error). - clone_filter: Filter for use in a partial clone. Returns: Whether the fetch was successful. """ - # We'll set to true once we've locked the lock. - did_lock = False - - # Encapsulate everything in a try/except/finally so that: - # - We always set err_event in the case of an exception. - # - We always make sure we unlock the lock if we locked it. start = time.time() success = False buf = io.StringIO() - with lock: - pm.start(project.name) try: - try: - success = project.Sync_NetworkHalf( - quiet=opt.quiet, - verbose=opt.verbose, - output_redir=buf, - current_branch_only=self._GetCurrentBranchOnly(opt), - force_sync=opt.force_sync, - clone_bundle=opt.clone_bundle, - tags=opt.tags, archive=self.manifest.IsArchive, - optimized_fetch=opt.optimized_fetch, - retry_fetches=opt.retry_fetches, - prune=opt.prune, - clone_filter=clone_filter) - self._fetch_times.Set(project, time.time() - start) + success = project.Sync_NetworkHalf( + quiet=opt.quiet, + verbose=opt.verbose, + output_redir=buf, + current_branch_only=self._GetCurrentBranchOnly(opt), + force_sync=opt.force_sync, + clone_bundle=opt.clone_bundle, + tags=opt.tags, archive=self.manifest.IsArchive, + optimized_fetch=opt.optimized_fetch, + retry_fetches=opt.retry_fetches, + prune=opt.prune, + clone_filter=self.manifest.CloneFilter) - # Lock around all the rest of the code, since printing, updating a set - # and Progress.update() are not thread safe. - lock.acquire() - did_lock = True + output = buf.getvalue() + if opt.verbose and output: + print('\n' + output.rstrip()) - output = buf.getvalue() - if opt.verbose and output: - pm.update(inc=0, msg=output.rstrip()) + if not success: + print('error: Cannot fetch %s from %s' + % (project.name, project.remote.url), + file=sys.stderr) + except Exception as e: + print('error: Cannot fetch %s (%s: %s)' + % (project.name, type(e).__name__, str(e)), file=sys.stderr) + raise - if not success: - err_event.set() - print('error: Cannot fetch %s from %s' - % (project.name, project.remote.url), - file=sys.stderr) - if opt.fail_fast: - raise _FetchError() - - fetched.add(project.gitdir) - except _FetchError: - pass - except Exception as e: - print('error: Cannot fetch %s (%s: %s)' - % (project.name, type(e).__name__, str(e)), file=sys.stderr) - err_event.set() - raise - finally: - if not did_lock: - lock.acquire() - pm.finish(project.name) - lock.release() - finish = time.time() - self.event_log.AddSync(project, event_log.TASK_SYNC_NETWORK, - start, finish, success) - - return success + finish = time.time() + return (success, project, start, finish) def _Fetch(self, projects, opt, err_event): + ret = True + fetched = set() - lock = _threading.Lock() - pm = Progress('Fetching', len(projects)) + pm = Progress('Fetching', len(projects), delay=False) objdir_project_map = dict() for project in projects: objdir_project_map.setdefault(project.objdir, []).append(project) + projects_list = list(objdir_project_map.values()) - threads = set() - sem = _threading.Semaphore(self.jobs) - for project_list in objdir_project_map.values(): - # Check for any errors before running any more tasks. - # ...we'll let existing threads finish, though. - if err_event.is_set() and opt.fail_fast: - break + def _ProcessResults(results_sets): + ret = True + for results in results_sets: + for (success, project, start, finish) in results: + self._fetch_times.Set(project, finish - start) + self.event_log.AddSync(project, event_log.TASK_SYNC_NETWORK, + start, finish, success) + # Check for any errors before running any more tasks. + # ...we'll let existing jobs finish, though. + if not success: + ret = False + else: + fetched.add(project.gitdir) + pm.update(msg=project.name) + if not ret and opt.fail_fast: + break + return ret - sem.acquire() - kwargs = dict(opt=opt, - projects=project_list, - sem=sem, - lock=lock, - fetched=fetched, - pm=pm, - err_event=err_event, - clone_filter=self.manifest.CloneFilter) - if self.jobs > 1: - t = _threading.Thread(target=self._FetchProjectList, - kwargs=kwargs) - # Ensure that Ctrl-C will not freeze the repo process. - t.daemon = True - threads.add(t) - t.start() + # NB: Multiprocessing is heavy, so don't spin it up for one job. + if len(projects_list) == 1 or opt.jobs == 1: + if not _ProcessResults(self._FetchProjectList(opt, x) for x in projects_list): + ret = False + else: + # Favor throughput over responsiveness when quiet. It seems that imap() + # will yield results in batches relative to chunksize, so even as the + # children finish a sync, we won't see the result until one child finishes + # ~chunksize jobs. When using a large --jobs with large chunksize, this + # can be jarring as there will be a large initial delay where repo looks + # like it isn't doing anything and sits at 0%, but then suddenly completes + # a lot of jobs all at once. Since this code is more network bound, we + # can accept a bit more CPU overhead with a smaller chunksize so that the + # user sees more immediate & continuous feedback. + if opt.quiet: + chunksize = WORKER_BATCH_SIZE else: - self._FetchProjectList(**kwargs) - - for t in threads: - t.join() + pm.update(inc=0, msg='warming up') + chunksize = 4 + with multiprocessing.Pool(opt.jobs) as pool: + results = pool.imap_unordered( + functools.partial(self._FetchProjectList, opt), + projects_list, + chunksize=chunksize) + if not _ProcessResults(results): + ret = False + pool.close() pm.end() self._fetch_times.Save() @@ -464,7 +426,7 @@ later is required to fix a server side protocol bug. if not self.manifest.IsArchive: self._GCProjects(projects, opt, err_event) - return fetched + return (ret, fetched) def _CheckoutOne(self, opt, project): """Checkout work tree for one project @@ -514,7 +476,7 @@ later is required to fix a server side protocol bug. self.event_log.AddSync(project, event_log.TASK_SYNC_LOCAL, start, finish, success) # Check for any errors before running any more tasks. - # ...we'll let existing threads finish, though. + # ...we'll let existing jobs finish, though. if not success: err_results.append(project.relpath) if opt.fail_fast: @@ -894,7 +856,9 @@ later is required to fix a server side protocol bug. to_fetch.extend(all_projects) to_fetch.sort(key=self._fetch_times.Get, reverse=True) - fetched = self._Fetch(to_fetch, opt, err_event) + success, fetched = self._Fetch(to_fetch, opt, err_event) + if not success: + err_event.set() _PostRepoFetch(rp, opt.repo_verify) if opt.network_only: @@ -923,7 +887,10 @@ later is required to fix a server side protocol bug. if previously_missing_set == missing_set: break previously_missing_set = missing_set - fetched.update(self._Fetch(missing, opt, err_event)) + success, new_fetched = self._Fetch(to_fetch, opt, err_event) + if not success: + err_event.set() + fetched.update(new_fetched) # If we saw an error, exit with code 1 so that other scripts can check. if err_event.is_set():