使用k8s搭建一个三节点的Kafka
集群
创建kafka-pv
首先通过nfs创建三个共享目录
1
| mkdir -p /data/kafka/{kafka01,kafka02,kafka03}
|
分别对应三节点zk集群中的三个pod的持久化目录,创建好目录之后编写yaml创建kafka-pv.yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| apiVersion: v1 kind: PersistentVolume metadata: name: k8s-pv-kafka01 namespace: middleware labels: app: kafka annotations: volume.beta.kubernetes.io/storage-class: "`kafka" spec: capacity: storage: 50Gi accessModes: #- ReadWriteOnce - ReadWriteMany nfs: server: 10.0.15.1 path: /jusdaglobal/kafka/kafka01 persistentVolumeReclaimPolicy: Retain #Recycle 回收 #Retain 保留数据 --- apiVersion: v1 kind: PersistentVolume metadata: name: k8s-pv-kafka02 namespace: middleware labels: app: kafka annotations: volume.beta.kubernetes.io/storage-class: "`kafka" spec: capacity: storage: 50Gi accessModes: #- ReadWriteOnce - ReadWriteMany nfs: server: 10.0.15.1 path: /jusdaglobal/kafka/kafka02 persistentVolumeReclaimPolicy: Retain --- apiVersion: v1 kind: PersistentVolume metadata: name: k8s-pv-kafka03 namespace: middleware labels: app: kafka annotations: volume.beta.kubernetes.io/storage-class: "`kafka" spec: capacity: storage: 50Gi accessModes: #- ReadWriteOnce - ReadWriteMany nfs: server: 10.0.15.1 path: /jusdaglobal/kafka/kafka03 persistentVolumeReclaimPolicy: Retain ---
|
使用如下命令创建kafka-pk
1
| kubectl create -f kafka-pv.yaml
|
这是我们可以通过如下命令去查看创建成功的pv
1
| kubectl get pv -n middleware -o wide
|
创建Kafka集群
我们选择使用statefulset
去部署kafka集群的三节点,并且使用刚刚创建的pv作为存储设备。kafka.yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
| --- apiVersion: v1 kind: Service metadata: name: kafka-hs namespace: middleware labels: app: kafka spec: ports: - port: 9092 name: server clusterIP: None selector: app: kafka --- apiVersion: v1 kind: Service metadata: name: kafka-cs namespace: middleware labels: app: kafka spec: selector: app: kafka type: NodePort ports: - name: client port: 9092 nodePort: 19092 --- apiVersion: policy/v1beta1 kind: PodDisruptionBudget metadata: name: kafka-pdb namespace: middleware spec: selector: matchLabels: app: kafka minAvailable: 2 --- apiVersion: apps/v1 kind: StatefulSet metadata: name: kafka namespace: middleware spec: serviceName: kafka-hs replicas: 3 selector: matchLabels: app: kafka template: metadata: labels: app: kafka spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: "app" operator: In values: - kafka topologyKey: "kubernetes.io/hostname" podAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 1 podAffinityTerm: labelSelector: matchExpressions: - key: "app" operator: In values: - zk topologyKey: "kubernetes.io/hostname" terminationGracePeriodSeconds: 300 containers: - name: kafka imagePullPolicy: IfNotPresent image: registry.cn-hangzhou.aliyuncs.com/jaxzhai/k8skafka:v1 resources: requests: memory: "1Gi" cpu: 500m ports: - containerPort: 9092 name: server command: - sh - -c - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \ --override listeners=PLAINTEXT://:9092 \ --override zookeeper.connect=zk-0.zk-hs.middleware.svc.cluster.local:2181,zk-1.zk-hs.middleware.svc.cluster.local:2181,zk-2.zk-hs.middleware.svc.cluster.local:2181 \ --override log.dir=/var/lib/kafka \ --override auto.create.topics.enable=true \ --override auto.leader.rebalance.enable=true \ --override background.threads=10 \ --override compression.type=producer \ --override delete.topic.enable=true \ --override leader.imbalance.check.interval.seconds=300 \ --override leader.imbalance.per.broker.percentage=10 \ --override log.flush.interval.messages=9223372036854775807 \ --override log.flush.offset.checkpoint.interval.ms=60000 \ --override log.flush.scheduler.interval.ms=9223372036854775807 \ --override log.retention.bytes=-1 \ --override log.retention.hours=168 \ --override log.roll.hours=168 \ --override log.roll.jitter.hours=0 \ --override log.segment.bytes=1073741824 \ --override log.segment.delete.delay.ms=60000 \ --override message.max.bytes=1000012 \ --override min.insync.replicas=1 \ --override num.io.threads=8 \ --override num.network.threads=3 \ --override num.recovery.threads.per.data.dir=1 \ --override num.replica.fetchers=1 \ --override offset.metadata.max.bytes=4096 \ --override offsets.commit.required.acks=-1 \ --override offsets.commit.timeout.ms=5000 \ --override offsets.load.buffer.size=5242880 \ --override offsets.retention.check.interval.ms=600000 \ --override offsets.retention.minutes=1440 \ --override offsets.topic.compression.codec=0 \ --override offsets.topic.num.partitions=50 \ --override offsets.topic.replication.factor=3 \ --override offsets.topic.segment.bytes=104857600 \ --override queued.max.requests=500 \ --override quota.consumer.default=9223372036854775807 \ --override quota.producer.default=9223372036854775807 \ --override replica.fetch.min.bytes=1 \ --override replica.fetch.wait.max.ms=500 \ --override replica.high.watermark.checkpoint.interval.ms=5000 \ --override replica.lag.time.max.ms=10000 \ --override replica.socket.receive.buffer.bytes=65536 \ --override replica.socket.timeout.ms=30000 \ --override request.timeout.ms=30000 \ --override socket.receive.buffer.bytes=102400 \ --override socket.request.max.bytes=104857600 \ --override socket.send.buffer.bytes=102400 \ --override unclean.leader.election.enable=true \ --override zookeeper.session.timeout.ms=6000 \ --override zookeeper.set.acl=false \ --override broker.id.generation.enable=true \ --override connections.max.idle.ms=600000 \ --override controlled.shutdown.enable=true \ --override controlled.shutdown.max.retries=3 \ --override controlled.shutdown.retry.backoff.ms=5000 \ --override controller.socket.timeout.ms=30000 \ --override default.replication.factor=1 \ --override fetch.purgatory.purge.interval.requests=1000 \ --override group.max.session.timeout.ms=300000 \ --override group.min.session.timeout.ms=6000 \ --override inter.broker.protocol.version=0.10.2-IV0 \ --override log.cleaner.backoff.ms=15000 \ --override log.cleaner.dedupe.buffer.size=134217728 \ --override log.cleaner.delete.retention.ms=86400000 \ --override log.cleaner.enable=true \ --override log.cleaner.io.buffer.load.factor=0.9 \ --override log.cleaner.io.buffer.size=524288 \ --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \ --override log.cleaner.min.cleanable.ratio=0.5 \ --override log.cleaner.min.compaction.lag.ms=0 \ --override log.cleaner.threads=1 \ --override log.cleanup.policy=delete \ --override log.index.interval.bytes=4096 \ --override log.index.size.max.bytes=10485760 \ --override log.message.timestamp.difference.max.ms=9223372036854775807 \ --override log.message.timestamp.type=CreateTime \ --override log.preallocate=false \ --override log.retention.check.interval.ms=300000 \ --override max.connections.per.ip=2147483647 \ --override num.partitions=1 \ --override producer.purgatory.purge.interval.requests=1000 \ --override replica.fetch.backoff.ms=1000 \ --override replica.fetch.max.bytes=1048576 \ --override replica.fetch.response.max.bytes=10485760 \ --override reserved.broker.max.id=1000 " env: - name: KAFKA_HEAP_OPTS value : "-Xmx1G -Xms1G" - name: KAFKA_OPTS value: "-Dlogging.level=INFO" volumeMounts: - name: dfs mountPath: /var/lib/kafka readinessProbe: exec: command: - sh - -c - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9092" securityContext: runAsUser: 1000 fsGroup: 1000 volumeClaimTemplates: - metadata: name: dfs annotations: volume.beta.kubernetes.io/storage-class: "kafka" spec: #accessModes: [ "ReadWriteOnce" ] accessModes: [ "ReadWriteMany" ] resources: requests: storage: 50Gi
|
使用如下命令创建statefulset
1
| kubectl apply -f kafka.yaml
|
使用如下命令查看容器
1
| kubectl -n middleware get pods,svc
|
使用如下命令进入容器
1 2 3 4
| kubectl exec -it kafka-1 -n middleware /bin/bash
创建topic kafka-topics.sh --create --topic mytest --zookeeper x.x.x.x:2181 --partitions 1 --replication-factor 1
|