Update zsh-async to 1.5.0 (#271)

This commit is contained in:
Mathias Fredriksson
2017-01-17 12:46:32 +02:00
committed by Sindre Sorhus
parent ee0c6a445a
commit 6ec0029ac5

323
async.zsh
View File

@@ -3,76 +3,91 @@
# #
# zsh-async # zsh-async
# #
# version: 1.3.1 # version: 1.5.0
# author: Mathias Fredriksson # author: Mathias Fredriksson
# url: https://github.com/mafredri/zsh-async # url: https://github.com/mafredri/zsh-async
# #
# Produce debug output from zsh-async when set to 1.
ASYNC_DEBUG=${ASYNC_DEBUG:-0}
# Wrapper for jobs executed by the async worker, gives output in parseable format with execution time # Wrapper for jobs executed by the async worker, gives output in parseable format with execution time
_async_job() { _async_job() {
# Disable xtrace as it would mangle the output.
setopt localoptions noxtrace
# Store start time as double precision (+E disables scientific notation) # Store start time as double precision (+E disables scientific notation)
float -F duration=$EPOCHREALTIME float -F duration=$EPOCHREALTIME
# Run the command # Run the command and capture both stdout (`eval`) and stderr (`cat`) in
# # separate subshells. When the command is complete, we grab write lock
# What is happening here is that we are assigning stdout, stderr and ret to # (mutex token) and output everything except stderr inside the command
# variables, and then we are printing out the variable assignment through # block, after the command block has completed, the stdin for `cat` is
# typeset -p. This way when we run eval we get something along the lines of: # closed, causing stderr to be appended with a $'\0' at the end to mark the
# eval " # end of output from this job.
# typeset stdout=' M async.test.sh\n M async.zsh' local stdout stderr ret tok
# typeset ret=0 {
# typeset stderr='' stdout=$(eval "$@")
# " ret=$?
unset stdout stderr ret duration=$(( EPOCHREALTIME - duration )) # Calculate duration.
eval "$(
{
stdout=$(eval '$@')
ret=$?
typeset -p stdout ret
} 2> >(stderr=$(cat); typeset -p stderr)
)"
# Calculate duration # Grab mutex lock, stalls until token is available.
duration=$(( EPOCHREALTIME - duration )) read -r -k 1 -p tok || exit 1
# stip all null-characters from stdout and stderr # Return output (<job_name> <return_code> <stdout> <duration> <stderr>).
stdout=${stdout//$'\0'/} print -r -n - ${(q)1} $ret ${(q)stdout} $duration
stderr=${stderr//$'\0'/} } 2> >(stderr=$(cat) && print -r -n - " "${(q)stderr}$'\0')
# if ret is missing for some unknown reason, set it to -1 to indicate we # Unlock mutex by inserting a token.
# have run into a bug print -n -p $tok
ret=${ret:--1}
# Grab mutex lock, stalls until token is available
read -ep >/dev/null
# return output (<job_name> <return_code> <stdout> <duration> <stderr>)
print -r -N -n -- "$1" "$ret" "$stdout" "$duration" "$stderr"$'\0'
# Unlock mutex by inserting a token
print -p "t"
} }
# The background worker manages all tasks and runs them without interfering with other processes # The background worker manages all tasks and runs them without interfering with other processes
_async_worker() { _async_worker() {
# Reset all options to defaults inside async worker.
emulate -R zsh
# Make sure monitor is unset to avoid printing the
# pids of child processes.
unsetopt monitor
# Redirect stderr to `/dev/null` in case unforseen errors produced by the
# worker. For example: `fork failed: resource temporarily unavailable`.
# Some older versions of zsh might also print malloc errors (know to happen
# on at least zsh 5.0.2 and 5.0.8) likely due to kill signals.
exec 2>/dev/null
# When a zpty is deleted (using -d) all the zpty instances created before
# the one being deleted receive a SIGHUP, unless we catch it, the async
# worker would simply exit (stop working) even though visible in the list
# of zpty's (zpty -L).
TRAPHUP() {
return 0 # Return 0, indicating signal was handled.
}
local -A storage local -A storage
local unique=0 local unique=0
local notify_parent=0 local notify_parent=0
local parent_pid=0 local parent_pid=0
local coproc_pid=0 local coproc_pid=0
local processing=0
# Deactivate all zsh hooks inside the worker. local -a zsh_hooks zsh_hook_functions
zsh_hooks=(chpwd periodic precmd preexec zshexit zshaddhistory) zsh_hooks=(chpwd periodic precmd preexec zshexit zshaddhistory)
unfunction $zsh_hooks &>/dev/null zsh_hook_functions=(${^zsh_hooks}_functions)
# And hooks with registered functions. unfunction $zsh_hooks &>/dev/null # Deactivate all zsh hooks inside the worker.
zsh_hook_functions=( ${^zsh_hooks}_functions ) unset $zsh_hook_functions # And hooks with registered functions.
unset $zsh_hook_functions unset zsh_hooks zsh_hook_functions # Cleanup.
child_exit() { child_exit() {
local -a pids
pids=(${${(v)jobstates##*:*:}%\=*})
# If coproc (cat) is the only child running, we close it to avoid # If coproc (cat) is the only child running, we close it to avoid
# leaving it running indefinitely and cluttering the process tree. # leaving it running indefinitely and cluttering the process tree.
if [[ ${#jobstates} = 1 ]] && [[ $coproc_pid = ${${(v)jobstates##*:*:}%\=*} ]]; then if (( ! processing )) && [[ $#pids = 1 ]] && [[ $coproc_pid = $pids[1] ]]; then
coproc : coproc :
coproc_pid=0
fi fi
# On older version of zsh (pre 5.2) we notify the parent through a # On older version of zsh (pre 5.2) we notify the parent through a
@@ -98,38 +113,50 @@ _async_worker() {
esac esac
done done
local -a buffer killjobs() {
# Command arguments are separated with a null character. local tok
while read -r -d $'\0' line; do local -a pids
if [[ $line != ___ZSH_ASNYC_EOC___ ]]; then pids=(${${(v)jobstates##*:*:}%\=*})
# Read command arguments until we receive magic end-of-command string.
buffer+=($line)
continue
fi
# Copy command buffer # No need to send SIGHUP if no jobs are running.
cmd=("${(@)=buffer}") (( $#pids == 0 )) && continue
(( $#pids == 1 )) && [[ $coproc_pid = $pids[1] ]] && continue
# Reset command buffer # Grab lock to prevent half-written output in case a child
buffer=() # process is in the middle of writing to stdin during kill.
(( coproc_pid )) && read -r -k 1 -p tok
local job=$cmd[1] kill -HUP -$$ # Send to entire process group.
coproc : # Quit coproc.
coproc_pid=0 # Reset pid.
}
local request
local -a cmd
while :; do
# Wait for jobs sent by async_job.
read -r -d $'\0' request || {
# Since we handle SIGHUP above (and thus do not know when `zpty -d`)
# occurs, a failure to read probably indicates that stdin has
# closed. This is why we propagate the signal to all children and
# exit manually.
kill -HUP -$$ # Send SIGHUP to all jobs.
exit 0
}
# Check for non-job commands sent to worker # Check for non-job commands sent to worker
case $job in case $request in
_unset_trap) _unset_trap) notify_parent=0; continue;;
notify_parent=0; continue;; _killjobs) killjobs; continue;;
_killjobs)
# Do nothing in the worker when receiving the TERM signal
trap '' TERM
# Send TERM to the entire process group (PID and all children)
kill -TERM -$$ &>/dev/null
# Reset trap
trap - TERM
continue
;;
esac esac
# Parse the request using shell parsing (z) to allow commands
# to be parsed from single strings and multi-args alike.
cmd=("${(z)request}")
# Name of the job (first argument).
local job=$cmd[1]
# If worker should perform unique jobs # If worker should perform unique jobs
if (( unique )); then if (( unique )); then
# Check if a previous job is still running, if yes, let it finnish # Check if a previous job is still running, if yes, let it finnish
@@ -140,20 +167,25 @@ _async_worker() {
done done
fi fi
# Guard against closing coproc from trap before command has started.
processing=1
# Because we close the coproc after the last job has completed, we must # Because we close the coproc after the last job has completed, we must
# recreate it when there are no other jobs running. # recreate it when there are no other jobs running.
if (( !${#jobstates} )); then if (( ! coproc_pid )); then
# Use coproc as a mutex for synchronized output between children. # Use coproc as a mutex for synchronized output between children.
coproc cat coproc cat
coproc_pid=$! coproc_pid="$!"
# Insert token into coproc # Insert token into coproc
print -p "t" print -n -p "t"
fi fi
# Run task in background # Run job in background, completed jobs are printed to stdout.
_async_job $cmd & _async_job $cmd &
# Store pid because zsh job manager is extremely unflexible (show jobname as non-unique '$job')... # Store pid because zsh job manager is extremely unflexible (show jobname as non-unique '$job')...
storage[$job]=$! storage[$job]="$!"
processing=0 # Disable guard.
done done
} }
@@ -174,46 +206,54 @@ _async_worker() {
async_process_results() { async_process_results() {
setopt localoptions noshwordsplit setopt localoptions noshwordsplit
integer count=0
local worker=$1 local worker=$1
local callback=$2 local callback=$2
local caller=$3 local caller=$3
local -a items local -a items
local line local null=$'\0' data
integer -l len pos num_processed
typeset -gA ASYNC_PROCESS_BUFFER typeset -gA ASYNC_PROCESS_BUFFER
# Read output from zpty and parse it if available
while zpty -rt $worker line 2>/dev/null; do
# Remove unwanted \r from output
ASYNC_PROCESS_BUFFER[$worker]+=${line//$'\r'$'\n'/$'\n'}
# Split buffer on null characters, preserve empty elements
# (an anonymous function is used to avoid leaking modified IFS into the callback)
() {
local IFS=$'\0'
items=("${(@)=ASYNC_PROCESS_BUFFER[$worker]}")
}
# Remove last element since it's an artifact
# of the return string separator structure
items=("${(@)items[1,${#items}-1]}")
# Continue until we receive all information # Read output from zpty and parse it if available.
(( ${#items} % 5 )) && continue while zpty -r -t $worker data 2>/dev/null; do
ASYNC_PROCESS_BUFFER[$worker]+=$data
len=${#ASYNC_PROCESS_BUFFER[$worker]}
pos=${ASYNC_PROCESS_BUFFER[$worker][(i)$null]} # Get index of NULL-character (delimiter).
# Work through all results # Keep going until we find a NULL-character.
while (( ${#items} > 0 )); do if (( ! len )) || (( pos > len )); then
$callback "${(@)items[1,5]}" continue
shift 5 items fi
count+=1
while (( pos <= len )); do
# Take the content from the beginning, until the NULL-character and
# perform shell parsing (z) and unquoting (Q) as an array (@).
items=("${(@Q)${(z)ASYNC_PROCESS_BUFFER[$worker][1,$pos-1]}}")
# Remove the extracted items from the buffer.
ASYNC_PROCESS_BUFFER[$worker]=${ASYNC_PROCESS_BUFFER[$worker][$pos+1,$len]}
if (( $#items == 5 )); then
$callback "${(@)items}" # Send all parsed items to the callback.
else
# In case of corrupt data, invoke callback with *async* as job
# name, non-zero exit status and an error message on stderr.
$callback "async" 1 "" 0 "$0:$LINENO: error: bad format, got ${#items} items (${(@q)items})"
fi
(( num_processed++ ))
len=${#ASYNC_PROCESS_BUFFER[$worker]}
if (( len > 1 )); then
pos=${ASYNC_PROCESS_BUFFER[$worker][(i)$null]} # Get index of NULL-character (delimiter).
fi
done done
# Empty the buffer
unset "ASYNC_PROCESS_BUFFER[$worker]"
done done
# If we processed any results, return success (( num_processed )) && return 0
(( count )) && return 0
# Avoid printing exit value from setopt printexitvalue # Avoid printing exit value when `setopt printexitvalue` is active.`
[[ $caller = trap || $caller = watcher ]] && return 0 [[ $caller = trap || $caller = watcher ]] && return 0
# No results were processed # No results were processed
@@ -243,13 +283,13 @@ async_job() {
local worker=$1; shift local worker=$1; shift
local cmd p local -a cmd
for p in "$@"; do cmd=("$@")
cmd+="$p"$'\0' if (( $#cmd > 1 )); then
done cmd=(${(q)cmd}) # Quote special characters in multi argument commands.
cmd+=___ZSH_ASNYC_EOC___$'\0' fi
zpty -w $worker $cmd zpty -w $worker $cmd$'\0'
} }
# This function traps notification signals and calls all registered callbacks # This function traps notification signals and calls all registered callbacks
@@ -276,7 +316,9 @@ async_register_callback() {
ASYNC_CALLBACKS[$worker]="$*" ASYNC_CALLBACKS[$worker]="$*"
if (( ! ASYNC_USE_ZLE_HANDLER )); then # Enable trap when the ZLE watcher is unavailable, allows
# workers to notify (via -n) when a job is done.
if [[ ! -o interactive ]] || [[ ! -o zle ]]; then
trap '_async_notify_trap' WINCH trap '_async_notify_trap' WINCH
fi fi
} }
@@ -311,10 +353,17 @@ async_flush_jobs() {
# Send kill command to worker # Send kill command to worker
async_job $worker "_killjobs" async_job $worker "_killjobs"
# Clear all output buffers # Clear the zpty buffer.
while zpty -r $worker line; do true; done local junk
if zpty -r -t $worker junk '*'; then
(( ASYNC_DEBUG )) && print -n "async_flush_jobs $worker: ${(V)junk}"
while zpty -r -t $worker junk '*'; do
(( ASYNC_DEBUG )) && print -n "${(V)junk}"
done
(( ASYNC_DEBUG )) && print
fi
# Clear any partial buffers # Finally, clear the process buffer in case of partially parsed responses.
typeset -gA ASYNC_PROCESS_BUFFER typeset -gA ASYNC_PROCESS_BUFFER
unset "ASYNC_PROCESS_BUFFER[$worker]" unset "ASYNC_PROCESS_BUFFER[$worker]"
} }
@@ -339,16 +388,47 @@ async_start_worker() {
typeset -gA ASYNC_PTYS typeset -gA ASYNC_PTYS
typeset -h REPLY typeset -h REPLY
typeset has_xtrace=0
# Make sure async worker is started without xtrace
# (the trace output interferes with the worker).
[[ -o xtrace ]] && {
has_xtrace=1
unsetopt xtrace
}
if (( ! ASYNC_ZPTY_RETURNS_FD )) && [[ -o interactive ]] && [[ -o zle ]]; then
# When zpty doesn't return a file descriptor (on older versions of zsh)
# we try to guess it anyway.
integer -l zptyfd
exec {zptyfd}>&1 # Open a new file descriptor (above 10).
exec {zptyfd}>&- # Close it so it's free to be used by zpty.
fi
zpty -b $worker _async_worker -p $$ $@ || { zpty -b $worker _async_worker -p $$ $@ || {
async_stop_worker $worker async_stop_worker $worker
return 1 return 1
} }
if (( ASYNC_USE_ZLE_HANDLER )); then # Re-enable it if it was enabled, for debugging.
ASYNC_PTYS[$REPLY]=$worker (( has_xtrace )) && setopt xtrace
zle -F $REPLY _async_zle_watcher
# If worker was called with -n, disable trap in favor of zle handler if [[ $ZSH_VERSION < 5.0.8 ]]; then
# For ZSH versions older than 5.0.8 we delay a bit to give
# time for the worker to start before issuing commands,
# otherwise it will not be ready to receive them.
sleep 0.001
fi
if [[ -o interactive ]] && [[ -o zle ]]; then
if (( ! ASYNC_ZPTY_RETURNS_FD )); then
REPLY=$zptyfd # Use the guessed value for the file desciptor.
fi
ASYNC_PTYS[$REPLY]=$worker # Map the file desciptor to the worker.
zle -F $REPLY _async_zle_watcher # Register the ZLE handler.
# Disable trap in favor of ZLE handler when notify is enabled (-n).
async_job $worker _unset_trap async_job $worker _unset_trap
fi fi
} }
@@ -373,6 +453,10 @@ async_stop_worker() {
done done
async_unregister_callback $worker async_unregister_callback $worker
zpty -d $worker 2>/dev/null || ret=$? zpty -d $worker 2>/dev/null || ret=$?
# Clear any partial buffers.
typeset -gA ASYNC_PROCESS_BUFFER
unset "ASYNC_PROCESS_BUFFER[$worker]"
done done
return $ret return $ret
@@ -391,12 +475,13 @@ async_init() {
zmodload zsh/zpty zmodload zsh/zpty
zmodload zsh/datetime zmodload zsh/datetime
# Check if zsh/zpty returns a file descriptor or not, shell must also be interactive # Check if zsh/zpty returns a file descriptor or not,
ASYNC_USE_ZLE_HANDLER=0 # shell must also be interactive with zle enabled.
[[ -o interactive ]] && { ASYNC_ZPTY_RETURNS_FD=0
[[ -o interactive ]] && [[ -o zle ]] && {
typeset -h REPLY typeset -h REPLY
zpty _async_test cat zpty _async_test :
(( REPLY )) && ASYNC_USE_ZLE_HANDLER=1 (( REPLY )) && ASYNC_ZPTY_RETURNS_FD=1
zpty -d _async_test zpty -d _async_test
} }
} }