From fbab6065d44072d33b2fbe61f604f24397ea2de4 Mon Sep 17 00:00:00 2001 From: Mike Frysinger Date: Tue, 16 Feb 2021 13:51:44 -0500 Subject: [PATCH] forall: rewrite parallel logic This fixes intermingling of parallel jobs and simplifies the code by switching to subprocess.run. This also provides stable output in the order of projects by returning the output as a string that the main loop outputs. This drops support for interactive commands, but it's unclear if anyone was relying on that, and the default behavior (-j2) made that unreliable. If it turns out someone still wants this, we can look at readding it. Change-Id: I7555b4e7a15aad336667292614f730fb7a90bd26 Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/297482 Reviewed-by: Chris Mcdonald Tested-by: Mike Frysinger --- subcmds/forall.py | 104 +++++++++++++++++----------------------------- 1 file changed, 39 insertions(+), 65 deletions(-) diff --git a/subcmds/forall.py b/subcmds/forall.py index b874b6d..4ea7db6 100644 --- a/subcmds/forall.py +++ b/subcmds/forall.py @@ -13,6 +13,7 @@ # limitations under the License. import errno +import io import multiprocessing import re import os @@ -22,7 +23,6 @@ import subprocess from color import Coloring from command import DEFAULT_LOCAL_JOBS, Command, MirrorSafeCommand, WORKER_BATCH_SIZE -import platform_utils _CAN_COLOR = [ 'branch', @@ -241,7 +241,18 @@ without iterating through the remaining projects. DoWorkWrapper, self.ProjectArgs(projects, mirror, opt, cmd, shell, config), chunksize=WORKER_BATCH_SIZE) - for r in results_it: + first = True + for (r, output) in results_it: + if output: + if first: + first = False + elif opt.project_header: + print() + # To simplify the DoWorkWrapper, take care of automatic newlines. + end = '\n' + if output[-1] == '\n': + end = '' + print(output, end=end) rc = rc or r if r != 0 and opt.abort_on_errors: raise Exception('Aborting due to previous error') @@ -326,73 +337,36 @@ def DoWork(project, mirror, opt, cmd, shell, cnt, config): # Allow the user to silently ignore missing checkouts so they can run on # partial checkouts (good for infra recovery tools). if opt.ignore_missing: - return 0 + return (0, '') + + output = '' if ((opt.project_header and opt.verbose) or not opt.project_header): - print('skipping %s/' % project['relpath'], file=sys.stderr) - return 1 + output = 'skipping %s/' % project['relpath'] + return (1, output) - if opt.project_header: - stdin = subprocess.PIPE - stdout = subprocess.PIPE - stderr = subprocess.PIPE + if opt.verbose: + stderr = subprocess.STDOUT else: - stdin = None - stdout = None - stderr = None + stderr = subprocess.DEVNULL - p = subprocess.Popen(cmd, - cwd=cwd, - shell=shell, - env=env, - stdin=stdin, - stdout=stdout, - stderr=stderr) + result = subprocess.run( + cmd, cwd=cwd, shell=shell, env=env, check=False, + encoding='utf-8', errors='replace', + stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=stderr) + output = result.stdout if opt.project_header: - out = ForallColoring(config) - out.redirect(sys.stdout) - empty = True - errbuf = '' - - p.stdin.close() - s_in = platform_utils.FileDescriptorStreams.create() - s_in.add(p.stdout, sys.stdout, 'stdout') - s_in.add(p.stderr, sys.stderr, 'stderr') - - while not s_in.is_done: - in_ready = s_in.select() - for s in in_ready: - buf = s.read().decode() - if not buf: - s_in.remove(s) - s.close() - continue - - if not opt.verbose: - if s.std_name == 'stderr': - errbuf += buf - continue - - if empty and out: - if not cnt == 0: - out.nl() - - if mirror: - project_header_path = project['name'] - else: - project_header_path = project['relpath'] - out.project('project %s/', project_header_path) - out.nl() - out.flush() - if errbuf: - sys.stderr.write(errbuf) - sys.stderr.flush() - errbuf = '' - empty = False - - s.dest.write(buf) - s.dest.flush() - - r = p.wait() - return r + if output: + buf = io.StringIO() + out = ForallColoring(config) + out.redirect(buf) + if mirror: + project_header_path = project['name'] + else: + project_header_path = project['relpath'] + out.project('project %s/' % project_header_path) + out.nl() + buf.write(output) + output = buf.getvalue() + return (result.returncode, output)