纸上得来终觉浅,绝知此事要躬行

0%

之前用k3s搭建了大数据环境,但是发现不是特别方便,并且会反复修改,因此考虑到实验的便捷性又用docker搭了一下开发环境,所有的物料链接见https://github.com/henrywangx/dev-cluster

Dev cluster搭建

1.安装

前提:docker和docker-compose已经安装

1.拉起容器

1
make up

2.到 minio中国下载 下载mc客户端

3.添加dev集群到mc

1
mc config host add dev http://localhost:9000 minio minio123 --api s3v4

4.创建minio的accesskey/secret, 并保存到本地
创建minio access key

5.使用minio的access key信息更新.env文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# AWS_REGION is used by Spark
AWS_REGION=us-east-1
# This must match if using minio
MINIO_REGION=us-east-1
# Used by pyIceberg
AWS_DEFAULT_REGION=us-east-1
# AWS Credentials (this can use minio credential, to be filled in later)
AWS_ACCESS_KEY_ID=qUgyOn1f3rbQkXAgCYLa
AWS_SECRET_ACCESS_KEY=MJA9lmnlESWEJZgmJ5Itdee94DUF16wSMfyhsIzT
# If using Minio, this should be the API address of Minio Server
AWS_S3_ENDPOINT=http://minio:9000
# Location where files will be written when creating new tables
WAREHOUSE=s3a://openlake/
# URI of Nessie Catalog
NESSIE_URI=http://nessie:19120/api/v1
GRANT_SUDO=yes

重新拉起容器

1
make up

2.数据准备

1.创建bucket openlake/spark/sample-data/

1
2
3
4
# 输入bucket
mc mb dev/openlake/spark/sample-data/
# 输出bucket
mc mb dev/openlake-tmp/spark/nyc/taxis_small

2.下载出租车数据拷贝到minio中

1
2
wget  https://data.cityofnewyork.us/api/views/t29m-gskq/rows.csv ./
mc cp rows.csv dev/openlake/spark/sample-data/

3.运行spark任务

1.网页访问jupyter地址: localhost:8888

2.运行spark-minio.py脚本

1
python3 spark-minio.py

3.查看spark管理页面:localhost:8080和jupyter的4040端口localhost:4040,分别可以查看运行的application信息和job的详情信息

application信息:
spark application

job详情:
spark job详情

4.等待python执行完毕,查看结果,可以看到外面算出来超过6名乘客的taxi为898

1
2
3
4
5
jovyan@jupyter-lab:~$ python3 spark-minio.py 
Setting default log level to "WARN".
...
2024-01-28 07:55:20,121 - MinIOSparkJob - INFO - Total Rows for NYC Taxi Data: 91704300
2024-01-28 07:55:20,121 - MinIOSparkJob - INFO - Total Rows for Passenger Count > 6: 898

4.使用pyspark-iceberg管理table

1.创建warehouse bucket

1
mc mb dev/warehouse

2.运行spark-iceberg-minio.py

1
python3 spark-iceberg-minio.py

5.配置dremio

1.登录dremio页面:localhost:9047,创建s3的source
s3 source配置:
dremio-s3-general
s3 advanced配置:
s3配置这里加了以下配置:

  1. fs.s3a.path.style.access: true
  2. fs.s3a.endpoint: http://minio:9000
  3. dremio.s3.compat: true
  4. 勾选enable compatibility mode, 因为我们是minio

dremio-s3-advanced

2.format table为iceberg,进入到nyc.taxis_large这个目录,然后点击format table的按钮,保存为iceberg

dremio-s3-format

3.format为iceberg后,我们就能发现一个table,选中table,运行sql,发现我们可以用sql来操作iceberg表了,哈哈

1
SELECT * FROM taxis_large limit 10

dremio-s3-sql

参考

https://www.cnblogs.com/rongfengliang/p/17970071
https://github.com/minio/openlake/tree/main
https://www.linkedin.com/pulse/creating-local-data-lakehouse-using-alex-merced/
https://medium.com/@ongxuanhong/dataops-02-spawn-up-apache-spark-infrastructure-by-using-docker-fec518698993
https://medium.com/@ongxuanhong/are-you-looking-for-a-powerful-way-to-streamline-your-data-analytics-pipeline-and-produce-cc13ea326790

1.docker安装spark和minio

编辑docker-compose.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
version: "3"

services:
spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
build: spark/
networks:
iceberg_net:
depends_on:
- rest
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8080:8080
- 10000:10000
- 10001:10001
rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest
networks:
iceberg_net:
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
image: minio/minio
container_name: minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
iceberg_net:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
container_name: mc
networks:
iceberg_net:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
networks:
iceberg_net:

大数据和云原生一直都是基建方面的两个热点,而现在越来越多的大数据基建逐渐往云原生方向发展,例如云原生的消息队列Pulsar,又例如Snowflake提供云原生的数仓。因此笔者想要探索大数据和云原生的结合,于是发现了一个非常有意思的项目Openlake,该项目挂在minio下,是在kubernetes环境下,利用minion,spark,kafka,dremio,iceberg搭建一套数据湖,非常适合学习,本文主要就是记录搭建过程和心得。

0.准备kubernetes环境

如果已经有集群可以跳过本节,我这边想快速做实验所以采用docker-compose的方式在linux上搭建k3s

安装docker compose,参考guide,执行如下命令:

1
2
sudo curl -L "https://github.com/docker/compose/releases/download/1.25.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose

本地创建一个目录用于保存k3s的配置和数据:

1
2
mkdir -p /home/xiowang/k3s
mkdir -p /home/xiowang/k3s/data

创建docker-compose.yaml用于k3s启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#vim /home/xiowang/k3s/docker-compose.yaml
version: '3'
services:
server:
image: rancher/k3s:v1.20.2-k3s1
container_name: k3s_server
hostname: xiowang.dev
command: server --tls-san=xiowang.dev
tmpfs:
- /run
- /var/run
privileged: true
restart: always
ports:
- 6443:6443
- 443:30443
- 80:30080
- 50050:30050
- 50051:30051
- 32000:32000
- 32001:32001
environment:
- K3S_TOKEN=16963276443662
- K3S_KUBECONFIG_OUTPUT=/root/.kube/config
- K3S_KUBECONFIG_MODE=600
volumes:
- /var/lib/rancher/k3s:/var/lib/rancher/k3s
- /etc/rancher:/etc/rancher
- /home/xiowang/k3s/.kube:/root/.kube
- /home/xiowang/k3s/data:/data:shared,rw

上面的配置,我们主要做了:

  1. 通过本机的6443用于访问kubernetes的apiserver,方便kubectl进行管理;
  2. 通过本机的443和80分别映射集群的nodeport:30443和30080;
  3. 把kubeconfig保存到/home/xiowang/k3s/.kube;
  4. 挂载集群的/data目录到/home/xiowang/k3s/data
  5. 本地的32000和32001用于后续暴露minio的端口;

开始启动k3s:

1
2
cd /home/xiowang/k3s
docker-compose up -d

因为本机80和443对应集群的nodeport:30443和30080,所以这里改一下trafik的service,将其80和443的nodeport分别指向30080和30443:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#kubectl -n kube-system edit svc traefik
apiVersion: v1
kind: Service
metadata:
annotations:
meta.helm.sh/release-name: traefik
meta.helm.sh/release-namespace: kube-system
creationTimestamp: "2023-05-17T09:29:42Z"
labels:
app: traefik
app.kubernetes.io/managed-by: Helm
chart: traefik-1.81.0
heritage: Helm
release: traefik
name: traefik
namespace: kube-system
resourceVersion: "368985"
uid: 7bbb1758-ca01-4e84-b166-dae950613adf
spec:
clusterIP: 10.43.115.234
clusterIPs:
- 10.43.115.234
externalTrafficPolicy: Cluster
ports:
- name: http
nodePort: 30080
port: 80
protocol: TCP
targetPort: http
- name: https
nodePort: 30443
port: 443
protocol: TCP
targetPort: https
selector:
app: traefik
release: traefik
sessionAffinity: None
type: LoadBalancer

拷贝/home/xiowang/k3s/.kube/config到本机的~/.kube/config(建议使用kubecm来管理)

1
cp /home/xiowang/k3s/.kube/config ~/.kube/config

接下来我们就可以开始openlake之旅了

1.安装和配置minio

在kubernetes上安装minio,有两种推荐方式:

  1. operator的方式安装minio,参考guide
  2. helm安装minio,参考guide

这里暂时没时间学习operator的CRD,所以采用helm安装minio,步骤如下

1
2
3
helm repo add minio https://charts.min.io/
#设置密码和磁盘大小,默认也没有tls(根据需求改一下,我就20Gi用于测试)
helm upgrade --install --namespace minio --set rootUser=rootuser,rootPassword=rootpass123,persistence.size=20Gi,resources.requests.memory=100Mi,resources.limits.memory=2Gi,replicas=3,service.type=NodePort,consoleService.type=NodePort one --create-namespace minio/minio

成功部署日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
espace minio/minio
NAME: one
LAST DEPLOYED: Mon May 22 12:22:31 2023
NAMESPACE: minio
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
MinIO can be accessed via port 9000 on the following DNS name from within your cluster:
one-minio.minio.svc.cluster.local

To access MinIO from localhost, run the below commands:

1. export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")

2. kubectl port-forward $POD_NAME 9000 --namespace minio

Read more about port forwarding here: http://kubernetes.io/docs/user-guide/kubectl/kubectl_port-forward/

You can now access MinIO server on http://localhost:9000. Follow the below steps to connect to MinIO server with mc client:

1. Download the MinIO mc client - https://min.io/docs/minio/linux/reference/minio-mc.html#quickstart

2. export MC_HOST_one-minio-local=http://$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootUser}" | base64 --decode):$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootPassword}" | base64 --decode)@localhost:9000

3. mc ls one-minio-local

这里注意上面日志中的MC_HOST_one-minio-local环境变量名似乎是非法的,所以我换成了MC_HOST_one_minio_local:

1
2
3
2. export MC_HOST_one_minio_local=http://$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootUser}" | base64 --decode):$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootPassword}" | base64 --decode)@localhost:9000

3. mc ls one_minio_local

若想访问minio的console控制台,则forward 9001端口,再用root账号登陆localhost:9001:

1
2
3
export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")

kubectl port-forward $POD_NAME 9001 --namespace minio

而访问bucket则是9000端口

1
2
export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")
kubectl port-forward $POD_NAME 9000 --namespace minio

2.搭建spark on k8s

spark on k8s是google发起的一个开源项目(不是google的官方产品),使用operator对k8s的资源进行调度,方便spark对接k8s。

spark on k8s采用helm安装,安装命令如下:

1
2
3
4
5
6
7
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm install my-release spark-operator/spark-operator \
--namespace spark-operator \
--set webhook.enable=true \
--set image.repository=openlake/spark-operator \
--set image.tag=3.3.2 \
--create-namespace

验证部署结果:

1
kubectl get pods -n spark-operator

应该能看到my-release的pod:

1
2
NAME                                           READY   STATUS      RESTARTS   AGE
my-release-spark-operator-6547984586-xzw4p 1/1 Running 2 4d20h

参考例子部署一个spark应用,保存为spark-pi.yaml用于计算pi(这里注意之前helm会在spark-operator的namespace中为serviceAccount: my-release-spark部署rbac,因此这里spark app都在spark-operator的namespace中,使用serviceAccount: my-release-spark运行):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: pyspark-pi
namespace: spark-operator
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "openlake/spark-py:3.3.2"
imagePullPolicy: Always
mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
sparkVersion: "3.3.2"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.3.2
serviceAccount: my-release-spark
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.3.2
1
kubectl apply -f spark-pi.yaml

查看sparkapp和pods:

1
2
3
4
5
6
7
8
#kubectl -n spark-operator get sparkapp,pod
NAME STATUS ATTEMPTS START FINISH AGE
sparkapplication.sparkoperator.k8s.io/pyspark-pi COMPLETED 1 2023-05-22T06:43:24Z 2023-05-22T06:44:37Z 4m50s

NAME READY STATUS RESTARTS AGE
pod/my-release-spark-operator-webhook-init-xzx9c 0/1 Completed 0 4d20h
pod/my-release-spark-operator-6547984586-xzw4p 1/1 Running 2 4d20h
pod/pyspark-pi-driver 0/1 Completed 0 4m46s

查看最后20行log, 可以看到DAGScheduler调度完成的日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#kubectl logs pyspark-pi-driver -n spark-operator --tail 10
23/05/22 06:44:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
23/05/22 06:44:35 INFO DAGScheduler: ResultStage 0 (reduce at /opt/spark/examples/src/main/python/pi.py:42) finished in 1.571 s
23/05/22 06:44:35 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
23/05/22 06:44:35 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
23/05/22 06:44:35 INFO DAGScheduler: Job 0 finished: reduce at /opt/spark/examples/src/main/python/pi.py:42, took 1.615592 s
Pi is roughly 3.145160
23/05/22 06:44:35 INFO SparkUI: Stopped Spark web UI at http://pyspark-pi-60e9d1884232c31f-driver-svc.spark-operator.svc:4040
23/05/22 06:44:35 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
23/05/22 06:44:35 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
23/05/22 06:44:35 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
23/05/22 06:44:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/05/22 06:44:36 INFO MemoryStore: MemoryStore cleared
23/05/22 06:44:36 INFO BlockManager: BlockManager stopped
23/05/22 06:44:36 INFO BlockManagerMaster: BlockManagerMaster stopped
23/05/22 06:44:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/05/22 06:44:36 INFO SparkContext: Successfully stopped SparkContext
23/05/22 06:44:36 INFO ShutdownHookManager: Shutdown hook called
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-4e8e1507-e941-4437-b0b5-18818fc8865f/spark-4f604216-aeab-4679-83ce-f2527613ec66
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-00ddbc21-98da-4ad8-9905-fcf0e8d64129
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-4e8e1507-e941-4437-b0b5-18818fc8865f/spark-4f604216-aeab-4679-83ce-f2527613ec66/pyspark-10201045-5307-40dc-b6b2-d7e5447763c4

3.使用spark分析minio上的数据

准备好dockerfile,因为和编译命令,因为后续会经常编辑py文件和推镜像.
这里直接使用openlake原文的镜像作为baseimage,因为里面提前安装spark的各种依赖

1
2
3
4
5
FROM openlake/sparkjob-demo:3.3.2

WORKDIR /app

COPY *.py .

编译和push dockerimage命令如下(因为dockerhub经常挂,所以我在华为云上开了一个镜像仓库,可以根据自身情况修改一下):

1
2
3
docker build -t xiowang/spark-minio:3.3.2 .
docker tag xiowang/spark-minio:3.3.2 swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2
docker push swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2

下载纽约出租车司机数据(~112M rows and ~10GB in size):

1
wget  https://data.cityofnewyork.us/api/views/t29m-gskq/rows.csv ./

minio中创建bucket:

1
mc mb one_minio_local/openlake/spark/sample-data

拷贝出租车数据到minio

1
mc cp rows.csv one_minio_local/openlake/spark/sample-data/

进到jupyter中安装pyspark

1
pip3 install pyspark

在minio中创建对one_minio_local/openlake的读写权限的用户,并申请key和secret,如下

1
2
3
4
5
export AWS_ACCESS_KEY_ID=3436ZpuHMvI5EEoR
export AWS_SECRET_ACCESS_KEY=6US0FDsSFdlg5DzbWPPJtS1UeL75Rb0G
export ENDPOINT=one-minio.minio:9000
export OUTPUT_PATH=s3a://openlake/spark/result/taxi
export INPUT_PATH=s3a://openlake/spark/sample-data/rows.csv

创建k8s secret,保存上述信息:

1
2
3
4
5
6
kubectl create secret generic minio-secret \
--from-literal=AWS_ACCESS_KEY_ID=3436ZpuHMvI5EEoR \
--from-literal=AWS_SECRET_ACCESS_KEY=6US0FDsSFdlg5DzbWPPJtS1UeL75Rb0G \
--from-literal=ENDPOINT=http://one-minio.minio:9000 \
--from-literal=AWS_REGION=us-east-1 \
--namespace spark-operator

部署sparkapp:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-minio
namespace: spark-operator
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2"
imagePullPolicy: Always
mainApplicationFile: local:///app/main.py
sparkVersion: "3.3.2"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
memory: "1024m"
labels:
version: 3.3.2
serviceAccount: my-release-spark
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT
executor:
cores: 1
instances: 3
memory: "1024m"
labels:
version: 3.3.2
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT

这个sparkapp中的python脚本内容如下,实际上做的就是统计每天超过6位乘客的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import logging
import os

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinIOSparkJob")

spark = SparkSession.builder.getOrCreate()


def load_config(spark_context: SparkContext):
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
# spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", os.getenv("SSL_ENABLED", "true"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
# spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")


load_config(spark.sparkContext)


# Define schema for NYC Taxi Data
schema = StructType([
StructField('VendorID', LongType(), True),
StructField('tpep_pickup_datetime', StringType(), True),
StructField('tpep_dropoff_datetime', StringType(), True),
StructField('passenger_count', DoubleType(), True),
StructField('trip_distance', DoubleType(), True),
StructField('RatecodeID', DoubleType(), True),
StructField('store_and_fwd_flag', StringType(), True),
StructField('PULocationID', LongType(), True),
StructField('DOLocationID', LongType(), True),
StructField('payment_type', LongType(), True),
StructField('fare_amount', DoubleType(), True),
StructField('extra', DoubleType(), True),
StructField('mta_tax', DoubleType(), True),
StructField('tip_amount', DoubleType(), True),
StructField('tolls_amount', DoubleType(), True),
StructField('improvement_surcharge', DoubleType(), True),
StructField('total_amount', DoubleType(), True)])

# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
# Filter dataframe based on passenger_count greater than 6
large_passengers_df = df.filter(df.passenger_count > 6)

total_rows_count = df.count()
filtered_rows_count = large_passengers_df.count()
# File Output Committer is used to write the output to the destination (Not recommended for Production)
large_passengers_df.write.format("csv").option("header", "true").save(
os.getenv("OUTPUT_PATH", "s3a://openlake-tmp/spark/nyc/taxis_small"))

logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")
logger.info(f"Total Rows for Passenger Count > 6: {filtered_rows_count}")

如果上面的代码跑的有问题可以创建如下一个debug-pod,并进入pod进行debug

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
apiVersion: v1
kind: Pod
metadata:
name: debug-pod
namespace: spark-operator
spec:
containers:
- name: spark-minio
image: swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2
command: ["sleep"]
args: ["infinity"]
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT

最后查看日志,可以看到打印的日志

1
2
3
#kubectl -n spark-operator logs spark-minio-driver
2023-05-25 01:13:49,104 - MinIOSparkJob - INFO - Total Rows for NYC Taxi Data: 112234626
2023-05-25 01:13:49,104 - MinIOSparkJob - INFO - Total Rows for Passenger Count > 6: 1066

4.spark中使用iceberg分析minio上的数据

iceberg是Netflix开源的一款软件,简单来说就是方便大数据工程师通过sql方式操作csv,parquet等文件,并且支持snapshot,具体可以见官网介绍。
原文中的一些地址写死了,这里我修改了一下,保存为main-iceberg.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import logging
import os
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinIOSparkJob")


# adding iceberg configs
conf = (
SparkConf()
.set("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") # Use Iceberg with Spark
.set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.set("spark.sql.catalog.demo.warehouse", os.getenv("WAREHOUSE", "s3a://openlake/warehouse/"))
.set("spark.sql.catalog.demo.s3.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
.set("spark.sql.defaultCatalog", "demo") # Name of the Iceberg catalog
.set("spark.sql.catalogImplementation", "in-memory")
.set("spark.sql.catalog.demo.type", "hadoop") # Iceberg catalog type
.set("spark.executor.heartbeatInterval", "300000")
.set("spark.network.timeout", "400000")
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Disable below line to see INFO logs
spark.sparkContext.setLogLevel("ERROR")


def load_config(spark_context: SparkContext):
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")


load_config(spark.sparkContext)

# Define schema for NYC Taxi Data
schema = StructType([
StructField('VendorID', LongType(), True),
StructField('tpep_pickup_datetime', StringType(), True),
StructField('tpep_dropoff_datetime', StringType(), True),
StructField('passenger_count', DoubleType(), True),
StructField('trip_distance', DoubleType(), True),
StructField('RatecodeID', DoubleType(), True),
StructField('store_and_fwd_flag', StringType(), True),
StructField('PULocationID', LongType(), True),
StructField('DOLocationID', LongType(), True),
StructField('payment_type', LongType(), True),
StructField('fare_amount', DoubleType(), True),
StructField('extra', DoubleType(), True),
StructField('mta_tax', DoubleType(), True),
StructField('tip_amount', DoubleType(), True),
StructField('tolls_amount', DoubleType(), True),
StructField('improvement_surcharge', DoubleType(), True),
StructField('total_amount', DoubleType(), True)])

# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))

# Create Iceberg table "nyc.taxis_large" from RDD
df.write.mode("overwrite").saveAsTable("nyc.taxis_large")

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")

# Rename column "fare_amount" in nyc.taxis_large to "fare"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN fare_amount TO fare")

# Rename column "trip_distance" in nyc.taxis_large to "distance"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN trip_distance TO distance")

# Add description to the new column "distance"
spark.sql(
"ALTER TABLE nyc.taxis_large ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'")

# Move "distance" next to "fare" column
spark.sql("ALTER TABLE nyc.taxis_large ALTER COLUMN distance AFTER fare")

# Add new column "fare_per_distance" of type float
spark.sql("ALTER TABLE nyc.taxis_large ADD COLUMN fare_per_distance FLOAT AFTER distance")

# Check the snapshots available
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show() # prints all the available snapshots (1 till now)

# Populate the new column "fare_per_distance"
logger.info("Populating fare_per_distance column...")
spark.sql("UPDATE nyc.taxis_large SET fare_per_distance = fare/distance")

# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show() # prints all the available snapshots (2 now) since previous operation will create a new snapshot

# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance
FROM nyc.taxis_large LIMIT 15""")
res_df.show()

# Delete rows from "fare_per_distance" based on criteria
logger.info("Deleting rows from fare_per_distance column...")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance > 4.0 OR distance > 2.0")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance IS NULL")

# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show() # prints all the available snapshots (4 now) since previous operations will create 2 new snapshots

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after delete operations: {total_rows_count}")

# Partition table based on "VendorID" column
logger.info("Partitioning table based on VendorID column...")
spark.sql("ALTER TABLE nyc.taxis_large ADD PARTITION FIELD VendorID")

# Query Metadata tables like snapshot, files, history
logger.info("Querying Snapshot table...")
snapshots_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots ORDER BY committed_at")
snapshots_df.show() # shows all the snapshots in ascending order of committed_at column

logger.info("Querying Files table...")
files_count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large.files")
total_files_count = files_count_df.first().cnt
logger.info(f"Total Data Files for NYC Taxi Data: {total_files_count}")

spark.sql("""SELECT file_path,
file_format,
record_count,
null_value_counts,
lower_bounds,
upper_bounds
FROM nyc.taxis_large.files LIMIT 1""").show()

# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()

# Time travel to initial snapshot
logger.info("Time Travel to initial snapshot...")
snap_df = spark.sql("SELECT snapshot_id FROM nyc.taxis_large.history LIMIT 1")
spark.sql(f"CALL demo.system.rollback_to_snapshot('nyc.taxis_large', {snap_df.first().snapshot_id})")

# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance
FROM nyc.taxis_large LIMIT 15""")
res_df.show()

# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show() # 1 new row

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after time travel: {total_rows_count}")

创建sparkapp,保存为spark-iceberg-minio.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-iceberg-minio
namespace: spark-operator
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2"
imagePullPolicy: Always
mainApplicationFile: local:///app/main-iceberg.py
sparkVersion: "3.3.2"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
memory: "1024m"
labels:
version: 3.3.2
serviceAccount: my-release-spark
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: WAREHOUSE
value: "s3a://openlake/warehouse"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT
executor:
cores: 1
instances: 3
memory: "1024m"
labels:
version: 3.3.2
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: WAREHOUSE
value: "s3a://openlake/warehouse"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT

#小结
本文主要参考openlake的guide体验了一下k8s环境下spark如何处理minio中的数据。能感受到spark对应的生态比较完善,对于结构化和半结构化的数据处理起来非常方便。当然也有一些不太习惯的地方,比如iceberg这些中间件都是通过jar包的方式被引入,而不是通过中间件服务,这就意味着更新中间件需要去更新容器镜像。

另外openlake原文中还有dremio做查询层,利用kakfa进行spark stream处理的内容。感兴趣的同学可以去试一下。

导读:kubernetes已经是容器编排的业界标准,各个云厂商都提供了相关的集群托管服务,同时不少公司也存在自建集群。如何将应用发布到不同k8s集群,对跨多集群或者混合云的应用进行管理则是k8s中待解决的问题。本文会分析华为云开源的多集群解决方案-karmada。

背景

k8s官方宣称支持最大150000个pods,5000个node。但是现实生产环境中业务时常有超过该规模的述求,比如说大型电商如淘宝,拼多多,又比如AI和大数据处理的workflow。同时出于合作和业务述求,一家公司还有可能将业务部署到不同的云厂商,或者自建机房和公有云配合使用。因此k8s多集群方案也是云原生领域的一个热点。

目前主要的多集群问题有:

  1. 集群运维。包括集群的加入,删除,集群内机器的运维,k8s社区有ClusterAPI项目屏蔽底层云厂商,以统一方式管理多集群。同时蚂蚁金服之前也有通过k8s on k8s的方案去管理node节点上的系统组件;
  2. 多集群网络管理。解决集群间网络的连通性,例如multicluster-ingress解决多集群的ingress,以及istio,cilium等网络组件实现的multi cluster mesh解决跨集群的mesh网络管理。
  3. 多集群对象分发。解决k8s中的对象,尤其是workload在多集群间分发,例如红帽领衔的kubefed v2,以及本文即将解析的karmada

核心概念

Karmada是基于kubefed v2进行修改的一个项目,因此里面很多概念都是取自kubefed v2

关系图

  1. Resource Template。和我们平时使用k8s的对象例如Deployment,Secret,Configmap没有不同,但是需要我们在global集群进行创建和修改;
  2. Propagation Policy。定义Resource Template需要被调度到那些集群,此概念和kubefed v2相同;
  3. Resource Binding。即Resource Template根据Propagation Policy调度之后的结果,保存在ResourceBinding中;
  4. Override Policy。由于我们可能需要在不同集群里面部署不同的版本,或者副本数,我们可以通过Override Policy对Resource Binding中的结果进行修改;
  5. Work。经过Override Policy的渲染,Karmada会产生Work对象,而Work对象所处的namespace跟调度的cluster一一对应,同时work中包含最终的对象的Spec和Status。对应的Execution Controller和Agent会不断Reconcile Work对象,即在子集群中创建和更新Work中的workload,并更新globa集群中Work的status。

接下来我们会通过官方的demo来体验一下使用。

Demo

我们主要关心对象在子集群间的分发,在karmada中主要由PropagationPolicyOverridePolicy决定。这俩概念都是继承至kubefed v2:

  1. PropagationPolicy用于定义对象分发的策略;
  2. OverridePolicy用于按需修改不同集群内对象的spec;

这里直接用官方样例对Karmada的PropagationPolicyOverridePolicy进行介绍。

PropagationPolicy

首先我们在global集群中部署一个nginx的应用Deployment

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
labels:
app: nginx
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- image: nginx
name: nginx

我们想要将这个deployment分发到cluster1cluster2两个工作集群中,那么我们需要创建一个PropagationPolicy来声明Deployment的分发规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
apiVersion: policy.karmada.io/v1alpha1
kind: PropagationPolicy
metadata:
name: nginx-propagation
spec:
resourceSelectors:
- apiVersion: apps/v1
kind: Deployment
name: nginx
placement:
clusterAffinity:
clusterNames:
- cluster1
- cluster2

上面这个PropagationPolicy通过spec中的resourceSelectors声明作用的对象为nginx的Deployment,而placement指明分发规则为分发到cluster1cluster2。因此调度结果如下所示,在cluser1cluster2两个工作集群中都会创建这个nginx的Deployment:

PropagationPolicy

OverridePolicy

没有OverridePolicy,则worker集群中的deployment和global的deployment的spec相同,但是我们有可能是要针对worker集群的不同或者业务需求修改内容。

比如现在我们修改cluster1中的deployment的image为nginx:test,用来做一个灰度发布。则我们可以创建一个如下的OverridePolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
apiVersion: policy.karmada.io/v1alpha1
kind: OverridePolicy
metadata:
name: nginx-propagation
spec:
resourceSelectors:
- apiVersion: apps/v1
kind: Deployment
name: nginx
targetCluster:
clusterNames:
- cluster1
overriders:
plaintext:
- path: "/spec/template/spec/containers/0/image"
operator: replace
value: "nginx:test"

则新的部署结果如下图:

OverridePolicy

处理流程分析

通过阅读Karmada的源码后,整理了整个的对象处理流程如下图所示
处理流程

  1. 用户在global集群中创建对象;
  2. Resource Detector会每30s去list集群中的所有资源,放到队列中;
  3. Resource Detector会对队列中的每个对象查找匹配的PropagationPolicy,创建Binding
  4. 此时的Binding中没有targetCluster,而Karmada Scheduler会根据ClusterPropagationPolicy计算调度结果,填入Binding的targetCluster;
  5. BindingController会watch OverridePolicyBinding,创建Work(每个cluster会有一个namespace);
  6. ExecutionControllerwatch到Work对象,提取spec.workload定义的对象,然后调用worker集群的client来创建和修改对象;
  7. 而worker集群的Agent则同样会watch Work对象并同步Workload的状态到Work

Karmada优点和缺点

优点

通过对Karmada的文档和源码分析,Karmada相对于kubefed v2的最大优点:完全兼容k8s的API

应用从单集群扩展为多集群时,不需要修改源文件,仅需要添加多集群的manifest包括PropagationPolicy和OverridePolicy,这些文件完全可以由运维同学来添加。因此用户迁移和学习成本不大。

缺点

但是Karmada也有一些缺点:

  1. 演进维护成本。由于karmada是基于k8s的控制组件做修改,如何保持与后续k8s新的对象和feature同步,也是一个挑战;
  2. 所有workload的支持。笔者在调研karmada的时候,karmada还只支持deployment,并且在worker controller里面发现有一些细小逻辑在对不同workload做一些特殊处理,因此如果karmada支持的workload和对象越多,后续这种逻辑也会更多。如果不通读整个工程话,是挺容易产生bug的,后续维护也挺头痛;

总结

Karmada出发点是想让用户尽量不要修改原始的物料,来完成单集群到多集群的修改。但是本文也指出了Karmada的一些缺点(也是感谢评论区的指正)。同时社区也有一个竞品方案https://open-cluster-management.io,但是整体开发进度没有karmada快,过段时间也会专门对比一下。

在工作中我们经常需要一些任务编排工具,例如做机器学习训练,大数据处理,pipeline之类的开发。而argo-workflow正是利用k8s的CRD开发出来的一款开源的流程引擎,

渐进式的Deployment–Argo Rollout

导语:熟悉k8s的同学知道Deployment目前只支持RollingUpgrade和ReCreate两种策略。而对于运维的同学而言,实际生产环境中更多应该使用灰度发布和蓝绿部署,笔者本想尝试造轮子,实现一个加强版,正好网上搜索到Argo-Rollout和我想法一致,就不用重复造轮子了,本文就是体验一下Argo-Rollout。

简介

Argo-Rollout是一个Kubernetes Controller和对应一系列的CRD,提供更强大的Deployment能力。包括灰度发布、蓝绿部署、更新测试(experimentation)、渐进式交付(progressive delivery)等特性。

支持特性:

  • 蓝绿部署
  • 灰度发布
  • 细粒的,带权重的流量调度(traffic shifting)
  • 自动rollback和promotion
  • 手动管理
  • 可定制的metric查询和kpi分析
  • Ingress controller集成:nginx,alb
  • Service Mesh集成:Istio,Linkerd,SMI
  • Metric provider集成:Prometheus, Wavefront, Kayenta, Web, Kubernetes Jobs

原理:

Argo原理和Deployment差不多,只是加强rollout的策略和流量控制。当spec.template发送变化时,Argo-Rollout就会根据spec.strategy进行rollout,通常会产生一个新的ReplicaSet,逐步scale down之前的ReplicaSet的pod数量。

安装

官方安装文档

1.安装argo-rollouts的controller和crd

1
2
kubectl create namespace argo-rollouts
kubectl apply -n argo-rollouts -f https://raw.githubusercontent.com/argoproj/argo-rollouts/stable/manifests/install.yaml

2.安装argo-rollouts的kubectl plugin

1
2
3
curl -LO https://github.com/argoproj/argo-rollouts/releases/latest/download/kubectl-argo-rollouts-linux-amd64
chmod +x ./kubectl-argo-rollouts-linux-amd64
mv ./kubectl-argo-rollouts-linux-amd64 /usr/local/bin/kubectl-argo-rollouts

使用

灰度发布包含Replica Shifting和Traffic Shifting两个过程。

Replica Shifting

这里就直接拿官网的例子,来体验一下Replica Shifting。

1.部署一个Demo应用

首先创建一个Rollout的CR和访问该CR的Service:

1
2
kubectl apply -f https://raw.githubusercontent.com/argoproj/argo-rollouts/master/docs/getting-started/basic/rollout.yaml
kubectl apply -f https://raw.githubusercontent.com/argoproj/argo-rollouts/master/docs/getting-started/basic/service.yaml

Rollout CR,可以看到除了apiVersionkind以及strategy之外,其他和Deployment无异,实际上其源码基本上都是引用的Deployment的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#cat rollout.yaml
apiVersion: argoproj.io/v1alpha1
kind: Rollout
metadata:
name: rollouts-demo
spec:
replicas: 5
strategy:
canary:
steps:
- setWeight: 20
- pause: {}
- setWeight: 40
- pause: {duration: 10}
- setWeight: 60
- pause: {duration: 10}
- setWeight: 80
- pause: {duration: 10}
revisionHistoryLimit: 2
selector:
matchLabels:
app: rollouts-demo
template:
metadata:
labels:
app: rollouts-demo
spec:
containers:
- name: rollouts-demo
image: argoproj/rollouts-demo:blue
ports:
- name: http
containerPort: 8080
protocol: TCP
resources:
requests:
memory: 32Mi
cpu: 5m

暴露的service:

1
2
3
4
5
6
7
8
9
10
11
12
apiVersion: v1
kind: Service
metadata:
name: rollouts-demo
spec:
ports:
- port: 80
targetPort: http
protocol: TCP
name: http
selector:
app: rollouts-demo

可以使用Argo-Rollout提供的plugin查看其状态,感觉还是很香:

1
[root@xiowang-dev ~/yaml]# kubectl argo rollouts get rollout rollouts-demo

部署Rollout

2.更新spec触发rollout

然后通过修改spec中的镜像,触发一次rollout:

1
kubectl argo rollouts set image rollouts-demo rollouts-demo=argoproj/rollouts-demo:yellow

预期Rollout会创建一个新的ReplicaSet,并且逐步扩容新的ReplicaSet和缩容旧的ReplicaSet,用plugin查看一下:

1
# kubectl argo rollouts get rollout rollouts-demo --watch

Canary状态

可以看到Rollout新创建了ReplicaSet rollouts-demo-789746c88d,并且将老ReplicaSet的Pod转移到新的ReplicaSet,新老ReplicaSet的pod比例为: 1:4,并且状态为Paused,没有继续升级新pod,为什么呢?

主要原因就在这个spec.strategy,通过这个strategy我们可以看到其为升级设定了steps,由于是个列表,因此其会按照顺序执行。这里第一步就是setWeight:20,意味着需要将20%的pod更新为新版本;第二步动作为pause: {},意味着将永久暂停,需要人为通过plugin使其继续:

1
2
3
4
5
6
7
8
9
10
11
strategy:
canary:
steps:
- setWeight: 20
- pause: {}
- setWeight: 40
- pause: {duration: 10} #停顿10s
- setWeight: 60
- pause: {duration: 10}
- setWeight: 80
- pause: {duration: 10}

我们通过promote命令使其进行下一步:

1
# kubectl argo rollouts promote rollouts-demo

让我们再查看结果,所有pod都为新的ReplicaSet的pod:

Finished

Traffic Shifting

上面例子演示了Argo-Rollout如何控制Replica Shifting,而正常的灰度过程,应该包含Replica Shifting和Traffic Shifting两部分。

目前Argo-Rollout主要集成了IngressServiceMesh两种流量控制方法,我的测试环境中目前只部署了Nginx-Controller那就使用Ingress做演示。

1.部署物料

首先删除之前的例子:

1
2
kubectl delete -f https://raw.githubusercontent.com/argoproj/argo-rollouts/master/docs/getting-started/basic/rollout.yaml
kubectl delete -f https://raw.githubusercontent.com/argoproj/argo-rollouts/master/docs/getting-started/basic/service.yaml

再部署官网的例子:

1
2
3
kubectl apply -f https://raw.githubusercontent.com/argoproj/argo-rollouts/master/docs/getting-started/nginx/rollout.yaml
kubectl apply -f https://raw.githubusercontent.com/argoproj/argo-rollouts/master/docs/getting-started/nginx/services.yaml
kubectl apply -f https://raw.githubusercontent.com/argoproj/argo-rollouts/master/docs/getting-started/nginx/ingress.yaml

上面的文件会部署1个rollout,两个service和一个ingress:

Rollout里分别用canaryServicestableService分别定义了该应用灰度的Service Name(rollouts-demo-canary)和当前版本的Service Name(rollouts-demo-stable):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
apiVersion: argoproj.io/v1alpha1
kind: Rollout
metadata:
name: rollouts-demo
spec:
replicas: 1
strategy:
canary:
canaryService: rollouts-demo-canary
stableService: rollouts-demo-stable
trafficRouting:
nginx:
stableIngress: rollouts-demo-stable
steps:
- setWeight: 5
- pause: {}
...

Service rollouts-demo-canary 和 rollouts-demo-stable,二者内容一样。selector中暂时没有填上pod-template-hash,Argo-Rollout Controller会根据实际的ReplicaSet hash来修改该值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
apiVersion: v1
kind: Service
metadata:
name: rollouts-demo-canary
spec:
ports:
- port: 80
targetPort: http
protocol: TCP
name: http
selector:
app: rollouts-demo
# This selector will be updated with the pod-template-hash of the canary ReplicaSet. e.g.:
# rollouts-pod-template-hash: 7bf84f9696

---
apiVersion: v1
kind: Service
metadata:
name: rollouts-demo-stable
spec:
ports:
- port: 80
targetPort: http
protocol: TCP
name: http
selector:
app: rollouts-demo
# This selector will be updated with the pod-template-hash of the stable ReplicaSet. e.g.:
# rollouts-pod-template-hash: 789746c88d

Ingress则定义了规则,nginx将rollouts-demo.local域名的请求转发到当前版本的Service(rollouts-demo-stable):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
name: rollouts-demo-stable
annotations:
kubernetes.io/ingress.class: nginx
spec:
rules:
- host: rollouts-demo.local
http:
paths:
- path: /
backend:
# Reference to a Service name, also specified in the Rollout spec.strategy.canary.stableService field
serviceName: rollouts-demo-stable
servicePort: 80

Rollout Controller会根据ingress rollouts-demo-stable内容,自动创建一个ingress用了灰度的流量,名字为--canary,所以这里多了一个ingress rollouts-demo-rollouts-demo-stable-canary,将流量导向Canary Service(rollouts-demo-canary):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
generation: 1
name: rollouts-demo-rollouts-demo-stable-canary
namespace: default
ownerReferences:
- apiVersion: argoproj.io/v1alpha1
blockOwnerDeletion: true
controller: true
kind: Rollout
name: rollouts-demo
uid: 2d5b728b-2f71-4bf2-8283-323acf8ef573
spec:
rules:
- host: rollouts-demo.local
http:
paths:
- backend:
serviceName: rollouts-demo-canary
servicePort: 80
path: /

2.触发更新

1
2
kubectl argo rollouts set image rollouts-demo rollouts-demo=argoproj/rollouts-demo:yellow
kubectl argo rollouts get rollout rollouts-demo

可以看到Rollout状态中SetWeight为5了

traffic

同时查看Ingress,多了nginx.ingress.kubernetes.io/canarynginx.ingress.kubernetes.io/canary-weight 两条annotation:

1
2
3
4
5
6
7
8
9
10
11
12
13
#当前版本Ingress

#灰度Ingress
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
annotations:
kubernetes.io/ingress.class: nginx
nginx.ingress.kubernetes.io/canary: "true"
nginx.ingress.kubernetes.io/canary-weight: "5"
creationTimestamp: "2020-07-05T06:31:54Z"
generation: 1
name: rollouts-demo-rollouts-demo-stable-canary

细心的你也能看出上面结果显示有一个小问题问题ActualWeight:50,这里应该为5或者95,所以顺便提了个issue给社区。

总结

Argo-Rollout提供更加强大的Deployment,包含比较适合运维的灰度发布和蓝绿发布功能。本文也是简单体验了一下其灰度发布功能。

本文未提及的功能包括:

  1. Experiments,可以加入到Steps中,用于检验每个Step是否符合用户预期;
  2. Analysis,用于统计Rollout中的各种metrics,包括每个Step花费时间等。

另外想到一个需求Argo-Rollout暂时未支持:

对于traffic-shifting,在做灰度的时候应该是让固定的一些用户或者url流量到新版本,目前Argo-Rollout并不支持。

当然上面这个问题可以通过添加一个Experiment,由该Experiment去修改Ingress或者SMI中的内容来实现。

除去功能之外,从源码学习的角度来说,Argo-Rollout仍然是一个好项目,结构清晰,适合学习写Controller和Plugin。

思考:为什么不算复杂的代码,k8s自己不实现呢?猜测官方为了鼓励大家多写CRD吧,哈哈!

导读:目前完全专注于云原生的paas平台建设上,为了进一步了解k8s并且方便debug,因此对kubelet的源码进行初步分析和学习。其实网上分析kubelet的文章已经蛮多了,但是不过一遍代码,会给人仅仅背书的印象。这里主要简单梳理kubelet的启动过程和syncLoop过程,kubelet的各个组件可以后面单独分析和整理。

启动过程

kubelet的启动入口函数和其他组件一样,放在cmd/kubelet/kubelet.go下:

1
2
3
4
5
6
7
8
9
10
11
func main() {
rand.Seed(time.Now().UnixNano())

command := app.NewKubeletCommand()
logs.InitLogs()
defer logs.FlushLogs()

if err := command.Execute(); err != nil {
os.Exit(1)
}
}

通过调用NewKubeletCommand来创建cobra的一个command对象,在该对象Run方法中主要做了三件事:通过传入command的参数和配置文件传入kubelet所需的配置信息,初始化kubeletDeps(指明kubelet所依赖的组件),然后调用Run创建并启动kubelet:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
...
Run: func(cmd *cobra.Command, args []string) {
...
// construct a KubeletServer from kubeletFlags and kubeletConfig
kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}
...
// use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
if err != nil {
klog.Fatal(err)
}
...
// run the kubelet
klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {
klog.Fatal(err)
}
},
}
}

Run中仅仅调用了run,所以直接查看run函数,run主要干了以下几件事:

  1. 初始化一下基本配置,包括kubeclient,eventclient,heartbeatclient,ContainerManager…;
  2. 通过PreInitRuntimeService初始化CRI,创建容器和执行kubectl exec的流式Server都是在这里初始化;
  3. 通过RunKubelet初始化kubelet所有依赖的组件,获取docker的配置文件路径,并且运行kubelet;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) {
...
err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
kubeDeps, &s.ContainerRuntimeOptions,
s.ContainerRuntime,
s.RuntimeCgroups,
s.RemoteRuntimeEndpoint,
s.RemoteImageEndpoint,
s.NonMasqueradeCIDR)
...
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
...
}

接下来重点说一下RunKubelet,它一个是会通过createAndInitKubelet来实例化一个kubelet(包含各种manager和module的注册),再通过startKubelet通过Kubelet.Run启动kubelet以及其注册的所有manager和module:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
...
k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
kubeDeps,
&kubeServer.ContainerRuntimeOptions,
kubeServer.ContainerRuntime,
kubeServer.HostnameOverride,
kubeServer.NodeIP,
kubeServer.ProviderID,
kubeServer.CloudProvider,
kubeServer.CertDirectory,
kubeServer.RootDirectory,
kubeServer.RegisterNode,
kubeServer.RegisterWithTaints,
kubeServer.AllowedUnsafeSysctls,
kubeServer.ExperimentalMounterPath,
kubeServer.ExperimentalKernelMemcgNotification,
kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
kubeServer.MinimumGCAge,
kubeServer.MaxPerPodContainerCount,
kubeServer.MaxContainerCount,
kubeServer.MasterServiceNamespace,
kubeServer.RegisterSchedulable,
kubeServer.KeepTerminatedPodVolumes,
kubeServer.NodeLabels,
kubeServer.SeccompProfileRoot,
kubeServer.BootstrapCheckpointPath,
kubeServer.NodeStatusMaxImages)
...
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
...
}

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
// start the kubelet
go wait.Until(func() {
k.Run(podCfg.Updates())
}, 0, wait.NeverStop)

// start the kubelet server
if enableServer {
go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)

}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
klog.Warning("No api server defined - no node status update will be sent.")
}

// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}

if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.Fatal(err)
}

// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
go kl.fastStatusUpdateOnce()

// start syncing lease
go kl.nodeLeaseController.Run(wait.NeverStop)
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

// Set up iptables util rules
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}

// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

// Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()

// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}

// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}

syncLoop过程

从Kubelet启动的Kubelet.Run最后一行代码,kl.syncLoop(updates, kl),跳进去我们就可以看到kubelet最核心的syncLoop过程,也就是更新pod消息处理过程。这里就直接画图来解释了。

  1. 用户从http,静态文件以及APIServer对pod的修改通过PodConfigchannel传递到syncLoop
  2. syncLoopsyncLoopIterationPodConfig中取出update的内容,一方面会通过podManger里更新pod状态,另一方面会通过dispatchWork将更新内容通过PodWoker更新pod状态,调用的是syncPod这个接口(由Kubelet.syncPod实现);
  3. syncPod这里通过podStatusChannelchannel更新状态到statusManager, 再patch Status到APIServer;
  4. syncPod一方面通过containerManager更新non-runtime的信息,例如QoS,Cgroup信息;另外一方面通过CRI更新pod的状态(对于更加详细的pod操作过程主要通过研究Dockershim,或者其他shim就可以搞清楚了);
  5. 另外一方面,PLEG会周期(默认1s)通过relist从CRI获取所有pod当前状态并且跟之前状态对比产生Pod的event发送到syncLoop;

Kubelet SyncLoop

小结

本文对kubelet的启动过程进行了学习和总结,并且简单介绍了一下syncLoop的处理流程。大概清楚了kubelet的代码结构,后面可以针对CRI,CSI以及Device Plugin等内容进行研究。

导读:上一篇blog主要是简单介绍了一下kubebuilder的使用,这里再分析一下kubebuilder的代码逻辑。

1.kubebuilder的基础:controller

首先介绍一下k8s的一个controller的逻辑,下图主要参考client-go给的一个workQueue的例子
Controller
这里我把代码分为通用的Common partSpecial Part。前者是client-go的基本流程,而后者部分是controller自身逻辑部分。具体过程包含8个步骤:

1.Reflector通过ListAndWatch方法去监听指定的Object;

1
2
3
4
5
6
7
8
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}

2.Reflector会将所监听到的event,包括对object的AddUpdateDelete的操作push到DeltaFIFO这个queue中;

3.Informer首先会解析event中的action和object;

4.Informer将解析的object更新到local store,也就是本地cache中的数据更新;

5.然后Informer会执行Controller在初始化Infromer时注册的ResourceEventHandler(这些callback是可以自己修改的);

6.ResourceEventHandler中注册的callback会将对应变化的object的key存入其初始化的一个workQueue;
7.最终controller会循环进行reconcile,就是从workQueue不停地pop key,然后去local store中取到对应的object,然后进行处理,最终多数情况会再通过client去更新这个object。

上面这个具体过程,我看网上大佬们也都分析过很多次了,但是根据我的经验呢,还是需要去看一遍代码心里才有底。以后出问题或者二次开发的时候,都是需要知道重要的结构体和API的。

2.kubebuilder的封装

想说明一下kubebuilder实际上是提供了对client-go进行封装的library,更加便利我们来开发k8s的operator。

我上面提到的workQueue的例子已经实现了一个controller的逻辑。而kubebuilder还帮我们做了以下的额外工作:

  1. kubebuilder引入了manager这个概念,一个manager可以管理多个controller,而这些controller会共享manager的client;
  2. 如果manager挂掉或者停止了,所有的controller也会随之停止;
  3. kubebuilder使用一个map[GroupVersionKind]informer来管理这些controller,所以每个controller还是拥有其独立的workQueue,deltaFIFO,并且kubebuilder也已经帮我们实现了这部分代码;
  4. 我们主要需要做的开发,就是写Reconcile中的逻辑。

1.Manager通过map[GroupVersionKind]informer启动所有controller:

1
2
3
4
5
6
7
8
func (ip *specificInformersMap) Start(stop <-chan struct{}) {
...
for _, informer := range ip.informersByGVK {
go informer.Informer.Run(stop)
}
...
<-stop
}

2.Controller处理event的逻辑都在https://github.com/kubernetes-sigs/controller-runtime/blob/master/pkg/internal/controller/controller.go这个文件里面,其实它就是实现了workqueue这个例子的大部分代码,推荐先看懂这个例子再来分析这个文件。

#3.小结
写这篇blog本来想讲一下kubebuilder的代码流程的,但是发现要理解kubebuilder必须先理解client-go,基本上client-go代码熟悉之后再来分析kubebuilder就easy了…我自己太笨了,花了很多时间去啃client-go,假期余额紧张,所以关于kubebuilder自身的分析就没写太多了,有时间再补充吧。网上有一篇关于kubebuilder的分析,有兴趣的同学可以参考一下吧,但是我觉得做二次开发一定要看自己亲自看和分析一遍。

导读: 作为云原生的从业者,多多少少也会听说CRD(Custom Resource Definition)和operator,但是说实话开发起来挺繁琐的,而且对初学者也不友好。还好社区目前有好使的脚手架:kubebuilderoperator-sdk,目前个人感觉大家会往前者站队,毕竟k8s自家的,索性就学着用它来尝试开发operator了。考虑到篇幅和观感问题,这里就只介绍最基本的使用,像finalizerwebhook建议直接查官方文档,code逻辑另外写一篇心得。

1.安装

安装参考官方指南, 由于github下载太慢我就用码云fork了源文件来安装的:

1
2
3
4
git clone https://gitee.com/henrywangx/kubebuilder.git
cd kubebuilder
make build
cp bin/kubebuilder $GOPATH/bin

2.使用

kubebuilder依赖go module所以要打开go module环境变量:export GO111MODULE=on, 另外proxy或者墙的原因,先设一下go mod的proxy:export GOPROXY=https://goproxy.io, 然后就可以开始使用了。总结就是要:

1
2
export GO111MODULE=on
export GOPROXY=https://goproxy.io

2.1创建project

1
2
3
mkdir $GOPATH/src/demo
cd $GOPATH/src/demo
kubebuilder init --domain demo.com --license apache2 --owner "xiong"

然后呢check一下当前文件夹:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
tree .
.
├── Dockerfile
├── Makefile
├── PROJECT
├── bin
│   └── manager
├── config
│   ├── certmanager
│   │   ├── certificate.yaml
│   │   ├── kustomization.yaml
│   │   └── kustomizeconfig.yaml
│   ├── default
│   │   ├── kustomization.yaml
│   │   ├── manager_auth_proxy_patch.yaml
│   │   ├── manager_webhook_patch.yaml
│   │   └── webhookcainjection_patch.yaml
│   ├── manager
│   │   ├── kustomization.yaml
│   │   └── manager.yaml
│   ├── prometheus
│   │   ├── kustomization.yaml
│   │   └── monitor.yaml
│   ├── rbac
│   │   ├── auth_proxy_role.yaml
│   │   ├── auth_proxy_role_binding.yaml
│   │   ├── auth_proxy_service.yaml
│   │   ├── kustomization.yaml
│   │   ├── leader_election_role.yaml
│   │   ├── leader_election_role_binding.yaml
│   │   └── role_binding.yaml
│   └── webhook
│   ├── kustomization.yaml
│   ├── kustomizeconfig.yaml
│   └── service.yaml
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
└── main.go

这里kubebuilder帮我们生成了一下模板文件夹,包括解决crd的rbac, cert, webhook的文件。暂时不用管他们,这时需要保证你的终端能访问k8s的测试集群,简单就是用kubectl cluster-info看看是否出错,如果不出错,就可以run起来main.go了

1
2
kubectl cluster-info
go run main.go

可以看到终端输出的log:

1
2
3
2019-12-28T00:22:09.789+0800	INFO	controller-runtime.metrics	metrics server is starting to listen	{"addr": ":8080"}
2019-12-28T00:22:09.789+0800 INFO setup starting manager
2019-12-28T00:22:09.790+0800 INFO controller-runtime.manager starting metrics server {"path": "/metrics"}

main.go里面可以看出其实kubebuilder帮我们生成一个管理controller的manager的代码,但是还没添加controller(controller是指管理crd的控制器):

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
...
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
LeaderElection: enableLeaderElection,
Port: 9443,
})
...
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
...
}
}

ok, 接下来我们就可以用kubebuilder帮我们创建一个我们想要的crd,我就叫这个crd为Object吧:

1
kubebuilder create api --group infra --version v1 --kind Object

这里简单注意一下, groupversion, kind这三个属性组合起来来标识一个k8s的crd。另外就是kind要首字母大写而且不能有特殊符号。

执行上面的命令之后,kubebuilder就帮我们创建了两个文件api/v1/object_types.gocontrollers/object_controller.go, 前者是这个crd需要定义哪些属性,而后者是对crd的reconsile的处理逻辑(也就是增删改crd的逻辑), 我们后面再讲这两个文件。最后呢,在main.go里面,我们定义的Object对应的controller会注册到之前生成的manager里:

1
2
3
4
5
6
7
8
9
10
11
function main(){
...
// 注册Object的controller到manager里
if err = (&controllers.ObjectReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Object"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
...
}
...

聪明的你一定能猜到,我们反复执行kubebuilder create api xxx这条命令就会帮我们创建和注册不同的controller到manager里面。

回过头我们再看一下api/v1/object_types.go,这里我在spec里面加一个Detail,在status里面加一个Created

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ObjectSpec defines the desired state of Object
type ObjectSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file

// Foo is an example field of Object. Edit Object_types.go to remove/update
Foo string `json:"foo,omitempty"`
Detail string `json:"detail,omitempty"`
}

// ObjectStatus defines the observed state of Object
type ObjectStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Created bool `json:"created,omitempy"`
}

实际上kubebuilder就是帮我们生成Object的spec和status的模板,从注释就也可以看出来spec是我们期望的crd状态,而status就是观测到的状态,具体也可以参见k8s对一个对象的定义。可以看到默认定义下面,kubebuilder会为我们生成对应的yaml文件在config/samples/infra_v1_object.yaml:

1
2
3
4
5
6
7
8
9
---
apiVersion: infra.demo.com/v1
kind: Object
metadata:
name: object-sample
spec:
# Add fields here
foo: bar
detail: "detail for demo"

controllers/object_controller.go,也就是kubebuilder帮我们生成的Reconcile代码里面,我添加了打印Detail的信息,并且把Created改成true:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//此method在controllers/object_controller.go
func (r *ObjectReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
_ = r.Log.WithValues("object", req.NamespacedName)
// your logic here

// 1. Print Spec.Detail and Status.Created in log
obj := &infrav1.Object{}
if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
fmt.Errorf("couldn't find object:%s", req.String())
} else {
//打印Detail和Created
r.Log.V(1).Info("Successfully get detail", "Detail", obj.Spec.Detail)
r.Log.V(1).Info("", "Created", obj.Status.Created)
}
// 2. Change Created
if !obj.Status.Created {
obj.Status.Created = true
r.Update(ctx, obj)
}

return ctrl.Result{}, nil
}

由于kubebuilder要用到kustomize,所以先确保先装好了, mac的安装方式:

1
brew install kustomize

然后就install生成的crd并且运行修改代码后的manager:

1
2
make install
go run main.go

现在我们就可以用config/samples/infra_v1_object.yaml创建一个Object:

1
kubectl create -f config/samples/infra_v1_object.yaml

在运行manager的终端里面可以看到我们刚才添加的代码打印出来的log:

1
2
3
4
5
2019-12-28T23:39:34.963+0800	DEBUG	controllers.Object	Successfully get detail	{"Detail": "detail for demo"}
2019-12-28T23:39:34.963+0800 DEBUG controllers.Object {"Created": false}
2019-12-28T23:39:35.019+0800 DEBUG controller-runtime.controller Successfully Reconciled {"controller": "object", "request": "default/object-sample"}
2019-12-28T23:39:35.019+0800 DEBUG controllers.Object Successfully get detail {"Detail": "detail for demo"}
2019-12-28T23:39:35.019+0800 DEBUG controllers.Object {"Created": true}

再看看从k8s里面看到的这个Object的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kubectl get object object-sample -o yaml
apiVersion: infra.demo.com/v1
kind: Object
metadata:
creationTimestamp: "2019-12-28T15:39:34Z"
generation: 2
name: object-sample
namespace: default
resourceVersion: "433782"
selfLink: /apis/infra.demo.com/v1/namespaces/default/objects/object-sample
uid: 3f4b368d-2988-11ea-a544-080027d6a71e
spec:
detail: detail for demo
foo: bar
status:
Created: true #此处是我们修改成true的状态

可以看到这个object-samplestatus.Created已经被修改为true了

3.其他

关于没有介绍的crt,webhook和finalizer,官网手册有比较详细的用法介绍。我就简单谈一下我的理解,如果有错误请纠正:

  1. webhook,其实就是当我们添加和修改一个Object时,我们需要对Object的合法性进行判断,所以可以通过webhook的framework来进行合法性的判定,所以kubebuilder可以生成对应的webhook代码;
  2. crt,用于解决webhook访问k8s时所需要的证书问题,官网也建议使用crt-manager解决证书问题;
  3. finalizer,就是在删除Object时,由于这个Object可能创建一些其他的resource比如pod之类的,又或者在删除之前,我们需要做一些清理工作,finalizer就是实现这个清理的framework代码;

另外kubebuilder也是支持把这个manager部署为deployment,但是调试起来比较麻烦,所以就只用go run的形式演示了。

小结

这篇blog简单的介绍了一下kubebuilder开发crd的基本过程,没有深入过多的代码原理,可能也有不少错误地方,麻烦帮忙纠正。另外大家可能跟我刚学习kubebuilder的时候一样,只能照着官网教程敲命令,kubebuilder生成的代码就像一个黑盒一样,接下来目标就是专门整理一下kubebuilder生成crd代码流程和结构。

前言

本人是在一个传统外企工作两年多的弱鸡,一个月前,所在的部门被一锅端掉了,当然赔偿也是比较可观,所以大部分人第一反应也是比较淡定的…但是呢,本人由于一方面刚买房欠着一屁股首付款😢;另外一方面,我觉得休息超过一个月时间会容易让人感到疲惫和堕落,所以下决心还是要在一个月内找到工作。结局还是符合我的计划,一个月内顺利拿到了几家大厂的offer,马上入职。这篇blog只是想简单阐述一下感受,而不是起个什么噱头,不然我大可以把标题改成–我是如何一个月拿到xx,xx,xx的offer的。

裁员

首先关于裁员,我正好在一家美企,一方面由于中美贸易,另外一方面所处的行业被人喻为是夕阳行业,所以同事间也老早就感觉到会裁员了。又因为赔偿比较可观,所以大家都是比较期待这份裁员的。而当这件事真正发生到大家身上时,其实很多人在第一个星期是无法入眠的。其实很好分析,大家主要也是2种心态:

  1. 入行不深的年轻人,包括我自己,比较兴奋能够获得一大笔资金。但是另一方面心里也有不甘,第一份工作竟然是如此草草结束的;
  2. 习惯公司的老年人,一部分同事其实是对大礼包本以翘首以盼,而另一部分同事又对失业或者下一个落脚地表示紧张和不安;

所以大家反应都是开心,激动和不安。

找工作

同样分为年轻人和老年人:

  1. 对于年轻人比如我自己,由于我们部门裁员前一直在做云原生方向的开发。正好市面上云原生是一个非常火的方向,自学或者培训是没办法让你掌握这方面的知识。同时年末,市面上也是没有多少人愿意出来试机会的,所以总体来说供不应求。
  2. 而对于老年人,大家的技术栈和经验更多在老品或者说非互联网的领域,相对来说市面上的需求并没有这么大,而另一方面由于工龄较长,对应工龄的岗位也是相对来说较少,所以可能看上去没有年轻人那么抢手。

如何避免找工作困难的窘境

首先,我觉得我是没有什么资格来回答这个问题的,网上有各种大佬和牛人。我只是有一些思考,避免自己也中年失业,从我观察中得出一些体会:

  1. 学会总结和思考。很多同事都会说自己有总结有思考啊,可是为什么被面试的时候还是什么都说不清楚,没法展示自己。这里我只能说,你思考得不够彻底,你只是片面的对问题或者设计理解了。那如何深刻理解呢?我觉得是分享,分享才能让你掌握90%的知识。由于在工作期间我经常需要写文档给人review,另一方面虽然我很懒,但是我也尽量把平时遇到的问题整理下来,形成blog或者其他文档。每次写文档你都会问自己,这样别人能看懂吗?怎么样读者是能够最容易理解你的point?读者会问什么样的问题呢,为什么要这样设计呢?所以写文档作分享的时候会让你把之前没考虑的问题重新思考和纠正一遍。找工作之前我没有作任何准备,包括刷题,但是我都顺利的通过了面试。一方面是运气好,云原生供不应求,另一方面就是因为我的每次面试基本上都是在分享我的文档或者给人作presentation,应该能让面试官能够比较轻松的了解你做了什么而且是有思考的。
  2. 找准定位。一定要明白自己的优势和劣势,对于我而言,我的优势是在拥有开发经验和技能的同时熟悉云原生的体系和知识,所以我一定要在云原生的环境下找工作的,而不是简单去做一个业务的后台开发工程师,CRUD我是拼不过别人的,但是他们是没法拥有云原生的实际使用经验的。
  3. 热爱工作。如何避免中年失业危机,我觉得首先大家如果要继续在这个行业干,需要保持激情,热爱这个行业(我觉得这个比较难),但是我是通过热爱解决问题,来热爱这个行业。
  4. 工程化能力。首先我们公司的老年人是非常优秀的,尤其是我的同事。我的同事是一个非常资深的engineer,我能感觉到他的工匠精神,对问题思考的全面和严谨性,工程化的思想,能抓住痛点,解决主要问题的思考方式。所以我觉得他是非常适合任何行业,也能够顺利找到好工作的。从他的身上,也提醒了我,你所掌握的技能可能会过时,但是一定要具备工程能力,解决问题的能力。总之我的同事是非常值得我学习的,希望以后也能达到他的水平,任何时候都不用畏惧失业。

总结

其实就是马上准备要入职了唠嗑一下吧,另外也有同事让我给他们讲我怎么面试的,实在没办法,又把整个项目架构做的东西给他们讲了一遍。其实我希望他们自己能讲出来,而不是我来告诉他们了。马上要入职了,希望自己能够进一步的成长吧,成熟一点😝也希望所有人能够找到适合自己的岗位。