Try it out on Kubernetes

You can use CamelKafkaConnector on plain Kubernetes with the Strimzi project, which provides a set of operators and container images to easily run Kafka on Kubernetes. This procedure assumes that you have cluster-admin access to a Kubernetes cluster (i.e. Minikube) with Internet access and an external registry for pushing images (i.e. quay.io).

Deploy Kafka and Kafka Connect

First, we create a new namespace and deploy a single-node Kafka cluster:

NAMESPACE="demo"
STRIMZI_VER="0.18.0"

# create a new namespace
kubectl create namespace $NAMESPACE && kubectl config set-context --current --namespace=$NAMESPACE

# deploy Strimzi operator
curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/$STRIMZI_VER/strimzi-cluster-operator-$STRIMZI_VER.yaml \
    | sed "s/namespace: .*/namespace: $NAMESPACE/g" | kubectl apply -f -

# deploy Kafka cluster
oc apply -f https://raw.githubusercontent.com/strimzi/strimzi-kafka-operator/release-${STRIMZI_VER/%.0/.x}/examples/kafka/kafka-persistent-single.yaml

Next, we build a custom KafkaConnect image to include all needed connectors (use your own registry here):

CKC_VERSION="0.3.0"
STRIMZI_IMG="strimzi/kafka:latest-kafka-2.5.0"
REGISTRY_URL="quay.io"
REGISTRY_USR="fvaleri"

TMP="/tmp/my-connect"
BASEURL="https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector"
PLUGINS=(
    "$BASEURL/camel-file-kafka-connector/$CKC_VERSION/camel-file-kafka-connector-$CKC_VERSION-package.zip"
    "$BASEURL/camel-sjms2-kafka-connector/$CKC_VERSION/camel-sjms2-kafka-connector-$CKC_VERSION-package.zip"
)

# dowload connect plugins
rm -rf $TMP && mkdir -p $TMP/plugins
for url in "${PLUGINS[@]}"; do
    curl -sL $url -o $TMP/plugins/file.zip && unzip -qq $TMP/plugins/file.zip -d $TMP/plugins
    rm -f $TMP/plugins/file.zip
done

# build and push the custom image
echo -e "FROM $STRIMZI_IMG\nCOPY ./plugins/ /opt/kafka/plugins/\nUSER 1001" > $TMP/Dockerfile
sudo podman build --layers=false -t $REGISTRY_USR/my-connect:1.0.0 -f $TMP/Dockerfile
sudo podman login -u $REGISTRY_USR $REGISTRY_URL
sudo podman push localhost/$REGISTRY_USR/my-connect:1.0.0 $REGISTRY_URL/$REGISTRY_USR/my-connect:1.0.0
sudo podman push localhost/$REGISTRY_USR/my-connect:1.0.0 $REGISTRY_URL/$REGISTRY_USR/my-connect:latest

Finally, we deploy the KafkaConnect cluster using our custom image:

cat <<'EOF' > $TMP/my-connect.yaml
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect
  annotations:
    # enable connect operator
    strimzi.io/use-connector-resources: "true"
spec:
  replicas: 1
  version: 2.5.0
  image: my/custom/image
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  resources:
    requests:
      memory: "1Gi"
    limits:
      memory: "1Gi"
  jvmOptions:
    gcLoggingEnabled: false
  config:
    group.id: my-connect
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    offset.storage.topic: my-connect-offsets
    config.storage.topic: my-connect-configs
    status.storage.topic: my-connect-status
    # single node setup
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
EOF

sed "s/image: .*/image: $REGISTRY_URL\/$REGISTRY_USR\/my-connect/g" $TMP/my-connect.yaml \
    | kubectl apply -f -

Create connector instance

A soon as the infrastructure is running, we can create an instance of a connector plugin:

kubectl apply -f - <<'EOF'
kind: KafkaConnector
apiVersion: kafka.strimzi.io/v1alpha1
metadata:
  name: file-sink
  labels:
    # must match connect cluster name
    strimzi.io/cluster: my-connect
spec:
  tasksMax: 1
  class: org.apache.camel.kafkaconnector.file.CamelFileSinkConnector
  config:
    topics: my-topic
    camel.sink.url: file:/tmp/?fileName=test.txt&fileExist=Append
EOF

You can check the status of the connector instance using:

kubectl describe kafkaconnector file-sink

Check received messages

To test the connector instance, we can send a message to the topic and see if it is written to file:

# send a message to Kafka
echo "Hello CamelKafkaConnector" | kubectl exec -i my-cluster-kafka-0 -c kafka -- \
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

# read the message from file
POD_NAME=$(kubectl get pods | grep my-connect | grep Running | cut -d " " -f1) && \
    kubectl exec -i $POD_NAME -- cat /tmp/test.txt