Poll Enrich

Camel comes with flavor pollEnrich as a choice of content enricher in the DSL. The other one is enrich

pollEnrich uses a Polling Consumer to obtain the additional data. It is usually used for Event Message messaging, for instance to read a file or download a FTP file.

Good practice to use timeout value. By default Camel will use the receive. Which may block until there is a message available. It is therefore recommended to always provide a timeout value, to make this clear that we may wait for a message, until the timeout is hit.

If there is no data then the newExchange in the aggregation strategy is null.

You can pass in a timeout value that determines which method to use

  • if timeout is -1 or other negative number then receive is selected (Important: the receive method may block if there is no message)

  • if timeout is 0 then receiveNoWait is selected

  • otherwise receive(timeout) is selected

The timeout values is in millis.

pollEnrich does not access any data from the current Exchange which means when polling it cannot use any of the existing headers you may have set on the Exchange. For example you cannot set a filename in the Exchange.FILE_NAME header and use pollEnrich to consume only that file. For that you must set the filename in the endpoint URI. Both enrich and pollEnrich supports dynamic endpoints that uses an Expression to compute the uri, which allows to use data from the current Exchange. In other words all what is told above no longer apply and it just works.

See the cacheSize option for more details on how much cache to use depending on how many or few unique endpoints are used.

Content enrichment using pollEnrich

The pollEnrich works just as the enrich however as it uses a Polling Consumer we have 3 methods when polling

  • receive

  • receiveNoWait

  • receive(timeout)

PollEnrich Options

The Poll Enrich EIP supports 7 options which are listed below:

Name Description Default Type

timeout

Timeout in millis when polling from the external service. The timeout has influence about the poll enrich behavior. It basically operations in three different modes: negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available. 0 - Attempts to receive a message exchange immediately without waiting and returning null if a message exchange is not available yet. positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns null if timed out The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value

-1

String

strategyRef

Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. By default Camel will use the reply from the external service as outgoing message.

String

strategyMethodName

This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.

String

strategyMethodAllowNull

If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy.

false

Boolean

aggregateOnException

If this option is false then the aggregate method is not used if there was an exception thrown while trying to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what to do if there was an exception in the aggregate method. For example to suppress the exception or set a custom message body etc.

false

Boolean

cacheSize

Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse consumers when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints are stored in memory in the caches. However if there are a high degree of dynamic endpoints that have been used before, then it can benefit to use the cache to reuse both producers and endpoints and therefore the cache size can be set accordingly or rely on the default size (1000). If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size can help reduce memory usage to avoid storing too many non frequent used producers.

Integer

ignoreInvalidEndpoint

Ignore the invalidate endpoint exception when try to create a producer with that endpoint

Integer

PollEnrich Example

In this example we enrich the message by loading the content from the file named inbox/data.txt.

from("direct:start")
  .pollEnrich("file:inbox?fileName=data.txt")
  .to("direct:result");

And in XML DSL you do:

<route>
  <from uri="direct:start"/>
  <pollEnrich>
    <constant>file:inbox?fileName=data.txt</constant>
  </pollEnrich>
  <to uri="direct:result"/>
</route>

If there is no file then the message is empty. We can use a timeout to either wait (potentially forever) until a file exists, or use a timeout to wait a certain period.

For example to wait up to 5 seconds you can do:

<route>
  <from uri="direct:start"/>
  <pollEnrich timeout="5000">
    <constant>file:inbox?fileName=data.txt</constant>
  </pollEnrich>
  <to uri="direct:result"/>
</route>

To use it as a consumer in the middle of a REST Get route downloading a file from AWS S3 as the response of an API call.

<rest path="/report">
    <description>Report REST API</description>
    <get uri="/{id}/payload">
        <route id="report-payload-download">
            <pollEnrich id="pollEnrich">
                <simple>aws-s3:xavier-dev?amazonS3Client=#s3client&amp;deleteAfterRead=false&amp;fileName=report-file.pdf</simple>
            </pollEnrich>
        </route>
    </get>
</rest>

Using dynamic uris

Both enrich and pollEnrich supports using dynamic uris computed based on information from the current Exchange. For example to pollEnrich from an endpoint that uses a header to indicate a SEDA queue name:

from("direct:start")
  .pollEnrich().simple("seda:${header.name}")
  .to("direct:result");

And in XML DSL

<route>
  <from uri="direct:start"/>
  <pollEnrich>
    <simple>seda:${header.name}</simple>
  </pollEnrich>
  <to uri="direct:result"/>
</route>