Distributed Multi Source Continuous Data Pipeline

In this series I will detail a solution to a common problem with distributed data aggregation. We want to build a web application that displays current price and location data for EV charging stations on a map. The data is scraped from websites, sourced by the government or similar data sources. The here described solution has been in production for years. So it is known to solve the above problem.

The above problem includes these sub problems:

  • Avoiding duplicates
  • Dealing with multiple sources for the same data
  • Having static and dynamic data mixed (e.g. location and price)
  • Merging the data from multiple sources into a single record

Requirements, Assumptions and Constraints

  • We want to scrape the data on a regular basis
  • We want to only update changed datasets
  • We want to handle deletion of the data at the source (e.g EV charger is out of order)
  • We want to handle explicit bad data records (some sources might have stale data that we need to be able to act up up, e.g. identified by customer feedback)
  • We want to accept pushed data, for example if we provide an API for EV charging providers to update their data
  • We want to deduplicate the data
  • We want to be able to change our merging code at all times without mixing the data
  • We want to generate multiple output formats e.g. map tiles, json
  • The update should happen with minimal latency
  • We have to respect that we talk about geo location data
  • Weight data from different sources independently

Evolutionary approach

Since we have to cover a lot of ground, we will tackle the whole solution post by post and solve one problem at a time. We start of with the fundamental idea of the system design and then work though the list of requirements, assumptions and constraints.

Fundamental idea

Every customer facing record has a UUID as the primary identifier. The UUID can be computed and made conflict free. The UUID will from now on be called ID to ease reading.

We separate the input and output data into two separate databases. The input database contains wrong, stale, inaccurate data. The input database has multiple source records for a single final EV charging record. The output database only contains a single final record.

Example record for a source A:

{
   "_id": "d39fed96-58ef-446d-9039-d1c56e7be592/A",
   "_source": "A",
   "id": "optimile-a82s12",
   "name": "Optimile",
   "street": "Munthofstraat",
   "city": "1060 Saint-Gilles"
}

Example record for a source B:

{
   "_id": "d39fed96-58ef-446d-9039-d1c56e7be592/B",
   "_source": "B",
   "id": "price-727s",
   "street": "Munthofstraat 74",
   "price_per_kwh": "0.24"
}

Bringing the two records together using a range select across all records for d39fed96-58ef-446d-9039-d1c56e7be592 is easy and with a tuned merge function could result in the following final record:

{
   "_id": "d39fed96-58ef-446d-9039-d1c56e7be592",
   "_sources": ["A", "B"],
   "name": "Optimile",
   "street": "Munthofstraat 74",
   "city": "1060 Saint-Gilles",
   "price_per_kwh": "0.24"
}

Find the right ID

The source will need to lookup the _id for the record. It is important for the source to avoid adding duplicate data. The source can use the known id (unique id of the source record) and make a simple lookup like findDoc(where: { "_source": "A", "id": "optimile-a82s12" }). The found record in the source database will also help to identify if the record has to be updated and also return the _id.

The only question left open is, how do we create the _id to begin with? This is a question for the next post.