Skip to content
This repository was archived by the owner on Jul 20, 2019. It is now read-only.
This repository was archived by the owner on Jul 20, 2019. It is now read-only.

After old messages are removed from the server, consuming from offset 0 fails #13

@colindickson

Description

@colindickson

To reproduce:

  1. Set the log.retention.hours = 1 in config/server.properties on a test kafka server
  2. Add a batch of messages using the Producer into a new topic
  3. Consume the batch of messages from step 2, and print out the value of consumer.offset
  4. Wait an hour for those messages to be deleted. Make some tea, and/or browse reddit while you wait.
  5. If you add another batch of messages now into the same topic and attempt to consume the messages starting at offset 0, the consumer.consume() loop will not return anything. Only if you instantiate the consumer using the offset that you took from the end of step 3 will you be able to retrieve the new messages.

But what if you don't know that offset? It probably is not easy to keep track of that offset in a production environment where older messages are deleted every hour.

Otherwise, there's this ugly O(N) hack solution to finding the offset where the messages begin, in the case where we don't know where on the disk the messages begin.

offset = -1
found = False
while not found:
    offset += 1
    consumer = kafka.consumer.Consumer(topic, offset=offset)
    for message in consumer.consume():
        if len(str(message)) > 0:
            print offset
            found = True
        break

(again, this code is awful and not recommended to use, but just something i wrote to get my answer for myself. would be a disaster to run on an empty topic, or even on a topic where the offset is a huge number.)

Can you think of a prettier solution to put into consume.py which could allow us to consume all the messages in a topic, even after some have been deleted by the server?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions