Skip to content

Fix an issue where ActiveMQ failed to properly support STOMP spec#1

Open
ehossack-aws wants to merge 3 commits intomainfrom
stompfix
Open

Fix an issue where ActiveMQ failed to properly support STOMP spec#1
ehossack-aws wants to merge 3 commits intomainfrom
stompfix

Conversation

@ehossack-aws
Copy link
Owner

The STOMP spec lists several allowed escaped characters in headers, with the rest noted:

Undefined escape sequences such as \t (octet 92 and 116) MUST be treated as a fatal protocol error.

An attempt was previously made to fix this by Gary Tully (https://issues.apache.org/jira/browse/AMQ-3501), but the validated escape sequences were not correct. (Example of correct escaping: jasonrbriggs/stomp.py@c2c0824#diff-f0aa9543d5e7800d4ac1074798a70e058ea1d7859f106a3a4de95e54b6bf98faR122)

This was fixed in Artemis: apache/artemis@a99617a
and this is an implementation for ActiveMQ.

This is a breaking change, but I believe it's the best for consumers as to not cause confusion.

Here's some sample code that would previously have worked but no longer works:

import logging
from typing import Optional
from contextlib import contextmanager
from typing import Generator
import os
import time
from datetime import datetime, timedelta
from logging import getLogger
from os import urandom
from typing import Any, Callable
import random
import string
import stomp

logger = logging.getLogger("brokers")
loggingHandler = logging.StreamHandler()

@contextmanager
def brokerConnection(name: str) -> Generator[stomp.Connection, str, None]:
    if not name:
        raise Exception("Need connection name!")
    host = os.getenv("BROKER_HOST", "localhost")
    port = os.getenv("BROKER_PORT", "61613")
    username = os.getenv("BROKER_USER", "broker")
    password = os.getenv("BROKER_PASSWORD", "password")

    connection = stomp.Connection11(host_and_ports=[(host, port)])
    connection.set_ssl(for_hosts=[(host, port)])
    logger.debug(f"{name} connecting to stomp at {host}:{port} using {username} and {password}")
    connection.connect(username, password, wait=True)
    logger.debug(f"{name} connected")
    yield connection
    connection.disconnect()
    logger.debug(f"{name} disconnected")


def byteLength(s: str) -> int:
    return len(s.encode("utf-8"))

def randomChars(sizeInBytes: int, base="") -> str:
    chars = base
    while byteLength(chars) < sizeInBytes:
        chars += random.choice(string.ascii_lowercase)
    return chars

def produce(
    message: Optional[str] = None,
    queue: str = "",
    times: int = 1,
    sizeInBytes: int = 1024
) -> None:
    if not queue:
        raise Exception("Need queue")
    if message and sizeInBytes:
        raise Exception("Need message or size not both")

    with brokerConnection("producer") as c:
        for i in range(0, times):
            body = message or randomChars(sizeInBytes, base=f"{i}-")
            c.send(destination=queue, body=body, headers={"persistent": "true"})
            if len(body) < 25:
                logger.debug(f"Sent message {body}")

        logger.info(f"Sent {times} messages")



logger = getLogger("brokers")

class QueueListener(stomp.ConnectionListener):
    def __init__(self, consumer: Callable[[str], Any]):
        self.consumer = consumer
        self.consumed_count = 0

    def on_error(self, frame):
        logger.error("Consumer encountered an error")
        logger.error(frame)

    def on_message(self, frame):
        logger.debug(f"Consumed message: byte length [{byteLength(frame.body)}], headers: {frame.headers}")
        self.consumer(frame.body)
        self.consumed_count += 1


def consume(queue: str = "", consumer=lambda m: print(m), seconds: float = float('inf'), times: float = float('inf')) -> None:
    if not queue:
        raise Exception("Need queue")

    hasFiniteMessages = times != float('inf')
    runForever = seconds == float('inf')
    listener = QueueListener(consumer)

    with brokerConnection("consumer") as c:
        c.set_listener("consume-consumer", listener)
        start = datetime.now()
        if hasFiniteMessages:
            logger.info(f"consuming {times} messages")
        if not runForever:
            logger.info(f"consuming for {seconds} seconds")

        c.subscribe(destination=queue, id=urandom(5), ack="auto")

        shouldContinueListening = True
        elapsedSeconds = 0
        lastPrinted = start
        while shouldContinueListening:
            time.sleep(0.1)
            elapsedSeconds = (datetime.now() - start).total_seconds()
            shouldContinueListening = elapsedSeconds < seconds and listener.consumed_count < times

            if (datetime.now() - lastPrinted).total_seconds() > 3:
                logger.debug(f"{round(elapsedSeconds, 1)} seconds elapsed...")
                lastPrinted = datetime.now()

        if runForever:
            logger.info(f"Consumed for {elapsedSeconds} total seconds")

    if hasFiniteMessages:
        if listener.consumed_count < times:
            logger.error(f"Did not consume all messages, {times - listener.consumed_count} remain")
        else:
            logger.info(f"All {times} messages consumed")
    logger.debug("Stopping consumption")

def run():
    produce(queue="Q", sizeInBytes=1024, times=20)
    consume(
        queue="Q",
        consumer=lambda messageBody: logger.info(f"Recieved a message with string length {len(messageBody)}"),
        times=20,
        seconds=30
    )


def _setLogLevel(verbose: bool) -> None:
    if verbose:
        loggingHandler.setFormatter(logging.Formatter("[%(levelname)s]\t%(message)s"))
        logger.setLevel(logging.DEBUG)
    else:
        loggingHandler.setFormatter(logging.Formatter("%(message)s"))
        logger.setLevel(logging.INFO)

if __name__ == "__main__":
    logging.basicConfig(handlers=[loggingHandler])
    _setLogLevel(True)
    run()

@ehossack-aws ehossack-aws changed the base branch from master to main June 24, 2021 23:45
} else if (b == Stomp.ESCAPE) {
isEscaping = true;
}
outputStream.write(b);
Copy link

@ygubernatorov ygubernatorov Jun 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about trailing ESCAPE? is it valid?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's a trailing escape? In this context, these are headers, so there must be a body that follows

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering what if line 205 is executed and the next byte is '\n'? Can a legit header value end with an unescaped backslash?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it should be just a backslash then. There's nothing in the spec to prevent that.

baos.write(b);
if (isEscaping) {
if (!isValidEscapedCharacter(b)) {
throw new ProtocolException("Undefined escape sequence [\\"+((char) (b & 0xFF))+"] found in header!", true);
Copy link

@ygubernatorov ygubernatorov Jun 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not check escape sequences' validity for the CONNECT frame's headers.
(Moreover, we should not decode them as well, but that's clearly out of scope of this PR.)

Reason: stomp v1.2 provides backward compatibility with v1.0 and thats why CONNECT's headers should not be escaped: https://stomp.github.io/stomp-specification-1.2.html#Connecting

One of the possible headers for the CONNECT frame is password. And password might contain just anything, including an unescaped backslash followed by an alphanumeric symbol.

Here is a couple of examples of popular clients that do not escape CONNECT's headers:

  1. stomp.py makes an explicit exception:
    https://github.com/jasonrbriggs/stomp.py/blob/fbab0c82d4e91ed364c17fd4977fdd6fe2b58b27/stomp/protocol.py#L241

  2. stomp-js only enables escaping upon receiving CONNECTED (i.e. after the CONNECT frame is sent)
    https://github.com/stomp-js/stompjs/blob/develop/src/stomp-handler.ts#L197


@Test(timeout = 60000)
public void testStompHeaderWithUndefinedEscapeSequenceInAllowedConnectHeader() throws Exception {
String connectFrame = "CONN\\xECT\n" +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not clear to me why \\x is just ignored here

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh hmm.. I misread your interpretation of the spec as us not reading escaped characters in the CONNECT frame.

What do you think the previous patch was missing? The code I have touched deals with CONNECT/CONNECTED decoding whereas the code you referenced deals with encoding on the response.

Copy link

@ygubernatorov ygubernatorov Sep 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. I referenced CONNECT frame's headers.

This is what I would expect to work without any attempt to decode the password:

CONNECT
login:ololo
passcode:bla\h\blah\na\\h
...

And this is something weird, that should not result in CONNECTED %) (but not because of escaping;))

CONN\\xECT
...

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, I'm less confused now! I think I woke up too early today 😅

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants