Apache Camel and MongoDB: a match made in heaven

Some time ago I submitted an Apache Camel component to interact with MongoDB databases. It was quickly committed to trunk, and I'm glad to announce that it will officially see the light of day with the Camel 2.10 release, which is just around the corner! So I thought now is a good time to advertise to the world the virtues and powers of this component.

Data explosion!

Data explosion. A term that refers to the unstoppable growth of data in the virtual world on a per-millisecond basis.

Whether it's published by humans or by objects (think the Internet of Things), it doesn't matter, it's still data that can be turned into information to gain further intelligence and insight. According to IDC's Digital Universe study, published just one year ago:

In 2011 alone, 1.8 zettabytes (or 1.8 trillion gigabytes) of data will be created, the equivalent to every U.S. citizen writing 3 tweets per minute for 26,976 years. And over the next decade, the number of servers managing the world's data stores will grow by ten times.

Mobile devices, smartphones, tablets, etc. are highly responsible for this data tornado. Before, we had to wait to get home to read the online paper, blog or our emails. Now we do the exact same thing from literally anywhere. We're immersed in a culture of "I want it, and I want it now". Thousands of new apps being launched everyday, each of them producing hordes of data. It's intense.

To support these new orders of magnitude, technology is evolving at a rapid rate under the terms of Big Data, Elastic Cloud, Virtualisation, Platform As A Service, and even Green Computing to make this whole new level of infrastructure sustainable.

At the Apache Camel project, there's a lot of interest and uptake of Big Data and Cloud trends. Folks have committed an array of components to enable the intimate core/heart of your organisation, your Enterprise Service Bus, to interact directly with these technologies.

The MongoDB Camel component is one of them. So let's talk about what it offers YOU.

A MongoDB component for Camel

The technical name of the beast is camel-mongodb, and if you use Apache ServiceMix or Apache Karaf, you can simply install it as a feature, which will drag along the MongoDB Java driver (which is also ASL-licensed). It's designed from the ground up to be simple, lightweight and convenient.

It's capable of acting both as a producer and as a consumer endpoint. As a producer, it can invoke a number of operations on a MongoDB collection or database. As a consumer, it can "inhale" data from a collection into a Camel route, in a time-ordered fashion, with zero creation of beans and custom processors!

Moreover, bundled with the component are several type converters that plug into the Camel routing engine to automatically convert the payload to Mongo's DBObjects where necessary. So this little component does a lot of magic, for the sake of your sanity ;)

Additionally, its quality is guaranteed by over 25+ unit tests which execute with the Maven build iff (if and only if) you point the relevant properties file to a running MongoDB instance, either local or remote.

The official camel-mongodb documentation is already quite clear and detailed, so I won't bore you with technical details in this post. Instead, we'll take an eagle-eye view on the functionalities this component offers, both as a producer and as a consumer.

As a producer

The producer endpoint supports quite a few Mongo operations:

  • Query operations: findById, findOneByQuery, findAll, count
  • Write operations: insert, save, update
  • Delete operations: remove
  • Other operations: getDbStats, getColStats (to automate monitoring via Camel)

In total, 10 operations in its first version! All CRUD operations are covered, and even augmented with several variants when it comes to the Query side of things. All these operations map to MongoDB operations, so refer to their manual for any doubts.

So, how do you specify the operation a producer endpoint executes? You have two approaches:

  • statically, by specifying the operation name as an option on the endpoint URI
  • dynamically, by setting the CamelMongoDbOperation a header in the IN message

So in essence, you can have a multi-functional endpoint, or an endpoint that primarily deletes documents, but can also "moonlight" as a document inserter under specific circumstances (e.g. determined by a Content-Based Router EIP, Filter EIP, etc.). Useful, huh?

By the way, support for group and mapReduce queries is slated for future versions.

As a consumer: tailable cursor consumer endpoint

This is the feature I enjoyed coding the most ;) It allows you to pump data from a MongoDB collection into a Camel route, in real-time, just as documents are being appended to the collection.

In short words, a camel-mongodb consumer is able to bind to a capped collection so that the MongoDB server keeps pushing documents into the Camel route as they are being inserted it. For more information, refer to Tailable cursors.

Each record in the MongoDB collection gets pushed to the Camel route as an individual Exchange.

Persistent tail tracking is also a great feature of this component. It allows you to ensure that the consumer will pick up exactly where it left off after it comes back to life from a shutdown. To use this feature, you just need to specify an increasing correlation key, which can be a timestamp or any other MongoDB data type that supports comparisons (String, Dates, ObjectId, etc.).

But alas, when working with tailable cursors, MongoDB reserves the right to kill the cursor if data hasn't been available for a while, thus preventing it from wasting server resources. The camel-mongodb is aware of this behaviour and regenerates the cursor automatically. You can configure a delay via the cursorRegenerationDelay option.

Other remarkable features

MANY, many other features exist. Here are just a few:

  • Paging support via skip() and limit(). Values specified in message headers.
  • Supports upserts (atomic insert/update) and multiUpdates in update operations.
  • Query operations support field filtering (to only fetch specific fields from matching documents) and sorting.
  • Simple and extensible endpoint configuration, revolving around a org.mongodb.Mongo instance that you create in your Registry.
  • Database and collection to bind to are configurable as endpoint options, but can be dynamic for each Exchange processed (via Message Headers). In order to maximise throughput for scenarios where you won't be using this feature, you need to explicitly set dynamicity=true in the endpoint to advise the component to compute the DB/Collection for each incoming exchange.
  • Can reuse same Mongo instance for as many endpoints as you wish.
  • WriteConcern can be set at the endpoint level or at the Exchange level, using a standard one (see constant fields in MongoDB's WriteConcern Javadoc) or creating a custom one in your Registry.
  • Quickly instruct producer endpoints to call getLastError() after each write operation without setting a custom WriteConcern by using option invokeGetLastError=true.

How to go about using camel-mongodb in my Camel routes?

As I mentioned earlier, the official camel-mongodb documentation is very clear and verbose. Detailed enough to be a great starting point.

Additionally, you can also check out the unit tests. There are more than 25, and they illustrate most usage aspects of the component, both as a producer and as a consumer.

If you'd like me to write a post with concrete examples on how to use this component, please provide feedback in the comments and share this post on your social networks ;)

9 Responses so far.

  1. What's about use camel-mongodb with spring-framework? Which parameters must by passed?

  2. @Crazy LionHeart,

    It's simple. All you need to do is define an instance of "com.mongodb.Mongo" (the MongoDB API class that encapsulates a MongoDB connection) with the appropriate connection details. See docs here: http://api.mongodb.org/java/current/com/mongodb/Mongo.html.

    Make sure you provide a bean name, as you'll need to use it in your camel-mongodb endpoint URI as per the docs here: http://camel.apache.org/mongodb.html.

    For an example of the beans definition, you can check out the Unit Tests for camel-mongodb:
    * Bean definition here: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/test/resources/org/apache/camel/component/mongodb/mongoComponentTest.xml?view=markup
    * Test classes here: http://svn.apache.org/viewvc/camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/.

    Hope that helps!
    Raúl Kripalani.

  3. Please give a full example using the Spring DSL.

    Thanks,
    Robert

  4. @Robert Liguori,

    I just committed a new unit test for camel-mongodb using the Spring DSL. There you go: https://git-wip-us.apache.org/repos/asf?p=camel.git;a=blob_plain;f=components/camel-mongodb/src/test/resources/org/apache/camel/component/mongodb/mongoBasicOperationsTest.xml;hb=8e2fb690.

    Hope this helps,
    Raúl.

  5. i learn so many things from this blog.keep updating.thank you so much.

  6. good work ..looking forward to learn more from you..keep updating such information.

  7. Good stuff..keep updating such Good stuff...thanks for writing such a post...knowledge which can enhance learning of the students as well as working people.

  8. This comment has been removed by the author.
  9. Hi,

    Very nice article :)
    I'm using Camel and MongoDB amd I'm wondering how to do batch insert in Mongo with Camel.
    What I've done with Spring DSL:

    -- route--
    -- from uri="direct:data_mongodb" /--
    -- transform--
    -- method ref="DBObjectTransformer" method="transformDataIntoDBObject" /--
    -- /transform--
    -- aggregate strategyRef="batchDBDataAggregatorStrategy"
    completionInterval="3000"
    completionSize="1000"--
    -- correlationExpression--
    -- constant-- true --/constant--
    -- /correlationExpression--
    -- to uri="mongodb:dataSourceMongoDB?database=myDB&collection=myCollection&operation=insert&writeConcern=normal" /--
    -- /aggregate--
    -- /route--

    (I have problem to display code source, so I replace each < and > with -- )

    The "transformDataIntoDBObject" receives a message and transform it into a Map.
    The "batchDBDataAggregatorStrategy" aggregates all the Map received.

    There is no error but it seems that the batch is not working well.
    Let me explain: I have 1000 messages in my route.
    So there should be just one insert, shouldn't it ?

    Instead of that, I have 3 insertions, each 3 seconds, so the 1000 messages are inserted in the collection in 3 times.

    Do you have any idea about this or can you show me the right way to do batch insert ?

    Thanks :)

    Tia

Leave a Reply

Category

Category

Category

Category