-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmove_queue_messages.sh
More file actions
108 lines (89 loc) Β· 3.17 KB
/
move_queue_messages.sh
File metadata and controls
108 lines (89 loc) Β· 3.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/bin/bash
# === CONFIGURATION ===
URL="<url>"
VHOST="de_queue"
SOURCE_QUEUE="optimize.capacity.error"
TARGET_QUEUE="event.retry.archive"
COUNT=5
DELAY=2
AUTH="<user>:<password>"
# === URL encode vhost ===
ENCODED_VHOST=$(python3 -c "import urllib.parse; print(urllib.parse.quote('''$VHOST'''))")
# === CHECK AND CREATE TARGET QUEUE IF MISSING ===
check_and_create_queue() {
echo "π Checking if target queue '$TARGET_QUEUE' exists..."
STATUS_CODE=$(curl -k -s -o /dev/null -w "%{http_code}" -u $AUTH \
"$URL/api/queues/$ENCODED_VHOST/$TARGET_QUEUE")
if [ "$STATUS_CODE" -eq 404 ]; then
echo "π¦ Target queue does not exist. Creating '$TARGET_QUEUE'..."
curl -k -s -u $AUTH -H "Content-Type: application/json" \
-X PUT "$URL/api/queues/$ENCODED_VHOST/$TARGET_QUEUE" \
-d '{"durable": true}'
echo "β
Target queue created."
else
echo "β
Target queue exists."
fi
}
# === BIND TARGET QUEUE TO amq.direct EXCHANGE ===
ensure_queue_binding() {
echo "π Ensuring binding to 'amq.direct' exchange..."
# Check existing bindings to see if this one already exists
BINDINGS=$(curl -k -s -u $AUTH "$URL/api/queues/$ENCODED_VHOST/$TARGET_QUEUE/bindings")
IS_BOUND=$(echo "$BINDINGS" | jq -e \
--arg key "$TARGET_QUEUE" \
'.[] | select(.source=="amq.direct" and .routing_key==$key)' > /dev/null && echo "yes" || echo "no")
if [ "$IS_BOUND" == "no" ]; then
echo "π§ Binding queue '$TARGET_QUEUE' to 'amq.direct' with routing key '$TARGET_QUEUE'..."
curl -k -s -u $AUTH -H "Content-Type: application/json" \
-X POST "$URL/api/bindings/$ENCODED_VHOST/e/amq.direct/q/$TARGET_QUEUE" \
-d "{\"routing_key\": \"$TARGET_QUEUE\"}"
echo "β
Binding created."
else
echo "β
Binding already exists."
fi
}
# === FUNCTION TO FETCH AND MOVE MESSAGES ===
move_messages() {
echo "Fetching up to $COUNT messages from '$SOURCE_QUEUE'..."
RESPONSE=$(curl -k -s -u $AUTH -H "Content-Type: application/json" \
-X POST "$URL/api/queues/$ENCODED_VHOST/$SOURCE_QUEUE/get" \
-d "{
\"count\": $COUNT,
\"ackmode\": \"ack_requeue_true\",
\"encoding\": \"auto\",
\"truncate\": 50000
}")
MESSAGE_COUNT=$(echo "$RESPONSE" | jq length)
if [ "$MESSAGE_COUNT" -eq 0 ]; then
echo "No more messages to move. Exiting."
exit 0
fi
echo "Moving $MESSAGE_COUNT messages to '$TARGET_QUEUE'..."
echo "$RESPONSE" | jq -c '.[]' | while read -r row; do
# Extract payload and encode in base64 safely
RAW_PAYLOAD=$(echo "$row" | jq -r '.payload')
BASE64_PAYLOAD=$(printf "%s" "$RAW_PAYLOAD" | base64)
# Build safe publish JSON
PUBLISH_JSON=$(jq -n \
--arg rk "$TARGET_QUEUE" \
--arg pl "$BASE64_PAYLOAD" \
'{
properties: {},
routing_key: $rk,
payload: $pl,
payload_encoding: "base64"
}')
# POST to RabbitMQ
curl -k -s -u $AUTH -H "Content-Type: application/json" \
-X POST "$URL/api/exchanges/$ENCODED_VHOST/amq.direct/publish" \
-d "$PUBLISH_JSON"
done
echo "Batch moved. Waiting $DELAY seconds..."
sleep $DELAY
}
# === MAIN EXECUTION ===
check_and_create_queue
ensure_queue_binding
while true; do
move_messages
done