Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 62 additions & 53 deletions prsync
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ set -e
# Inspired by https://gist.github.com/akorn/644855ddaa8065f564be

# Usage:
# rsync_parallel.sh [--parallel=N] [rsync args...]
#
# prsync [--parallel=N] [rsync args...]
#
# Options:
# --parallel=N Use N parallel processes for transfer. Default is to use all available processors (`nproc`) or fail back to 10.
# --parallel=N Use N parallel processes for transfer. Default is to use all available processors (`nproc`) or fail back to 10.
#
# Notes:
# * Requires GNU Parallel
# * Use with ssh-keys. Lots of password prompts will get very annoying.
# * Does an itemize-changes first, then chunks the resulting file list and launches N parallel
# rsyncs to transfer a chunk each.
# * be a little careful with the options you pass through to rsync. Normal ones will work, you
# * be a little careful with the options you pass through to rsync. Normal ones will work, you
# might want to test weird options upfront.
#

Expand All @@ -29,6 +29,12 @@ then
exit
fi

if ! command -v awk &> /dev/null
then
echo -e "${RED}awk could not be found${NC}"
exit
fi

if [[ "$1" == --parallel=* ]]; then
PARALLEL_RSYNC="${1##*=}"
shift
Expand Down Expand Up @@ -56,57 +62,60 @@ if [ "${TOTAL_FILES}" -eq "0" ]; then
exit 0
fi

function array_min {
ARR=("$@")

# Default index for min value
min_i=0

# Default min value
min_v=${ARR[$min_i]}

for i in "${!ARR[@]}"; do
v="${ARR[$i]}"

(( v < min_v )) && min_v=${v} && min_i=${i}
done

MIN_I="${min_i}"
#echo "${min_i}"
}

echo -e "${GREEN}INFO: Distributing files among chunks ...${NC}"
# declare chunk-size array
for ((I = 0 ; I < PARALLEL_RSYNC ; I++ )); do
CHUNKS["${I}"]=0
done
echo -e "${GREEN}INFO: Distributing and reversing chunks (via awk)...${NC}"

# add each file to the emptiest chunk, so they're as balanced by size as possible
PROGRESS=0
SECONDS=0
while read -r FSIZE FPATH; do
PROGRESS=$((PROGRESS+1))

array_min "${CHUNKS[@]}"
#MIN_I=$(array_min ${CHUNKS[@]})

CHUNKS[MIN_I]=$(( CHUNKS[MIN_I] + FSIZE ))
echo "${FPATH}" >> "${TMPDIR}/chunk.${MIN_I}"

if ! ((PROGRESS % 25000)); then
>&2 echo -e "${GREEN}INFO: ${PROGRESS} of ${TOTAL_FILES} (${SECONDS}s)${NC}"
fi
done < "${TMPDIR}/files.all"
# Reverse the list of files in every other chunk so some are sorted largest->smallest and others smallest->largest
# This will aid transfer efficiency, so some "queues" are processing large files first, while others are processing small files first.
for ((I = 1 ; I < PARALLEL_RSYNC ; I+=2 )); do
# If the list of files to transfer is small, we may not have any files to work with
if [ ! -f "${TMPDIR}/chunk.${I}" ]; then
continue
fi

tac "${TMPDIR}/chunk.${I}" > "${TMPDIR}/chunk.${I}.r" && mv "${TMPDIR}/chunk.${I}.r" "${TMPDIR}/chunk.${I}"
done
# We use awk to do the bin-packing AND the reversing in one pass.
# Even chunks write immediately (Large->Small).
# Odd chunks buffer in RAM and write at the end (Small->Large).
awk -v n="${PARALLEL_RSYNC}" -v tmp="${TMPDIR}" '
BEGIN {
# Initialize load trackers
for(i=0; i<n; i++) loads[i]=0
}
{
# $1 is size, rest of line is path
size = $1
path = substr($0, index($0," ")+1)

# Find emptiest chunk
min_idx = 0
min_load = loads[0]
for(i=1; i<n; i++) {
if(loads[i] < min_load) {
min_load = loads[i]
min_idx = i
}
}

# Update load
loads[min_idx] += size

# Logic:
# If index is EVEN: Write directly to file (keeps Large -> Small order)
# If index is ODD: Store in array to write later (allows Small -> Large order)
if (min_idx % 2 == 0) {
print path > (tmp "/chunk." min_idx)
} else {
# Store in memory: buffer[chunk_id, line_number]
idx = ++counts[min_idx]
buffer[min_idx, idx] = path
}
}
END {
# Flush the buffered (ODD) chunks in reverse order
for (c = 1; c < n; c += 2) {
outfile = (tmp "/chunk." c)
count = counts[c]
# Loop backwards from last line to first
for (j = count; j >= 1; j--) {
print buffer[c, j] > outfile
}
# Good practice to close file descriptors in loops
close(outfile)
}
}
' "${TMPDIR}/files.all"

echo -e "${GREEN}DONE (${SECONDS}s)${NC}"

Expand Down