OnCompletion

Camel has this concept of a Unit of Work that encompass the Exchange. The unit of work among others supports synchronization callbacks that are invoked when the Exchange is complete. The callback API is defined in org.apache.camel.spi.Synchronization. From Camel 2.14 onwards we have an extended synchronization org.apache.camel.spi.SynchronizationRouteAware that have callbacks for route events.

Getting the UnitOfWork

You can get hold of the org.apache.camel.spi.UnitOfWork from org.apache.camel.Exchange with the method getUnitOfWork().

In Camel 2.0 we have added DSL for these callbacks using the new onCompletion DSL name.

onCompletion supports the following features:

  • scope: global and/or per route (route scope override all global scope)

  • multiple global scope

  • triggered either always, only if completed with success, or only if failed

  • onWhen predicate to only trigger in certain situations

  • Camel 2.14: mode: to define whether to run either before or after route consumer writes response back to callee (if its InOut)

  • Camel 2.14: whether to run async or sync (use a thread pool or not)

From Camel 2.14 onwards the onCompletion has been modified to support running the completion task in either synchronous or asynchronous mode (using a thread pool) and also whether to run before or after the route consumer is done. The reason is to give more flexibility. For example to specify to run synchronous and before the route consumer is done, which allows to modify the exchange before the consumer writes back any response to the callee. You can use this to for example add customer headers, or send to a log to log the response message, etc.

Changes from Camel 2.14 onwards The onCompletion has changed defaults and behavior from Camel 2.14 onwards. It now runs * Runs synchronously without any thread pool In Camel 2.13 the defaults were * Runs asynchronous using a thread pool

Camel 2.13 or older - On completion runs in separate thread

The onCompletion runs in a separate thread in parallel with the original route. It is therefore not intended to influence the outcome of the original route. The idea for on completion is to spin off a new thread to eg send logs to a central log database, send an email, send alterts to a monitoring system, store a copy of the result message etc.
Therefore if you want to do some work that influence the original route, then do not use onCompletion for that. Notice: if you use the UnitOfWork API as mentioned in the top of this page, then you can register a Synchronization callback on the Exchange which is executed in the original route. That way allows you to do some custom code when the route is completed; this is how custom components can enlist on completion services which they need, eg the File component does that for work that moves/deletes the original file etc.

onCompletion with route scope

The onCompletion DSL allows you to add custom routes/processors when the original Exchange is complete. Camel spin off a copy of the Exchange and routes it in a separate thread, kinda like a Wire Tap. This allows the original thread to continue while the onCompletion route is running concurrently. We decided for this model as we did not want the onCompletion route to interfere with the original route.

Only 1 onCompletion supported by route scope

You can only have 1 onCompletion in a route. Only at context scoped level you can have multiple. And notice that when you use a route scoped onCompletion then any context scoped are disabled for that given route.

from("direct:start")
    .onCompletion()
        // this route is only invoked when the original route is complete as a kind
        // of completion callback
        .to("log:sync")
        .to("mock:sync")
    // must use end to denote the end of the onCompletion route
    .end()
    // here the original route contiues
    .process(new MyProcessor())
    .to("mock:result");

By default the onCompletion will be triggered when the Exchange is complete and regardless if the Exchange completed with success or with an failure (such as an Exception was thrown). You can limit the trigger to only occur onCompleteOnly or by onFailureOnly as shown below:

from("direct:start")
    // here we qualify onCompletion to only invoke when the exchange failed (exception or FAULT body)
    .onCompletion().onFailureOnly()
        .to("log:sync")
        .to("mock:sync")
    // must use end to denote the end of the onCompletion route
    .end()
    // here the original route continues
    .process(new MyProcessor())
    .to("mock:result");

You can identify if the Exchange is an onCompletion Exchange as Camel will add the property Exchange.ON_COMPLETION with a boolean value of true when it spin offs the onCompletion Exchange.

Using onCompletion from Spring DSL

The onCompletion is defined like this with Spring DSL:

<route>
    <from uri="direct:start"/>
    <!-- this onCompletion block will only be executed when the exchange is done being routed -->
    <!-- this callback is always triggered even if the exchange failed -->
    <onCompletion>
        <!-- so this is a kinda like an after completion callback -->
        <to uri="log:sync"/>
        <to uri="mock:sync"/>
    </onCompletion>
    <process ref="myProcessor"/>
    <to uri="mock:result"/>
</route>

And the onCompleteOnly and onFailureOnly is defined as a boolean attribute on the <onCompletion> tag so the failure example would be:

<route>
    <from uri="direct:start"/>
    <!-- this onCompletion block will only be executed when the exchange is done being routed -->
    <!-- this callback is only triggered when the exchange failed, as we have onFailure=true -->
    <onCompletion onFailureOnly="true">
        <to uri="log:sync"/>
        <to uri="mock:sync"/>
    </onCompletion>
    <process ref="myProcessor"/>
    <to uri="mock:result"/>
</route>

onCompletion with global scope

This works just like the route scope except from the fact that they are defined globally. An example below:

// define a global on completion that is invoked when the exchange is complete
onCompletion().to("log:global").to("mock:sync");

from("direct:start")
    .process(new MyProcessor())
    .to("mock:result");

Using onCompletion from Spring DSL

This works just like the route scope except from the fact that they are defined globally. An example below:

<!-- this is a global onCompletion route that is invoke when any exchange is complete
     as a kind of after callback -->
<onCompletion>
    <to uri="log:global"/>
    <to uri="mock:sync"/>
</onCompletion>

<route>
    <from uri="direct:start"/>
    <process ref="myProcessor"/>
    <to uri="mock:result"/>
</route>

Route scope override Global scope If an onCompletion is defined in a route, it overrides all global scoped and thus its only the route scoped that are used. The globally scoped ones are never used.

Using onCompletion with onWhen predicate

As other DSL in Camel you can attach a Predicate to the onCompletion so it only triggers in certain conditions, when the predicate matches. For example to only trigger if the message body contains the word Hello we can do like:

from("direct:start")
    .onCompletion().onWhen(body().contains("Hello"))
        // this route is only invoked when the original route is complete as a kind
        // of completion callback. And also only if the onWhen predicate is true
        .to("log:sync")
        .to("mock:sync")
    // must use end to denote the end of the onCompletion route
    .end()
    // here the original route contiues
    .to("log:original")
    .to("mock:result");

Using onCompletion with or without thread pool

Since Camel 2.14

OnCompletion will from Camel 2.14 onwards not use thread pool by default. To use thread pool then either set a executorService or set parallelProcessing to true.

For example in Java DSL do

                onCompletion().parallelProcessing()
                    .to("mock:before")
                    .delay(1000)
                    .setBody(simple("OnComplete:${body}"));

And in XML DSL

      <onCompletion parallelProcessing="true">
        <to uri="before"/>
        <delay><constant>1000</constant></delay>
        <setBody><simple>OnComplete:${body}</simple></setBody>
      </onCompletion>

You can also refer to a specific thread pool to be used, using the executorServiceRef option

      <onCompletion executorServiceRef="myThreadPool">
        <to uri="before"/>
        <delay><constant>1000</constant></delay>
        <setBody><simple>OnComplete:${body}</simple></setBody>
      </onCompletion>

Using onCompletion to run before route consumer sends back response to callee

Since Camel 2.14

OnCompletion supports two modes

  • AfterConsumer - Default mode which runs after the consumer is done

  • BeforeConsumer - Runs before the consumer is done, and before the consumer writes back response to the callee

The AfterConsumer mode is the default mode which is the same behavior as in older Camel releases.

The new BeforeConsumer mode is used to run onCompletion before the consumer writes its response back to the callee (if in InOut mode). This allows the onCompletion to modify the Exchange, such as adding special headers, or to log the Exchange as a response logger etc.

For example to always add a "created by" header you use modeBeforeConsumer() as shown below:

    .onCompletion().modeBeforeConsumer()
        .setHeader("createdBy", constant("Someone"))
    .end()

And in XML DSL you set the mode attribute to BeforeConsumer:

      <onCompletion mode="BeforeConsumer">
        <setHeader name="createdBy">
          <constant>Someone</constant>
        </setHeader>
      </onCompletion>