Finalize minimal setup for s3-avro kafka connect

This commit is contained in:
Benedith Mulongo 2024-11-20 18:59:55 +01:00
parent f895c94cb8
commit 419f6fa266
Signed by: benedith
GPG key ID: 62D68B584B4B3EB3
9 changed files with 157 additions and 20 deletions

2
.gitignore vendored
View file

@ -4,5 +4,7 @@
**/*.config
**/*.example*
*.md
docs
./docs
**/*.md*
!README.md

View file

@ -1,7 +1,7 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: connect-configs
name: kafka-connect-cluster-configs
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster

View file

@ -1,7 +1,7 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: connect-offsets
name: kafka-connect-cluster-offsets
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster

View file

@ -1,7 +1,7 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: connect-status
name: kafka-connect-cluster-status
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster

17
docker/Dockerfile Normal file
View file

@ -0,0 +1,17 @@
# FROM confluentinc/cp-kafka-connect:7.0.9 as cp
FROM confluentinc/cp-kafka-connect:7.7.1 as cp
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.17
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:7.7.1
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY --from=cp /usr/share/confluent-hub-components/ /opt/kafka/plugins/
# Rename files
COPY rename_files.sh /opt/kafka/rename_files.sh
RUN chmod +x /opt/kafka/rename_files.sh && \
# Run the script during the build process
/opt/kafka/rename_files.sh && \
# Remove the script after execution
rm /opt/kafka/rename_files.sh

14
docker/rename_files.sh Normal file
View file

@ -0,0 +1,14 @@
#!/bin/bash
PLUGIN_DIR="/opt/kafka/plugins"
for folder in "$PLUGIN_DIR"/*; do
if [ -d "$folder" ]; then
# Remove the specific prefix from the folder name #=remove left expr
new_name="${folder#"$PLUGIN_DIR"/confluentinc-kafka-connect-}"
# Perform the move and handle errors
if mv "$folder" "$PLUGIN_DIR/$new_name"; then
echo "Successfully renamed: $folder to $PLUGIN_DIR/$new_name"
else
echo "Error renaming: $folder"
fi
fi
done

View file

@ -0,0 +1,42 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-cluster
namespace: kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
replicas: 1
bootstrapServers: kafka-cluster-kafka-plainext-bootstrap:9092
config:
group.id: kafka-connect-cluster
offset.storage.topic: kafka-connect-cluster-offsets
config.storage.topic: kafka-connect-cluster-configs
status.storage.topic: kafka-connect-cluster-status
key.converter: io.confluent.connect.avro.AvroConverter
value.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://89.47.191.210:30081
value.converter.schema.registry.url: http://89.47.191.210:30081
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
build:
output:
type: docker
image: platform.sunet.se/benedith/strimzi-kafka-connectors:latest
pushSecret: docker-platform-creds
plugins:
- name: kafka-connect-s3
artifacts:
- type: jar
url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/kafka-connect-s3-10.4.10.jar
sha512sum: 56927fe694c2eec13318f52179d6c255357dd8a391a2f67858b5db15d7c13ef14a7939c47ac4ef2664004e871d512127b2abd3ffe600ecd18177cb39c43e1d45
- name: confluent-avro-connector
artifacts:
- type: jar
url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/kafka-connect-avro-converter-7.6.3.jar
sha512sum: ff53b82746475eae7b4b7e85ec05eeaa16c458225979b05d2018ca594fc9fdd10996bad0c17dbc5bd3ae128c0fd646cfd3d2ea2e4058964c44584f0174f83b0b
template:
pod:
imagePullSecrets:
- name: docker-platform-creds

View file

@ -8,6 +8,10 @@ metadata:
spec:
replicas: 1
bootstrapServers: kafka-cluster-kafka-plainext-bootstrap:9092
# tls:
# trustedCertificates:
# - secretName: my-cluster-cluster-ca-cert
# certificate: ca.crt
config:
group.id: kafka-connect-cluster
offset.storage.topic: kafka-connect-cluster-offsets
@ -20,32 +24,34 @@ spec:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
externalConfiguration:
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: s3-minio-creds
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: s3-minio-creds
key: AWS_SECRET_ACCESS_KEY
build:
output:
type: docker
image: platform.sunet.se/benedith/das-kafka-connect-cluster:latest
pushSecret: docker-platform-creds
plugins:
- name: connect-api
artifacts:
- type: jar
url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/connect-api-3.9.0.jar
sha512sum: bf3287ae1552d1bef97865a96ebfd9c482d2f477c41b51234261d1dbb33b487ac225224a3a860a37536e970e99a3d00ebc6ed17cb67450becbad554d9f39f35f
- name: kafka-connect-storage-common
artifacts:
- type: jar
url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/kafka-connect-storage-common-11.1.10.jar
sha512sum: 1788e0168edbdd21b550d6ab412d3e26be0f04d4730e88b6a0d746fe6d77b32720b6aec487144ed9c72001e5ddfd1098e9c753bb12efa29cdd31e67d76d201db
- name: kafka-connect-s3
artifacts:
- type: jar
url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/kafka-connect-s3-10.4.10.jar
sha512sum: 56927fe694c2eec13318f52179d6c255357dd8a391a2f67858b5db15d7c13ef14a7939c47ac4ef2664004e871d512127b2abd3ffe600ecd18177cb39c43e1d45
- name: confluent-avro-connector
- type: zip
url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/confluentinc-kafka-connect-s3-10.5.17.zip
sha512sum: 51dc4eb5e618a7743b3d29c7c5586f5bda00a254a9f105ee816cad7c8e9509a7c1a1ea43351e77dcf97847900c21895962716ed6a1bfb2de4a2b4695233d8804
- name: avro-connector
artifacts:
- type: jar
url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/kafka-connect-avro-converter-7.6.3.jar
sha512sum: ff53b82746475eae7b4b7e85ec05eeaa16c458225979b05d2018ca594fc9fdd10996bad0c17dbc5bd3ae128c0fd646cfd3d2ea2e4058964c44584f0174f83b0b
- type: zip
url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/confluentinc-kafka-connect-avro-converter-7.7.1.zip
sha512sum: cebc6fece5c5551d3cff5f1cc8f4660e83da6292a9d695c1f8851af880661b2882e59ef0eeb3df395c3fc314e483cc26961d6a6df271237aab7ef2d8732af3f4
template:
pod:
imagePullSecrets:

56
s3-kafka-connector.yaml Normal file
View file

@ -0,0 +1,56 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: kafka-connect-s3-connector
namespace: kafka
labels:
strimzi.io/cluster: kafka-connect-cluster
spec:
class: io.confluent.connect.s3.S3SinkConnector
tasksMax: 2
config:
# Core Connector Configuration
connector.class: io.confluent.connect.s3.S3SinkConnector
topics: test-topic
# S3 Configuration
s3.region: us-east-1
s3.bucket.name: openlake-tmp
s3.part.size: '1073741824' # Part size for upload (1 GB)
# Performance tuning
flush.size: 1000
# MinIO (or S3) store URL (use environment variable for security)
store.url: https://play.min.io:50000
# Storage and Format Configuration
storage.class: io.confluent.connect.s3.storage.S3Storage
format.class: io.confluent.connect.s3.format.parquet.ParquetFormat
partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
path.format: "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH" # Added explicit path format
partition.duration.ms: 3600000 # Ensures hourly partitions for manageability
# Behavior settings
behavior.on.null.values: ignore
# Serialization
# * Key
key.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: false # Avro schemas usually not needed for keys
key.converter.schema.registry.url: http://schema-registry-release-cp-schema-registry:8081
# * Value
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://schema-registry-release-cp-schema-registry:8081
value.converter.schemas.enable: true
schema.compatibility: BACKWARD # Allow schema evolution
# Rotation and Batch Handling
rotate.interval.ms: 600000 # reduce overhead in high-throughput scenarios
key.converter.use.latest.version: true
value.converter.use.latest.version: true
# Optional:
s3.compression.type: gzip
store.kafka.keys: true
logging.level: debug