Elastichsearch Rest

Since Camel 2.21

The ElasticSearch component allows you to interface with an ElasticSearch 6.x API using the REST Client library.

Maven users will need to add the following dependency to their pom.xml for this component:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-elasticsearch-rest</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

URI format

elasticsearch-rest://clusterName[?options]

Endpoint Options

The Elastichsearch Rest component supports 12 options, which are listed below.

Name Description Default Type

client (advanced)

To use an existing configured Elasticsearch client, instead of creating a client per endpoint. This allow to customize the client with specific settings.

RestClient

hostAddresses (advanced)

Comma separated list with ip:port formatted remote transport addresses to use. The ip and port options must be left blank for hostAddresses to be considered instead.

String

socketTimeout (advanced)

The timeout in ms to wait before the socket will timeout.

30000

int

connectionTimeout (advanced)

The time in ms to wait before connection will timeout.

30000

int

user (security)

Basic authenticate user

String

password (security)

Password for authenticate

String

enableSSL (security)

Enable SSL

false

Boolean

maxRetryTimeout (advanced)

The time in ms before retry

30000

int

enableSniffer (advanced)

Enable automatically discover nodes from a running Elasticsearch cluster

false

Boolean

snifferInterval (advanced)

The interval between consecutive ordinary sniff executions in milliseconds. Will be honoured when sniffOnFailure is disabled or when there are no failures between consecutive sniff executions

300000

int

sniffAfterFailureDelay (advanced)

The delay of a sniff execution scheduled after a failure (in milliseconds)

60000

int

resolveProperty Placeholders (advanced)

Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders.

true

boolean

The Elastichsearch Rest endpoint is configured using URI syntax:

elasticsearch-rest:clusterName

with the following path and query parameters:

Path Parameters (1 parameters):

Name Description Default Type

clusterName

Required Name of the cluster

String

Query Parameters (13 parameters):

Name Description Default Type

connectionTimeout (producer)

The time in ms to wait before connection will timeout.

30000

int

disconnect (producer)

Disconnect after it finish calling the producer

false

boolean

enableSSL (producer)

Enable SSL

false

boolean

hostAddresses (producer)

Required Comma separated list with ip:port formatted remote transport addresses to use.

String

indexName (producer)

The name of the index to act against

String

indexType (producer)

The type of the index to act against

String

maxRetryTimeout (producer)

The time in ms before retry

30000

int

operation (producer)

What operation to perform

ElasticsearchOperation

scrollKeepAliveMs (producer)

Time in ms during which elasticsearch will keep search context alive

60000

int

socketTimeout (producer)

The timeout in ms to wait before the socket will timeout.

30000

int

useScroll (producer)

Enable scroll usage

false

boolean

waitForActiveShards (producer)

Index creation waits for the write consistency number of shards to be available

1

int

synchronous (advanced)

Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported).

false

boolean

Spring Boot Auto-Configuration

When using Spring Boot make sure to use the following Maven dependency to have support for auto configuration:

<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-elasticsearch-rest-starter</artifactId>
  <version>x.x.x</version>
  <!-- use the same version as your Camel core version -->
</dependency>

The component supports 13 options, which are listed below.

Name Description Default Type

camel.component.elasticsearch-rest.client

To use an existing configured Elasticsearch client, instead of creating a client per endpoint. This allow to customize the client with specific settings. The option is a org.elasticsearch.client.RestClient type.

String

camel.component.elasticsearch-rest.connection-timeout

The time in ms to wait before connection will timeout.

30000

Integer

camel.component.elasticsearch-rest.enable-s-s-l

Enable SSL

false

Boolean

camel.component.elasticsearch-rest.enable-sniffer

Enable automatically discover nodes from a running Elasticsearch cluster

false

Boolean

camel.component.elasticsearch-rest.enabled

Whether to enable auto configuration of the elasticsearch-rest component. This is enabled by default.

Boolean

camel.component.elasticsearch-rest.host-addresses

Comma separated list with ip:port formatted remote transport addresses to use. The ip and port options must be left blank for hostAddresses to be considered instead.

String

camel.component.elasticsearch-rest.max-retry-timeout

The time in ms before retry

30000

Integer

camel.component.elasticsearch-rest.password

Password for authenticate

String

camel.component.elasticsearch-rest.resolve-property-placeholders

Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders.

true

Boolean

camel.component.elasticsearch-rest.sniff-after-failure-delay

The delay of a sniff execution scheduled after a failure (in milliseconds)

60000

Integer

camel.component.elasticsearch-rest.sniffer-interval

The interval between consecutive ordinary sniff executions in milliseconds. Will be honoured when sniffOnFailure is disabled or when there are no failures between consecutive sniff executions

300000

Integer

camel.component.elasticsearch-rest.socket-timeout

The timeout in ms to wait before the socket will timeout.

30000

Integer

camel.component.elasticsearch-rest.user

Basic authenticate user

String

Message Operations

The following ElasticSearch operations are currently supported. Simply set an endpoint URI option or exchange header with a key of "operation" and a value set to one of the following. Some operations also require other parameters or the message body to be set.

operation message body description

Index

Map, String, byte[], XContentBuilder or IndexRequest content to index

Adds content to an index and returns the content’s indexId in the body. You can set the indexId by setting the message header with the key "indexId".

GetById

String or GetRequest index id of content to retrieve

Retrieves the specified index and returns a GetResult object in the body

Delete

String or DeleteRequest index name and type of content to delete

Deletes the specified indexName and indexType and returns a DeleteResponse object in the body

DeleteIndex

String or DeleteRequest index name of the index to delete

Deletes the specified indexName and returns a status code the body

BulkIndex

a List, BulkRequest, or Collection of any type that is already accepted (XContentBuilder, Map, byte[], String)

Adds content to an index and return a List of the id of the successfully indexed documents in the body

Bulk

a List, BulkRequest, or Collection of any type that is already accepted (XContentBuilder, Map, byte[], String)

Adds content to an index and returns the BulkItemResponse[] object in the body

Search

Map, String or SearchRequest

Search the content with the map of query string

MultiSearch

MultiSearchRequest

Multiple search in one

Exists

Index name(indexName) as header

Checks the index exists or not and returns a Boolean flag in the body

Update

Map, UpdateRequest, String, byte[] or XContentBuilder content to update

Updates content to an index and returns the content’s indexId in the body.

Ping

None

Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise

Info

None

Get the info of the Elasticsearch cluster and returns them as MainResponse class instance

Configure the component and enable basic authentication

To use the Elasticsearch component is has to be configured with a minimum configuration.

ElasticsearchComponent elasticsearchComponent = new ElasticsearchComponent();
elasticsearchComponent.setHostAddresses("myelkhost:9200");
camelContext.addComponent("elasticsearch-rest", elasticsearchComponent);

For basic authentication with elasticsearch or using reverse http proxy in front of the elasticsearch cluster, simply setup basic authentication and SSL on the component like the example below

ElasticsearchComponent elasticsearchComponent = new ElasticsearchComponent();
elasticsearchComponent.setHostAddresses("myelkhost:9200");
elasticsearchComponent.setUser("elkuser");
elasticsearchComponent.setPassword("secure!!");
elasticsearchComponent.setEnableSSL(true);

camelContext.addComponent("elasticsearch-rest", elasticsearchComponent);

Index Example

Below is a simple INDEX example

from("direct:index")
  .to("elasticsearch-rest://elasticsearch?operation=Index&indexName=twitter&indexType=tweet");
<route>
    <from uri="direct:index" />
    <to uri="elasticsearch-rest://elasticsearch?operation=Index&indexName=twitter&indexType=tweet"/>
</route>

For this operation you’ll need to specify a indexId header.

A client would simply need to pass a body message containing a Map to the route. The result body contains the indexId created.

Map<String, String> map = new HashMap<String, String>();
map.put("content", "test");
String indexId = template.requestBody("direct:index", map, String.class);

Search Example

Searching on specific field(s) and value use the Operation ´Search´. Pass in the query JSON String or the Map

from("direct:search")
  .to("elasticsearch-rest://elasticsearch?operation=Search&indexName=twitter&indexType=tweet");
<route>
    <from uri="direct:search" />
    <to uri="elasticsearch-rest://elasticsearch?operation=Search&indexName=twitter&indexType=tweet"/>
</route>
String query = "{\"query\":{\"match\":{\"content\":\"new release of ApacheCamel\"}}}";
SearchHits response = template.requestBody("direct:search", query, SearchHits.class);

Search on specific field(s) using Map.

Map<String, Object> actualQuery = new HashMap<>();
actualQuery.put("content", "new release of ApacheCamel");

Map<String, Object> match = new HashMap<>();
match.put("match", actualQuery);

Map<String, Object> query = new HashMap<>();
query.put("query", match);
SearchHits response = template.requestBody("direct:search", query, SearchHits.class);

Search using Elasticsearch scroll api in order to fetch all results.

from("direct:search")
  .to("elasticsearch-rest://elasticsearch?operation=Search&indexName=twitter&indexType=tweet&useScroll=true&scrollKeepAliveMs=30000");
<route>
    <from uri="direct:search" />
    <to uri="elasticsearch-rest://elasticsearch?operation=Search&indexName=twitter&indexType=tweet&useScroll=true&scrollKeepAliveMs=30000"/>
</route>
String query = "{\"query\":{\"match\":{\"content\":\"new release of ApacheCamel\"}}}";
try (ElasticsearchScrollRequestIterator response = template.requestBody("direct:search", query, ElasticsearchScrollRequestIterator.class)) {
    // do something smart with results
}

Split EIP can also be used.

from("direct:search")
  .to("elasticsearch-rest://elasticsearch?operation=Search&indexName=twitter&indexType=tweet&useScroll=true&scrollKeepAliveMs=30000")
  .split()
  .body()
  .streaming()
  .to("mock:output")
  .end();

MultiSearch Example

MultiSearching on specific field(s) and value use the Operation ´MultiSearch´. Pass in the MultiSearchRequest instance

from("direct:multiSearch")
  .to("elasticsearch-rest://elasticsearch?operation=MultiSearch");
<route>
    <from uri="direct:multiSearch" />
    <to uri="elasticsearch-rest://elasticsearch?operation=MultiSearch"/>
</route>

MultiSearch on specific field(s)

SearchRequest req = new SearchRequest();
req.indices("twitter");
req.types("tweet");
SearchRequest req1 = new SearchRequest();
req.indices("twitter");
req.types("tweets");
MultiSearchRequest request = new MultiSearchRequest().add(req1).add(req);
Item[] response = template.requestBody("direct:search", request, Item[].class);