纸上得来终觉浅,绝知此事要躬行

0%

OpenLake:云原生的数据湖搭建

大数据和云原生一直都是基建方面的两个热点,而现在越来越多的大数据基建逐渐往云原生方向发展,例如云原生的消息队列Pulsar,又例如Snowflake提供云原生的数仓。因此笔者想要探索大数据和云原生的结合,于是发现了一个非常有意思的项目Openlake,该项目挂在minio下,是在kubernetes环境下,利用minion,spark,kafka,dremio,iceberg搭建一套数据湖,非常适合学习,本文主要就是记录搭建过程和心得。

0.准备kubernetes环境

如果已经有集群可以跳过本节,我这边想快速做实验所以采用docker-compose的方式在linux上搭建k3s

安装docker compose,参考guide,执行如下命令:

1
2
sudo curl -L "https://github.com/docker/compose/releases/download/1.25.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose

本地创建一个目录用于保存k3s的配置和数据:

1
2
mkdir -p /home/xiowang/k3s
mkdir -p /home/xiowang/k3s/data

创建docker-compose.yaml用于k3s启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#vim /home/xiowang/k3s/docker-compose.yaml
version: '3'
services:
server:
image: rancher/k3s:v1.20.2-k3s1
container_name: k3s_server
hostname: xiowang.dev
command: server --tls-san=xiowang.dev
tmpfs:
- /run
- /var/run
privileged: true
restart: always
ports:
- 6443:6443
- 443:30443
- 80:30080
- 50050:30050
- 50051:30051
- 32000:32000
- 32001:32001
environment:
- K3S_TOKEN=16963276443662
- K3S_KUBECONFIG_OUTPUT=/root/.kube/config
- K3S_KUBECONFIG_MODE=600
volumes:
- /var/lib/rancher/k3s:/var/lib/rancher/k3s
- /etc/rancher:/etc/rancher
- /home/xiowang/k3s/.kube:/root/.kube
- /home/xiowang/k3s/data:/data:shared,rw

上面的配置,我们主要做了:

  1. 通过本机的6443用于访问kubernetes的apiserver,方便kubectl进行管理;
  2. 通过本机的443和80分别映射集群的nodeport:30443和30080;
  3. 把kubeconfig保存到/home/xiowang/k3s/.kube;
  4. 挂载集群的/data目录到/home/xiowang/k3s/data
  5. 本地的32000和32001用于后续暴露minio的端口;

开始启动k3s:

1
2
cd /home/xiowang/k3s
docker-compose up -d

因为本机80和443对应集群的nodeport:30443和30080,所以这里改一下trafik的service,将其80和443的nodeport分别指向30080和30443:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#kubectl -n kube-system edit svc traefik
apiVersion: v1
kind: Service
metadata:
annotations:
meta.helm.sh/release-name: traefik
meta.helm.sh/release-namespace: kube-system
creationTimestamp: "2023-05-17T09:29:42Z"
labels:
app: traefik
app.kubernetes.io/managed-by: Helm
chart: traefik-1.81.0
heritage: Helm
release: traefik
name: traefik
namespace: kube-system
resourceVersion: "368985"
uid: 7bbb1758-ca01-4e84-b166-dae950613adf
spec:
clusterIP: 10.43.115.234
clusterIPs:
- 10.43.115.234
externalTrafficPolicy: Cluster
ports:
- name: http
nodePort: 30080
port: 80
protocol: TCP
targetPort: http
- name: https
nodePort: 30443
port: 443
protocol: TCP
targetPort: https
selector:
app: traefik
release: traefik
sessionAffinity: None
type: LoadBalancer

拷贝/home/xiowang/k3s/.kube/config到本机的~/.kube/config(建议使用kubecm来管理)

1
cp /home/xiowang/k3s/.kube/config ~/.kube/config

接下来我们就可以开始openlake之旅了

1.安装和配置minio

在kubernetes上安装minio,有两种推荐方式:

  1. operator的方式安装minio,参考guide
  2. helm安装minio,参考guide

这里暂时没时间学习operator的CRD,所以采用helm安装minio,步骤如下

1
2
3
helm repo add minio https://charts.min.io/
#设置密码和磁盘大小,默认也没有tls(根据需求改一下,我就20Gi用于测试)
helm upgrade --install --namespace minio --set rootUser=rootuser,rootPassword=rootpass123,persistence.size=20Gi,resources.requests.memory=100Mi,resources.limits.memory=2Gi,replicas=3,service.type=NodePort,consoleService.type=NodePort one --create-namespace minio/minio

成功部署日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
espace minio/minio
NAME: one
LAST DEPLOYED: Mon May 22 12:22:31 2023
NAMESPACE: minio
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
MinIO can be accessed via port 9000 on the following DNS name from within your cluster:
one-minio.minio.svc.cluster.local

To access MinIO from localhost, run the below commands:

1. export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")

2. kubectl port-forward $POD_NAME 9000 --namespace minio

Read more about port forwarding here: http://kubernetes.io/docs/user-guide/kubectl/kubectl_port-forward/

You can now access MinIO server on http://localhost:9000. Follow the below steps to connect to MinIO server with mc client:

1. Download the MinIO mc client - https://min.io/docs/minio/linux/reference/minio-mc.html#quickstart

2. export MC_HOST_one-minio-local=http://$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootUser}" | base64 --decode):$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootPassword}" | base64 --decode)@localhost:9000

3. mc ls one-minio-local

这里注意上面日志中的MC_HOST_one-minio-local环境变量名似乎是非法的,所以我换成了MC_HOST_one_minio_local:

1
2
3
2. export MC_HOST_one_minio_local=http://$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootUser}" | base64 --decode):$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootPassword}" | base64 --decode)@localhost:9000

3. mc ls one_minio_local

若想访问minio的console控制台,则forward 9001端口,再用root账号登陆localhost:9001:

1
2
3
export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")

kubectl port-forward $POD_NAME 9001 --namespace minio

而访问bucket则是9000端口

1
2
export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")
kubectl port-forward $POD_NAME 9000 --namespace minio

2.搭建spark on k8s

spark on k8s是google发起的一个开源项目(不是google的官方产品),使用operator对k8s的资源进行调度,方便spark对接k8s。

spark on k8s采用helm安装,安装命令如下:

1
2
3
4
5
6
7
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm install my-release spark-operator/spark-operator \
--namespace spark-operator \
--set webhook.enable=true \
--set image.repository=openlake/spark-operator \
--set image.tag=3.3.2 \
--create-namespace

验证部署结果:

1
kubectl get pods -n spark-operator

应该能看到my-release的pod:

1
2
NAME                                           READY   STATUS      RESTARTS   AGE
my-release-spark-operator-6547984586-xzw4p 1/1 Running 2 4d20h

参考例子部署一个spark应用,保存为spark-pi.yaml用于计算pi(这里注意之前helm会在spark-operator的namespace中为serviceAccount: my-release-spark部署rbac,因此这里spark app都在spark-operator的namespace中,使用serviceAccount: my-release-spark运行):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: pyspark-pi
namespace: spark-operator
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "openlake/spark-py:3.3.2"
imagePullPolicy: Always
mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
sparkVersion: "3.3.2"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.3.2
serviceAccount: my-release-spark
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.3.2
1
kubectl apply -f spark-pi.yaml

查看sparkapp和pods:

1
2
3
4
5
6
7
8
#kubectl -n spark-operator get sparkapp,pod
NAME STATUS ATTEMPTS START FINISH AGE
sparkapplication.sparkoperator.k8s.io/pyspark-pi COMPLETED 1 2023-05-22T06:43:24Z 2023-05-22T06:44:37Z 4m50s

NAME READY STATUS RESTARTS AGE
pod/my-release-spark-operator-webhook-init-xzx9c 0/1 Completed 0 4d20h
pod/my-release-spark-operator-6547984586-xzw4p 1/1 Running 2 4d20h
pod/pyspark-pi-driver 0/1 Completed 0 4m46s

查看最后20行log, 可以看到DAGScheduler调度完成的日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#kubectl logs pyspark-pi-driver -n spark-operator --tail 10
23/05/22 06:44:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
23/05/22 06:44:35 INFO DAGScheduler: ResultStage 0 (reduce at /opt/spark/examples/src/main/python/pi.py:42) finished in 1.571 s
23/05/22 06:44:35 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
23/05/22 06:44:35 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
23/05/22 06:44:35 INFO DAGScheduler: Job 0 finished: reduce at /opt/spark/examples/src/main/python/pi.py:42, took 1.615592 s
Pi is roughly 3.145160
23/05/22 06:44:35 INFO SparkUI: Stopped Spark web UI at http://pyspark-pi-60e9d1884232c31f-driver-svc.spark-operator.svc:4040
23/05/22 06:44:35 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
23/05/22 06:44:35 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
23/05/22 06:44:35 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
23/05/22 06:44:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/05/22 06:44:36 INFO MemoryStore: MemoryStore cleared
23/05/22 06:44:36 INFO BlockManager: BlockManager stopped
23/05/22 06:44:36 INFO BlockManagerMaster: BlockManagerMaster stopped
23/05/22 06:44:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/05/22 06:44:36 INFO SparkContext: Successfully stopped SparkContext
23/05/22 06:44:36 INFO ShutdownHookManager: Shutdown hook called
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-4e8e1507-e941-4437-b0b5-18818fc8865f/spark-4f604216-aeab-4679-83ce-f2527613ec66
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-00ddbc21-98da-4ad8-9905-fcf0e8d64129
23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-4e8e1507-e941-4437-b0b5-18818fc8865f/spark-4f604216-aeab-4679-83ce-f2527613ec66/pyspark-10201045-5307-40dc-b6b2-d7e5447763c4

3.使用spark分析minio上的数据

准备好dockerfile,因为和编译命令,因为后续会经常编辑py文件和推镜像.
这里直接使用openlake原文的镜像作为baseimage,因为里面提前安装spark的各种依赖

1
2
3
4
5
FROM openlake/sparkjob-demo:3.3.2

WORKDIR /app

COPY *.py .

编译和push dockerimage命令如下(因为dockerhub经常挂,所以我在华为云上开了一个镜像仓库,可以根据自身情况修改一下):

1
2
3
docker build -t xiowang/spark-minio:3.3.2 .
docker tag xiowang/spark-minio:3.3.2 swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2
docker push swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2

下载纽约出租车司机数据(~112M rows and ~10GB in size):

1
wget  https://data.cityofnewyork.us/api/views/t29m-gskq/rows.csv ./

minio中创建bucket:

1
mc mb one_minio_local/openlake/spark/sample-data

拷贝出租车数据到minio

1
mc cp rows.csv one_minio_local/openlake/spark/sample-data/

进到jupyter中安装pyspark

1
pip3 install pyspark

在minio中创建对one_minio_local/openlake的读写权限的用户,并申请key和secret,如下

1
2
3
4
5
export AWS_ACCESS_KEY_ID=3436ZpuHMvI5EEoR
export AWS_SECRET_ACCESS_KEY=6US0FDsSFdlg5DzbWPPJtS1UeL75Rb0G
export ENDPOINT=one-minio.minio:9000
export OUTPUT_PATH=s3a://openlake/spark/result/taxi
export INPUT_PATH=s3a://openlake/spark/sample-data/rows.csv

创建k8s secret,保存上述信息:

1
2
3
4
5
6
kubectl create secret generic minio-secret \
--from-literal=AWS_ACCESS_KEY_ID=3436ZpuHMvI5EEoR \
--from-literal=AWS_SECRET_ACCESS_KEY=6US0FDsSFdlg5DzbWPPJtS1UeL75Rb0G \
--from-literal=ENDPOINT=http://one-minio.minio:9000 \
--from-literal=AWS_REGION=us-east-1 \
--namespace spark-operator

部署sparkapp:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-minio
namespace: spark-operator
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2"
imagePullPolicy: Always
mainApplicationFile: local:///app/main.py
sparkVersion: "3.3.2"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
memory: "1024m"
labels:
version: 3.3.2
serviceAccount: my-release-spark
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT
executor:
cores: 1
instances: 3
memory: "1024m"
labels:
version: 3.3.2
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT

这个sparkapp中的python脚本内容如下,实际上做的就是统计每天超过6位乘客的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import logging
import os

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinIOSparkJob")

spark = SparkSession.builder.getOrCreate()


def load_config(spark_context: SparkContext):
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
# spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", os.getenv("SSL_ENABLED", "true"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
# spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")


load_config(spark.sparkContext)


# Define schema for NYC Taxi Data
schema = StructType([
StructField('VendorID', LongType(), True),
StructField('tpep_pickup_datetime', StringType(), True),
StructField('tpep_dropoff_datetime', StringType(), True),
StructField('passenger_count', DoubleType(), True),
StructField('trip_distance', DoubleType(), True),
StructField('RatecodeID', DoubleType(), True),
StructField('store_and_fwd_flag', StringType(), True),
StructField('PULocationID', LongType(), True),
StructField('DOLocationID', LongType(), True),
StructField('payment_type', LongType(), True),
StructField('fare_amount', DoubleType(), True),
StructField('extra', DoubleType(), True),
StructField('mta_tax', DoubleType(), True),
StructField('tip_amount', DoubleType(), True),
StructField('tolls_amount', DoubleType(), True),
StructField('improvement_surcharge', DoubleType(), True),
StructField('total_amount', DoubleType(), True)])

# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
# Filter dataframe based on passenger_count greater than 6
large_passengers_df = df.filter(df.passenger_count > 6)

total_rows_count = df.count()
filtered_rows_count = large_passengers_df.count()
# File Output Committer is used to write the output to the destination (Not recommended for Production)
large_passengers_df.write.format("csv").option("header", "true").save(
os.getenv("OUTPUT_PATH", "s3a://openlake-tmp/spark/nyc/taxis_small"))

logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")
logger.info(f"Total Rows for Passenger Count > 6: {filtered_rows_count}")

如果上面的代码跑的有问题可以创建如下一个debug-pod,并进入pod进行debug

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
apiVersion: v1
kind: Pod
metadata:
name: debug-pod
namespace: spark-operator
spec:
containers:
- name: spark-minio
image: swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2
command: ["sleep"]
args: ["infinity"]
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT

最后查看日志,可以看到打印的日志

1
2
3
#kubectl -n spark-operator logs spark-minio-driver
2023-05-25 01:13:49,104 - MinIOSparkJob - INFO - Total Rows for NYC Taxi Data: 112234626
2023-05-25 01:13:49,104 - MinIOSparkJob - INFO - Total Rows for Passenger Count > 6: 1066

4.spark中使用iceberg分析minio上的数据

iceberg是Netflix开源的一款软件,简单来说就是方便大数据工程师通过sql方式操作csv,parquet等文件,并且支持snapshot,具体可以见官网介绍。
原文中的一些地址写死了,这里我修改了一下,保存为main-iceberg.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import logging
import os
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinIOSparkJob")


# adding iceberg configs
conf = (
SparkConf()
.set("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") # Use Iceberg with Spark
.set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.set("spark.sql.catalog.demo.warehouse", os.getenv("WAREHOUSE", "s3a://openlake/warehouse/"))
.set("spark.sql.catalog.demo.s3.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
.set("spark.sql.defaultCatalog", "demo") # Name of the Iceberg catalog
.set("spark.sql.catalogImplementation", "in-memory")
.set("spark.sql.catalog.demo.type", "hadoop") # Iceberg catalog type
.set("spark.executor.heartbeatInterval", "300000")
.set("spark.network.timeout", "400000")
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Disable below line to see INFO logs
spark.sparkContext.setLogLevel("ERROR")


def load_config(spark_context: SparkContext):
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")


load_config(spark.sparkContext)

# Define schema for NYC Taxi Data
schema = StructType([
StructField('VendorID', LongType(), True),
StructField('tpep_pickup_datetime', StringType(), True),
StructField('tpep_dropoff_datetime', StringType(), True),
StructField('passenger_count', DoubleType(), True),
StructField('trip_distance', DoubleType(), True),
StructField('RatecodeID', DoubleType(), True),
StructField('store_and_fwd_flag', StringType(), True),
StructField('PULocationID', LongType(), True),
StructField('DOLocationID', LongType(), True),
StructField('payment_type', LongType(), True),
StructField('fare_amount', DoubleType(), True),
StructField('extra', DoubleType(), True),
StructField('mta_tax', DoubleType(), True),
StructField('tip_amount', DoubleType(), True),
StructField('tolls_amount', DoubleType(), True),
StructField('improvement_surcharge', DoubleType(), True),
StructField('total_amount', DoubleType(), True)])

# Read CSV file from MinIO
df = spark.read.option("header", "true").schema(schema).csv(
os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))

# Create Iceberg table "nyc.taxis_large" from RDD
df.write.mode("overwrite").saveAsTable("nyc.taxis_large")

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")

# Rename column "fare_amount" in nyc.taxis_large to "fare"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN fare_amount TO fare")

# Rename column "trip_distance" in nyc.taxis_large to "distance"
spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN trip_distance TO distance")

# Add description to the new column "distance"
spark.sql(
"ALTER TABLE nyc.taxis_large ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'")

# Move "distance" next to "fare" column
spark.sql("ALTER TABLE nyc.taxis_large ALTER COLUMN distance AFTER fare")

# Add new column "fare_per_distance" of type float
spark.sql("ALTER TABLE nyc.taxis_large ADD COLUMN fare_per_distance FLOAT AFTER distance")

# Check the snapshots available
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show() # prints all the available snapshots (1 till now)

# Populate the new column "fare_per_distance"
logger.info("Populating fare_per_distance column...")
spark.sql("UPDATE nyc.taxis_large SET fare_per_distance = fare/distance")

# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show() # prints all the available snapshots (2 now) since previous operation will create a new snapshot

# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance
FROM nyc.taxis_large LIMIT 15""")
res_df.show()

# Delete rows from "fare_per_distance" based on criteria
logger.info("Deleting rows from fare_per_distance column...")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance > 4.0 OR distance > 2.0")
spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance IS NULL")

# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
snap_df.show() # prints all the available snapshots (4 now) since previous operations will create 2 new snapshots

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after delete operations: {total_rows_count}")

# Partition table based on "VendorID" column
logger.info("Partitioning table based on VendorID column...")
spark.sql("ALTER TABLE nyc.taxis_large ADD PARTITION FIELD VendorID")

# Query Metadata tables like snapshot, files, history
logger.info("Querying Snapshot table...")
snapshots_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots ORDER BY committed_at")
snapshots_df.show() # shows all the snapshots in ascending order of committed_at column

logger.info("Querying Files table...")
files_count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large.files")
total_files_count = files_count_df.first().cnt
logger.info(f"Total Data Files for NYC Taxi Data: {total_files_count}")

spark.sql("""SELECT file_path,
file_format,
record_count,
null_value_counts,
lower_bounds,
upper_bounds
FROM nyc.taxis_large.files LIMIT 1""").show()

# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show()

# Time travel to initial snapshot
logger.info("Time Travel to initial snapshot...")
snap_df = spark.sql("SELECT snapshot_id FROM nyc.taxis_large.history LIMIT 1")
spark.sql(f"CALL demo.system.rollback_to_snapshot('nyc.taxis_large', {snap_df.first().snapshot_id})")

# Qurey the table to see the results
res_df = spark.sql("""SELECT VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance
FROM nyc.taxis_large LIMIT 15""")
res_df.show()

# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
hist_df.show() # 1 new row

# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for NYC Taxi Data after time travel: {total_rows_count}")

创建sparkapp,保存为spark-iceberg-minio.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-iceberg-minio
namespace: spark-operator
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2"
imagePullPolicy: Always
mainApplicationFile: local:///app/main-iceberg.py
sparkVersion: "3.3.2"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
memory: "1024m"
labels:
version: 3.3.2
serviceAccount: my-release-spark
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: WAREHOUSE
value: "s3a://openlake/warehouse"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT
executor:
cores: 1
instances: 3
memory: "1024m"
labels:
version: 3.3.2
env:
- name: INPUT_PATH
value: "s3a://openlake/spark/sample-data/rows.csv"
- name: OUTPUT_PATH
value: "s3a://openlake/spark/result/taxi"
- name: SSL_ENABLED
value: "true"
- name: WAREHOUSE
value: "s3a://openlake/warehouse"
- name: AWS_REGION
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_REGION
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: AWS_SECRET_ACCESS_KEY
- name: ENDPOINT
valueFrom:
secretKeyRef:
name: minio-secret
key: ENDPOINT

#小结
本文主要参考openlake的guide体验了一下k8s环境下spark如何处理minio中的数据。能感受到spark对应的生态比较完善,对于结构化和半结构化的数据处理起来非常方便。当然也有一些不太习惯的地方,比如iceberg这些中间件都是通过jar包的方式被引入,而不是通过中间件服务,这就意味着更新中间件需要去更新容器镜像。

另外openlake原文中还有dremio做查询层,利用kakfa进行spark stream处理的内容。感兴趣的同学可以去试一下。