Kubernetes 上的大数据(三)
在本章中,你学习了如何在 Kubernetes 上部署和管理 Apache Spark、Apache Airflow 和 Apache Kafka 等关键大数据技术。将这些工具部署到 Kubernetes 上提供了多个好处,包括简化操作、更好的资源利用、扩展性、高可用性和统一的集群管理。你首先在 Kubernetes 上部署了 Spark 操作符,并运行了一个 Spark 应用程序来处理来自 Am
原文:
annas-archive.org/md5/c47e39c1bc577f11068c3d7425d9a76d
译者:飞龙
第八章:在 Kubernetes 上部署大数据堆栈
在本章中,我们将涵盖在 Kubernetes 上部署关键的大数据技术 —— Spark、Airflow 和 Kafka。随着容器编排和管理变得对于高效运行数据工作负载至关重要,Kubernetes 已成为事实上的标准。在本章结束时,你将能够成功在 Kubernetes 上部署和管理大数据堆栈,构建强大的数据管道和应用程序。
我们将首先使用 Spark 操作员在 Kubernetes 上部署 Apache Spark。你将学习如何配置和监控在 Kubernetes 集群上运行的 Spark 应用程序。能够在 Kubernetes 上运行 Spark 工作负载带来了重要的好处,如动态扩展、版本控制和统一的资源管理。
接下来,我们将在 Kubernetes 上部署 Apache Airflow。你将配置 Kubernetes 上的 Airflow,将其日志链接到 S3,以便于调试和监控,并设置它来编排使用 Spark 等工具构建的数据管道。在 Kubernetes 上运行 Airflow 可以提高可靠性、扩展性和资源利用率。
最后,我们将在 Kubernetes 上部署 Apache Kafka,这对流数据管道至关重要。在 Kubernetes 上运行 Kafka 简化了操作、扩展和集群管理。
在本章结束时,你将拥有在 Kubernetes 上部署和管理大数据堆栈的实践经验。这将使你能够利用 Kubernetes 作为容器编排平台,构建强大、可靠的数据应用程序。
在本章中,我们将涵盖以下主要内容:
-
在 Kubernetes 上部署 Spark
-
在 Kubernetes 上部署 Airflow
-
在 Kubernetes 上部署 Kafka
技术要求
在本章的活动中,你应该拥有一个 AWS 账户,并已安装 kubectl
、eksctl
和 helm
。关于如何设置 AWS 账户以及安装 kubectl
和 eksctl
的说明,请参考 第一章。有关 helm
安装的说明,请访问 helm.sh/docs/intro/install/
。
我们还将使用 Titanic 数据集进行练习。你可以在 github.com/neylsoncrepalde/titanic_data_with_semicolon
找到我们将使用的版本。
本章中的所有代码都可以在 GitHub 仓库 github.com/PacktPublishing/Bigdata-on-Kubernetes
的 Chapter08
文件夹中找到。
在 Kubernetes 上部署 Spark
为了帮助我们在 Kubernetes 上部署资源,我们将使用Helm。Helm 是 Kubernetes 的包管理器,帮助安装应用程序和服务。Helm 使用名为Charts的模板,将安装配置、默认设置、依赖关系等打包成一个易于部署的包。
另一方面,我们有操作符。操作符是自定义控制器,扩展了 Kubernetes API,用于管理应用程序及其组件。它们提供了一种声明性方式来创建、配置和管理 Kubernetes 上的复杂状态应用程序。
使用操作符的一些关键优势包括以下几点:
-
简化的应用程序部署和生命周期管理:操作符抽象了底层细节,为应用程序部署提供了高层次的抽象,而无需了解 Kubernetes 的复杂性。
-
与监控工具的集成:操作符暴露自定义指标和日志,支持与 Prometheus 和 Grafana 等监控堆栈的集成。
-
Kubernetes 原生:操作符利用 Kubernetes 的可扩展性,并专门为 Kubernetes 编写,使其能够保持云无关性。
操作符通过创建自定义资源定义(CRDs)和控制器来扩展 Kubernetes。CRD 允许你在 Kubernetes 中定义一个新的资源类型。例如,SparkOperator 定义了一个 SparkApplication 资源。
操作符接着创建一个控制器,监视这些自定义资源并根据资源的spec
执行操作。
例如,当创建 SparkApplication 资源时,SparkOperator 控制器会执行以下操作:
-
根据规范创建驱动程序和执行器 Pods
-
挂载存储卷
-
监控应用程序的状态
-
执行日志记录和监控
现在,开始吧:
-
首先,让我们使用
eksctl
创建一个 AWS EKS 集群:eksctl create cluster --managed --alb-ingress-access --node-private-networking --full-ecr-access --name=studycluster --instance-types=m6i.xlarge --region=us-east-1 --nodes-min=3 --nodes-max=4 --nodegroup-name=ng-studycluster
请记住,这行代码需要几分钟才能完成。现在,我们需要进行一些重要的配置,以便允许我们的 Kubernetes 集群代表我们创建卷。为此,我们需要安装 AWS EBS CSI 驱动程序。这对于部署 Spark 应用程序不是必须的,但对于 Airflow 部署来说非常重要。
-
首先,我们需要将 IAM OIDC 提供者与 EKS 集群关联,这样 IAM 角色和用户就能通过 Kubernetes API 进行身份验证。为此,请在终端中输入以下命令:
eksctl utils associate-iam-oidc-provider --region=us-east-1 --cluster=studycluster --approve
-
接下来,我们将在
kube-system
命名空间中创建一个名为ebs-csi-controller-sa
的 IAM 服务账户,并附加指定的 IAM 角色和策略。这个服务账户将由 EBS CSI 驱动程序使用:eksctl create iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster studycluster --role-name AmazonEKS_EBS_CSI_DriverRole --role-only --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy --approve
-
最后,我们将在集群中启用 EBS CSI 驱动程序,并将其链接到之前创建的服务账户和角色。记得将
<YOUR_ACCOUNT_NUMBER>
替换为真实值:eksctl create addon --name aws-ebs-csi-driver --cluster studycluster --service-account-role-arn arn:aws:iam::<YOUR_ACCOUNT_NUMBER>:role/AmazonEKS_EBS_CSI_DriverRole --force
-
现在,让我们开始实际的 Spark 操作符部署。接下来我们将创建一个命名空间来组织我们的资源:
kubectl create namespace spark-operator
-
接下来,我们将使用网上提供的 SparkOperator Helm chart 来部署操作符:
helm install spark-operator https://github.com/kubeflow/spark-operator/releases/download/spark-operator-chart-1.1.27/spark-operator-1.1.27.tgz --namespace spark-operator --set webhook.enable=true
-
检查操作符是否正确部署:
kubectl get pods -n spark-operator
你应该看到类似这样的输出:
NAME READY STATUS spark-operator-74db6fcf98-grhdw 1/1 Running spark-operator-webhook-init-mw8gf 0/1 Completed
-
接下来,我们需要将 AWS 凭证注册为 Kubernetes Secret,以使其可供 Spark 使用。这将允许我们的 Spark 应用程序访问 AWS 中的资源:
kubectl create secret generic aws-credentials --from-literal=aws_access_key_id=<YOUR_ACCESS_KEY_ID> --from-literal=aws_secret_access_key="<YOUR_SECRET_ACCESS_KEY>" -n spark-operator
-
现在,到了开发我们的 Spark 代码的时候了。到目前为止,你应该已经将泰坦尼克号数据集存储在 Amazon S3 上。在
github.com/PacktPublishing/Bigdata-on-Kubernetes/blob/main/Chapter08/spark/spark_job.py
上,你可以找到读取 S3 桶中的泰坦尼克号数据集并将其写入另一个桶的简单代码(这个第二个 S3 桶必须是事先创建的——你可以在 AWS 控制台中创建)。 -
将此文件保存为
spark_job.py
并上传到另一个 S3 桶中。这是 SparkOperator 将要查找代码以运行应用程序的地方。请注意,这段 PySpark 代码与我们之前在第五章中看到的有所不同。在这里,我们将 Spark 配置与 Spark 会话分开设置。我们将详细讨论这些配置:-
.set("spark.cores.max", "2")
:这限制了此 Spark 应用程序最多使用两个核心。这可以防止资源的过度分配。 -
.set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
和.set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
:这些配置启用了使用签名版本 4 认证对 S3 进行读写操作的支持,这种认证方式更为安全。 -
.set("spark.hadoop.fs.s3a.fast.upload", True)
:此属性启用了 S3A 连接器的快速上传功能,这可以提高将数据保存到 S3 时的性能。 -
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
:此配置将 S3 文件系统的实现设置为使用更新的、优化的s3a
,而不是旧版的s3
连接器。 -
.set("spark.hadoop.fs.s3a.aws.crendentials.provider", "com.amazonaws.auth.EnvironmentVariablesCredentials")
:这将 Spark 配置为从环境变量中获取 AWS 凭证,而无需直接在代码中指定。 -
.set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.3")
:这会添加对 Hadoop AWS 模块的依赖,以便 Spark 能够访问 S3。
还需要注意的是,默认情况下,Spark 使用
INFO
日志级别。在此代码中,我们将其设置为WARN
,以减少日志记录并提高日志的可读性。记得将<YOUR_BUCKET>
替换为你自己的 S3 桶。 -
-
上传此代码到 S3 后,接下来就是创建一个包含 SparkApplication 定义的 YAML 文件。代码的内容可以在
github.com/PacktPublishing/Bigdata-on-Kubernetes/blob/main/Chapter08/spark/spark_job.yaml
找到。代码定义了一个新的 SparkApplication 资源。这只有在 SparkOperator 创建了 SparkApplication 自定义资源后才有可能。让我们仔细看看这个 YAML 定义在做什么。
-
YAML 文件的第一个块指定了 apiVersion 和资源类型为 Spark 应用程序。它还为应用程序设置了名称和命名空间。
-
第二个块定义了一个名为“ivy”的卷挂载,用于缓存依赖项,以避免每次作业运行时重新获取它们。它挂载到驱动程序和执行器的
/tmp
目录。 -
第三个块配置了 Spark 属性,启用了 Ivy 缓存目录,并设置了 Kubernetes 的资源分配批量大小。
-
第四个块配置了 Hadoop 属性,以使用 S3A 文件系统实现。
-
第五个块将此 Spark 应用程序设置为 Python 类型,指定要使用的 Python 版本,运行模式为集群模式,并设置要使用的 Docker 镜像——在此情况下是一个之前准备好的与 AWS 和 Kafka 集成的 Spark 镜像。它还定义了即使该镜像已经存在于集群中,也会始终从 Docker Hub 拉取镜像。
-
第六个块指定了 S3 中主 Python 应用程序文件的位置以及 Spark 版本——在这种情况下是 3.1.1。
-
第七个块将
restartPolicy
设置为Never
,这样应用程序只会运行一次。
其余的块设置了驱动程序和执行器 Pod 的配置。在这里,我们设置了用于访问 S3 的 AWS 访问密钥密文,要求为驱动程序和执行器分别分配 1 个核心和 1GB 内存,并为它们提供相同的资源,我们挂载了一个名为“ivy”的
emptyDir
卷用于缓存依赖项,并设置了 Spark 和驱动程序 Pod 标签用于跟踪。 -
-
一旦这个文件保存在你的计算机上,并且你已经有了存储在 S3 中的
.py
文件,就可以运行 Spark 应用程序了。在终端中,输入以下内容:kubectl apply -f spark_job.yaml -n spark-operator kubectl get sparkapplication -n spark-operator
我们可以通过以下方式获取应用程序的更多细节:
kubectl describe sparkapplication/test-spark-job -n spark-operator
要查看 Spark 应用程序的日志,请输入以下命令:
kubectl logs test-spark-job-driver -n spark-operator
就这样!你刚刚在 Kubernetes 上运行了你的第一个 Spark 应用程序!Kubernetes 不会让你用相同的名称部署另一个作业,因此,要重新测试,你应该删除该应用程序:
kubectl delete sparkapplication/test-spark-job -n spark-operator
现在,让我们看看如何使用官方 Helm 图表在 Kubernetes 上部署 Airflow。
在 Kubernetes 上部署 Airflow
在 Kubernetes 上部署 Airflow 非常简单。然而,在 Helm 图表配置中有一些重要的细节,我们需要注意。
首先,我们将下载最新的 Helm 图表到本地环境:
helm repo add apache-airflow https://airflow.apache.org
接下来,我们需要配置一个custom_values.yaml
文件,以更改图表的默认部署配置。此 YAML 文件的示例可以在github.com/PacktPublishing/Bigdata-on-Kubernetes/blob/main/Chapter08/airflow/custom_values.yaml
找到。我们将不逐行查看整个文件,而是只关注部署所需的最重要配置:
-
在
defaultAirflowTag
和airflowVersion
参数中,确保设置为2.8.3
。这是 1.13.1 Helm 图表版本可用的最新 Airflow 版本。 -
executor
参数应设置为KubernetesExecutor
。这样可以确保 Airflow 使用 Kubernetes 基础设施来动态启动任务。 -
在
env
部分,我们将配置“远程日志记录”以允许 Airflow 将日志保存到 S3 中。这是审计和节省 Kubernetes 存储资源的最佳实践。在这里,我们为 Airflow 配置了三个环境变量。第一个设置远程日志记录为"True"
;第二个定义 Airflow 将日志写入的 S3 桶和文件夹;最后一个定义 Airflow 将在 AWS 中进行身份验证时使用的“连接”。稍后我们将在 Airflow UI 中设置此项。这是这个块应该是什么样的例子:env: - name: "AIRFLOW__LOGGING__REMOTE_LOGGING" value: "True" - name: "AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER" value: "s3://airflow-logs-<YOUR_ACCOUNT_NUMBER>/airflow-logs/" - name: "AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID" value: "aws_conn"
-
在 webserver 块中,我们必须配置第一个用户凭据和服务类型。
service
参数应该设置为“LoadBalancer”,这样我们就可以通过浏览器访问 Airflow 的 UI 界面。defaultUser
块应该如下所示:defaultUser: enabled: true role: Admin username: <YOUR_USERNAME> email: admin@example.com firstName: NAME lastName: LASTNAME password: admin
在值文件中设置一个简单的密码,并在部署完成后尽快在 UI 中更改它,这一点非常重要。这样,你的凭据就不会以明文形式存储。这将避免发生重大安全事件。
-
redis.enabled
参数应设置为false
。因为我们使用的是 Kubernetes 执行器,所以 Airflow 不需要 Redis 来管理任务。如果我们没有将此参数设置为false
,Helm 仍然会部署一个 Redis Pod。 -
最后,在
dags
块中,我们将配置gitSync
。这是将我们的 DAG 文件发送到 Airflow 并保持它们在 GitHub(或你喜欢的任何其他 Git 仓库)中更新的最简单方法。首先,你应该创建一个名为dags
的 GitHub 仓库,用来存储 Python DAG 文件。然后,你应该按如下方式配置gitSync
块:gitSync: enabled: true repo: https://github.com/<USERNAME>/<REPO_NAME>.git branch: main rev: HEAD ref: main depth: 1 maxFailures: 0 subPath: "dags"
请注意,我们在原始文件中省略了几个注释以提高可读性。
custom_values.yaml
文件已准备好部署。现在我们可以在终端中执行以下命令:helm install airflow apache-airflow/airflow --namespace airflow --create-namespace -f custom_values.yaml
这个部署可能需要几分钟才能完成,因为 Airflow 在使 UI 可用之前会执行数据库迁移任务。
接下来,我们需要获取 UI 的 LoadBalancer 的 URL。在终端中,输入以下命令:
kubectl get svc -n airflow
在EXTERNAL-IP
列中,你会注意到airflow-webserver
服务有一个非空
值。复制这个 URL 并粘贴到浏览器中,添加:8080
以访问 Airflow 的正确端口。
登录到 UI 后,点击aws_conn
(正如我们在值文件中所述),选择Amazon Web Services作为连接类型,并输入你的访问密钥 ID 和密钥访问密钥。(此时,你应该已经将 AWS 凭据存储在本地——如果没有,在 AWS 控制台中,进入 IAM 并为你的用户生成新的凭据。你将无法在屏幕上看到旧的凭据。)
最后,我们将使用从 第五章 中改编的 DAG 代码,该代码将在 Kubernetes 上顺利运行。此 DAG 将自动从互联网下载 Titanic 数据集,执行简单的计算,并打印结果,这些结果将在“日志”页面上查看。代码内容可以通过以下链接访问:github.com/PacktPublishing/Bigdata-on-Kubernetes/blob/main/Chapter08/airflow/dags/titanic_dag.py
。
将此 Python 代码的副本上传到你的 GitHub 仓库,并在几秒钟内它将出现在 Airflow UI 中。现在,激活你的 DAG(点击 switch
按钮)并跟踪执行 (图 8.1)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_08.1.jpg
图 8.1 – DAG 成功执行
然后点击任何任务以选择它,点击 日志。你将看到 Airflow 日志直接从 S3 读取 (图 8.2)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_08_02.jpg
图 8.2 – S3 中的 Airflow 日志
恭喜!现在你已经在 Kubernetes 上成功启动并运行 Airflow。在 第十章 中,我们将把所有这些部分组合在一起,构建一个完全自动化的数据管道。
接下来,我们将在 Kubernetes 上使用 Strimzi 操作符部署一个 Kafka 集群。让我们开始吧。
在 Kubernetes 上部署 Kafka
Strimzi 是一个开源操作符,简化了在 Kubernetes 上部署和管理 Kafka 集群的过程,通过创建新的 CRD。它由 Strimzi 项目开发和维护,Strimzi 项目是 云原生计算基金会 (CNCF) 的一部分。Strimzi 操作符提供了一种声明式的方法来管理 Kubernetes 上的 Kafka 集群。用户不再需要手动创建和配置 Kafka 组件,而是通过 Kubernetes 自定义资源定义所需的 Kafka 集群状态。然后,操作符会根据指定的配置自动部署和管理 Kafka 组件:
-
要在 Kubernetes 中部署 Strimzi,首先,我们需要安装其 Helm 图表:
helm repo add strimzi https://strimzi.io/charts/
-
接下来,我们使用以下命令安装操作符:
helm install kafka strimzi/strimzi-kafka-operator --namespace kafka --create-namespace --version 0.40.0
-
我们可以通过以下方式检查部署是否成功:
helm status kafka -n kafka kubectl get pods -n kafka
-
现在,是时候配置我们 Kafka 集群的部署了。以下是一个 YAML 文件,用于使用新的 CRD 配置 Kafka 集群。为了更好地理解,我们将它分解成几个部分:
kafka_jbod.yaml
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: kafka-cluster spec: kafka: version: 3.7.0 replicas: 3
代码的第一部分指定了 API 版本和正在定义的资源类型。在本例中,这是一个由 Strimzi 操作符管理的 Kafka 资源。然后,我们为 Kafka 资源定义了元数据,特别是其名称,设置为
kafka-cluster
。接下来的代码块指定了 Kafka 经纪人的配置。我们将 Kafka 版本设置为 3.7.0,并指定希望集群中有三个副本(Kafka 经纪人实例):listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true - name: external port: 9094 type: loadbalancer tls: false
接下来,我们定义 Kafka 经纪人的监听器。我们将配置三个监听器:
-
plain
: 一个没有 TLS 加密的内部监听器,端口为9092
-
tls
: 一个启用了 TLS 加密的内部监听器,端口为9093
-
external
: 一个暴露为 LoadBalancer 服务的外部监听器,端口为9094
,不使用 TLS 加密readinessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5
下一个块配置了 Kafka brokers 的就绪和存活探针。就绪探针检查 broker 是否准备好接受流量,而存活探针检查 broker 是否仍在运行。
initialDelaySeconds
参数指定了在执行第一次探针之前等待的秒数,而timeoutSeconds
参数指定了探针被认为失败后的秒数:config: default.replication.factor: 3 num.partitions: 9 offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 1 log.message.format.version: "3.7" inter.broker.protocol.version: "3.7" min.insync.replicas: 2 log.retention.hours: 2160
这个
kafka.config
块指定了 Kafka brokers 的各种配置选项,例如默认的复制因子、新主题的分区数量、偏移量和事务状态日志主题的复制因子、日志消息格式版本,以及日志保留期(以小时为单位)。Kafka 默认的日志保留期是 7 天(168 小时),但我们可以根据需要修改这个参数。需要记住的是,更长的保留期意味着更多的磁盘存储使用,因此要小心:storage: type: jbod volumes: - id: 0 type: persistent-claim size: 15Gi deleteClaim: false - id: 1 type: persistent-claim size: 15Gi deleteClaim: false
kafka.storage
块配置了 Kafka brokers 的存储。我们使用deleteClaim
设置为false
,以防止在删除 Kafka 集群时删除持久化存储卷声明:resources: requests: memory: 512Mi cpu: "500m" limits: memory: 1Gi cpu: "1000m"
接下来,
kafka.resources
块指定了 Kafka brokers 的资源请求和限制。我们请求 512 MiB 的内存和 500 毫 CPU,并将内存限制设置为 1 GiB,CPU 限制设置为 1cpu
:zookeeper: replicas: 3 storage: type: persistent-claim size: 10Gi deleteClaim: false resources: requests: memory: 512Mi cpu: "250m" limits: memory: 1Gi cpu: "500m"
最后,我们有一个
zookeeper
块,它配置了 Kafka 集群使用的 ZooKeeper 集群。我们为 ZooKeeper 指定了三个副本,使用 10 GiB 的持久化存储卷声明,并设置了类似于 Kafka brokers 的资源请求和限制。 -
-
配置文件准备好后,在你的机器上输入以下命令来将集群部署到 Kubernetes:
kubectl apply -f kafka_jbod.yaml -n kafka kubectl get kafka -n kafka
这将输出以下内容:
NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS kafka-class 3 3 kubectl describe kafka -n kafka
现在,检查 Pods:
kubectl get pods -n kafka
输出显示了三个 Kafka broker 和 ZooKeeper 实例:
NAME READY STATUS
kafka-class-kafka-0 1/1 Running
kafka-class-kafka-1 1/1 Running
kafka-class-kafka-2 1/1 Running
kafka-class-zookeeper-0 1/1 Running
kafka-class-zookeeper-1 1/1 Running
kafka-class-zookeeper-2 1/1 Running
恭喜!你已经在 Kubernetes 上成功运行了一个完全操作的 Kafka 集群。接下来的步骤是部署一个 Kafka Connect 集群,并为实时数据管道做好一切准备。由于云端成本效率问题,我们暂时不会进行此操作,但我们将在 第十章 中回到这个配置。
总结
在本章中,你学习了如何在 Kubernetes 上部署和管理 Apache Spark、Apache Airflow 和 Apache Kafka 等关键大数据技术。将这些工具部署到 Kubernetes 上提供了多个好处,包括简化操作、更好的资源利用、扩展性、高可用性和统一的集群管理。
你首先在 Kubernetes 上部署了 Spark 操作符,并运行了一个 Spark 应用程序来处理来自 Amazon S3 的数据。这使你能够利用 Kubernetes 以云原生的方式运行 Spark 作业,利用动态资源分配和扩展的优势。
接下来,你使用官方 Helm 图表在 Kubernetes 上部署了 Apache Airflow。你配置了 Airflow 使用 Kubernetes 执行器运行,从而使其能够动态地在 Kubernetes 上启动任务。你还设置了远程日志记录到 Amazon S3,以便于监控和调试。在 Kubernetes 上运行 Airflow 提高了可靠性、可扩展性和资源利用率,从而更好地协调数据管道。
最后,你使用 Strimzi 操作符在 Kubernetes 上部署了 Apache Kafka。你配置了一个包含代理、ZooKeeper 集群、持久化存储卷以及内部/外部监听器的 Kafka 集群。在 Kubernetes 上部署 Kafka 简化了操作、扩展、高可用性和集群管理,从而帮助构建流数据管道。
总体而言,你现在已经具备了在 Kubernetes 上部署和管理大数据栈关键组件的实战经验。这将使你能够构建可靠、可扩展的数据应用和管道,利用 Kubernetes 容器编排的强大功能。 本章中学习的技能对于在云原生环境中高效运行大数据工作负载至关重要。
在下一章,我们将看到如何在 Kubernetes 上构建数据消费层,并了解如何通过工具连接这些层来可视化和使用数据。
第九章:数据消费层
在今天的数据驱动世界中,组织正在处理日益增长的数据量,有效地消费和分析这些数据对于做出明智的业务决策至关重要。当我们深入探讨基于 Kubernetes 的大数据领域时,必须解决数据消费层的关键组成部分。这一层将作为数据存储库和需要提取有价值洞察并对业务产生影响的业务分析师之间的桥梁。
在本章中,我们将探讨两个强大的工具,它们将使您能够释放基于 Kubernetes 的数据架构的潜力:Trino 和 Elasticsearch。Trino,一个分布式 SQL 查询引擎,将使您能够直接查询您的数据湖,消除了传统数据仓库的需求。您将学习如何在 Kubernetes 上部署 Trino,监视其性能,并对存储在 Amazon S3 中的数据执行 SQL 查询。
此外,我们将介绍 Elasticsearch,这是一个广泛用于实时数据管道的高度可伸缩和高效的搜索引擎,以及其强大的数据可视化工具 Kibana。您将获得在 Kubernetes 上部署 Elasticsearch、对数据进行索引以进行优化存储和检索,并使用 Kibana 构建简单而富有洞察力的可视化的实际经验。这将使您能够分析实时数据流并发现有价值的模式和趋势。
到达本章结束时,您将掌握成功部署和利用 Trino 和 Elasticsearch 在 Kubernetes 上所需的技能。您将能够直接对数据湖执行 SQL 查询,监视查询执行和历史记录,并利用 Elasticsearch 和 Kibana 进行实时数据分析和可视化。
在本章中,我们将讨论以下主要主题:
-
开始使用 SQL 查询引擎
-
在 Kubernetes 中部署 Trino
-
在 Kubernetes 中部署 Elasticsearch
-
运行查询和连接其他工具
技术要求
对于本章,您应准备好一个用于部署的 AWS EKS 集群,并在本地安装了 DBeaver Community (dbeaver.io/
)。我们将继续在我们在 第八章 部署的集群上工作。本章的所有代码都可以在 github.com/PacktPublishing/Bigdata-on-Kubernetes
的 Chapter09
文件夹中找到。
开始使用 SQL 查询引擎
在大数据的世界中,我们存储和分析数据的方式发生了重大转变。曾经是数据分析首选方案的传统数据仓库,已经被更现代、更具可扩展性的方法所取代,例如 SQL 查询引擎。这些引擎,如Trino(前身为 Presto)、Dremio和Apache Spark SQL,提供了比传统数据仓库更高性能、更具性价比和更灵活的替代方案。
接下来,我们将看到数据仓库和 SQL 查询引擎之间的主要区别。
传统数据仓库的局限性
传统数据仓库是为存储和分析关系数据库中的结构化数据而设计的。然而,随着大数据的到来以及日志文件、传感器数据和社交媒体数据等多样化数据源的普及,数据仓库的局限性逐渐显现。这些局限性包括:
-
可扩展性:数据仓库通常难以实现水平扩展,处理大量数据时效率较低。
-
数据摄取:将数据提取、转换和加载(ETL)到数据仓库的过程可能复杂、耗时且资源密集。
-
成本:数据仓库的搭建和维护可能非常昂贵,尤其是在处理大量数据时。
-
灵活性:数据仓库通常优化用于结构化数据,可能无法高效处理半结构化或非结构化数据。
SQL 查询引擎的开发旨在解决这些局限性。让我们来看它们是如何工作的。
SQL 查询引擎的崛起
SQL 查询引擎,如 Trino,提供了一种分布式、可扩展且具性价比的解决方案,用于查询存储在各种数据源中的大规模数据集,包括对象存储(如 Amazon S3、Google Cloud Storage 和 Azure Blob Storage)、关系数据库和 NoSQL 数据库。我们将在下一部分深入探讨 SQL 查询引擎的架构。
以下是 SQL 查询引擎的一些关键优势:
-
高性能:SQL 查询引擎旨在利用分布式计算的优势,使它们能够在多个节点上并行处理大规模数据集。这种并行化使得即使在庞大的数据集上也能进行高性能查询。
-
性价比高:通过利用对象存储并将存储与计算分离,SQL 查询引擎相比传统数据仓库可以显著降低数据存储和处理的成本。
-
可扩展性:SQL 查询引擎可以通过向集群中添加更多节点来水平扩展,使其能够高效处理不断增长的数据量。
-
灵活性:SQL 查询引擎能够查询多种数据源,包括结构化、半结构化和非结构化数据,使其具有高度的灵活性和适应性,能够应对各种数据格式和存储系统。
-
开源:许多 SQL 查询引擎都是开源项目,允许组织利用社区贡献的力量并避免供应商锁定。
现在,让我们理解一下这种解决方案的底层架构。
SQL 查询引擎的架构
Trino 等 SQL 查询引擎设计为在分布式计算环境中工作,其中多个节点协同处理查询并返回结果。该架构通常包括以下组件:
-
协调节点,负责解析 SQL 查询,创建分布式执行计划,并协调查询在工作节点之间的执行。
-
一组工作节点,负责执行协调节点分配的任务。它们从底层数据源读取数据,执行计算,并根据需要与其他工作节点交换中间结果。
-
一个元数据存储,其中包含有关数据源、表定义及查询执行所需的其他元数据的信息。
当用户向 SQL 查询引擎提交 SQL 查询时,发生的过程如下:
-
首先,协调节点接收查询并解析它,以创建一个分布式执行计划。
-
执行计划被划分为更小的任务,这些任务被分配给可用的工作节点。
-
工作节点从底层数据源读取数据,执行计算,并根据需要交换中间结果。
-
协调节点收集并合并来自工作节点的结果,生成最终的查询结果,然后返回给客户端应用程序的用户。
这种分布式架构使 SQL 查询引擎能够利用多个节点的联合计算能力,使其能够高效处理大数据集并提供高性能的查询执行。
在 Trino 的情况下,它可以直接连接到对象存储系统,如 Amazon S3、Azure Blob Storage 或 Google Cloud Storage,其中数据通常以 Parquet、ORC 或 CSV 等格式存储。Trino 可以直接从对象存储读取并处理这些数据,无需中间的数据加载或转换步骤。这种能力消除了单独的数据摄取过程的需要,简化了复杂性并加速了洞察的获取时间。
Trino 的分布式架构使其能够将查询执行分配到多个工作节点,每个节点并行处理数据的一部分。这样的并行化使 Trino 能够利用集群的计算能力,从而在处理大规模数据集时实现高性能查询执行。
此外,Trino 的成本效益来源于它能够将存储与计算分离。通过利用对象存储进行数据存储,组织可以利用这些存储系统的低成本和可扩展性,同时根据需要动态分配计算资源(工作节点)以执行查询。这种关注点的分离使组织能够优化其基础设施成本,并根据特定需求独立地扩展资源。
现在,让我们进行一个实践练习,看看如何将 Trino 部署到 Kubernetes,并将其连接到 Amazon S3 作为数据源。
在 Kubernetes 中部署 Trino
使用官方 Helm 图表部署 Trino 非常简单。首先,我们使用以下命令安装该图表:
helm repo add trino https://trinodb.github.io/charts
接下来,我们将配置custom_values.yaml
文件。该文件的完整版本可以在github.com/PacktPublishing/Bigdata-on-Kubernetes/blob/main/Chapter09/trino/custom_values.yaml
找到。此部署仅需少量自定义配置。首先,server.workers
参数允许我们设置 Trino 集群的工作节点数。我们将其设置为2
,但如果要处理大数据查询,建议进行扩展:
server:
workers: 2
在参数块中,将image.tag
参数设置为432
,因为这是与我们使用的图表版本(0.19.0)兼容的最新 Trino 版本:
image:
registry: ""
repository: trinodb/trino
tag: 432
在additionalCatalogs
部分,我们必须配置 Trino 使用 AWS Glue 数据目录。该代码块应该如下所示:
additionalCatalogs:
hive: |
connector.name=hive
hive.metastore=glue
最后,我们将service.type
参数设置为LoadBalancer
,以便能够从 AWS 外部访问 Trino(仅用于测试,不适用于生产环境):
service:
type: LoadBalancer
port: 8080
就这样。我们已经准备好在 Kubernetes 上启动 Trino。
注意
我们没有使用任何认证方法(密码、OAuth、证书等)。在生产环境中,你应该设置合适的认证方法,并将流量限制在你的 VPC(私有网络)内,而不是像这里一样将负载均衡器暴露到互联网上。这个简单的配置仅用于培训和非关键数据。
保存custom_values.yaml
文件后,使用以下命令部署 Trino:
helm install trino trino/trino -f custom_values.yaml -n trino --create-namespace --version 0.19.0
现在,我们可以使用以下命令检查部署是否成功:
kubectl get pods,svc -n trino
这将产生如下输出:
NAME READY STATUS
pod/trino-coordinator-dbbbcf9d9-94wsn 1/1 Running
pod/trino-worker-6c58b678cc-6fgjs 1/1 Running
pod/trino-worker-6c58b678cc-hldrb 1/1 Running
NAME TYPE CLUSTER-IP EXTERNAL-IP
service/trino LoadBalancer 10.100.246.148 xxxx.us-east-1.elb.amazonaws.com
输出已简化以提高可视化效果。我们可以看到一个协调节点和两个工作节点,这正是我们设置的节点数。现在,复制输出中EXTERNAL-IP
列提供的 URL,并将其粘贴到浏览器中,在末尾加上:8080
。你应该能看到登录页面。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_01.jpg
图 9.1 – Trino 登录页面
默认用户是trino
。由于我们在部署时没有设置任何密码,因此不需要密码。点击登录后,你将看到 Trino 的监控页面。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_02.jpg
图 9.2 – Trino 的监控页面
接下来,我们将使用相同的LoadBalancer
URL,通过 DBeaver(一款开源 SQL 客户端)与 Trino 进行交互。
连接 DBeaver 和 Trino
要连接 Trino,首先,打开 DBeaver 并创建一个新的 Trino 连接。在配置部分(trino
作为用户名,密码留空)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_03.jpg
图 9.3 – DBeaver 连接配置
然后,点击测试连接 …。如果这是第一次配置 Trino 连接,DBeaver 会自动找到必要的驱动程序并弹出新窗口,提示您下载它。您可以点击确定并完成安装,然后完成配置。
在尝试访问数据之前,我们需要对一些数据进行目录化,并使其在 Glue 数据目录中可用,同时设置一个 IAM 权限,允许 Trino 访问目录和底层数据。让我们开始吧。
从github.com/neylsoncrepalde/titanic_data_with_semicolon
下载数据集,并将 CSV 文件存储在一个名为titanic
的文件夹中的 S3 存储桶里。Glue 只理解来自文件夹中的表,而不是孤立的文件。现在,我们将创建一个Glue 爬虫。该爬虫将扫描数据集,映射其列和列类型,并在目录中注册元数据:
- 在您的 AWS 账户中,输入
Glue
以进入 AWS Glue 服务,展开侧边菜单中的数据目录选项,点击爬虫(图 9.4)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_04.jpg
图 9.4 – AWS Glue 着陆页
-
接下来,点击
bdok-titanic-crawler
(您可以选择任何名称)。点击下一步。 -
在下一页,点击
s3://<YOUR_BUCKET_NAME>/titanic/
。您可以保持其他配置为默认。点击添加 S3 数据源,然后点击下一步。 -
在下一步中,点击
AWSGlueServiceRole-titanic
。点击下一步。 -
在下一页,点击
bdok-database
,点击创建数据库,然后关闭此窗口并返回到Glue 爬虫 配置标签页。 -
返回到爬虫页面,点击刷新按钮,选择您的新bdok-database数据库。保持其他选项为默认。点击下一步。
-
现在,在最后一节中,仔细检查所有信息并点击创建爬虫。
-
当准备好后,您将被带到 AWS 控制台的爬虫页面。点击运行爬虫以启动爬虫。它应该运行约 1 到 2 分钟(图 9.5)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_05.jpg
图 9.5 – bdok-titanic-crawler
- 爬虫完成后,您可以通过访问数据目录表菜单项验证表是否已正确目录化。titanic 表应与bdok-database数据库一起列出(图 9.6)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_06.jpg
图 9.6 – Glue 数据目录表
-
点击
titanic
表的名称,检查列是否正确映射。 -
现在,我们需要创建一个 IAM 策略,授权 Kubernetes 访问目录和存储在 S3 中的数据。为此,在控制台中,进入
studycluster
,你会看到为 Kubernetes 创建的两个角色,一个是服务角色,一个是节点实例角色。我们需要更改节点实例角色的权限(图 9.7)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_07.jpg
图 9.7 – IAM 角色页面
-
点击节点实例角色,然后点击添加权限按钮,选择创建 内联策略。
-
在指定权限页面,点击以 JSON 文档方式编辑,并粘贴此 GitHub 仓库中的 JSON 文件:
github.com/PacktPublishing/Bigdata-on-Kubernetes/blob/main/Chapter09/trino/iam/AthenaFullWithAllBucketsPolicy.json
(图 9.8)。此策略允许 Athena 和 Glue 权限,并获取任何桶中的所有 S3 数据。请记住,这是一个非常开放的策略,不应在生产环境中使用。最好的安全实践是仅允许访问所需的桶。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_08.jpg
图 9.8 – 指定权限
- 点击
AthenaFullWithAllBucketsPolicy
,以便稍后更方便地搜索此策略。然后,点击创建策略。我们就准备好了!
现在,让我们回到 DBeaver 并玩一些查询。首先,我们需要找到表存储的位置。在 DBeaver 中展开 Trino 连接,你会看到一个名为hive的数据库。这是 Glue 数据目录中的数据在 Trino 中的镜像。展开hive,你会看到bdok-database目录。如果展开表格,你会看到映射的titanic数据集。
要测试查询,右键点击hive数据库,选择SQL 编辑器,然后选择新建 SQL 脚本。现在,运行查询:
select * from hive."bdok-database".titanic
你应该能看到如下结果(图 9.9):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_09.jpg
图 9.9 – Trino 的 DBeaver 结果
当然,Trino 可以执行我们喜欢的任何计算或聚合。让我们尝试一个简单的查询,按pclass
和sex
统计所有乘客的数量和平均年龄。我们将按sex
和pclass
的顺序显示结果。
select
pclass,
sex,
COUNT(1) as people_count,
AVG(age) as avg_age
from hive."bdok-database".titanic
group by pclass, sex
order by sex, pclass
这个查询返回了以下结果:
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_10.jpg
图 9.10 – Titanic 数据集上的简单查询
现在,让我们再次访问 Trino 的监控页面,查看我们刚才运行的查询。在查询详情下勾选已完成框,以查看所有查询;第一个显示的就是我们刚才运行的查询。点击它查看详细信息。
就是这样!你已经成功地在 Kubernetes 上部署了 Trino,并利用它查询了来自 Amazon S3 数据湖的数据。在接下来的部分,我们将开始使用 Elasticsearch。
在 Kubernetes 中部署 Elasticsearch
虽然 Trino 提供了一个强大的 SQL 接口,用于查询数据湖中的结构化数据,但许多现代应用程序也需要实时分析半结构化和非结构化数据,如日志、指标和文本。对于这些类型的用例,Elasticsearch(或者 ELK 堆栈,指代 Elasticsearch、Logstash 和 Kibana)提供了一个强大的解决方案。
Elasticsearch 是一个开源的分布式、RESTful 搜索与分析引擎,建立在 Apache Lucene 之上。它旨在快速并几乎实时地存储、搜索和分析大量数据。
Elasticsearch 的核心是一个 NoSQL 数据库,它使用 JSON 文档来表示数据。它会对每个字段中的所有数据进行索引,并使用高级数据结构和索引技术使得搜索速度极快。
Elasticsearch 如何存储、索引和管理数据
数据以单独的 JSON 文档存储在 Elasticsearch 中。这些文档会被分组为索引内的类型。你可以将索引视为一个具有定义映射或模式的数据库表。
向 Elasticsearch 添加数据时,你需要向相应的索引发送一个 HTTP 请求,请求体中包含 JSON 文档。Elasticsearch 会使用先进的数据结构(如 Apache Lucene 的倒排索引)自动对文档中的所有字段数据进行索引。
这个索引过程优化了数据,使其能够进行极快的查询和聚合。Elasticsearch 将数据分布到分片中,这些分片可以被分配到集群中的不同节点,以实现冗余和可扩展性。
当你想要从 Elasticsearch 查询或检索数据时,你可以使用 RESTful 搜索 API,通过简单的 JSON 请求体定义查询。查询结果同样以 JSON 格式返回。
Elasticsearch 从一开始就是作为一个分布式系统设计的。它可以扩展到数百台服务器,并处理 PB 级的数据。其核心元素如下:
-
节点,是 Elasticsearch 的运行实例,它们共同构成一个 集群
-
索引,是具有相似特征的文档集合
-
分片,是索引的低级分区,包含索引中所有文档的一部分
-
副本,是用于冗余和提高性能的分片副本
Elasticsearch 分布式架构的核心是分片系统。分片指的是将 Elasticsearch 索引水平拆分成多个部分,称为分片。这使得索引数据可以分布在集群中的多个节点上,从而提供多个关键好处:
-
横向扩展性:通过将分片分布到各个节点,Elasticsearch 可以有效地横向扩展,以处理更多数据和更高的查询/索引吞吐量。随着数据集的增长,您只需向集群中添加更多节点,Elasticsearch 会自动迁移分片以平衡负载。
-
高可用性:每个分片可以有一个或多个副本分片。副本是主分片的完整副本。副本提供冗余和高可用性——如果承载主分片的节点发生故障,Elasticsearch 会自动将副本提升为新的主分片以接管任务。
-
操作并行化:由于索引操作(如搜索和聚合)会在每个分片上并行执行,拥有更多的分片可以实现更大的并行化,从而提升性能。
当您在 Elasticsearch 中创建索引时,需要指定该索引应该拥有的主分片数量。例如,如果您为索引配置了三个主分片,Elasticsearch 将把索引数据横向分区为三个分片,并将它们分布到集群中的各个节点上。
每个主分片也可以配置零个或多个副本分片。一个常见的设置是拥有一个副本,这意味着每个分片都有两个副本——一个主分片和一个副本。副本分片也会分布到集群中的节点上,每个副本与其相应的主分片位于不同的节点上以确保冗余。
Elasticsearch 使用分片分配策略自动管理节点之间的分片分配。默认情况下,它会将分片尽可能多地分布到各个节点上以平衡负载。当节点被添加或从集群中移除时,Elasticsearch 会自动迁移分片以重新平衡集群。
查询会在每个分片上并行执行,结果会被合并以产生最终的结果集。写入(索引新文档)会发送到一个主分片,该主分片负责验证数据、使更改持久化,并将更改复制到相关的副本分片。
为索引配置的分片数量在索引创建时是固定的,之后不能更改,因此合理的分片规划非常重要。拥有更多的分片可以实现更大的并行化,但过多的分片也可能增加开销。
一个好的经验法则是从足够数量的分片(3 到 5 个分片)开始,以便索引数据可以分布到多个节点上。如果索引变得非常庞大且需要更多的并行化,可以增加分片数量。然而,一般不建议拥有成百上千个分片,因为这会增加集群管理的开销。
现在,让我们来看一下如何在 Kubernetes 上部署 Elasticsearch。
Elasticsearch 部署
在这里,我们将使用Elastic Cloud on Kubernetes(ECK),这是一个官方的 Elastic 操作员,允许你在 Kubernetes 集群上部署、管理和编排 Elastic Stack 应用程序。我们将使用官方的 Helm 图表来安装该操作员。在终端中,输入以下命令:
helm repo add elastic https://helm.elastic.co
helm install elastic-operator elastic/eck-operator -n elastic --create-namespace --version 2.12.1
这将会将 Helm 图表下载到本地,并在一个名为elastic
的新环境中部署 Elastic Stack 的默认定义。在这里,我们将使用版本2.12.1
的 Helm 图表。
现在,我们将为 Elasticsearch 集群配置部署。elastic_cluster.yaml
YAML 文件可以完成这项工作。
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
name: elastic
spec:
version: 8.13.0
volumeClaimDeletePolicy: DeleteOnScaledownAndClusterDeletion
nodeSets:
- name: default
count: 2
podTemplate:
spec:
containers:
- name: elasticsearch
resources:
requests:
memory: 2Gi
cpu: 1
limits:
memory: 2Gi
initContainers:
- name: sysctl
securityContext:
privileged: true
runAsUser: 0
command: ['sh', '-c', 'sysctl -w vm.max_map_count=262144']
volumeClaimTemplates:
- metadata:
name: elasticsearch-data
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
storageClassName: gp2
让我们仔细看看这段代码。第一个块指定了 API 版本和我们正在定义的 Kubernetes 资源类型。在这种情况下,它是来自elasticsearch.k8s.elastic.co/v1
API 组的Elasticsearch
资源,由 ECK 操作员提供。metadata
块指定了集群的名称,这里是elastic
。在spec
块中,我们设置了要使用的 Elasticsearch 版本(8.13.0
)以及一个策略,该策略决定了DeleteOnScaledownAndClusterDeletion
策略在 Elasticsearch 集群缩减或完全删除时删除 PVCs。
nodeSets
块定义了 Elasticsearch 节点的配置。在这种情况下,我们有一个名为default
的节点集,节点数为2
,意味着集群中将有两个 Elasticsearch 节点。podTemplate
块指定了运行 Elasticsearch 容器的 Pod 的配置。在这里,我们定义了 Elasticsearch 容器的资源请求和限制,将内存请求和限制设置为 2 GiB,CPU 请求为一个 vCPU。
initContainers
块是官方 Elastic 文档对生产环境的推荐。它定义了一个将在主 Elasticsearch 容器启动之前运行的容器。在这里,我们有一个名为sysctl
的initContainer
,它以特权安全上下文运行,并将vm.max_map_count
内核设置为262144
。这个设置推荐在 Linux 上运行 Elasticsearch 时使用,以允许在内存映射区域使用中设置更高的限制。
最后,volumeClaimTemplates
块定义了用于存储 Elasticsearch 数据的 PVC。在这种情况下,我们有一个名为elasticsearch-data
的 PVC,要求的存储大小为 5 GiB。accessModes
指定该卷应该是ReadWriteOnce
,意味着它可以被单个节点以读写方式挂载。storageClassName
设置为gp2
,这是 AWS EBS 存储类别的通用 SSD 卷。
在本地保存该文件后,运行以下命令以部署一个 Elasticsearch 集群:
kubectl apply -f elastic_cluster.yaml -n elastic
使用以下命令监控部署过程:
kubectl get pods -n elastic
或者,你也可以使用以下命令:
kubectl get elastic -n elastic
这将提供更多的信息。请注意,这个部署过程可能需要几分钟才能完成。你还可以使用以下命令获取集群的详细信息:
kubectl describe elastic -n elastic
在输出中,HEALTH
应为green
,PHASE
列应显示Ready
:
NAME HEALTH NODES VERSION PHASE
elastic green 2 8.13.0 Ready
现在,让我们转向 Kibana。我们将按照相同的流程进行。首先要做的是设置一个名为kibana.yaml
的 YAML 文件,并配置部署。
apiVersion: kibana.k8s.elastic.co/v1
kind: Kibana
metadata:
name: kibana
spec:
version: 8.13.0
count: 1
elasticsearchRef:
name: elastic
http:
service:
spec:
type: LoadBalancer
podTemplate:
spec:
containers:
- name: kibana
env:
- name: NODE_OPTIONS
value: "--max-old-space-size=2048"
resources:
requests:
memory: 1Gi
cpu: 0.5
limits:
memory: 2Gi
cpu: 2
此代码与以前非常相似,但更简单。主要区别在于spec
块。首先,elasticsearchRef
参数指定了 Kibana 应连接到的 Elasticsearch 集群的名称。在这种情况下,它引用了我们之前创建的名为elastic
的 Elasticsearch 集群。http 块配置了将公开 Kibana 部署的 Kubernetes 服务。具体来说,我们将服务的类型设置为LoadBalancer
,这意味着云提供商将提供一个负载均衡器来分发 Kibana 实例的流量。最后,在podTemplate
块中,我们有一个env
配置,设置了一个名为NODE_OPTIONS
的环境变量,其值为--max-old-space-size=2048
,这会增加 Kibana 的最大堆大小。
现在,我们已经准备好部署:
kubectl apply -f kibana.yaml -n elastic
我们使用与之前相同的命令来监视部署是否成功。现在,我们需要访问 Elastic 和 Kibana 的自动生成密码。我们可以通过以下命令完成:
kubectl get secret elastic-es-elastic-user -n elastic -o go-template='{{.data.elastic | base64decode}}'
此命令将在屏幕上打印生成的密码。复制并妥善保管。现在,运行以下命令:
kubectl get svc -n elastic
要获取服务列表,请复制负载均衡器的 URL 地址,并将其粘贴到浏览器中,在末尾添加:5601
并以 https://开头。Kibana 将不接受常规 HTTP 协议连接。您应该看到登录页面,如图 9.11所示。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_11.jpg
图 9.11 – Kibana 登录页面
插入用户名和密码后,您应该能够访问 Kibana 的第一个空白页面(图 9.12)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_12.jpg
图 9.12 – Kibana 的第一个空白页面
点击自行探索,现在您可以尽情使用 Elastic(虽然目前还没有任何数据)。为此,我们将尝试使用我们熟悉的 Titanic 数据集。在主页上,点击左上角的菜单,然后点击堆栈管理(最后一个选项)。在下一页中,左侧菜单中,点击数据视图,然后在中心点击上传文件按钮(图 9.13)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_13.jpg
图 9.13 – 在 Kibana 中上传文件选项
现在,选择你已经拥有的 Titanic 数据集 CSV 文件并将其上传到 Kibana。您将看到一个页面,显示来自文件的映射内容(图 9.14)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_14.jpg
图 9.14 – Titanic 数据集的映射内容
现在,点击titanic
,确保创建数据视图选项已选中。点击导入(图 9.15)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_15.jpg
图 9.15 – Kibana – 创建索引
几秒钟后,你应该会看到一个成功的屏幕。现在,让我们使用这些数据进行一些可视化操作。返回主页,在左侧菜单中点击仪表板。然后点击创建仪表板,再点击创建可视化。这将带你进入 Kibana 中的可视化构建页面(图 9.16)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_16.jpg
图 9.16 – Kibana 可视化创建
现在,让我们来创建一些快速的可视化。在页面的右侧,选择可视化的类型(我们选择垂直堆叠条形图)。对于横轴,拖动并放置Pclass字段。对于纵轴,拖动并放置Fare字段。由于这是一个数值字段,Kibana 会自动建议使用中位数作为聚合函数。点击它将其更改为平均值。对于细分,拖动并放置Sex字段。最后我们将得到一个漂亮的条形图,如图 9.17所示。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_17.jpg
图 9.17 – 按性别和 Pclass 分组的平均票价
点击保存并返回,在新仪表板上查看你新创建的图形。让我们再进行一次快速分析。再次点击创建可视化。这次,我们将制作一个散点图,展示年龄和票价之间是否存在关联。将年龄放入横轴,将票价放入纵轴。点击纵轴将聚合函数更改为平均值。现在,你将看到一个漂亮的散点图,展示这两个变量之间的互动。到目前为止,没有显著的相关性。接下来,我们将Pclass字段添加为细分,你将得到一个很酷的数据可视化图像(图 9.18)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_18.jpg
图 9.18 – Kibana 中的散点图
现在,点击Survivors
(图 9.19)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_19.jpg
图 9.19 – 生还者计数可视化
然后,点击保存并返回,并根据需要手动重新排列仪表板(图 9.20中展示了一个简单的示例)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_09_20.jpg
图 9.20 – 你的第一个 Kibana 仪表板
就是这样!你成功地在 Kubernetes 上部署了 Elasticsearch 和 Kibana,手动添加了数据,并创建了一个仪表板(具有巨大潜力)。随时可以尝试使用 Kibana,尝试其他数据集和可视化。
总结
在本章中,我们探讨了两个强大的工具,Trino 和 Elasticsearch,它们使得在基于 Kubernetes 的大数据架构中进行有效的数据消费和分析成为可能。我们学习了拥有一个强大的数据消费层的重要性,它可以在数据仓库和业务分析师之间架起桥梁,帮助他们提取有价值的洞察,并做出明智的决策。
我们学习了如何在 Kubernetes 上部署 Trino,一个分布式 SQL 查询引擎,并利用它直接查询存储在对象存储系统(如 Amazon S3)中的数据。这消除了传统数据仓库的需求,并提供了一种具有成本效益、可扩展且灵活的解决方案,用于查询大规模数据集。我们通过实际操作获得了在 Kubernetes 上部署 Trino、配置其使用 AWS Glue 数据目录,并执行针对数据湖的 SQL 查询的经验。
此外,我们还深入学习了 Elasticsearch,一个高度可扩展且高效的搜索引擎,以及 Kibana,它是一个强大的数据可视化工具。我们学习了如何使用 ECK 操作符在 Kubernetes 上部署 Elasticsearch,如何为优化存储和检索创建索引,以及如何使用 Kibana 构建简单但富有洞察力的可视化。这一组合使我们具备了分析实时数据流并发现有价值的模式和趋势的能力。
本章中学习的技能在今天这个数据驱动的世界中至关重要,组织需要有效地消耗和分析海量数据,以做出明智的业务决策。Trino 和 Elasticsearch 对于不熟悉编码的业务团队来说也非常有帮助,可以通过简单的 SQL 查询或可视化的方式探索数据,从而提升日常决策能力。
在下一章中,我们将把到目前为止所学的所有内容结合起来,构建一个完整的 Kubernetes 数据管道。
第十章:在 Kubernetes 上构建大数据管道
在前面的章节中,我们介绍了在 Kubernetes 上构建大数据管道所需的各个组件。我们探索了 Kafka、Spark、Airflow、Trino 等工具。然而,在现实世界中,这些工具并非孤立运行。它们需要集成并协调工作,形成完整的数据管道,以应对各种数据处理需求。
在本章中,我们将把你迄今为止所学的所有知识和技能结合起来,通过构建两个完整的数据管道来实践:一个批处理管道和一个实时管道。到本章结束时,你将能够(1)部署和协调构建大数据管道所需的所有工具;(2)使用 Python、SQL 和 API 编写数据处理、协调和查询的代码;(3)无缝集成不同的工具,创建复杂的数据管道;(4)理解并应用构建可扩展、高效且可维护的数据管道的最佳实践。
我们将从确保所有必要工具在你的 Kubernetes 集群中正确部署和运行开始。然后,我们将深入构建批处理管道,你将学习如何从各种来源摄取数据,使用 Spark 进行处理,并将结果存储在数据湖中以供查询和分析。
接下来,我们将讨论实时数据管道,它对于近实时地处理和分析数据流至关重要。你将学习如何使用 Kafka、Spark Streaming 和 Elasticsearch 来摄取和处理数据流,使你能够构建能够实时响应事件的应用程序。
到本章结束时,你将获得在 Kubernetes 上构建完整数据管道的实际操作经验,为应对现实世界的大数据挑战做好准备。让我们开始吧,解锁 Kubernetes 上的大数据力量!
在本章中,我们将涵盖以下主要主题:
-
检查已部署的工具
-
构建批处理数据管道
-
构建实时数据管道
技术要求
本章的活动要求你拥有一个正在运行的 Kubernetes 集群。有关 Kubernetes 部署和所有必要操作符的详细信息,请参阅第八章。你还需要一个亚马逊 Web 服务(AWS)账户来进行练习。我们还将使用 DBeaver 来检查数据。安装说明,请参阅第九章。
本章的所有代码都可以在github.com/PacktPublishing/Bigdata-on-Kubernetes
的Chapter10
文件夹中找到。
检查已部署的工具
在我们深入构建一个完整的数据管道之前,我们需要确保所有必要的操作符已经在 Kubernetes 上正确部署。我们将检查 Spark 操作符、Strimzi 操作符、Airflow 和 Trino。首先,我们通过以下命令检查 Spark 操作符:
kubectl get pods -n spark-operator
此输出显示 Spark 操作符正在成功运行:
NAME READY STATUS
spark-operator-74db6fcf98-f86vt 1/1 Running
spark-operator-webhook-init-5594s 0/1 Completed
现在,我们将检查 Trino。为此,输入以下命令:
kubectl get pods -n trino
检查所有 Pod 是否正确运行;在我们的案例中,包括一个协调器 Pod 和两个工作节点 Pod。同时,使用以下命令检查 Kafka 和 Elasticsearch:
kubectl get pods -n kafka
kubectl get pods -n elastic
最后,我们需要重新部署 Airflow。我们将需要使用 Airflow 的特定版本以及其某个提供程序库,以确保与 Spark 正常配合工作。我已经设置了一个 Airflow 2.8.1 的镜像,并配备了 apache-airflow-providers-cncf-kubernetes
库的 7.13.0 版本(这是 SparkKubernetesOperator
所需的)。如果你已经安装了 Airflow,可以通过以下命令将其删除:
helm delete airflow -n airflow
确保所有服务和持久化存储卷声明也已被删除,使用以下代码:
kubectl delete svc --all -n airflow
kubectl delete pvc --all -n airflow
然后,我们需要稍微修改我们已有的 custom_values.yaml
文件配置。我们需要将 defaultAirflowTag
和 airflowVersion
参数设置为 2.8.1
,并且将 images.airflow
参数更改为获取已准备好的公共镜像:
images:
airflow:
repository: "docker.io/neylsoncrepalde/apache-airflow"
tag: "2.8.1-cncf7.13.0"
digest: ~
pullPolicy: IfNotPresent
此外,如果你使用的是不同的 GitHub 仓库或文件夹,别忘了调整 dags.gitSync
参数。适配后的完整版本 custom_values.yaml
代码可在 github.com/PacktPublishing/Bigdata-on-Kubernetes/tree/main/Chapter10/airflow_deployment
查阅。用新配置重新部署 Airflow,操作如下:
helm install airflow apache-airflow/airflow --namespace airflow --create-namespace -f custom_values.yaml
最后所需的配置允许 Airflow 在集群上运行 SparkApplication
实例。我们将为 Airflow 命名空间设置一个服务账户和一个集群角色绑定:
kubectl create serviceaccount spark -n airflow
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=airflow:spark --namespace=airflow
现在,我们将创建一个新的集群角色和集群角色绑定,以授予 Airflow 工作节点必要的权限。设置一个 YAML 文件:
rolebinding_for_airflow.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: spark-cluster-cr
labels:
rbac.authorization.kubeflow.org/aggregate-to-kubeflow-edit: "true"
rules:
- apiGroups:
- sparkoperator.k8s.io
resources:
- sparkapplications
verbs:
- "*"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: airflow-spark-crb
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: spark-cluster-cr
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
现在,使用以下命令部署此配置:
kubectl apply -f rolebinding_for_airflow.yaml -n airflow
就这样!我们现在可以进入批处理数据管道的实现部分了。让我们开始吧。
构建批处理管道
对于批处理管道,我们将使用在 第五章 中使用的 IMBD 数据集。我们将自动化整个过程,从数据采集和将数据摄取到我们的数据湖(Amazon Simple Storage Service,Amazon S3),直到将消费就绪的表格交付给 Trino。在 图 10.1 中,你可以看到代表本节练习架构的示意图:
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_01.jpg
图 10.1 – 批处理管道的架构设计
现在,让我们进入代码部分。
构建 Airflow DAG
让我们像往常一样开始开发我们的 Airflow DAG。完整的代码可以在 github.com/PacktPublishing/Bigdata-on-Kubernetes/tree/main/Chapter10/batch/dags
文件夹中找到:
-
以下是 Airflow DAG 的第一部分代码:
imdb_dag.py
from airflow.decorators import task, dag from airflow.utils.task_group import TaskGroup from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator from airflow.models import Variable from datetime import datetime import requests import boto3 aws_access_key_id = Variable.get("aws_access_key_id") aws_secret_access_key = Variable.get("aws_secret_access_key") s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key ) default_args = { 'owner': 'Ney', 'start_date': datetime(2024, 5, 10) }
这段代码导入了必要的库,设置了两个用于在 AWS 上进行身份验证的环境变量,定义了一个 Amazon S3 客户端,并设置了一些默认配置。
-
在下一个代码块中,我们将开始 DAG 函数的编写:
@dag( default_args=default_args, schedule_interval="@once", description="IMDB Dag", catchup=False, tags=['IMDB'] ) def IMDB_batch():
该代码块集成了 DAG 的默认参数,并定义了一个调度间隔,使得 DAG 仅运行一次,还设置了一些元数据。
-
现在,我们将定义第一个任务,它会自动下载数据集并将其存储在 S3 中(第一行重复):
def IMDB_batch(): @task def data_acquisition(): urls_dict = { "names.tsv.gz": "https://datasets.imdbws.com/name.basics.tsv.gz", "basics.tsv.gz": "https://datasets.imdbws.com/title.basics.tsv.gz", "crew.tsv.gz": "https://datasets.imdbws.com/title.crew.tsv.gz", "principals.tsv.gz": "https://datasets.imdbws.com/title.principals.tsv.gz", "ratings.tsv.gz": "https://datasets.imdbws.com/title.ratings.tsv.gz" } for title, url in urls_dict.items(): response = requests.get(url, stream=True) with open(f"/tmp/{title}", mode="wb") as file: file.write(response.content) s3.upload_file(f"/tmp/{title}", "bdok-<YOUR_ACCOUNT_NUMBER>", f"landing/imdb/{title}") return True
这段代码来源于我们在 第五章 中开发的内容,最后做了一个小修改,用于将下载的文件上传到 S3。
-
接下来,我们将调用 Spark 处理作业来转换数据。第一步仅仅是读取数据的原始格式(TSV),并将其转换为 Parquet(这种格式在 Spark 中优化了存储和处理)。首先,我们定义一个
TaskGroup
实例,以便更好地组织任务:with TaskGroup("tsvs_to_parquet") as tsv_parquet: tsvs_to_parquet = SparkKubernetesOperator( task_id="tsvs_to_parquet", namespace="airflow", #application_file=open(f"{APP_FILES_PATH}/spark_imdb_tsv_parquet.yaml").read(), application_file="spark_imdb_tsv_parquet.yaml", kubernetes_conn_id="kubernetes_default", do_xcom_push=True ) tsvs_to_parquet_sensor = SparkKubernetesSensor( task_id="tsvs_to_parquet_sensor", namespace="airflow", application_name="{{ task_instance.xcom_pull(task_ids='tsvs_to_parquet.tsvs_to_parquet')['metadata']['name'] }}", kubernetes_conn_id="kubernetes_default" ) tsvs_to_parquet >> tsvs_to_parquet_sensor
在这个组内,有两个任务:
-
tsvs_to_parquet
:这是一个SparkKubernetesOperator
任务,在 Kubernetes 上运行一个 Spark 作业。该作业在spark_imdb_tsv_parquet.yaml
文件中定义,包含 Spark 应用程序的配置。我们使用do_xcom_push=True
参数,使得该任务与后续任务之间能够进行跨任务通信。 -
tsvs_to_parquet_sensor
:这是一个SparkKubernetesSensor
任务,它监视由tsvs_to_parquet
任务启动的 Spark 作业。它通过task_instance.xcom_pull
方法,从前一个任务推送的元数据中获取 Spark 应用程序名称。此传感器在 Spark 作业完成之前不会允许 DAG 继续执行后续任务。
tsvs_to_parquet >> tsvs_to_parquet_sensor
这一行设置了任务依赖关系,确保tsvs_to_parquet_sensor
任务在tsvs_to_parquet
任务成功完成后运行。 -
-
接下来,我们将进行另一次数据处理,这次使用 Spark。我们将连接所有表,构建一个合并的唯一表。这个合并后的表被称为
TaskGroup
实例,名为Transformations
,并按与之前代码块相同的方式进行处理:with TaskGroup('Transformations') as transformations: consolidated_table = SparkKubernetesOperator( task_id='consolidated_table', namespace="airflow", application_file="spark_imdb_consolidated_table.yaml", kubernetes_conn_id="kubernetes_default", do_xcom_push=True ) consolidated_table_sensor = SparkKubernetesSensor( task_id='consolidated_table_sensor', namespace="airflow", application_name="{{ task_instance.xcom_pull(task_ids='Transformations.consolidated_table')['metadata']['name'] }}", kubernetes_conn_id="kubernetes_default" ) consolidated_table >> consolidated_table_sensor
-
最后,在数据处理并写入 Amazon S3 后,我们将触发一个 Glue 爬虫,它会将该表的元数据写入 Glue 数据目录,使其可供 Trino 使用:
glue_crawler_consolidated = GlueCrawlerOperator( task_id='glue_crawler_consolidated', region_name='us-east-1', aws_conn_id='aws_conn', wait_for_completion=True, config = {'Name': 'imdb_consolidated_crawler'} )
记住,这些代码应该缩进以便位于
IMDB_Batch()
函数内。 -
现在,在这段代码的最后一个块中,我们将配置任务之间的依赖关系以及
TaskGroup
实例,并触发 DAG 函数的执行:da = data_acquisition() da >> tsv_parquet >> transformations transformations >> glue_crawler_consolidated execution = IMDB_batch()
现在,我们需要设置 Airflow 将调用的两个 SparkApplication
实例,以及 AWS 上的 Glue 爬虫。我们开始吧。
创建 SparkApplication 任务
我们将遵循 第八章 中使用的相同模式来配置 Spark 任务。我们需要存储在 S3 中的 PySpark 代码,以及定义任务的 YAML 文件,该文件必须与 Airflow DAG 代码一起位于 dags
文件夹中:
-
由于 YAML 文件与我们之前做的非常相似,我们不会在这里深入讨论。两个 YAML 文件的代码可以在
github.com/PacktPublishing/Bigdata-on-Kubernetes/tree/main/Chapter10/batch/dags
文件夹中找到。创建这些文件并将它们分别保存为spark_imdb_tsv_parquet.yaml
和spark_imdb_consolidated_table.yaml
到dags
文件夹中。 -
接下来,我们将查看 PySpark 代码。第一个任务相当简单,它从 Airflow 导入的 TSV 文件中读取数据,并将相同的数据转换后写回 Parquet 格式。首先,我们导入 Spark 模块,并定义一个带有必要配置的
SparkConf
对象,用于 Spark 应用程序:from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession conf = ( SparkConf() .set("spark.cores.max", "2") .set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true") .set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true") .set("spark.hadoop.fs.s3a.fast.upload", True) .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .set("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.EnvironmentVariablesCredentials") .set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.3") ) sc = SparkContext(conf=conf).getOrCreate()
这些配置是针对与 Amazon S3 配合使用而定制的,启用了某些功能,例如 S3 V4 认证、快速上传和使用 S3A 文件系统实现。
spark.cores.max
属性限制应用程序使用的最大核心数为2
。最后一行创建了一个带有之前定义的配置的SparkContext
对象。 -
接下来,我们创建一个
SparkSession
实例并将日志级别设置为"WARN"
,这样日志中只会显示警告和错误信息。这有助于提高日志的可读性:if __name__ == "__main__": spark = SparkSession.builder.appName("SparkApplicationJob").getOrCreate() spark.sparkContext.setLogLevel("WARN")
-
接下来,我们将定义表格架构。在处理大数据集时,这一点非常重要,因为它可以提高 Spark 在处理基于文本的文件(如 TSV、CSV 等)时的性能。接下来,我们仅展示第一个表格的架构,以简化可读性。完整代码可以在
github.com/PacktPublishing/Bigdata-on-Kubernetes/tree/main/Chapter10/batch/spark_code
文件夹中找到:schema_names = "nconst string, primaryName string, birthYear int, deathYear int, primaryProfession string, knownForTitles string"
-
现在,我们将表格读取到 Spark DataFrame 中(这里只显示读取第一个表格):
names = ( spark .read .schema(schema_names) .options(header=True, delimiter="\t") .csv('s3a://bdok-<ACCOUNT_NUMBER>/landing/imdb/names.tsv.gz') )
-
接下来,我们将把表格写回 S3,以 Parquet 格式存储:
names.write.mode("overwrite").parquet("s3a://bdok-<ACCOUNT_NUMBER>/bronze/imdb/names")
-
最后,我们停止 Spark 会话并释放应用程序使用的任何资源:
spark.stop()
-
将此文件保存为
spark_imdb_tsv_parquet.py
并上传到你在 YAML 文件中定义的 S3 存储桶中(在本例中为s3a://bdok-<ACCOUNT_NUMBER>/spark-jobs/
)。 -
现在,我们将定义第二个
SparkApplication
实例,负责构建 OBT。对于第二段代码,我们将跳过 Spark 配置和SparkSession
代码块,因为它们与上一个任务几乎相同,唯一需要导入的是一个额外的模块:from pyspark.sql import functions as f
这将导入
functions
模块,允许使用 Spark 内部函数进行数据转换。 -
我们从这里开始读取数据集:
names = spark.read.parquet("s3a://bdok-<ACCOUNT_NUMBER>/bronze/imdb/names") basics = spark.read.parquet("s3a://bdok-<ACCOUNT_NUMBER>/bronze/imdb/basics") crew = spark.read.parquet("s3a://bdok-<ACCOUNT_NUMBER>/bronze/imdb/crew") principals = spark.read.parquet("s3a://bdok-<ACCOUNT_NUMBER>/bronze/imdb/principals") ratings = spark.read.parquet("s3a://bdok-<ACCOUNT_NUMBER>/bronze/imdb/ratings")
-
names
数据集中的knownForTitles
列和crew
数据集中的directors
列,在同一个单元格中有多个值,需要进行展开,才能得到每个导演和标题的一行数据:names = names.select( 'nconst', 'primaryName', 'birthYear', 'deathYear', f.explode(f.split('knownForTitles', ',')).alias('knownForTitles') ) crew = crew.select( 'tconst', f.explode(f.split('directors', ',')).alias('directors'), 'writers' )
-
现在,我们开始进行表连接:
basics_ratings = basics.join(ratings, on=['tconst'], how='inner') principals_names = ( principals.join(names, on=['nconst'], how='inner') .select('nconst', 'tconst','ordering', 'category', 'characters', 'primaryName', 'birthYear', 'deathYear') .dropDuplicates() ) directors = ( crew .join(names, on=crew.directors == names.nconst, how='inner') .selectExpr('tconst', 'directors', 'primaryName as directorPrimaryName', 'birthYear as directorBirthYear', 'deathYear as directorDeathYear') .dropDuplicates() )
在这里,我们执行三次连接操作:(a)通过在
basics
和ratings
DataFrame 中基于tconst
列(电影标识符)连接,创建basics_ratings
;(b)通过在principals
和names
DataFrame 中基于nconst
列(演员标识符)连接,创建principals_names
;我们选择一些特定的列并删除重复项;(c)通过在crew
和names
DataFrame 中基于crew
中的directors
列与names
中的nconst
列连接,创建directors
表。接着,我们选择特定的列,重命名一些列以便识别哪些数据与导演相关,并删除重复项。 -
接下来,我们将创建一个
basics_principals
表,通过连接crew
和principals_names
数据集,获取有关剧组和电影表演者的完整数据集。最后,我们创建一个basics_principals_directors
表,连接directors
表信息:basics_principals = basics_ratings.join(principals_names, on=['tconst'], how='inner').dropDuplicates() basics_principals_directors = basics_principals.join(directors, on=['tconst'], how='inner').dropDuplicates()
-
最后,我们将这个最终表作为 Parquet 文件写入 Amazon S3 并停止 Spark 作业:
basics_principals_directors.write.mode("overwrite").parquet("s3a://bdok-<ACCOUNT_NUMBER>/silver/imdb/consolidated") spark.stop()
最后一步是创建一个 Glue 爬虫,使 OBT 中的信息可供 Trino 使用。
创建一个 Glue 爬虫
我们将使用 AWS 控制台创建一个 Glue 爬虫。请按照以下步骤操作:
- 登录到 AWS 并进入AWS Glue页面。然后,点击侧边菜单中的爬虫,并点击创建爬虫(图 10.2):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_02.jpg
图 10.2 – AWS Glue:爬虫页面
-
接下来,输入
imdb_consolidated_crawler
(与 Airflow 代码中引用的名称相同)作为爬虫名称,并根据需要填写描述。点击下一步。 -
确保在第一个配置项您的数据是否已映射到 Glue 表?中选中尚未。然后,点击添加数据源(图 10.3):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_03.jpg
图 10.3 – 添加数据源
- 在
s3://bdok-<ACCOUNT_NUMBER>/silver/imdb/consolidated
中,如图 10.4所示。点击添加 S3 数据源,然后点击下一步:
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_04.jpg
图 10.4 – S3 路径配置
- 在下一页,点击创建新 IAM 角色,并为其填写您喜欢的名称。确保它不是已有的角色名称。点击下一步(图 10.5):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_05.jpg
图 10.5 – IAM 角色配置
- 在下一页,您可以选择我们在第九章中创建的相同数据库来与 Trino(
bdok-database
)一起使用。为了方便定位此表,请使用imdb-
(图 10.6)。将Crawler 调度设置为按需。点击下一步:
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_06.jpg
图 10.6 – 目标数据库和表名前缀
- 在最后一步,检查所有提供的信息。如果一切正确,点击 创建爬虫。
就这样!设置完成。现在,我们返回到浏览器中的 Airflow UI,激活 DAG,看看“魔法”发生了 (图 10*.7*):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_07.jpg
图 10.7 – 在 Airflow 上运行完整的 DAG
DAG 执行成功后,等待大约 2 分钟,直到爬虫停止,然后我们用 DBeaver 搜索我们的数据。我们来玩一下,搜索所有《约翰·威克》电影 (图 10*.8*):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_08.jpg
图 10.8 – 使用 DBeaver 检查 Trino 中的 OBT
看,搞定了!你刚刚运行了完整的批量数据处理管道,连接了我们迄今为止学习的所有批量工具。恭喜!现在,我们将开始构建一个使用 Kafka、Spark Streaming 和 Elasticsearch 的数据流管道。
构建实时数据管道
对于实时管道,我们将使用我们在 第八章 中使用的相同数据模拟代码,结合一个增强的架构。在 图 10*.9* 中,你可以看到我们即将构建的管道架构设计:
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_09.jpg
图 10.9 – 实时数据管道架构
首先,我们需要在 AWS 上创建一个 虚拟私有云 (VPC) —— 一个私有网络 —— 并设置一个 关系数据库服务 (RDS) Postgres 数据库,作为我们的数据源:
-
进入 AWS 控制台,导航到 VPC 页面。在 VPC 页面上,点击 创建 VPC,你将进入配置页面。
-
确保
bdok
在10.20.0.0/16
无类域间路由 (CIDR) 块中。其余设置保持默认 (图 10*.10*):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_10.jpg
图 10.10 – VPC 基本配置
- 现在,滚动页面。你可以保持 可用区 (AZs) 和子网配置不变(两个 AZ,两个公共子网和两个私有子网)。确保为 网络地址转换 (NAT) 网关勾选 在 1 个 AZ 中。保持 S3 网关 选项框选中 (图 10*.11*)。此外,最后两个 DNS 选项也保持勾选。点击 创建 VPC:
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_11.jpg
图 10.11 – NAT 网关配置
-
创建 VPC 可能需要几分钟。成功创建后,在 AWS 控制台中,导航到 RDS 页面,在侧边菜单中点击 数据库,然后点击 创建数据库。
-
在下一个页面中,选择标准创建,并选择Postgres作为数据库类型。保持默认的引擎版本。在模板部分,选择免费层,因为我们仅需要一个小型数据库用于本练习。
-
在
bdok-postgres
中,对于凭证,保持postgres
为主用户名,选择自管理作为凭证管理选项,并选择一个主密码(图 10.12):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_12.jpg
图 10.12 – 数据库名称和凭证
-
将实例配置和存储部分保持为默认设置。
-
在
bdok-vpc
中,保持bdok-database-sg
作为安全组名称(图 10.13):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_13.jpg
图 10.13 – RDS 子网和安全组配置
- 确保数据库认证部分标记为密码认证。其他设置可以保持默认。最后,AWS 会给出如果我们让这个数据库运行 30 天的费用估算(图 10.14)。最后,点击创建数据库,并等待几分钟以完成数据库创建:
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_14.jpg
图 10.14 – 数据库费用估算
- 最后,我们需要更改数据库安全组的配置,以允许来自 VPC 外部的连接,除了您自己的 IP 地址(默认配置)。进入
bdok-postgres
数据库。点击安全组名称以打开其页面(图 10.15):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_15.jpg
图 10.15 – bdok-postgres 数据库查看页面
- 在安全组页面中,选择安全组后,向下滚动页面并点击入站规则选项卡。点击编辑入站规则(图 10.16):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_16.jpg
图 10.16 – 安全组页面
- 在下一个页面中,您将看到一个已经配置好的入口规则,源为您的 IP 地址。将其更改为任何地方-IPv4,并点击保存规则(图 10.17):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_17.jpg
图 10.17 – 安全规则配置
-
最后,为了向我们的数据库填充一些数据,我们将使用
simulatinos.py
代码生成一些虚假的客户数据并将其导入到数据库中。代码可以在github.com/PacktPublishing/Bigdata-on-Kubernetes/tree/main/Chapter10/streaming
文件夹中找到。要运行它,请从 AWS 页面复制数据库端点,然后在终端中输入以下命令:python simulations.py --host <YOUR-DATABASE-ENDPOINT> -p <YOUR-PASSWORD>
在代码在终端打印出一些数据后,使用Ctrl + C 停止该进程。现在,我们可以开始处理数据管道了。让我们从 Kafka Connect 配置开始。
部署 Kafka Connect 和 Elasticsearch
为了让 Kafka 能够访问 Elasticsearch,我们需要在与 Kafka 部署相同的命名空间中部署另一个 Elasticsearch 集群。为此,我们将使用两个 YAML 配置文件,elastic_cluster.yaml
和kibana.yaml
。这两个文件可以在github.com/PacktPublishing/Bigdata-on-Kubernetes/tree/main/Chapter10/streaming/elastic_deployment
文件夹中找到。请按照以下步骤操作:
-
首先,下载这两个文件并在终端中运行以下命令:
kubectl apply -f elastic_cluster.yaml -n kafka kubectl apply -f kibana.yaml -n kafka
-
接下来,我们将使用以下命令获取一个自动生成的 Elasticsearch 密码:
kubectl get secret elastic-es-elastic-user -n kafka -o go-template='{{.data.elastic | base64decode}}'
该命令将在终端中打印出密码。请保存以供后用。
-
Elasticsearch 仅在传输加密时工作。这意味着我们必须配置证书和密钥,允许 Kafka Connect 正确连接到 Elastic。为此,首先,我们将获取 Elastic 的证书和密钥,并使用以下命令将它们保存在本地:
kubectl get secret elastic-es-http-certs-public -n kafka --output=go-template='{{index .data "ca.crt" | base64decode}}' > ca.crt kubectl get secret elastic-es-http-certs-public -n kafka --output=go-template='{{index .data "tls.crt" | base64decode}}' > tls.crt kubectl get secret elastic-es-http-certs-internal -n kafka --output=go-template='{{index .data "tls.key" | base64decode}}' > tls.key
这将在本地创建三个文件,分别命名为
ca.crt
、tls.crt
和tls.key
。 -
现在,我们将使用这些文件创建一个
keystore.jks
文件,该文件将用于 Kafka Connect 集群。在终端中运行以下命令:openssl pkcs12 -export -in tls.crt -inkey tls.key -CAfile ca.crt -caname root -out keystore.p12 -password pass:BCoqZy82BhIhHv3C -name es-keystore keytool -importkeystore -srckeystore keystore.p12 -srcstoretype PKCS12 -srcstorepass BCoqZy82BhIhHv3C -deststorepass OfwxynZ8KATfZSZe -destkeypass OfwxynZ8KATfZSZe -destkeystore keystore.jks -alias es-keystore
请注意,我设置了一些随机密码。你可以根据自己的需要选择其他密码。现在,你已经有了我们需要配置传输加密的文件
keystore.jks
。 -
接下来,我们需要使用
keystore.jks
文件在 Kubernetes 中创建一个 secret。为此,在终端中输入以下命令:kubectl create secret generic es-keystore --from-file=keystore.jks -n kafka
-
现在,我们准备好部署 Kafka Connect 了。我们有一个已准备好的配置文件,名为
connect_cluster.yaml
,可以在github.com/PacktPublishing/Bigdata-on-Kubernetes/tree/main/Chapter10/streaming
文件夹中找到。然而,这段代码有两个部分值得注意。在第 13 行,我们有spec.bootstrapServers
参数。这个参数应该填写由 Helm chart 创建的 Kafka 启动服务。要获取该服务的名称,输入以下命令:kubectl get svc -n kafka
检查服务名称是否与代码中的名称匹配。如果不匹配,请相应调整。保持此服务的
9093
端口。 -
在第 15 行,你有
spec.tls.trustedCertificates
参数。secretName
的值应该与 Helm chart 创建的ca-cert
secret 的确切名称匹配。使用以下命令检查此 secret 的名称:kubectl get secret -n kafka
如果 secret 的名称不匹配,请相应调整。保持
certificate
参数中的ca.crt
值。 -
最后值得一提的是,我们将在 Kafka Connect 的 Pod 中将之前创建的
es-keystore
secret 作为卷进行挂载。以下代码块设置了此配置:externalConfiguration: volumes: - name: es-keystore-volume secret: secretName: es-keystore
此 secret 必须作为卷挂载,以便 Kafka Connect 可以导入连接到 Elasticsearch 所需的 secret。
-
要部署 Kafka Connect,在终端中输入以下命令:
kubectl apply -f connect_cluster.yaml -n kafka
Kafka Connect 集群将在几分钟内准备就绪。准备好后,接下来是配置Java 数据库连接(JDBC)源连接器,从 Postgres 数据库中拉取数据。
-
接下来,准备一个配置 JDBC 源连接器的 YAML 文件。接下来,您将找到此文件的代码:
jdbc_source.yaml
apiVersion: "kafka.strimzi.io/v1beta2" kind: "KafkaConnector" metadata: name: "jdbc-source" namespace: kafka labels: strimzi.io/cluster: kafka-connect-cluster spec: class: io.confluent.connect.jdbc.JdbcSourceConnector tasksMax: 1 config: key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: true value.converter.schemas.enable: true connection.url: «jdbc:postgresql://<DATABASE_ENDPOINT>:5432/postgres» connection.user: postgres connection.password: "<YOUR_PASSWORD>" connection.attempts: "2" query: "SELECT * FROM public.customers" mode: "timestamp" timestamp.column.name: "dt_update" topic.prefix: "src-customers" valincrate.non.null: "false"
连接器的配置指定它应该使用来自 Confluent 的 JDBC 连接器库中的
io.confluent.connect.jdbc.JdbcSourceConnector
类。它将连接器的最大任务数(并行工作线程)设置为 1。连接器配置为使用 JSON 转换器处理键和值,并包括模式信息。它连接到运行在 Amazon RDS 实例上的 PostgreSQL 数据库,使用提供的连接 URL、用户名和密码。指定了SELECT * FROM public.customers
SQL 查询,这意味着连接器将持续监控customers
表,并将任何新行或更新行作为 JSON 对象流式传输到名为src-customers
的 Kafka 主题中。mode
值设置为timestamp
,这意味着连接器将使用时间戳列(dt_update
)来追踪哪些行已经处理过,从而避免重复。最后,validate.non.null
选项设置为false
,这意味着如果遇到数据库行中的null
值,连接器不会失败。 -
将 YAML 文件放在名为
connectors
的文件夹中,并使用以下命令部署 JDBC 连接器:kubectl apply -f connectors/jdbc_source.yaml -n kafka
您可以使用以下命令检查连接器是否已正确部署:
kubectl get kafkaconnector -n kafka kubectl describe kafkaconnector jdbc-source -n kafka
我们还将使用以下命令检查消息是否正确地传递到分配的 Kafka 主题:
kubectl exec kafka-cluster-kafka-0 -n kafka -c kafka -it -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic src-customers
您应该看到以 JSON 格式打印在屏幕上的消息。太棒了!我们已经与源数据库建立了实时连接。现在,是时候使用 Spark 设置实时处理层了。
实时处理与 Spark
为了正确连接 Spark 和 Kafka,我们需要在 Kafka 的命名空间中设置一些授权配置:
-
以下命令创建了一个 Spark 的服务账户,并设置了运行
SparkApplication
实例所需的权限:kubectl create serviceaccount spark -n kafka kubectl create clusterrolebinding spark-role-kafka --clusterrole=edit --serviceaccount=kafka:spark -n kafka
-
接下来,我们需要确保在命名空间中设置了一个包含 AWS 凭证的秘密。使用以下命令检查秘密是否已存在:
kubectl get secrets -n kafka
如果秘密尚不存在,请使用以下命令创建它:
kubectl create secret generic aws-credentials --from-literal=aws_access_key_id=<YOUR_ACCESS_KEY_ID> --from-literal=aws_secret_access_key="<YOUR_SECRET_ACCESS_KEY>" -n kafka
-
现在,我们需要构建一个 Spark Streaming 作业。为了实现这一点,正如之前所看到的,我们需要一个 YAML 配置文件和存储在 Amazon S3 中的 PySpark 代码。YAML 文件遵循与第八章中看到的相同模式。此配置的代码可在
github.com/PacktPublishing/Bigdata-on-Kubernetes/tree/main/Chapter10/streaming
文件夹中找到。将其保存在本地,因为它将用于部署SparkApplication
作业。 -
Spark 作业的 Python 代码也可以在 GitHub 仓库中的第十章
/streaming/processing
文件夹中找到,文件名为spark_streaming_job.py
。这段代码与我们在第七章中看到的非常相似,但有一些地方值得注意。在第 61 行,我们对数据进行实时转换。在这里,我们仅仅根据出生日期计算人的年龄,使用以下代码:query = ( newdf .withColumn("dt_birthdate", f.col("birthdate")) .withColumn("today", f.to_date(f.current_timestamp() ) ) .withColumn("age", f.round( f.datediff(f.col("today"), f.col("dt_birthdate"))/365.25, 0) ) .select("name", "gender", "birthdate", "profession", "age", "dt_update") )
-
为了使 Elasticsearch sink 连接器正确读取主题中的消息,消息必须采用标准的 Kafka JSON 格式,并包含两个键:
schema
和payload
。在代码中,我们将手动构建该 schema 并将其与最终版本的 JSON 数据连接起来。第 70 行定义了schema
键和payload
结构的开头(此行为了提高可读性,在此不会打印)。 -
在第 72 行,我们将 DataFrame 的值转换为单个 JSON 字符串,并将其设置为名为
value
的列:json_query = ( query .select( f.to_json(f.struct(f.col("*"))) ) .toDF("value") )
-
在第 80 行,我们将之前定义的
schema
键与数据的实际值连接成 JSON 字符串,并将其写入流式查询,返回 Kafka 中的customers-transformed
主题:( json_query .withColumn("value", f.concat(f.lit(write_schema), f.col("value"), f.lit('}'))) .selectExpr("CAST(value AS STRING)") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "kafka-cluster-kafka-bootstrap:9092") .option("topic", "customers-transformed") .option("checkpointLocation", "s3a://bdok-<ACCOUNT-NUMBER>/spark-checkpoint/customers-processing/") .start() .awaitTermination() )
-
将此文件保存为
spark_streaming_job.py
并保存到我们在 YAML 文件中定义的 S3 存储桶中。现在,你已经准备好开始实时处理了。要启动流式查询,请在终端中输入以下命令:kubectl apply -f spark_streaming_job.yaml -n kafka
你还可以使用以下命令检查应用程序是否正确运行:
kubectl describe sparkapplication spark-streaming-job -n kafka kubectl get pods -n kafka
-
现在,检查消息是否正确写入新主题,使用以下命令:
kubectl exec kafka-cluster-kafka-0 -n kafka -c kafka -it -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic customers-transformed
就是这样!我们已经启动了实时处理层。现在,是时候部署 Elasticsearch sink 连接器并将最终数据导入 Elastic 了。让我们开始吧。
部署 Elasticsearch sink 连接器
在这里,我们将从 Elasticsearch sink 连接器的 YAML 配置文件开始。大部分“重头戏”已经在之前的秘密配置中完成了:
-
在
connectors
文件夹下创建一个名为es_sink.yaml
的文件。以下是代码:es_sink.yaml
apiVersion: "kafka.strimzi.io/v1beta2" kind: "KafkaConnector" metadata: name: "es-sink" namespace: kafka labels: strimzi.io/cluster: kafka-connect-cluster spec: class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasksMax: 1 config: topics: "customers-transformed" connection.url: "https://elastic-es-http.kafka:9200" connection.username: "elastic" connection.password: "w6MR9V0SNLD79b56arB9Q6b6" batch.size: 1 key.ignore: "true" elastic.security.protocol: "SSL" elastic.https.ssl.keystore.location: "/opt/kafka/external-configuration/es-keystore-volume/keystore.jks" elastic.https.ssl.keystore.password: "OfwxynZ8KATfZSZe" elastic.https.ssl.key.password: "OfwxynZ8KATfZSZe" elastic.https.ssl.keystore.type: "JKS" elastic.https.ssl.truststore.location: "/opt/kafka/external-configuration/es-keystore-volume/keystore.jks" elastic.https.ssl.truststore.password: "OfwxynZ8KATfZSZe" elastic.https.ssl.truststore.type: "JKS"
我认为这里值得注意的部分是从第 20 行开始。在这里,我们正在配置与 Elasticsearch 连接的 SSL/TLS 设置。
keystore.location
和truststore.location
属性分别指定keystore
和truststore
文件的路径(在本例中,它们是相同的)。keystore.password
、key.password
和truststore.password
属性提供访问这些文件的密码。keystore.type
和truststore.type
属性指定keystore
和truststore
文件的类型,在本例中是JKS
(Java KeyStore)。 -
现在,一切已准备好启动此连接器。在终端中,输入以下命令:
kubectl apply -f connectors/es_sink.yaml -n kafka
你还可以使用以下命令检查连接器是否正确部署:
kubectl describe kafkaconnector es-sink -n kafka
-
现在,获取负载均衡器的 URL 并访问 Elasticsearch UI。让我们看看数据是否已经正确导入:
kubectl get svc -n kafka
-
一旦登录到 Elasticsearch,选择
GET _cat/indices
命令。如果一切正常,新的customers-transformed
索引将显示在输出中(图 10.18):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_18.jpg
图 10.18 – 在 Elasticsearch 中创建的新索引
-
现在,让我们使用这个索引创建一个新的数据视图。在侧边菜单中,选择 堆栈管理 并点击 数据视图。点击 创建数据视图 按钮。
-
将数据视图名称设置为
customers-transformed
,并再次将customers-transformed
设置为索引模式。选择dt_update
列作为时间戳字段。然后,点击 保存数据视图到 Kibana(图 10.19):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_19.jpg
图 10.19 – 在 Kibana 上创建数据视图
- 现在,让我们检查数据。在侧边菜单中,选择
customers-transformed
数据视图。记得将日期过滤器设置为合理的值,例如一年前。如果你在进行较大的索引操作,可以尝试一些基于时间的数据子集。数据应该会在 UI 中显示(图 10.20):
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_10_20.jpg
图 10.20 – 在 Kibana 中显示的 customers-transformed 数据
现在,通过再次运行 simulations.py
代码来添加更多数据。试着动动手,构建一些酷炫的仪表盘来可视化你的数据。
就这样!你刚刚在 Kubernetes 中使用 Kafka、Spark 和 Elasticsearch 运行了一个完整的实时数据管道。干杯,我的朋友!
总结
在本章中,我们将全书中学到的所有知识和技能结合起来,在 Kubernetes 上构建了两个完整的数据管道:一个批处理管道和一个实时管道。我们首先确保了所有必要的工具,如 Spark 运维工具、Strimzi 运维工具、Airflow 和 Trino,都已正确部署并在我们的 Kubernetes 集群中运行。
对于批处理管道,我们协调了整个过程,从数据获取和导入到 Amazon S3 数据湖,到使用 Spark 进行数据处理,最后将消费就绪的表格交付到 Trino。我们学习了如何创建 Airflow DAG,配置 Spark 应用程序,并无缝集成不同的工具来构建一个复杂的端到端数据管道。
在实时管道中,我们解决了实时处理和分析数据流的挑战。我们将 Postgres 数据库设置为数据源,部署了 Kafka Connect 和 Elasticsearch,并构建了一个 Spark Streaming 作业来对数据进行实时转换。然后,我们使用 sink 连接器将转换后的数据导入 Elasticsearch,使我们能够构建可以对事件进行响应的应用程序。
在整个章节中,我们通过编写 Python 和 SQL 代码来处理数据、编排和查询,获得了实战经验。我们还学习了集成不同工具的最佳实践,管理 Kafka 主题,以及将数据高效索引到 Elasticsearch 中。
通过完成本章的练习,您将掌握在 Kubernetes 上部署和编排构建大数据管道所需的所有工具的技能,连接这些工具以成功运行批处理和实时数据处理管道,并理解并应用构建可扩展、高效和可维护数据管道的最佳实践。
在接下来的章节中,我们将讨论如何使用 Kubernetes 部署生成式 AI(GenAI)应用程序。
第十一章:Kubernetes 上的生成式 AI
生成式人工智能(GenAI)已成为一项变革性技术,彻底改变了我们与 AI 的互动方式以及如何利用 AI。本章将带你探索生成式 AI 的精彩世界,学习如何在 Kubernetes 上利用其强大功能。我们将深入了解生成式 AI 的基础知识,并理解它与传统 AI 的主要区别。
我们的重点将放在利用 Amazon Bedrock,这是一个旨在简化生成式 AI 应用开发和部署的综合服务套件。通过实际操作示例,你将获得在 Kubernetes 上构建生成式 AI 应用的实践经验,并使用 Streamlit,一个强大的 Python 库,用于创建互动数据应用。我们将覆盖整个过程,从开发到将应用部署到 Kubernetes 集群。
此外,我们将探索 检索增强生成(RAG)的概念,RAG 将生成式 AI 的力量与外部知识库结合起来。
最后,我们将介绍 Amazon Bedrock 的 Agents,这是一项强大的功能,允许你自动化任务并创建智能助手。你将学习如何构建代理,如何通过 OpenAPI 架构定义其能力,以及如何创建作为代理后端的 Lambda 函数。
到本章结束时,你将对生成式 AI 有一个扎实的理解,了解其应用以及构建和部署生成式 AI 应用到 Kubernetes 上所需的工具和技术。
本章将涵盖以下主要主题:
-
什么是生成式 AI,什么不是
-
使用 Amazon Bedrock 与基础模型进行工作
-
在 Kubernetes 上构建生成式 AI 应用
-
使用 Amazon Bedrock 构建 RAG 知识库
-
使用代理构建行动模型
技术要求
本章需要一个 AWS 账户和一个正在运行的 Kubernetes 集群。我们还将使用 LangChain 和 Streamlit 库。虽然部署应用到 Kubernetes 时不需要它们,但如果你希望在本地测试代码并修改以适应自己的实验,建议安装这些库。
此外,为了获取 RAG 练习(第四部分)的数据,必须安装 Beautiful Soup 库。
本章的所有代码可以在 github.com/PacktPublishing/Bigdata-on-Kubernetes
的 Chapter11
文件夹中找到。
什么是生成式 AI,什么不是
生成性 AI 的核心是指能够基于其所接触到的训练数据生成新的原创内容的 AI 系统,这些内容可以是文本、图像、音频或代码。生成性 AI 模型在大量现有内容的数据集上进行训练,并学习其中的模式和关系。在得到提示后,这些模型可以生成新的原创内容,这些内容类似于训练数据,但并不是任何特定示例的精确复制。
这与传统的机器学习模型形成对比,后者侧重于基于现有数据进行预测或分类。
传统的机器学习模型,例如用于图像识别、自然语言处理或预测分析的模型,旨在接收输入数据,并基于该数据进行预测或分类。机器学习模型擅长处理分类任务(例如,识别图像中的物体或文本中的主题)、回归任务(例如,根据面积和位置等特征预测房价)以及聚类任务(例如,根据相似行为模式将客户分组)。
例如,一个图像识别模型可能会基于大量带标签的图像数据集进行训练,学习识别和分类新图像中的物体。同样,一个自然语言处理模型可能会基于文本数据语料库进行训练,执行情感分析、命名实体识别或语言翻译等任务。
在信用风险评估场景中,一个机器学习模型会基于包含过去贷款申请者信息的数据集进行训练,例如他们的收入、信用历史和其他相关特征,以及是否违约的标签。该模型会学习这些特征与贷款违约结果之间的模式和关系。当遇到新的贷款申请时,经过训练的模型便可以预测该申请者违约的可能性。
在这些情况下,机器学习模型并不生成新内容;相反,它是在使用从训练数据中学习到的模式和关系,对新的、未见过的数据进行有根据的预测或决策。
相比之下,例如,经过大量文本语料库训练的生成性 AI 模型可以在任何给定的主题或所需的风格下生成类似人类的写作。同样,经过图像训练的模型可以根据文本描述或其他输入数据创建全新的、逼真的图像。
尽管生成性 AI 的最终结果是创造新的内容,但其基本机制仍然基于相同的机器学习原理:进行预测。然而,生成性 AI 模型不仅仅是预测单一输出(如分类或数值),它们被训练来预测序列中的下一个元素,无论这个序列是由单词、像素或任何其他类型的数据组成。
大型神经网络的力量
虽然预测序列中下一个元素的概念相对简单,但生成性人工智能模型生成连贯、高质量内容的能力在于所使用的神经网络的规模和复杂性。
生成性人工智能模型通常使用拥有数十亿甚至万亿个参数的大型深度神经网络。这些神经网络在大量数据上进行训练,通常涉及数百万或数十亿个示例,从而使模型能够捕捉数据中极其微妙的模式和关系。
例如,Anthropic 的模型,如 Claude,是在一个庞大的文本数据集上进行训练的,这些数据涵盖了广泛的主题和领域。这使得模型能够深入理解语言、上下文和领域特定的知识,从而生成不仅语法正确,而且在语义上连贯且与给定上下文相关的文本。
挑战与局限性
尽管生成性人工智能展现出了显著的能力,但它并非没有挑战和局限性。一个主要的担忧是,这些模型可能生成带有偏见、有害或误导性内容,尤其是在训练数据集包含反映社会偏见或不准确信息的情况下。
此外,生成式 AI 模型有时可能会产生无意义、不一致或事实不准确的输出,尽管它们可能在表面上看起来合乎逻辑且可信。这被称为“幻觉”问题,即模型生成的内容并未基于事实知识或提供的背景。以下是两个著名的真实案例。加拿大航空的 AI 聊天机器人向一名乘客提供了有关航空公司丧亲票价政策的误导性信息。该聊天机器人错误地表示,即使在旅行已发生后,乘客仍可追溯申请减价的丧亲票价,这与加拿大航空的实际政策相矛盾。乘客依赖该聊天机器人“幻觉”般的回应,最终在航空公司拒绝兑现聊天机器人建议时,成功提起小额索赔案件并获胜(www.forbes.com/sites/marisagarcia/2024/02/19/what-air-canada-lost-in-remarkable-lying-ai-chatbot-case/
)。此外,巴西的一名联邦法官使用 ChatGPT AI 系统来研究他所撰写判决的法律先例。然而,AI 提供了伪造的信息,引用了不存在的最高法院裁决作为该法官判决的依据(g1.globo.com/politica/blog/daniela-lima/post/2023/11/13/juiz-usa-inteligencia-artificial-para-fazer-decisao-e-cita-jurisprudencia-falsa-cnj-investiga-caso.ghtml
)。
尽管存在这些挑战,生成式 AI 仍是一个快速发展的领域,研究人员和开发者正在积极解决这些问题。诸如微调、提示工程和使用外部知识来源(例如,知识库或 RAG)等技术,正在被探索以提高生成式 AI 模型的可靠性、安全性和事实准确性。
在接下来的章节中,我们将深入探讨如何使用 Amazon Bedrock 及其基础模型、知识库和基于代理的架构来构建和部署生成式 AI 应用的实际操作。
使用 Amazon Bedrock 来与基础模型进行协作
Amazon Bedrock 提供了一套基础模型,可作为构建生成式 AI 应用的模块。了解每个模型的能力和预期使用场景非常重要,以便为你的应用选择合适的模型。
在 Amazon Bedrock 中可用的模型包括语言模型、计算机视觉模型和多模态模型。语言模型擅长理解和生成类人文本,可以用于文本摘要、问答和内容生成等任务。而计算机视觉模型则擅长分析和理解视觉数据,非常适合图像识别、物体检测和场景理解等应用。
如同名称所示,多模态模型可以同时处理多种模态。这使得它适用于图像标注、视觉问答和数据图表分析等任务。
需要注意的是,每个模型都有其自身的优点和局限性,选择模型时应根据应用的具体需求来决定。例如,如果你的应用主要处理基于文本的任务,那么像 Llama 这样的语言模型可能是最合适的选择。然而,如果你需要处理文本和图像,那么像 Claude 这样的多模态模型则会更为合适。
为了将 Amazon Bedrock 的基础模型有效集成到我们的生成型 AI 应用程序中,请按照以下步骤操作:
- 要使用 Amazon Bedrock 上的可用基础模型,首先需要激活它们。进入 AWS 控制台,搜索 Amazon Bedrock 页面。然后,点击 修改模型访问权限 (图 11.1)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_01.jpg
图 11.1 – 修改 Amazon Bedrock 上的模型访问权限
- 在下一页,选择 Claude 3 Sonnet 和 Claude 3 Haiku Anthropic 模型。这些是我们将用于生成型 AI 应用程序的基础模型。如果你愿意尝试和实验不同的模型,也可以选择所有可用的模型 (图 11.2)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_02.jpg
图 11.2 – 请求访问 Anthropic 的 Claude 3 模型
- 点击 下一步,在下一页上查看更改并点击 提交。这些模型可能需要几分钟才能获得访问权限。
一旦获得访问权限,我们就拥有了开发生成型 AI 应用程序所需的一切。让我们开始吧。
在 Kubernetes 上构建生成型 AI 应用程序
在这一节中,我们将使用 Streamlit 构建一个生成型 AI 应用程序。该应用程序架构的示意图如 图 11.3 所示。在这个应用程序中,用户将能够选择与之交互的基础模型。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_03.jpg
图 11.3 – 基础模型的应用架构
让我们从应用程序的 Python 代码开始。完整的代码可以在 GitHub 上的 第十一章/streamlit-claude/app
文件夹中找到。我们将逐块分析代码:
-
创建一个名为
app
的文件夹,并在其中创建一个main.py
代码文件。首先,我们导入必要的文件并创建一个客户端以访问 Amazon Bedrock 运行时 API:import boto3 from langchain_community.chat_models import BedrockChat from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler bedrock = boto3.client(service_name='bedrock-runtime', region_name="us-east-1")
-
接下来,我们定义一个重要的参数字典,用于与 Claude 进行交互:
inference_modifier = { "max_tokens": 4096, "temperature": 0.5, "top_k": 250, "top_p": 1, "stop_sequences": ["\n\nHuman:"], }
-
接下来,我们将配置一个函数,以允许选择首选的基础模型。通过选择,我们将返回一个可以通过 Langchain 访问 Bedrock 的模型对象:
def choose_model(option): modelId = "" if option == "Claude 3 Haiku": modelId = "anthropic.claude-3-haiku-20240307-v1:0" elif option == "Claude 3 Sonnet": modelId = "anthropic.claude-3-sonnet-20240229-v1:0" model = BedrockChat( model_id=modelId, client=bedrock, model_kwargs=inference_modifier, streaming=True, callbacks=[StreamingStdOutCallbackHandler()], ) return model
-
现在,我们将添加一个小功能,用于重置对话历史记录:
def reset_conversation(): st.session_state.messages = []
-
接下来,我们将开始开发
main
函数,并为应用程序界面添加一些小部件。以下代码创建了一个侧边栏。在其中,我们添加了一个选择框,选项包括 Claude 3 Haiku 和 Claude 3 Sonnet,我们写了一个确认消息告诉用户他们正在与哪个模型对话,并添加了一个choose_model
函数来返回连接到 Bedrock 的类,并写下应用程序的标题,Chat with Claude 3:def main(): with st.sidebar: option = st.selectbox( "What model do you want to talk to?", ("Claude 3 Haiku", "Claude 3 Sonnet") ) st.write(f"You are talking to **{option}**") st.button('Reset Chat', on_click=reset_conversation) model = choose_model(option) st.title("Chat with Claude 3")
-
接下来,如果
st.session_state
中尚未存在聊天历史记录,我们将初始化一个空列表。st.session_state
是一个 Streamlit 对象,可以在应用程序重新运行时保持数据的持久性。然后,我们遍历st.session_state
中的messages
列表,并在聊天消息容器中显示每条消息。st.chat_message
函数根据指定的角色(例如user
或assistant
)创建聊天消息容器。st.markdown
函数在容器内显示消息内容:if "messages" not in st.session_state: st.session_state.messages = [] for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"])
-
接下来,我们处理用户输入并显示对话。
st.chat_input
函数创建一个输入框,用户可以在其中输入提示。如果用户输入了提示,将执行以下步骤:(1)将用户的提示以user
角色添加到st.session_state
中的messages
列表中;(2)将用户的提示以user
角色显示在聊天消息容器中;(3)调用model.stream(prompt)
函数,将用户的提示发送到 Bedrock 模型并实时返回响应。st.write_stream
函数实时显示流式响应;(4)助手的响应以assistant
角色添加到st.session_state
中的messages
列表中:if prompt := st.chat_input("Enter your prompt here"): st.session_state.messages.append( {"role": "user", "content": prompt} ) with st.chat_message("user"): st.markdown(prompt) with st.chat_message("assistant"): response = st.write_stream( model.stream(prompt) ) st.session_state.messages.append( {"role": "assistant", "content": response} )
-
最后,我们调用主函数来启动 Streamlit 应用程序:
if __name__ == "__main__": main()
如果你想在本地运行此应用程序,这里有一个
requirements.txt
文件:boto3==1.34.22 langchain-community==0.0.33 langchain==0.1.16 streamlit==1.34.0 pip install -r requirements.txt
如果你已经安装了所需的库,使用
aws configure
命令对 AWS CLI 进行身份验证,然后通过以下命令在本地启动应用程序:streamlit run main.py
这是在构建用于部署的容器镜像之前测试应用程序的绝妙方法。你可以随心所欲地测试和修改应用程序。
准备好后,现在,让我们构建一个用于部署的容器镜像。
-
以下是一个简单的Dockerfile,用于构建镜像:
Dockerfile
FROM python:3.9-slim WORKDIR /app RUN apt-get update && apt-get install -y \ build-essential \ curl \ software-properties-common \ git \ && rm -rf /var/lib/apt/lists/* COPY app /app/ EXPOSE 8501 HEALTHCHECK CMD curl --fail http://localhost:8501/_stcore/health RUN pip3 install -r requirements.txt ENTRYPOINT ["streamlit", "run", "main.py", "--server.port=8501", "--server.address=0.0.0.0"]
这个 Dockerfile 从 Python 3.9 slim 基础镜像开始,并将工作目录设置为
/app
。它接着安装应用程序所需的各种系统包,例如build-essential
、curl
、software-properties-common
和git
。应用程序代码被复制到/app
目录,容器暴露8501
端口。健康检查会检查 Streamlit 应用程序是否在localhost:8501/_stcore/health
正常运行。通过pip3
安装所需的 Python 包,依据requirements.txt
文件。最后,ENTRYPOINT
命令通过运行streamlit run main.py
启动 Streamlit 应用程序,并指定服务器端口和地址。 -
要在本地构建镜像,请输入以下命令:
docker build --platform linux/amd64 -t <YOUR_USERNAME>/chat-with-claude:v1 .
记得将
<YOUR_USERNAME>
替换成你实际的 Docker Hub 用户名。然后,使用以下命令推送镜像:docker push <YOUR_USERNAME>/chat-with-claude:v1
记住,这个镜像将在 Docker Hub 上公开可用。不要在代码中或作为环境变量放入任何认证凭据或敏感数据!
现在,让我们在 Kubernetes 上部署我们的应用程序。
部署 Streamlit 应用程序
正如我们之前所看到的,要在 Kubernetes 上部署我们的应用程序,我们需要一个 Deployment
和一个 Service
.yaml
定义。我们可以将两者合并到一个文件中:
-
首先,创建一个
deploy_chat_with_claude.yaml
文件,内容如下:deploy_chat_with_claude.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: chat-with-claude spec: replicas: 1 selector: matchLabels: app: chat-with-claude template: metadata: labels: app: chat-with-claude spec: containers: - name: chat-with-claude image: docker.io/neylsoncrepalde/chat-with-claude:v1 ports: - containerPort: 8501 env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: aws-credentials key: aws_access_key_id - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: aws-credentials key: aws_secret_access_key
代码的第一部分定义了一个名为
chat-with-claude
的Deployment
资源。它使用一个预先构建的镜像(你可以将其更改为自己的新镜像),并在容器中打开8501
端口,供外部访问。spec.template.spec.containers.env
块会将 AWS 凭证作为环境变量挂载到容器中,这些凭证来自名为aws-credentials
的密钥。 -
代码的第二部分定义了一个
LoadBalancer
服务,为Deployment
中定义的 pod 提供服务,该服务监听8501
端口并将流量转发到容器中的8501
端口。别忘了---
,它是分隔多个资源的必需项:--- apiVersion: v1 kind: Service metadata: name: chat-with-claude spec: type: LoadBalancer ports: - port: 8501 targetPort: 8501 selector: app: chat-with-claude
-
现在,我们将创建命名空间和密钥,并使用以下命令部署应用程序:
kubectl create namespace genai kubectl create secret generic aws-credentials --from-literal=aws_access_key_id=<YOUR_ACCESS_KEY_ID> --from-literal=aws_secret_access_key="<YOUR_SECRET_ACCESS_KEY>" -n genai kubectl apply -f deploy_chat_with_claude.yaml -n genai
-
就这些。等待几分钟,直到
LoadBalancer
启动并运行,然后使用以下命令检查其 URL:kubectl get svc -n genai
-
现在,粘贴带有
:8501
的 URL 以定义正确的端口,瞧瞧!(图 11.4)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_04.jpg
图 11.4 – Chat with Claude 3 应用程序 UI
现在,和助手玩一玩。试试 Haiku 和 Sonnet,并注意它们在速度和回答质量上的差异。试几次之后,你会注意到,向基础模型提问特定问题会导致幻觉。比如问模型:“你是谁?”你会得到一个惊喜(还会笑出声)。这个模型需要上下文。
在下一节中,我们将使用 RAG 提供一些上下文。
使用知识库构建 Amazon Bedrock 的 RAG
RAG 是一种用于生成 AI 模型的技术,通过在生成过程中为基础模型提供额外的上下文和知识。其工作原理是首先从知识库或文档语料库中检索相关信息,然后使用这些检索到的信息来增强输入到生成模型的内容。
RAG 是为生成 AI 模型提供上下文的良好选择,因为它允许模型访问并利用外部知识源,这可以显著提高生成输出的质量、准确性和相关性。如果没有 RAG,模型将仅限于在训练过程中学到的知识和模式,而这些可能并不总是充足或最新的,尤其是对于特定领域或快速发展的主题。
RAG 的一个关键优势是它使模型能够利用大型知识库或文档集合,这些内容在模型的训练数据中包含是不现实或不可能的。这使得模型能够生成更为知情和专业的输出,因为它可以从大量相关信息中提取内容。此外,RAG 有助于缓解如幻觉和偏见等问题,因为模型可以访问权威和事实性的来源。
然而,RAG 也存在一些局限性。生成输出的质量在很大程度上依赖于检索信息的相关性和准确性,而这些因素会受到知识库质量、检索机制效果以及模型是否能够正确整合检索信息的影响。此外,RAG 还可能带来计算开销和延迟,因为它需要在生成过程之前执行额外的检索步骤。
要使用 RAG 构建 AI 助手,我们将使用亚马逊 Bedrock 服务中的知识库功能,这是 Bedrock 中允许你无缝创建和管理知识库的功能。让我们开始吧。
对于我们的练习,我们将构建一个能够提供 AWS 能力计划信息的 AI 助手。该助手架构的视觉表示如 图 11.5 所示:
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_05.jpg
图 11.5 – 亚马逊 Bedrock 应用架构的知识库
AWS 能力计划是 AWS 提供的一个验证计划,旨在认可在特定解决方案领域中展示出技术熟练度和客户成功的合作伙伴。AWS 能力授予 AWS 合作伙伴网络 (APN) 成员,这些成员经过与特定 AWS 服务或工作负载相关的技术验证,确保他们具备提供一致且高质量解决方案的专业能力。这些能力涵盖了 DevOps、迁移、数据与分析、机器学习和安全等多个领域。每个能力都有自己的规则文档,理解起来可能相当具有挑战性。
-
首先,我们将收集一些有关该程序的上下文信息。在 GitHub 上的第十一章
/claude-kb/knowledge-base/
文件夹下,您将找到一段 Python 代码,该代码将收集关于对话式 AI、数据与分析、DevOps、教育、能源、金融服务、机器学习和安全程序的信息。将此代码保存到本地后,使用以下命令安装 Beautiful Soup 库:pip install "beautifulsoup4==4.12.2" python get_competency_data.py
几秒钟后,数据应保存到您本地的机器上。
-
接下来,创建一个 S3 存储桶并上传这些文件。这将成为我们 RAG 层的基础。
-
接下来,进入 AWS 控制台中的Bedrock页面。在侧边菜单中,点击知识库,然后点击创建知识库(图 11.6)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_06.jpg
图 11.6 – 知识库首页
-
在下一页面,选择一个名称为您的知识库,并在IAM 权限部分选择创建并使用新服务角色。然后,点击下一步。
-
接下来,您将配置数据源。为数据源名称选择一个您喜欢的名称。对于数据源位置,确保选中了此 AWS 账户选项框。然后,在S3 URI部分,点击浏览 S3,搜索包含 AWS Competency 数据集的 S3 存储桶(我们在步骤 2中创建的存储桶)。该配置的示例如图 11.7所示。选择 S3 存储桶后,点击下一步。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_07.jpg
图 11.7 – 选择知识库的数据源
接下来,我们将选择嵌入模型。此嵌入模型负责将文本或图像文件转换为称为嵌入的向量表示。这些嵌入捕捉输入数据的语义和上下文信息,从而支持高效的相似性比较和检索操作。默认情况下,Bedrock 的嵌入模型 Amazon Titan 应该是可用的。如果不可用,请在控制台中按照相同的过程申请访问权限。
- 在下一页面的嵌入模型部分,选择Titan Embeddings G1 - Text。在向量数据库部分,确保选中了快速创建新的向量存储选项。此快速创建选项基于 OpenSearch Serverless 创建一个向量数据库。将其他选项保持未选中状态,然后点击下一步。
注意
OpenSearch 是一个开源分布式搜索与分析引擎,基于 Apache Lucene,并来源于 Elasticsearch。它是 RAG 向量数据库的一个优秀选择,因为它提供了高效的全文搜索和最近邻搜索功能,适用于向量嵌入的检索。OpenSearch 支持稠密向量索引与检索,非常适合存储和查询大量的向量嵌入,这对于 RAG 模型的检索组件至关重要。
-
接下来,检查信息是否正确提供。如果一切看起来正常,点击创建知识库。请耐心等待,创建过程将需要几分钟才能完成。
-
知识库启动并运行后,返回 Bedrock 中的知识库页面,点击你刚创建的知识库。在下一个页面中,滚动直到找到数据源部分(如图 11.8所示)。选择数据源并点击同步,以开始嵌入文本内容。这也需要几分钟。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_08.jpg
图 11.8 – 同步知识库与其数据源
在“同步”准备好之后,我们已经具备了运行生成式 AI 助手与 RAG 所需的一切。现在,是时候调整代码,让 Claude 与知识库配合使用。
调整 RAG 检索代码
我们将从之前开发的代码开始,使用纯 Claude 模型。由于我们只需要进行一些小修改,因此不需要重新查看整个代码。我们将仔细看看必要的修改。RAG 应用程序的完整代码可以在github.com/PacktPublishing/Bigdata-on-Kubernetes/tree/main/Chapter11/claude-kb/app
文件夹中找到。如果你不想自定义代码,可以使用我为此示例提供的现成 docker 镜像。
-
首先,我们需要额外的导入:
import os from botocore.client import Config from langchain.prompts import PromptTemplate from langchain.retrievers.bedrock import AmazonKnowledgeBasesRetriever from langchain.chains import RetrievalQA
在这里,我们导入
os
库以获取环境变量。Config
类将帮助构建一个配置对象,以访问bedrock-agent
API。所有其他导入与访问知识库并将检索到的文档与 AI 响应合并有关。 -
接下来,我们将从环境变量中获取 Amazon Bedrock 服务的知识库 ID。这是一个非常有用的方法。如果将来需要更改知识库,就无需重新构建镜像,只需更改环境变量。然后,我们设置一些配置并为
bedrock-agent-runtime
API(用于知识库)创建一个客户端:kb_id = os.getenv("KB_ID") bedrock_config = Config(connect_timeout=120, read_timeout=120, retries={'max_attempts': 0}) bedrock_agent_client = boto3.client( "bedrock-agent-runtime", config=bedrock_config, region_name = "us-east-1" )
-
接下来,我们将配置一个提示模板,帮助我们将从知识库中检索的文档与用户问题串联起来。最后,我们将实例化一个对象,该对象将保存模板,并接收文档和用户问题作为输入:
PROMPT_TEMPLATE = """ Human: You are a friendly AI assistant and provide answers to questions about AWS competency program for partners. Use the following pieces of information to provide a concise answer to the question enclosed in <question> tags. Don't use tags when you generate an answer. Answer in plain text, use bullets or lists if needed. If you don't know the answer, just say that you don't know, don't try to make up an answer. <context> {context} </context> <question> {question} </question> The response should be specific and use statistics or numbers when possible. Assistant:""" claude_prompt = PromptTemplate(template=PROMPT_TEMPLATE, input_variables=["context","question"])
-
设置完
choose_model()
函数后,我们需要实例化一个retriever
类,用于从知识库中拉取文档:retriever = AmazonKnowledgeBasesRetriever( knowledge_base_id=kb_id, retrieval_config={ "vectorSearchConfiguration": { "numberOfResults": 4 } }, client=bedrock_agent_client )
-
现在,在
main
函数中,我们将添加RetrievalQA
。这个类用于构建能够从知识库中检索相关信息的问答系统:qa = RetrievalQA.from_chain_type( llm=model, chain_type="stuff", retriever=retriever, return_source_documents=False, chain_type_kwargs={"prompt": claude_prompt} )
-
最后,我们将修改响应以提供完整的答案:
with st.chat_message("assistant"): response = qa.invoke(prompt)['result'] st.write(response)
就这样。代码已经准备好,可以构建新的镜像。你可以通过创建一个新的 Dockerfile,使用之前的代码重新构建它。在运行
docker build
命令时,记得选择一个不同的镜像名称(或者至少选择一个不同的版本)。 -
接下来,我们将开始部署。
.yaml
文件与上一节中的文件非常相似(但记得更改部署、服务、容器和标签的所有名称为rag-with-claude
)。该代码的完整版可在 GitHub 仓库中找到。我们只需声明知识库 ID 的环境变量。由于这不是敏感凭证,因此我们不需要使用 Kubernetes 秘密来处理它。我们将使用ConfigMap
。.yaml
文件中的spec.template.spec.container.env
部分应该如下所示:env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: aws-credentials key: aws_access_key_id - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: aws-credentials key: aws_secret_access_key - name: KB_ID valueFrom: configMapKeyRef: name: kb-config key: kb_id
注意,我们添加了一个新的环境变量
KB_ID
,它将从ConfigMap
导入。 -
要部署新应用程序,我们运行以下命令:
kubectl create configmap kb-config --from-literal=kb_id=<YOUR_KB_ID> -n genai kubectl apply -f deploy_chat_with_claude.yaml -n genai
我们运行前面的命令来部署应用程序。等待几分钟,直到
LoadBalancer
启动,然后使用以下命令:kubectl get svc -n genai
使用前面的命令获取
LoadBalancer
的 URL。将名为rag-with-claude
的服务复制并粘贴到浏览器中,并添加:8501
以连接到暴露的端口。瞧! 你应该能看到新应用程序在运行,正如图 11.9 所示。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_09.jpg
图 11.9 – RAG 应用 UI
尝试稍微玩一下这个应用程序。你会发现,如果你提问与其范围无关的问题(如 AWS 能力计划以外的内容),助手会告诉你它无法回答。
现在,我们将进入本章的最后部分,学习如何让生成性 AI 模型通过代理执行操作。
使用代理构建行动模型
代理是生成性 AI 世界中最新的功能。它们是强大的工具,通过允许生成性 AI 模型代表我们执行任务,实现了任务的自动化。它们充当生成性 AI 模型与外部系统或服务之间的中介,促进了现实世界任务的执行。
在后台,代理“理解”用户的需求,并调用执行操作的后端函数。代理能够执行的范围由 OpenAPI 架构定义,它将用于“理解”它的功能以及如何正确调用后端函数。
总结来说,要构建一个代理,我们需要一个 OpenAPI 架构、一个后端函数和一个知识库。知识库是可选的,但它能大大提升用户与 AI 助手的交互体验。
在本节的练习中,我们将构建一个“了解” AWS 能力计划相关信息的代理。该代理的应用架构的可视化表示如图 11.10所示。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_10.jpg
图 11.10 – 代理应用架构
这个代理将创建一个简单的工作表,包含用例信息,将工作表保存到 Amazon S3,并在 DynamoDB 表中注册该信息以供查询。我们开始吧:
-
首先,我们需要一个 OpenAPI 模式,定义代理可用的方法。在这种情况下,我们将定义两个方法。第一个方法
generateCaseSheet
注册用例信息并创建工作表。第二个方法checkCase
接收用例 ID 并返回相关信息。由于这是一个较长的 JSON 文件,我们不会在此展示。完整的代码可以在github.com/PacktPublishing/Bigdata-on-Kubernetes/tree/main/Chapter11/agent
文件夹中找到。复制这段代码并将其保存在 S3 桶中。 -
然后,我们将定义一个 Lambda 函数,作为代理的后端。该函数的完整 Python 代码可以在书籍的 GitHub 仓库中找到,位于 第十一章
/agent/function
文件夹中。在你的机器上,创建一个名为function
的文件夹,并将此代码保存为lambda_function.py
。这段代码定义了一个 Lambda 函数,作为 Bedrock 代理的后端。该函数处理两个不同的 API 路径:/generateCaseSheet
和/checkCase
。让我们逐块分析这段代码。在导入必要的文件夹之后,我们定义了两个辅助函数,从事件对象中提取参数值(get_named_parameter
和get_named_property
)。generateCaseSheet
函数负责根据提供的信息创建一个新的用例表格。它从事件对象中提取所需的参数,生成一个唯一的 ID,使用CaseTemplate
类创建一个新的 Excel 工作簿,将提供的参数填充到模板中,保存工作簿到临时文件,上传到 S3 桶,并将用例信息存储到 DynamoDB 表中。最后,它返回一个包含用例详情的响应对象。checkCase
函数根据提供的caseSheetId
参数从 DynamoDB 表中检索用例表格信息,并返回一个包含用例详情的响应对象。lambda_handler
函数是 Lambda 函数的入口点,它根据事件对象中的apiPath
值决定执行的操作。该函数根据操作构建相应的响应对象并返回。 -
接下来,在
function
文件夹中,创建一个名为lambda_requirements.txt
的新文件,在其中列出 Lambda 函数代码的依赖项。在lambda_requirements.txt
文件中输入openpyxl==3.0.10
并保存。 -
现在,在部署该函数之前,我们需要创建一个 IAM 角色,授予 Lambda 所需的权限。在 AWS 控制台中,进入 IAM 页面,选择侧边菜单中的 角色,然后点击 创建新角色。
-
在下一页,选择AWS 服务作为受信实体类型,并选择Lambda作为使用案例(如图 11.11所示)。点击下一步。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_11.jpg
图 11.11 – 选择受信实体和 AWS 服务
-
现在,我们将选择权限策略。选择Administrator Access并点击下一步。记住,拥有这样的开放权限在生产环境中不是一个好做法。你应该仅为所需的操作和资源设置权限。
-
然后,为你的 IAM 角色选择一个名称(例如
BDOK-Lambda-service-role
),并点击创建角色。 -
然后,你将再次看到 IAM 角色页面。搜索你创建的角色并点击它(图 11.12)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_12.jpg
图 11.12 – 选择你创建的 IAM 角色
-
在角色页面,你将看到角色的Amazon 资源名称(ARN)。复制并保存它,稍后我们将需要这个名称来部署 Lambda 函数。
-
接下来,在你创建的
function
文件夹内,创建一个名为worksheet
的新文件夹。从github.com/PacktPublishing/Bigdata-on-Kubernetes/tree/main/Chapter11/agent/function/worksheet
复制两个文件,第一个命名为__init__.py
,第二个命名为template.py
,将这些代码文件放入worksheet
文件夹中。这段代码包含一个名为CaseTemplate
的类,它使用openpyxl
Python 库构建一个 Excel 工作表。 -
接下来,复制
github.com/PacktPublishing/Bigdata-on-Kubernetes/tree/main/Chapter11/agent/scripts
文件夹中的另外两个文件,分别命名为build_lambda_package.sh
和create_lambda_function.sh
。这些文件包含 bash 代码,将为 Lambda 函数安装依赖项并将其部署到 AWS。 -
现在,我们将部署 Lambda 函数。这是检查项目结构是否正确的好时机。文件夹和文件结构应该如下所示:
├── app │ └── main.py ├── function │ ├── lambda_function.py │ ├── lambda_requirements.txt │ ├── test_event.json │ └── worksheet │ ├── __init__.py │ └── template.py ├── openapi_schema.json └── scripts ├── build_lambda_package.sh scripts folder and run the following commands:
sh build_lambda_package.sh
sh create_lambda_function.sh “<YOUR_ROLE_ARN>”
记得将<YOUR_ROLE_ARN>
替换为你实际的 Lambda IAM 角色 ARN。现在,我们还有一些工作要做。接下来,我们将创建 DynamoDB 表格,用于存储关于使用案例的信息。
创建 DynamoDB 表
DynamoDB 是一个完全托管的 NoSQL 数据库服务。它是一个键值和文档数据库,可以在任何规模下提供单数毫秒级的性能。DynamoDB 已针对运行无服务器应用进行了优化,并且设计上能够根据需求自动向上或向下扩展,无需预配置或管理服务器。它特别适合需要在任何规模下对数据进行低延迟读写访问的应用程序。其极低的延迟使其成为 AI 助手应用程序的一个非常好的选择。让我们开始吧:
-
在 AWS 控制台中,导航到DynamoDB页面。在侧边菜单中,点击表格,然后点击创建表格。
-
在下一页中,填写
case-sheets
和caseSheetId
。记得选择数字,表示此条目为数字,如图 11.13所示。将所有其他配置保留为默认,然后点击创建表格。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_13.jpg
图 11.13 – 创建 DynamoDB 表格
几秒钟后,你应该能够看到已准备好的 DynamoDB 表格。现在,我们将配置 Bedrock 代理。
配置代理
现在,在本节的最后部分,我们将配置一个 Bedrock 代理,并将其链接到其后端 Lambda 函数和知识库数据库。让我们开始吧:
-
首先,在 AWS 控制台中,搜索
Bedrock
,然后在侧边菜单中点击代理。 -
在弹出框中,输入你的代理名称(
aws-competency-agent
),然后点击创建。 -
接下来,你将看到代理配置页面。滚动到选择模型,然后选择 Anthropic 模型Claude 3 Haiku(你也可以根据需要尝试其他可用的模型)。
-
在
你是一个友好的 AI 助手。你的主要目标是帮助 AWS 合作伙伴公司为 AWS 能力认证计划创建案例表格,注册这些案例,并向用户提供已注册案例的信息。当你生成案例表格时,总是要向用户展示案例表格的 ID(id)、客户的名称(client)和案例名称(casename),并确认案例已成功创建。同时,回答用户关于你能做什么以及如何帮助他们的问题。
这是代理配置中非常重要的一部分。可以根据这些指令进行多次操作。此屏幕的示例如图 11.14所示。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_14.jpg
图 11.14 – 配置代理的指令
-
之后,点击页面顶部的保存按钮,让 AWS 创建必要的权限策略。
-
接下来,滚动到操作组部分,点击添加。
-
在下一页中,为你的操作组选择一个名称。对于操作组类型,选择使用 API 模式定义。在操作组调用中,选择选择一个现有的 Lambda 函数,然后选择我们刚才创建的 Lambda 函数(图 11.15)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_15.jpg
图 11.15 – 为代理的操作组选择 Lambda 函数
- 现在,在操作组架构部分,选择选择现有 API 架构,然后点击浏览 S3,搜索我们保存在 S3 上的 OpenAPI 架构(图 11.16)。然后,点击创建。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_16.jpg
图 11.16 – 选择 OpenAPI 架构
-
接下来,在知识库部分,点击添加。
-
选择我们之前创建的知识库,并为代理输入一些使用说明。例如:
此知识库包含以下 AWS 能力程序的信息:对话式 AI、数据与分析、DevOps、教育、能源、金融服务、机器学习和安全
。确保知识库状态设置为启用(图 11.17)。点击保存 并退出。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_17.jpg
图 11.17 – 将知识库附加到代理
-
现在,您已返回到代理的编辑页面。这里不需要其他操作,所以您可以点击顶部的准备,使代理准备运行,然后点击保存 并退出。
-
现在,您将被带回代理的主页。向下滚动到别名部分并点击创建。
-
输入一个别名名称(例如
aws-competency
),然后点击创建别名。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_18.jpg
图 11.18 – 创建别名
-
现在,最后一步是为 Lambda 注册一个权限,以便这个代理能够触发函数执行。在代理的主页上,复制代理 ARN。
-
接下来,转到Lambda页面,点击我们为此练习创建的函数。在函数的主页上,向下滚动,点击配置,然后点击侧边菜单中的权限(图 11.19)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_19.jpg
图 11.19 – Lambda 权限
-
再次向下滚动到基于资源的策略声明部分,点击添加权限。
-
在下一页中,填写
lambda:InvokeFunction
(图 11.20)。然后,点击保存。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_20.jpg
图 11.20 – 配置 Bedrock 代理的 Lambda 权限
这就是我们让代理运行所需的所有配置。现在,是时候进行部署了。让我们把 Streamlit 应用程序部署到 Kubernetes 上。
在 Kubernetes 上部署应用程序
在 Kubernetes 上部署代理 Streamlit 应用程序遵循与之前部署的其他两个应用程序相同的路径。唯一不同的是我们必须创建一个新的configmap
,并包含代理的 ID 及其别名 ID:
-
进入 AWS 控制台中的代理页面,复制代理的 ID(位于顶部区域)和别名 ID(位于底部区域)。
-
现在,使用以下命令创建带有这些参数的
configmap
:kubectl create configmap agent-config --from-literal=agent_alias_id=<YOUR_ALIAS_ID> --from-literal=agent_id=<YOUR_AGENT_ID> -n genai
记得将
<YOUR_ALIAS_ID>
和<YOUR_AGENT_ID>
占位符替换为实际值。 -
如果你想自定义应用程序,可以构建一个自定义镜像。如果不想,可以使用来自 DockerHub 的现成镜像(
hub.docker.com/r/neylsoncrepalde/chat-with-claude-agent
)。 -
接下来,我们将为应用程序和服务部署定义一个
deploy_agent.yaml
文件。该文件的内容可以在github.com/PacktPublishing/Bigdata-on-Kubernetes/tree/main/Chapter11/agent
文件夹中找到。 -
将此文件复制到本地后,现在运行以下命令:
kubectl apply -f deploy_agent.yaml -n genai
-
等待几秒钟,直到
LoadBalancer
启动完成。然后,运行以下命令以获取LoadBalancer
的 URL:kubectl get svc -n genai
将其粘贴到浏览器中,并添加正确的端口(
:8501
),查看魔法的发生(图 11.21)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_21.jpg
图 11.21 – AWS 能力代理应用程序 UI
尝试插入一个用于创建新用例的提示,如图 11.18所示。同时,你也可以通过传递用例的 ID 来查看该用例的具体信息(图 11.22)。
https://github.com/OpenDocCN/freelearn-devops-pt6-zh/raw/master/docs/bgdt-k8s/img/B21927_11_22.jpg
图 11.22 – 使用代理检查用例信息
玩一玩。提出关于能力计划的问题,尝试注册不同的用例。同时,你也可以检查 AWS DynamoDB,查看我们创建的表中摄取的信息,并查看 S3 中代理创建的 Excel 文件。
就是这样!恭喜你!你刚刚部署了一个完整的生成式 AI 代理应用程序,它可以通过自然语言在 Kubernetes 上为你执行任务。
总结
在本章中,我们探索了生成式 AI 的激动人心的世界,并学习了如何在 Kubernetes 上利用其力量。我们首先了解了生成式 AI 的基本概念、其底层机制以及它与传统机器学习方法的区别。
然后,我们利用 Amazon Bedrock,一个全面的服务套件,来构建和部署生成式 AI 应用程序。我们学习了如何使用 Bedrock 的基础模型,如 Claude 3 Haiku 和 Claude 3 Sonnet,并将它们集成到 Streamlit 应用程序中,以提供交互式用户体验。
接下来,我们深入探讨了 RAG 的概念,它将生成式 AI 的强大功能与外部知识库相结合。我们使用 Amazon Bedrock 构建了一个 RAG 系统,使我们的应用程序能够访问和利用大量的结构化数据,从而提高生成输出的准确性和相关性。
最后,我们探讨了 Amazon Bedrock 的代理功能,这是一个强大的特性,允许生成式 AI 模型自动化任务并代替我们采取行动。我们学习了如何构建代理,通过 OpenAPI 架构定义其能力,并创建作为代理后台的 Lambda 函数。
在本章中,我们通过实践经验学习了如何在 Kubernetes 上构建和部署生成式 AI 应用。通过本章获得的技能和知识,在当今快速发展的技术环境中具有不可估量的价值。生成式 AI 正在改变各行各业,彻底革新我们与 AI 的互动方式及其应用。通过掌握本章中介绍的工具和技术,你将能够构建创新且智能的应用,生成类人内容,利用外部知识来源,并实现任务自动化。
在下一章,我们将讨论一些构建生产就绪的 Kubernetes 环境所需的重要要点,这些内容由于篇幅原因在本书中未能展开。
更多推荐
所有评论(0)