ElasticSearch Change Feed : Case Study

I have been working in the database synchronization domain for more than a year now. I mostly perform migration from one system to other and for that, we need synchronization of data from the existing system to newly adopted system, mainly NoSQL databases such as MongoDB, Cassandra, and ElasticSearch.

One time data migration is easy. All you need to do is write some piece of code to pull data in streams and put a queue in between of the data sources to help you scale and ingest.

But, the real challenge is ongoing synchronization. Most of the time, we require multiple databases engine such as MongoDB to store data, MySQL or relational database to handle billing etc, ElasticSearch to work with Search. All these databases need to be synchronized in order to have consistency in the system.

This is also referred as “Polyglot Persistence” in software architecture domain.

In order to trigger a function to perform the data synchronization, you need an event from the master database. If you are using a database such as RethinkDB, MongoDB, CouchDB then they do provide change feed where you can listen and monitor the changes in the database.

This way when there is any change in the master database, you listen to that change, push that change in your queue and let the listener to the queue do the rest which is indexing into a secondary database such as ElasticSearch or MySQL etc.

But, Recently I faced a situation where I had to make an API call to third party system when there is any change in ElasticSearch Index. To be clear, ElasticSearch has no such feature. There is an issue created for that in GitHub but no progress yet.

I wanted something like this.

ElasticSearch Changefeed

I somehow knew of Logstash, a product by ElasticSearch team to perform the data migration between various systems. In Logstash, you can specify input system say ElasticSearch and output system say RabbitMQ. Once you specify it, Logstash looks for the changes (At least what I thought) and push data in output system.

I was like, this is it. Problem solved!

But there was one issue. Here is the configuration which I wrote and passed to Logstash.

input {
        elasticsearch {
                hosts => ["localhost:9200"]
                index => "liferay-20116"
        }
}
output {
      kafka {
        topic_id => "elastic-topic-index"
        codec => json
      }
}

It worked, except one issue.

It doesn’t really look for changes. As per my understanding, on some time interval, it just runs the query in ElasticSearch (You can change query as you like) and pushes the data in output. I was like hundreds of document every single minute. Within few hours, my temp directory got full and Kafka crashed!

I got to know at that point that this won’t work.

So I started looking for something more lightweight and less dependent. I found a plugin which we can Install inside ElasticSearch to listen to real-time feeds. I thought to give it a shot and it worked!

In order to make it work, you need to install a plugin. Go to your ElasticSearch directory and then bin folder and run this command.

./plugin install https://github.com/jurgc11/es-change-feed-plugin/releases/download/{version}/es-changes-feed-plugin.zip

Replace the version with the supported version of ElasticSearch. It supports following versions.

1.4, 2.2, 2.3, 2.4, 5.3, 5.4

Select the version and append v in front. So for example, if your Elasticsearch version is 5.4 then download URL would be.

./plugin install https://github.com/jurgc11/es-change-feed-plugin/releases/download/v5.4/es-changes-feed-plugin.zip

This will install the plugin and push the changes to WebSocket port 9400. You need to listen to this port using web socket listener. If you are using Node.js, then here is a sample code for you to listen to port 9400 for changes.

You need to install ws module first. Run this command under project directory.

npm i --S ws

Then create a new file and paste this code.

var WebSocket = require('ws');
// if ES is on different machine then replace localhost with machine IP.
var ws = new WebSocket("ws://localhost:9400/ws/_changes");

process.stdin.resume();
process.stdin.setEncoding('utf8');

process.stdin.on('data', function(message) {
  message = message.trim();
  ws.send(message, console.log.bind(null, 'Sent : ', message));
});

ws.on('message', function(message) {
  // You recieve the messages here
  console.log('Received: ' + message);
});

ws.on('close', function(code) {
  console.log('Disconnected: ' + code);
});

ws.on('error', function(error) {
  console.log('Error: ' + error.code);
});

Run the code and change something in ElasticSearch, you should see a message on the console.

Depending upon the data in your ElasticSearch Server, you may see different console then mine which is shown below.

Received: {"_index":"liferay-20116","_type":"LiferayDocumentType","_id":"com.liferay.document.library.kernel.model.DLFileEntry_PORTLET_38771","_timestamp":"2017-10-10T07:30:08.011Z","_version":1,"_operation":"DELETE"}

And it worked!

Now all I need to do is invoke some other function on the event of a change to do my further task.

There is some limitation with this approach as well and they are:

  • Only sends live changes – if there is no listener to socket, change event is gone!
  • Doesn’t send the initial state of any documents.
  • You need to handle error and failing nodes.

If there is any better way to solve it then please let me know in comments 🙂

Shahid
Shahid

Founder of Codeforgeek. Technologist. Published Author. Engineer. Content Creator. Teaching Everything I learn!

Articles: 126