Spark Stream Deduplication

Rahul Sahoo
6 min readJan 19, 2025

--

Spark Stream Deduplication

Handling IoT data streams often comes with unique challenges, and one issue I encountered was the persistent generation of duplicate events. Our data source was inadvertently producing duplicates alongside the original data, flooding our Kafka topic with redundant information. Left unchecked, these duplicate events could overwhelm downstream services, leading to inefficiencies, increased processing costs, and even inaccurate results. To address this, it became clear that implementing a deduplication mechanism at the streaming layer was not just beneficial — it was essential for maintaining the integrity and efficiency of our data pipeline.

Background/Context

All of our producers generate time-series events, which are essential for tracking sensor data over time. To make it easier to follow along, here’s a sample schema of the data events being produced:

{
"sensor_id": 1, // Integer: Unique identifier for the sensor
"temperature": 10.01, // Float: Measured temperature
"humidity": 2.23, // Float: Measured humidity
"timestamp": "2025-01-12T13:29:00.406005" // String: Event generation time
}

The producers send data at regular intervals, ensuring consistent updates. However, if no new data is available at a given interval, they continue to send the last known event. This results in duplicate events being streamed into the pipeline.

It’s important to note that duplicate events are identified based on the combination of sensor_id and timestamp. Even if the values for temperature and humidity remain the same, an event is considered unique if it has a different timestamp. Conversely, events with the same sensor_id and timestamp are considered duplicates, regardless of their data content.

Problem Statement

To ensure that consumers only process unique events, we need to deduplicate the incoming event stream. However, this is not as straightforward as it seems, because there’s an additional challenge to address: sensor malfunctions.

Occasionally, sensors may malfunction and retransmit an event that was already sent in the past. These are not duplicate events but rather outlier events, which must be discarded to maintain data integrity. To achieve this, we need to track the last occurred timestamp for each sensor.

The deduplication logic will work as follows:

  • If the timestamp of an incoming event is less than or equal to the last occurred timestamp for a given sensor, the event is deemed an outlier or duplicate and hence discarded.
  • If the timestamp of the incoming event is greater than the last occurred timestamp, the event is accepted and the last occurred timestamp is updated accordingly.

By implementing this approach, we can effectively filter out both duplicate and outlier events, ensuring that the consumers process only valid, unique data.

Step-by-Step Guide / Explanation

We’ll use Spark Streaming for this task, as it allows efficient stream processing and outlier removal. The implementation is in Scala because certain Spark Streaming APIs for managing state are not yet available in PySpark.

The full code can be found here: GitHub Repo. Below is a breakdown of the approach:

  • Read the events.
val inputStream = ... // Read kakfa events from topic.
  • Define the schema of the incoming events.
private val SENSOR_DATA_SCHEMA = StructType(Array(
StructField("sensor_id", IntegerType, nullable = false),
StructField("temperature", DoubleType, nullable = true),
StructField("humidity", FloatType, nullable = true),
StructField("timestamp", TimestampType, nullable = false)
))
private val DEFAULT_TIMESTAMP = Timestamp.valueOf("1970-01-01 00:00:00")
  • Define the case class for the incoming events.
 case class SensorData(
sensor_id: Int,
temperature: Double,
humidity: Double,
timestamp: Timestamp
)
  • Parse the events.
val parsedStream = inputStream.selectExpr("CAST(value AS STRING) as json")
.select(from_json(col("json"), SENSOR_DATA_SCHEMA).as("data"))
.select("data.*")
.as[SensorData]
  • Define the state class.
case class DeduplicationState(latestTimestamp: Timestamp)
  • Define the deduplication logic.
val deduplicatedStream = parsedStream
.dropDuplicates("sensor_id", "timestamp")
.groupByKey(record => record.sensor_id) // Group by sensor_id
.flatMapGroupsWithState[DeduplicationState, SensorData](OutputMode.Append(), GroupStateTimeout.NoTimeout()) {
case (sensor_id, records, state) =>
// Get the latest timestamp from state
val latestTimestamp = state.getOption.map(_.latestTimestamp).getOrElse(DEFAULT_TIMESTAMP)

// Sort records by timestamp and filter out duplicates
val deduplicatedRecords = records
.filter(record => record.timestamp.after(latestTimestamp))
.toSeq
.sortBy(_.timestamp)

// Update the state with the latest timestamp
if (deduplicatedRecords.nonEmpty) {
val newLatestTimestamp = deduplicatedRecords.map(_.timestamp).max(Ordering[Timestamp])
state.update(DeduplicationState(newLatestTimestamp))
}

deduplicatedRecords.iterator
}

Remove Initial Duplicates: Use dropDuplicates("sensor_id", "timestamp") to remove events with identical sensor_id and timestamp.

Group Records by Sensor ID: Group the events by sensor_id using groupByKey so we can process each sensor's data independently.

Manage State with flatMapGroupsWithState: Use this stateful operation to maintain the latest timestamp for each sensor. For each incoming record, compare its timestamp with the state’s latest timestamp:

  • If the event’s timestamp ≤ the latest timestamp: Discard the event as an outlier.
  • If the event’s timestamp > the latest timestamp: Accept the event and update the state with the new latest timestamp.

Handle Missing State: When no state exists for a sensor, use the DEFAULT_TIMESTAMP as the initial value.

Run the Spark Streaming Job Using Docker Compose

To run the Spark Streaming job, you can use the provided Docker Compose file. This file sets up a Kafka broker, a Zookeeper instance, a Python producer, and a Spark Streaming job.

  • Clone the repository and run:
docker-compose up -d
  • Visit http://localhost:8080 to access the Spark UI and monitor the job. You will find two topics:
  • iot_events for the incoming events.
  • iot_dedup_events for the deduplicated events.
Kafka UI Topics List
Kafka UI iot_events Topic
Kafka UI iot_dedup_events Topic

Key Takeaways / Insights

  • Preventing Redundancy: Simple data deduplication is essential to prevent downstream services from being overwhelmed by redundant information. However, in this case, we needed to address not just duplicate events, but also outlier events caused by sensor malfunctions.
  • IoT Data Challenges: IoT data streams are particularly prone to issues such as duplicate events due to sensor malfunctions or retransmissions. Implementing a robust deduplication mechanism — such as tracking the last occurred timestamp for each sensor — is critical for filtering both duplicates and outliers.
  • Power of Spark Streaming: Spark Streaming provides powerful APIs for managing stateful operations, making it an ideal tool for deduplication tasks. Its state management features allow for real-time handling of complex scenarios like tracking timestamps for individual sensors.
  • Scalable and Fault-Tolerant Pipelines: By combining Spark Streaming with Kafka, we can build a scalable, fault-tolerant data pipeline that efficiently processes, deduplicates, and filters incoming events in real-time, ensuring that only clean and meaningful data reaches downstream consumers.

Conclusion

Handling IoT data streams presents unique challenges, especially when it comes to filtering out duplicate and outlier events caused by sensor malfunctions or retransmissions.

By leveraging Spark Streaming and Kafka, we demonstrated how to implement a robust, stateful deduplication mechanism that ensures only clean and meaningful data flows through the pipeline.

The approach of maintaining the latest timestamp for each sensor not only eliminates duplicates but also discards outdated outlier events, enabling downstream services to process data more accurately and efficiently.

In a world increasingly driven by IoT and streaming data, investing in reliable deduplication mechanisms like this is essential to ensure the performance, reliability, and accuracy of your data pipelines.

By combining the right tools, such as Spark Streaming and Kafka, with thoughtful design, you can create systems that handle the challenges of real-time data processing with ease.

References

--

--

Rahul Sahoo
Rahul Sahoo

Written by Rahul Sahoo

I'm a Software Developer. Figuring out life.

No responses yet