大数据和云原生一直都是基建方面的两个热点,而现在越来越多的大数据基建逐渐往云原生方向发展,例如云原生的消息队列Pulsar,又例如Snowflake提供云原生的数仓。因此笔者想要探索大数据和云原生的结合,于是发现了一个非常有意思的项目Openlake,该项目挂在minio下,是在kubernetes环境下,利用minion,spark,kafka,dremio,iceberg搭建一套数据湖,非常适合学习,本文主要就是记录搭建过程和心得。
0.准备kubernetes环境
如果已经有集群可以跳过本节,我这边想快速做实验所以采用docker-compose的方式在linux上搭建k3s
安装docker compose,参考guide,执行如下命令:
1 | sudo curl -L "https://github.com/docker/compose/releases/download/1.25.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose |
本地创建一个目录用于保存k3s的配置和数据:
1 | mkdir -p /home/xiowang/k3s |
创建docker-compose.yaml用于k3s启动:
1 | #vim /home/xiowang/k3s/docker-compose.yaml |
上面的配置,我们主要做了:
- 通过本机的6443用于访问kubernetes的apiserver,方便kubectl进行管理;
- 通过本机的443和80分别映射集群的nodeport:30443和30080;
- 把kubeconfig保存到/home/xiowang/k3s/.kube;
- 挂载集群的/data目录到/home/xiowang/k3s/data
- 本地的32000和32001用于后续暴露minio的端口;
开始启动k3s:
1 | cd /home/xiowang/k3s |
因为本机80和443对应集群的nodeport:30443和30080,所以这里改一下trafik的service,将其80和443的nodeport分别指向30080和30443:
1 | #kubectl -n kube-system edit svc traefik |
拷贝/home/xiowang/k3s/.kube/config到本机的~/.kube/config(建议使用kubecm来管理)
1 | cp /home/xiowang/k3s/.kube/config ~/.kube/config |
接下来我们就可以开始openlake之旅了
1.安装和配置minio
在kubernetes上安装minio,有两种推荐方式:
这里暂时没时间学习operator的CRD,所以采用helm安装minio,步骤如下
1 | helm repo add minio https://charts.min.io/ |
成功部署日志:
1 | espace minio/minio |
这里注意上面日志中的MC_HOST_one-minio-local
环境变量名似乎是非法的,所以我换成了MC_HOST_one_minio_local
:
1 | 2. export MC_HOST_one_minio_local=http://$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootUser}" | base64 --decode):$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootPassword}" | base64 --decode)@localhost:9000 |
若想访问minio的console控制台,则forward 9001端口,再用root账号登陆localhost:9001:
1 | export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}") |
而访问bucket则是9000端口
1 | export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}") |
2.搭建spark on k8s
spark on k8s是google发起的一个开源项目(不是google的官方产品),使用operator对k8s的资源进行调度,方便spark对接k8s。
spark on k8s采用helm安装,安装命令如下:
1 | helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator |
验证部署结果:
1 | kubectl get pods -n spark-operator |
应该能看到my-release的pod:
1 | NAME READY STATUS RESTARTS AGE |
参考例子部署一个spark应用,保存为spark-pi.yaml用于计算pi(这里注意之前helm会在spark-operator的namespace中为serviceAccount: my-release-spark部署rbac,因此这里spark app都在spark-operator的namespace中,使用serviceAccount: my-release-spark运行):
1 | apiVersion: "sparkoperator.k8s.io/v1beta2" |
1 | kubectl apply -f spark-pi.yaml |
查看sparkapp和pods:
1 | #kubectl -n spark-operator get sparkapp,pod |
查看最后20行log, 可以看到DAGScheduler调度完成的日志:
1 | #kubectl logs pyspark-pi-driver -n spark-operator --tail 10 |
3.使用spark分析minio上的数据
准备好dockerfile,因为和编译命令,因为后续会经常编辑py文件和推镜像.
这里直接使用openlake原文的镜像作为baseimage,因为里面提前安装spark的各种依赖
1 | FROM openlake/sparkjob-demo:3.3.2 |
编译和push dockerimage命令如下(因为dockerhub经常挂,所以我在华为云上开了一个镜像仓库,可以根据自身情况修改一下):
1 | docker build -t xiowang/spark-minio:3.3.2 . |
下载纽约出租车司机数据(~112M rows and ~10GB in size):
1 | wget https://data.cityofnewyork.us/api/views/t29m-gskq/rows.csv ./ |
minio中创建bucket:
1 | mc mb one_minio_local/openlake/spark/sample-data |
拷贝出租车数据到minio
1 | mc cp rows.csv one_minio_local/openlake/spark/sample-data/ |
进到jupyter中安装pyspark
1 | pip3 install pyspark |
在minio中创建对one_minio_local/openlake
的读写权限的用户,并申请key和secret,如下
1 | export AWS_ACCESS_KEY_ID=3436ZpuHMvI5EEoR |
创建k8s secret,保存上述信息:
1 | kubectl create secret generic minio-secret \ |
部署sparkapp:
1 | apiVersion: "sparkoperator.k8s.io/v1beta2" |
这个sparkapp中的python脚本内容如下,实际上做的就是统计每天超过6位乘客的信息:
1 | import logging |
如果上面的代码跑的有问题可以创建如下一个debug-pod,并进入pod进行debug
1 | apiVersion: v1 |
最后查看日志,可以看到打印的日志
1 | #kubectl -n spark-operator logs spark-minio-driver |
4.spark中使用iceberg分析minio上的数据
iceberg是Netflix开源的一款软件,简单来说就是方便大数据工程师通过sql方式操作csv,parquet等文件,并且支持snapshot,具体可以见官网介绍。
原文中的一些地址写死了,这里我修改了一下,保存为main-iceberg.py
1 | import logging |
创建sparkapp,保存为spark-iceberg-minio.yaml
1 | apiVersion: "sparkoperator.k8s.io/v1beta2" |
#小结
本文主要参考openlake的guide体验了一下k8s环境下spark如何处理minio中的数据。能感受到spark对应的生态比较完善,对于结构化和半结构化的数据处理起来非常方便。当然也有一些不太习惯的地方,比如iceberg这些中间件都是通过jar包的方式被引入,而不是通过中间件服务,这就意味着更新中间件需要去更新容器镜像。
另外openlake原文中还有dremio做查询层,利用kakfa进行spark stream处理的内容。感兴趣的同学可以去试一下。