Elevate Your Data Processing: Real-Time Stream Analytics with Apache Flink’s Event Time

Teepika R M
4 min readJun 1, 2023

Introduction to Event Time Processing:

Event Time processing is a key concept in real-time data analysis, where data is processed based on the actual time an event occurred. In this article, we’ll demystify Event Time processing using Apache Flink in Scala, providing a beginner-friendly explanation of the concepts involved.

The Importance of Event Time in Stream Processing:

When processing real-time data, we have three timestamps to consider: Event Time, Processing Time, and Ingestion Time. Event Time, being the actual time an event occurred, is crucial for accurate analysis. In contrast, Processing Time and Ingestion Time may introduce inaccuracies due to delays or out-of-order arrival of data.

What is event time and how it is important in stream processing?

There are three time timestamps that can be considered while processing data.

Event Time, Processing Time and Ingestion Time

When we decide to use “Event Time” for processing data, we should think about how are we going to deal with late data.

Why late data concept comes only when dealing with “Event time”?

When using Processing Time / Ingestion Time, the timestamp get assigned on the flow. There is no possibility of having data flow with unordered timestamps. But with event time processing, data can be arrived late due to network issues, ingestion delays etc. It results in unordered timestamp data flow.

Sample Unordered Data stream

In this case, we need to define how the late data(Event 3 in this sample) needs to be handled.

Example use cases where source time is important for processing:

Predicting equipment failures, Tracking trends, Delivery status, Predicting bottlenecks in the supply chain, online advertising/marketing, recommendation systems etc.

Challenges in processing data using Event Time:

  • How to handle out-of-order events & late-arriving events?
  • How to extract time & assign timestamp for events?
  • Whether to consider late data & when to drop the late data?
  • How to define watermarks to control the progress of events?

Let us see how these challenges can be addressed.

How to handle out-of-order events & late-arriving events?

Short ans: Watermarks

Out-of-order events: Events arrived in a different order than their actual event timestamps.

Late arriving events: Events arriving at the system after their expected processing time ie, later than anticipated.

One of the challenges in Event Time processing is dealing with out-of-order and late-arriving events. This is where watermarks come into play.

Watermarks: Watermarks help to define how to handle the out-of-order & late arriving events. They establish a threshold that determines how late an event can be before it is considered too late to be processed. Any event with a timestamp that exceeds this threshold is considered late and may be discarded or handled differently. Watermarks show how far we are in processing events based on their observed timestamps.

How to extract time & assign timestamp for events?

In the data stream, we select the column that contains the timestamp value to extract the event timestamp. In the code snippet below, we use the withTimestampAssigner function, which accepts an object of the SerializableTimestampAssigner class as input. SerializableTimestampAssigner is an abstract class that defines the extractTimestamp function. This function is implemented in its derived classes to specify which column is used as the event timestamp. By providing an object of the SerializableTimestampAssigner class, we pass an implementation of the extractTimestamp function that returns the event timestamp column.

val groupedEventsByWindow = SampleEvents
.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(java.time.Duration.ofMillis(500))
// max delay < 500 millis/0.5 seconds
.withTimestampAssigner(new SerializableTimestampAssigner[SampleEvents] {
override def extractTimestamp(element: SampleEvents, recordTimestamp: Long) =
element.timestamp.toEpochMilli
}))

How to define watermarks to control the progress of events?

Defining Watermarks for Event Progress Control:

Watermarks are defined using WatermarkStrategy, which allows you to configure them with the maximum lateness that can be tolerated. Additionally, you need to specify the column that should be considered for determining the event timestamp.

Whether to consider late data & when to drop the late data?

It depends on the maximum lateness accepted. If an event comes later than the maximum accepted lateness, it gets dropped.

Will conclude this topic with an example of how watermarks are generated for a sample stream. It helps with understanding which events are considered and which events are dropped.

Let’s embark on a journey of knowledge and exploration. Join me on Medium and let’s learn and grow together. Thank you!!

--

--

Teepika R M

AWS Certified Big Data Specialty| Linux Certified Kubernetes Application Developer| Hortonworks Certified Spark Developer|Hortonworks Certified Hadoop Developer