Try it out on Kubernetes
You can use CamelKafkaConnector on plain Kubernetes with the Strimzi project,
which provides a set of operators and container images to easily run Kafka on Kubernetes. This
procedure assumes that you have cluster-admin
access to a Kubernetes cluster (i.e. Minikube)
with Internet access and an external registry for pushing images (i.e. quay.io).
Deploy Kafka and Kafka Connect
First, we create a new namespace and deploy a single-node Kafka cluster:
NAMESPACE="demo"
STRIMZI_VER="0.18.0"
# create a new namespace
kubectl create namespace $NAMESPACE && kubectl config set-context --current --namespace=$NAMESPACE
# deploy Strimzi operator
curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/$STRIMZI_VER/strimzi-cluster-operator-$STRIMZI_VER.yaml \
| sed "s/namespace: .*/namespace: $NAMESPACE/g" | kubectl apply -f -
# deploy Kafka cluster
oc apply -f https://raw.githubusercontent.com/strimzi/strimzi-kafka-operator/release-${STRIMZI_VER/%.0/.x}/examples/kafka/kafka-persistent-single.yaml
Next, we build a custom KafkaConnect image to include all needed connectors (use your own registry here):
CKC_VERSION="0.3.0"
STRIMZI_IMG="strimzi/kafka:latest-kafka-2.5.0"
REGISTRY_URL="quay.io"
REGISTRY_USR="fvaleri"
TMP="/tmp/my-connect"
BASEURL="https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector"
PLUGINS=(
"$BASEURL/camel-file-kafka-connector/$CKC_VERSION/camel-file-kafka-connector-$CKC_VERSION-package.zip"
"$BASEURL/camel-sjms2-kafka-connector/$CKC_VERSION/camel-sjms2-kafka-connector-$CKC_VERSION-package.zip"
)
# dowload connect plugins
rm -rf $TMP && mkdir -p $TMP/plugins
for url in "${PLUGINS[@]}"; do
curl -sL $url -o $TMP/plugins/file.zip && unzip -qq $TMP/plugins/file.zip -d $TMP/plugins
rm -f $TMP/plugins/file.zip
done
# build and push the custom image
echo -e "FROM $STRIMZI_IMG\nCOPY ./plugins/ /opt/kafka/plugins/\nUSER 1001" > $TMP/Dockerfile
sudo podman build --layers=false -t $REGISTRY_USR/my-connect:1.0.0 -f $TMP/Dockerfile
sudo podman login -u $REGISTRY_USR $REGISTRY_URL
sudo podman push localhost/$REGISTRY_USR/my-connect:1.0.0 $REGISTRY_URL/$REGISTRY_USR/my-connect:1.0.0
sudo podman push localhost/$REGISTRY_USR/my-connect:1.0.0 $REGISTRY_URL/$REGISTRY_USR/my-connect:latest
Finally, we deploy the KafkaConnect cluster using our custom image:
cat <<'EOF' > $TMP/my-connect.yaml
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: my-connect
annotations:
# enable connect operator
strimzi.io/use-connector-resources: "true"
spec:
replicas: 1
version: 2.5.0
image: my/custom/image
bootstrapServers: my-cluster-kafka-bootstrap:9092
resources:
requests:
memory: "1Gi"
limits:
memory: "1Gi"
jvmOptions:
gcLoggingEnabled: false
config:
group.id: my-connect
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
offset.storage.topic: my-connect-offsets
config.storage.topic: my-connect-configs
status.storage.topic: my-connect-status
# single node setup
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
EOF
sed "s/image: .*/image: $REGISTRY_URL\/$REGISTRY_USR\/my-connect/g" $TMP/my-connect.yaml \
| kubectl apply -f -
Create connector instance
A soon as the infrastructure is running, we can create an instance of a connector plugin:
kubectl apply -f - <<'EOF'
kind: KafkaConnector
apiVersion: kafka.strimzi.io/v1alpha1
metadata:
name: file-sink
labels:
# must match connect cluster name
strimzi.io/cluster: my-connect
spec:
tasksMax: 1
class: org.apache.camel.kafkaconnector.file.CamelFileSinkConnector
config:
topics: my-topic
camel.sink.url: file:/tmp/?fileName=test.txt&fileExist=Append
EOF
You can check the status of the connector instance using:
kubectl describe kafkaconnector file-sink
Check received messages
To test the connector instance, we can send a message to the topic and see if it is written to file:
# send a message to Kafka
echo "Hello CamelKafkaConnector" | kubectl exec -i my-cluster-kafka-0 -c kafka -- \
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
# read the message from file
POD_NAME=$(kubectl get pods | grep my-connect | grep Running | cut -d " " -f1) && \
kubectl exec -i $POD_NAME -- cat /tmp/test.txt