From 419f6fa2664030e6a0b64c7c03f57ff0757858a3 Mon Sep 17 00:00:00 2001 From: Benedith Mulongo Date: Wed, 20 Nov 2024 18:59:55 +0100 Subject: [PATCH] Finalize minimal setup for s3-avro kafka connect --- .gitignore | 2 + .../connect-configs-topic.yaml | 2 +- .../connect-offsets-topic.yaml | 2 +- .../connect-status-topic.yaml | 2 +- docker/Dockerfile | 17 ++++++ docker/rename_files.sh | 14 +++++ docker/s3-kafka-connect-manual-build.yaml | 42 ++++++++++++++ s3-kafka-connect.yaml | 40 +++++++------ s3-kafka-connector.yaml | 56 +++++++++++++++++++ 9 files changed, 157 insertions(+), 20 deletions(-) create mode 100644 docker/Dockerfile create mode 100644 docker/rename_files.sh create mode 100644 docker/s3-kafka-connect-manual-build.yaml create mode 100644 s3-kafka-connector.yaml diff --git a/.gitignore b/.gitignore index 5901a65..9f1b47c 100644 --- a/.gitignore +++ b/.gitignore @@ -4,5 +4,7 @@ **/*.config **/*.example* *.md +docs +./docs **/*.md* !README.md diff --git a/connector-mgmt-topics/connect-configs-topic.yaml b/connector-mgmt-topics/connect-configs-topic.yaml index 91c3217..d999840 100644 --- a/connector-mgmt-topics/connect-configs-topic.yaml +++ b/connector-mgmt-topics/connect-configs-topic.yaml @@ -1,7 +1,7 @@ apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: - name: connect-configs + name: kafka-connect-cluster-configs namespace: kafka labels: strimzi.io/cluster: kafka-cluster diff --git a/connector-mgmt-topics/connect-offsets-topic.yaml b/connector-mgmt-topics/connect-offsets-topic.yaml index ae769ab..7f8086b 100644 --- a/connector-mgmt-topics/connect-offsets-topic.yaml +++ b/connector-mgmt-topics/connect-offsets-topic.yaml @@ -1,7 +1,7 @@ apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: - name: connect-offsets + name: kafka-connect-cluster-offsets namespace: kafka labels: strimzi.io/cluster: kafka-cluster diff --git a/connector-mgmt-topics/connect-status-topic.yaml b/connector-mgmt-topics/connect-status-topic.yaml index a6a338c..9d8f8cc 100644 --- a/connector-mgmt-topics/connect-status-topic.yaml +++ b/connector-mgmt-topics/connect-status-topic.yaml @@ -1,7 +1,7 @@ apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: - name: connect-status + name: kafka-connect-cluster-status namespace: kafka labels: strimzi.io/cluster: kafka-cluster diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 0000000..c106975 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,17 @@ +# FROM confluentinc/cp-kafka-connect:7.0.9 as cp +FROM confluentinc/cp-kafka-connect:7.7.1 as cp +RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.17 +RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:7.7.1 + + +FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0 +USER root:root +COPY --from=cp /usr/share/confluent-hub-components/ /opt/kafka/plugins/ + +# Rename files +COPY rename_files.sh /opt/kafka/rename_files.sh +RUN chmod +x /opt/kafka/rename_files.sh && \ + # Run the script during the build process + /opt/kafka/rename_files.sh && \ + # Remove the script after execution + rm /opt/kafka/rename_files.sh diff --git a/docker/rename_files.sh b/docker/rename_files.sh new file mode 100644 index 0000000..b4b0f87 --- /dev/null +++ b/docker/rename_files.sh @@ -0,0 +1,14 @@ +#!/bin/bash +PLUGIN_DIR="/opt/kafka/plugins" +for folder in "$PLUGIN_DIR"/*; do + if [ -d "$folder" ]; then + # Remove the specific prefix from the folder name #=remove left expr + new_name="${folder#"$PLUGIN_DIR"/confluentinc-kafka-connect-}" + # Perform the move and handle errors + if mv "$folder" "$PLUGIN_DIR/$new_name"; then + echo "Successfully renamed: $folder to $PLUGIN_DIR/$new_name" + else + echo "Error renaming: $folder" + fi + fi +done diff --git a/docker/s3-kafka-connect-manual-build.yaml b/docker/s3-kafka-connect-manual-build.yaml new file mode 100644 index 0000000..1aa575e --- /dev/null +++ b/docker/s3-kafka-connect-manual-build.yaml @@ -0,0 +1,42 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaConnect +metadata: + name: kafka-connect-cluster + namespace: kafka + annotations: + strimzi.io/use-connector-resources: "true" +spec: + replicas: 1 + bootstrapServers: kafka-cluster-kafka-plainext-bootstrap:9092 + config: + group.id: kafka-connect-cluster + offset.storage.topic: kafka-connect-cluster-offsets + config.storage.topic: kafka-connect-cluster-configs + status.storage.topic: kafka-connect-cluster-status + key.converter: io.confluent.connect.avro.AvroConverter + value.converter: io.confluent.connect.avro.AvroConverter + key.converter.schema.registry.url: http://89.47.191.210:30081 + value.converter.schema.registry.url: http://89.47.191.210:30081 + config.storage.replication.factor: 1 + offset.storage.replication.factor: 1 + status.storage.replication.factor: 1 + build: + output: + type: docker + image: platform.sunet.se/benedith/strimzi-kafka-connectors:latest + pushSecret: docker-platform-creds + plugins: + - name: kafka-connect-s3 + artifacts: + - type: jar + url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/kafka-connect-s3-10.4.10.jar + sha512sum: 56927fe694c2eec13318f52179d6c255357dd8a391a2f67858b5db15d7c13ef14a7939c47ac4ef2664004e871d512127b2abd3ffe600ecd18177cb39c43e1d45 + - name: confluent-avro-connector + artifacts: + - type: jar + url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/kafka-connect-avro-converter-7.6.3.jar + sha512sum: ff53b82746475eae7b4b7e85ec05eeaa16c458225979b05d2018ca594fc9fdd10996bad0c17dbc5bd3ae128c0fd646cfd3d2ea2e4058964c44584f0174f83b0b + template: + pod: + imagePullSecrets: + - name: docker-platform-creds diff --git a/s3-kafka-connect.yaml b/s3-kafka-connect.yaml index e524a2a..2e440a3 100644 --- a/s3-kafka-connect.yaml +++ b/s3-kafka-connect.yaml @@ -8,6 +8,10 @@ metadata: spec: replicas: 1 bootstrapServers: kafka-cluster-kafka-plainext-bootstrap:9092 + # tls: + # trustedCertificates: + # - secretName: my-cluster-cluster-ca-cert + # certificate: ca.crt config: group.id: kafka-connect-cluster offset.storage.topic: kafka-connect-cluster-offsets @@ -20,32 +24,34 @@ spec: config.storage.replication.factor: 1 offset.storage.replication.factor: 1 status.storage.replication.factor: 1 + externalConfiguration: + env: + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: s3-minio-creds + key: AWS_ACCESS_KEY_ID + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: s3-minio-creds + key: AWS_SECRET_ACCESS_KEY build: output: type: docker image: platform.sunet.se/benedith/das-kafka-connect-cluster:latest pushSecret: docker-platform-creds plugins: - - name: connect-api - artifacts: - - type: jar - url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/connect-api-3.9.0.jar - sha512sum: bf3287ae1552d1bef97865a96ebfd9c482d2f477c41b51234261d1dbb33b487ac225224a3a860a37536e970e99a3d00ebc6ed17cb67450becbad554d9f39f35f - - name: kafka-connect-storage-common - artifacts: - - type: jar - url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/kafka-connect-storage-common-11.1.10.jar - sha512sum: 1788e0168edbdd21b550d6ab412d3e26be0f04d4730e88b6a0d746fe6d77b32720b6aec487144ed9c72001e5ddfd1098e9c753bb12efa29cdd31e67d76d201db - name: kafka-connect-s3 artifacts: - - type: jar - url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/kafka-connect-s3-10.4.10.jar - sha512sum: 56927fe694c2eec13318f52179d6c255357dd8a391a2f67858b5db15d7c13ef14a7939c47ac4ef2664004e871d512127b2abd3ffe600ecd18177cb39c43e1d45 - - name: confluent-avro-connector + - type: zip + url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/confluentinc-kafka-connect-s3-10.5.17.zip + sha512sum: 51dc4eb5e618a7743b3d29c7c5586f5bda00a254a9f105ee816cad7c8e9509a7c1a1ea43351e77dcf97847900c21895962716ed6a1bfb2de4a2b4695233d8804 + - name: avro-connector artifacts: - - type: jar - url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/kafka-connect-avro-converter-7.6.3.jar - sha512sum: ff53b82746475eae7b4b7e85ec05eeaa16c458225979b05d2018ca594fc9fdd10996bad0c17dbc5bd3ae128c0fd646cfd3d2ea2e4058964c44584f0174f83b0b + - type: zip + url: https://platform.sunet.se/benedith/strimzi-kafka-connectors/raw/branch/main/jars/confluentinc-kafka-connect-avro-converter-7.7.1.zip + sha512sum: cebc6fece5c5551d3cff5f1cc8f4660e83da6292a9d695c1f8851af880661b2882e59ef0eeb3df395c3fc314e483cc26961d6a6df271237aab7ef2d8732af3f4 template: pod: imagePullSecrets: diff --git a/s3-kafka-connector.yaml b/s3-kafka-connector.yaml new file mode 100644 index 0000000..85700d1 --- /dev/null +++ b/s3-kafka-connector.yaml @@ -0,0 +1,56 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaConnector +metadata: + name: kafka-connect-s3-connector + namespace: kafka + labels: + strimzi.io/cluster: kafka-connect-cluster +spec: + class: io.confluent.connect.s3.S3SinkConnector + tasksMax: 2 + config: + # Core Connector Configuration + connector.class: io.confluent.connect.s3.S3SinkConnector + topics: test-topic + + # S3 Configuration + s3.region: us-east-1 + s3.bucket.name: openlake-tmp + s3.part.size: '1073741824' # Part size for upload (1 GB) + + # Performance tuning + flush.size: 1000 + + # MinIO (or S3) store URL (use environment variable for security) + store.url: https://play.min.io:50000 + + # Storage and Format Configuration + storage.class: io.confluent.connect.s3.storage.S3Storage + format.class: io.confluent.connect.s3.format.parquet.ParquetFormat + partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner + path.format: "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH" # Added explicit path format + partition.duration.ms: 3600000 # Ensures hourly partitions for manageability + + # Behavior settings + behavior.on.null.values: ignore + + # Serialization + # * Key + key.converter: org.apache.kafka.connect.storage.StringConverter + key.converter.schemas.enable: false # Avro schemas usually not needed for keys + key.converter.schema.registry.url: http://schema-registry-release-cp-schema-registry:8081 + # * Value + value.converter: io.confluent.connect.avro.AvroConverter + value.converter.schema.registry.url: http://schema-registry-release-cp-schema-registry:8081 + value.converter.schemas.enable: true + schema.compatibility: BACKWARD # Allow schema evolution + + # Rotation and Batch Handling + rotate.interval.ms: 600000 # reduce overhead in high-throughput scenarios + key.converter.use.latest.version: true + value.converter.use.latest.version: true + + # Optional: + s3.compression.type: gzip + store.kafka.keys: true + logging.level: debug