In this article we will learn how to have more flexibility around the starting point of consuming streams in Spark Structured Streaming. Previously, I wrote how we can consume messages from EventHub using Spark on Databricks with interactive Notebooks.
Let’s explore the incoming stream even further.
Previously, we looked at “body” field with the contents of each tweet and didn’t pay attention to other fields of the incoming stream. EventHubs has a retention period for durability of messages, which can be configured separately. Without explicit settings, when the stream starts, we start receiving all events that were written to the EventHub from the very beginning, starting with the oldest event retained by the hub. So if we started broadcasting tweets to the EventHub 2 days ago and started a Spark stream to consume those events now, the first message we’d get would be from 2 days ago.
How can we have better control of the consumption starting point of stream? It can be convenient in case the consumer has stopped reading events and needs to continue from the exact point it left off earlier, Another case when it can be helpful is when we need to start consuming all events that were produced not earlier than a certain point in time.
To refresh the memory, let’s look at the regular stream without explicit time or offset configuration.
Instead of looking at the “body” column only, we can discover the “offset” and “enqueuedTime” columns. They can be converted to their corresponding data types as well. Offset is a field of a type Long that is used to identify each message in a partition. Enqueued time is when the message was received by the EventHub.
Stream with “offset” specified
To only receive messages after a certain offset, we can use “EventPosition.fromOffset” starting position configuration parameter. For example, if we set it to 3080 for the example with tweets, we should not see any messages with an offset less than 3080. You can see that in the following notebook:
Stream with “enqueuedTime” specified
To only receive messages that arrived after a certain timestamp, we can use "EventPosition.fromEnqueuedTime" as a starting position and set it to the desired timestamp. For example, if we set the timestamp to "X", we won’t see any events with enqueue time earlier than "X":
Follow me on twitter @lenadroid if you found this article interesting or helpful. My direct messages are open, always happy to connect, feel free to reach out with any questions or ideas!