maintainers/scripts/update.nix: avoid deadlock

Use `Process.communicate()` instead of `Process.wait()` to ensure the
`stdin` and `stdout` OS pipe buffers don't get full and cause a deadlock
waiting for the buffers to get emptied.
This commit is contained in:
Michael Hoang 2022-10-08 01:42:18 +11:00 committed by David McFarland
parent 343895ba37
commit e81b7d4f57

View File

@ -12,6 +12,7 @@ import tempfile
class CalledProcessError(Exception): class CalledProcessError(Exception):
process: asyncio.subprocess.Process process: asyncio.subprocess.Process
stderr: Optional[bytes]
class UpdateFailedException(Exception): class UpdateFailedException(Exception):
pass pass
@ -19,20 +20,23 @@ class UpdateFailedException(Exception):
def eprint(*args, **kwargs): def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs) print(*args, file=sys.stderr, **kwargs)
async def check_subprocess(*args, **kwargs): async def check_subprocess_output(*args, **kwargs):
""" """
Emulate check argument of subprocess.run function. Emulate check and capture_output arguments of subprocess.run function.
""" """
process = await asyncio.create_subprocess_exec(*args, **kwargs) process = await asyncio.create_subprocess_exec(*args, **kwargs)
returncode = await process.wait() # We need to use communicate() instead of wait(), as the OS pipe buffers
# can fill up and cause a deadlock.
stdout, stderr = await process.communicate()
if returncode != 0: if process.returncode != 0:
error = CalledProcessError() error = CalledProcessError()
error.process = process error.process = process
error.stderr = stderr
raise error raise error
return process return stdout
async def run_update_script(nixpkgs_root: str, merge_lock: asyncio.Lock, temp_dir: Optional[Tuple[str, str]], package: Dict, keep_going: bool): async def run_update_script(nixpkgs_root: str, merge_lock: asyncio.Lock, temp_dir: Optional[Tuple[str, str]], package: Dict, keep_going: bool):
worktree: Optional[str] = None worktree: Optional[str] = None
@ -43,7 +47,7 @@ async def run_update_script(nixpkgs_root: str, merge_lock: asyncio.Lock, temp_di
worktree, _branch = temp_dir worktree, _branch = temp_dir
# Ensure the worktree is clean before update. # Ensure the worktree is clean before update.
await check_subprocess('git', 'reset', '--hard', '--quiet', 'HEAD', cwd=worktree) await check_subprocess_output('git', 'reset', '--hard', '--quiet', 'HEAD', cwd=worktree)
# Update scripts can use $(dirname $0) to get their location but we want to run # Update scripts can use $(dirname $0) to get their location but we want to run
# their clones in the git worktree, not in the main nixpkgs repo. # their clones in the git worktree, not in the main nixpkgs repo.
@ -52,7 +56,7 @@ async def run_update_script(nixpkgs_root: str, merge_lock: asyncio.Lock, temp_di
eprint(f" - {package['name']}: UPDATING ...") eprint(f" - {package['name']}: UPDATING ...")
try: try:
update_process = await check_subprocess( update_info = await check_subprocess_output(
'env', 'env',
f"UPDATE_NIX_NAME={package['name']}", f"UPDATE_NIX_NAME={package['name']}",
f"UPDATE_NIX_PNAME={package['pname']}", f"UPDATE_NIX_PNAME={package['pname']}",
@ -63,8 +67,6 @@ async def run_update_script(nixpkgs_root: str, merge_lock: asyncio.Lock, temp_di
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
cwd=worktree, cwd=worktree,
) )
update_info = await update_process.stdout.read()
await merge_changes(merge_lock, package, update_info, temp_dir) await merge_changes(merge_lock, package, update_info, temp_dir)
except KeyboardInterrupt as e: except KeyboardInterrupt as e:
eprint('Cancelling…') eprint('Cancelling…')
@ -74,10 +76,9 @@ async def run_update_script(nixpkgs_root: str, merge_lock: asyncio.Lock, temp_di
eprint() eprint()
eprint(f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------") eprint(f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------")
eprint() eprint()
stderr = await e.process.stderr.read() eprint(e.stderr.decode('utf-8'))
eprint(stderr.decode('utf-8'))
with open(f"{package['pname']}.log", 'wb') as logfile: with open(f"{package['pname']}.log", 'wb') as logfile:
logfile.write(stderr) logfile.write(e.stderr)
eprint() eprint()
eprint(f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------") eprint(f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------")
@ -101,14 +102,14 @@ async def commit_changes(name: str, merge_lock: asyncio.Lock, worktree: str, bra
for change in changes: for change in changes:
# Git can only handle a single index operation at a time # Git can only handle a single index operation at a time
async with merge_lock: async with merge_lock:
await check_subprocess('git', 'add', *change['files'], cwd=worktree) await check_subprocess_output('git', 'add', *change['files'], cwd=worktree)
commit_message = '{attrPath}: {oldVersion} -> {newVersion}'.format(**change) commit_message = '{attrPath}: {oldVersion} -> {newVersion}'.format(**change)
if 'commitMessage' in change: if 'commitMessage' in change:
commit_message = change['commitMessage'] commit_message = change['commitMessage']
elif 'commitBody' in change: elif 'commitBody' in change:
commit_message = commit_message + '\n\n' + change['commitBody'] commit_message = commit_message + '\n\n' + change['commitBody']
await check_subprocess('git', 'commit', '--quiet', '-m', commit_message, cwd=worktree) await check_subprocess_output('git', 'commit', '--quiet', '-m', commit_message, cwd=worktree)
await check_subprocess('git', 'cherry-pick', branch) await check_subprocess_output('git', 'cherry-pick', branch)
async def check_changes(package: Dict, worktree: str, update_info: str): async def check_changes(package: Dict, worktree: str, update_info: str):
if 'commit' in package['supportedFeatures']: if 'commit' in package['supportedFeatures']:
@ -129,12 +130,12 @@ async def check_changes(package: Dict, worktree: str, update_info: str):
if 'newVersion' not in changes[0]: if 'newVersion' not in changes[0]:
attr_path = changes[0]['attrPath'] attr_path = changes[0]['attrPath']
obtain_new_version_process = await check_subprocess('nix-instantiate', '--expr', f'with import ./. {{}}; lib.getVersion {attr_path}', '--eval', '--strict', '--json', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=worktree) obtain_new_version_output = await check_subprocess_output('nix-instantiate', '--expr', f'with import ./. {{}}; lib.getVersion {attr_path}', '--eval', '--strict', '--json', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=worktree)
changes[0]['newVersion'] = json.loads((await obtain_new_version_process.stdout.read()).decode('utf-8')) changes[0]['newVersion'] = json.loads(obtain_new_version_output.decode('utf-8'))
if 'files' not in changes[0]: if 'files' not in changes[0]:
changed_files_process = await check_subprocess('git', 'diff', '--name-only', 'HEAD', stdout=asyncio.subprocess.PIPE, cwd=worktree) changed_files_output = await check_subprocess_output('git', 'diff', '--name-only', 'HEAD', stdout=asyncio.subprocess.PIPE, cwd=worktree)
changed_files = (await changed_files_process.stdout.read()).splitlines() changed_files = changed_files_output.splitlines()
changes[0]['files'] = changed_files changes[0]['files'] = changed_files
if len(changed_files) == 0: if len(changed_files) == 0:
@ -176,8 +177,8 @@ async def start_updates(max_workers: int, keep_going: bool, commit: bool, packag
# Do not create more workers than there are packages. # Do not create more workers than there are packages.
num_workers = min(max_workers, len(packages)) num_workers = min(max_workers, len(packages))
nixpkgs_root_process = await check_subprocess('git', 'rev-parse', '--show-toplevel', stdout=asyncio.subprocess.PIPE) nixpkgs_root_output = await check_subprocess_output('git', 'rev-parse', '--show-toplevel', stdout=asyncio.subprocess.PIPE)
nixpkgs_root = (await nixpkgs_root_process.stdout.read()).decode('utf-8').strip() nixpkgs_root = nixpkgs_root_output.decode('utf-8').strip()
# Set up temporary directories when using auto-commit. # Set up temporary directories when using auto-commit.
for i in range(num_workers): for i in range(num_workers):