Reads accelerometer data to detect earthquakes in real time and enable historical analysis.
I developed this project during my Data Engineering fellowship with Insight Data Science. I made a video presenting this work; the video and accompany slides can be found below:
This readme can be considered a complement to the links above. There is some overlap, but the content in the readme is more detailed in some areas.
Earthquakes put thousands of people at risk each year. While we know where they are likely to happen, we do not know when they will strike.
This map shows areas of the world at high risk for earthquakes.
Many countries in these high risk zones have built early warning systems. These sytems attempt to detect earthquakes as they are starting.
Such systems have limitations. They are expensive and often slow to develop. There is rework since each country develops its own earthquake detection system on its own. Finally, the systems themselves often have a high false positive rate and therefore are not very effective.
To address these shortcomings, an organization called OpenEEW (EEW="early earthquake warning") has made an open source solution. It detects earthquakes using cheap $50 IoT accelerometers. As of 2020, they have deployed several dozen of these across Latin America, with plans to expand across the globe.
They have made the accelerometer data publically available on AWS: there is 1TB of it already on S3. This represents 3 years of sensor data, from 2017 to date.
I reviewed the documentation, code, and data available from OpenEEW. I determined that one area to improve is in building a data platform on top of their data. This would address two use cases:
-
Scale to enable more advanced real time detection As OpenEEW grows, I would expect them to add more sensors, which will be sending even more real-time data. Further, they may wish to deploy more advanced earthquake detection models than they are currently running. The increased load from additional data and more advanced mdoels would strain their current infrastructure.
-
Study historical earthquake events OpenEEW is collecting interesting and valuable data. However, at present there is no method for tagging and studying past earthquakes as detected by the system. Studying past earthquakes is likely of interest for analysts and researchers.
A data platform would help enable both of these use cases.
This demo some historical earthquakes are captured by my solution. I am visualizing the data in Tableau.
Larger circles indicate earthquakes of higher intensity. Darker circles indicate earthquakes with longer durations. The circles are at sensor locations in Mexico.
I deployed a Pulsar cluster using the "from bare metal" guide, available from Pulsar docs here. I hosted the cluster on several AWS EC2 instances.
The cluster contained the Pulsar brokers, bookies, and zookeeper. Two separate machines were used as the Pulsar producer and consumer.
The producer pulled down data from the OpenEEW S3 bucket, and then sent it to the Pulsar cluster. A simple threshold is used to detect high acceleration values. When this threshold is triggered (ie, due to real time detection), data is sent to Postgres for historical analysis. Finally, that historical data can be viewed in Tableau after applying some transformations in Postgres.
Here is a picture of the whole tech stack:
My AWS instances:
This shows how Pulsar cluster interacts with clients (further diagrams are available in Pulsar docs):
This section walks through the code available in this repo.
These scripts are run on a producer EC2 instance that sends data to Pulsar.
producer/import_data.py
link
This pulls down data from OpenEEW's S3 bucket by using the boto library, and stores the data on the consumer.
As shown in the code, I am only downloading a subset of the data, by limiting to specific dates and locations. I did not need all history for the purposes of this project.
OpenEEW has a library for downloading their data, but it is not functioning properly right now. I opened a ticket to report this bug. I had to implement the data pulling on my own; import_data.py would have been much shorter if the OpenEEW library had been working.
download_data() is the longest function in the file.
producer/send_data.py
link
This sends the data to the Pulsar cluster located at broker1_url. The logic is similar to import_data.py, but with some differences. It loops through the desired time ranges to locate and open the relevant data files. It then steps through each line of the file and sends each line to Pulsar.
send_data() is the longest function in the file. This function also contains some logic to time how quickly messages are being sent to the cluster.
This code is run on a consumer EC2 instance that receives data from Pulsar. I also installed postgres on this same instance. Postgres could be put on another machine instead, if an alternative architecture is desired.
consumer/consumer.py
link
This listens for messages being sent to the topic. As they are received, it checks if the values should be stored for later analysis. If so, it loads them to the local postgres database.
Let's recall the use case here. In this project, we are playing back history from 9/1/2020 - 9/10/2020. This is to simulate data coming across in real time. Now, most of this data being sent will be noise: slight shakings of the accelerometer from things like a nearby stream or a truck driving by. Actual earthquakes will be more rare, and also will cause much higher readings in the accelerometer. So the interesting data to store will be when an earthquake is going on, not when it is just noise.
I applied a simple threshold to determine if an accelerometer reading is "interesting". My heuristic just says, if the reading is above some level, then let's save it in postgres. (I didn't build an advanced earthquake detection algorithm, since this is not a data science project.)
Returning to the code: there are helper functions to make and insert lists. make_insertion_list is called on every line (=1 sec) of data. Each line contains ~35 readings for x, y, and z accelerometer readings. While we could store every one of those readings, for simplicity we aggregate it up to the sec level. We take the max and mean of the readings, and then check to see if the mean exceeds the threshold. If it does, then it is "interesting" and we want to store it.
We create a new row for postgres, which has the same number of columns as the table we will insert them in. The function returns a list of these rows.
Finally, back in the listen function, we insert the data into postgres.
consumer/process_accel_data.py
link
This script applies only to the postgres database. It does not connect to the Pulsar cluster. Whereas consumer.py would be running all the time, process_accel_data.py could be run on a daily or weekly basis.
This is an ETL job. It reads the table which the consumer has written to ('accel_data'), transforms it, and loads the result in 'sensor_events' table. (This table is what is used to create the Tableau)
The business purpose here is to group up acceleration readings into what I am calling sensor events. In the case of consecutive high readings, we have an event. For each such event we identify, we can make a key for it, and store when it started and how long it lasted. We can also calculate and store average and peak values.
As mentioned, I deployed a Pulsar cluster on AWS. I do not include the configuration files I used for that cluster. The code in this repo would have to be modified with the connection details of the particular Pulsar cluster you are connecting to.
Here are some interesting areas for further work on this project
Build a more sophisticated topic architecture. Right now, all data is sent to the 'sensors' topic. However, a DAG could be designed to direct messages from this topic to others. For example, we may want subtopics by country, or by intensity of detected acceleration. That is, you might have 'all-sensors' --> 'mexico-sensors' --> 'mexico-quakes'.
I spent a while trying to deploy functions in Pulsar, and was able to figure most of it out. However I have left the detection function in the consumer for now as it is simpler. A followup to this project might be to deploy that detection function within the cluster itself.
One interesting feature that originally attracted me to Pulsar is its ability for tiered storage. This allows you to store past messages in cheap storage such as S3, which you can then query with PulsarSQL. I didn't get time to implement these features of Pulsar but they would be interesting to explore.
I did some testing around message throughput and tried a few optimizations. However I was not able to increase it significantly. Further work could be done here.
I tried topic partitioning, which enables all the machines in the Pulsar cluster to read from the topic, and not just one. I also tried bulk acknowledgement, which acks many messages at once instead of one by one.
For now, I am doing an async_send, which sends messages as fast as possible without waiting for an acknowledgement.
In my consumer I am also writing to Postgres. I did confirm this is not responsible for slowing down the receiving of the Pulsar messages.
You could connect to OpenEEW's real time messaging service and process those messages in Pulsar.
In this project I instead played back some of history to simulate the data coming across in real time.
Optimizing the detection is more of a data science undertaking. However OpenEEW already does have some pretty good detection functions in their repo. This project could use those detection functions.
Many national governments store data about earthquakes. You could pull down this data, and land it in postgres. From there you could compare government-reported quakes with OpenEEW-detected quakes to see what the differences are.