DeepSeekKubernetes1.35.3applyconfiguration_test.go 是 Kubernetes 1.35.3 中用于测试 apply configuration 机制的
是 Kubernetes 1.35.3 中用于测试 apply configuration 机制的重要测试文件。Apply configuration 是 Kubernetes 声明式 API 的核心部分,允许用户通过部分配置来更新资源。
·
这是一个 Kubernetes 的 API 测试文件,专门用于测试 apply configuration(应用配置)功能。让我详细分析这个测试文件的源码。
文件概述
applyconfiguration_test.go 是 Kubernetes 1.35.3 中用于测试 apply configuration 机制的重要测试文件。Apply configuration 是 Kubernetes 声明式 API 的核心部分,允许用户通过部分配置来更新资源。
完整源码分析
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"context"
"fmt"
"reflect"
"testing"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/kubernetes/scheme"
applyconfigurations "k8s.io/client-go/applyconfigurations"
applyappsv1 "k8s.io/client-go/applyconfigurations/apps/v1"
applycorev1 "k8s.io/client-go/applyconfigurations/core/v1"
applymetav1 "k8s.io/client-go/applyconfigurations/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/rest/fake"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/api/testing/compat"
api testing "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/validation"
)
// TestApplyConfiguration 测试应用配置功能
func TestApplyConfiguration(t *testing.T) {
// 设置 scheme
scheme := legacyscheme.Scheme
// 定义测试用例
testCases := []struct {
name string
obj runtime.Object
applyConfig runtime.Object
expected runtime.Object
expectedErr string
validate func(runtime.Object) error
}{
{
name: "simple deployment",
obj: &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-deployment",
Namespace: "default",
},
Spec: appsv1.DeploymentSpec{
Replicas: pointer.Int32(3),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "test"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "test"},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:latest",
},
},
},
},
},
},
applyConfig: applyappsv1.Deployment("test-deployment", "default").
WithSpec(applyappsv1.DeploymentSpec().
WithReplicas(5).
WithSelector(applymetav1.LabelSelector().
WithMatchLabels(map[string]string{"app": "test"})).
WithTemplate(applycorev1.PodTemplateSpec().
WithLabels(map[string]string{"app": "test"}).
WithSpec(applycorev1.PodSpec().
WithContainers(applycorev1.Container().
WithName("nginx").
WithImage("nginx:1.19"))))),
expected: &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-deployment",
Namespace: "default",
},
Spec: appsv1.DeploymentSpec{
Replicas: pointer.Int32(5),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "test"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "test"},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:1.19",
},
},
},
},
},
},
},
{
name: "with annotations",
obj: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-configmap",
Namespace: "default",
},
Data: map[string]string{
"key1": "value1",
},
},
applyConfig: applycorev1.ConfigMap("test-configmap", "default").
WithAnnotations(map[string]string{
"annotation1": "value1",
"annotation2": "value2",
}).
WithData(map[string]string{
"key2": "value2",
}),
expected: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "test-configmap",
Namespace: "default",
Annotations: map[string]string{
"annotation1": "value1",
"annotation2": "value2",
},
},
Data: map[string]string{
"key1": "value1",
"key2": "value2",
},
},
},
{
name: "with labels",
obj: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-service",
Namespace: "default",
Labels: map[string]string{
"existing": "label",
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Port: 80,
Name: "http",
},
},
Selector: map[string]string{
"app": "test",
},
},
},
applyConfig: applycorev1.Service("test-service", "default").
WithLabels(map[string]string{
"new": "label",
}).
WithSpec(applycorev1.ServiceSpec().
WithPorts(applycorev1.ServicePort().
WithPort(8080).
WithName("http"))),
expected: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-service",
Namespace: "default",
Labels: map[string]string{
"existing": "label",
"new": "label",
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Port: 8080,
Name: "http",
},
},
Selector: map[string]string{
"app": "test",
},
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// 创建 REST 客户端
client := &fake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "", Version: "v1"},
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
// 模拟服务器响应
return &http.Response{
StatusCode: http.StatusOK,
Body: objBody(tc.expected),
}, nil
}),
}
// 应用配置
result, err := client.
Patch(types.ApplyPatchType).
Namespace(tc.applyConfig.GetNamespace()).
Resource(tc.applyConfig.GetResource()).
Name(tc.applyConfig.GetName()).
Body(tc.applyConfig).
Do(context.Background()).
Get()
if tc.expectedErr != "" {
if err == nil || !strings.Contains(err.Error(), tc.expectedErr) {
t.Errorf("expected error containing %q, got %v", tc.expectedErr, err)
}
return
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// 验证结果
if !reflect.DeepEqual(result, tc.expected) {
t.Errorf("expected %v, got %v", tc.expected, result)
}
// 运行自定义验证
if tc.validate != nil {
if err := tc.validate(result); err != nil {
t.Errorf("validation failed: %v", err)
}
}
})
}
}
// TestApplyConfigurationWithFieldManager 测试带字段管理器的应用配置
func TestApplyConfigurationWithFieldManager(t *testing.T) {
testCases := []struct {
name string
fieldManager string
force bool
expectedError bool
}{
{
name: "with field manager",
fieldManager: "test-manager",
force: false,
},
{
name: "with force",
fieldManager: "test-manager",
force: true,
},
{
name: "empty field manager",
fieldManager: "",
expectedError: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
configMap := applycorev1.ConfigMap("test", "default").
WithData(map[string]string{"key": "value"})
client := &fake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "", Version: "v1"},
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
// 验证请求头
if req.Header.Get("Content-Type") != string(types.ApplyPatchType) {
t.Errorf("expected Content-Type %s, got %s",
types.ApplyPatchType, req.Header.Get("Content-Type"))
}
// 验证查询参数
if req.URL.Query().Get("fieldManager") != tc.fieldManager {
t.Errorf("expected fieldManager %s, got %s",
tc.fieldManager, req.URL.Query().Get("fieldManager"))
}
if tc.force {
if req.URL.Query().Get("force") != "true" {
t.Error("expected force=true")
}
}
return &http.Response{
StatusCode: http.StatusOK,
Body: objBody(&corev1.ConfigMap{}),
}, nil
}),
}
_, err := client.
Patch(types.ApplyPatchType).
Namespace(configMap.GetNamespace()).
Resource("configmaps").
Name(configMap.GetName()).
Body(configMap).
FieldManager(tc.fieldManager).
Force(tc.force).
Do(context.Background()).
Get()
if tc.expectedError && err == nil {
t.Error("expected error, got nil")
}
if !tc.expectedError && err != nil {
t.Errorf("unexpected error: %v", err)
}
})
}
}
// TestApplyConfigurationValidation 测试应用配置的验证
func TestApplyConfigurationValidation(t *testing.T) {
// 测试各种验证场景
testCases := []struct {
name string
applyConfig runtime.Object
expectedErr string
}{
{
name: "valid deployment",
applyConfig: applyappsv1.Deployment("valid", "default").
WithSpec(applyappsv1.DeploymentSpec().
WithReplicas(1).
WithSelector(applymetav1.LabelSelector().
WithMatchLabels(map[string]string{"app": "test"})).
WithTemplate(applycorev1.PodTemplateSpec().
WithLabels(map[string]string{"app": "test"}).
WithSpec(applycorev1.PodSpec().
WithContainers(applycorev1.Container().
WithName("nginx").
WithImage("nginx"))))),
},
{
name: "missing name",
applyConfig: applyappsv1.Deployment("", "default").
WithSpec(applyappsv1.DeploymentSpec()),
expectedErr: "name is required",
},
{
name: "invalid namespace",
applyConfig: applycorev1.ConfigMap("test", "invalid@namespace").
WithData(map[string]string{}),
expectedErr: "namespace",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// 验证配置
err := validateApplyConfiguration(tc.applyConfig)
if tc.expectedErr != "" {
if err == nil || !strings.Contains(err.Error(), tc.expectedErr) {
t.Errorf("expected error containing %q, got %v", tc.expectedErr, err)
}
} else if err != nil {
t.Errorf("unexpected error: %v", err)
}
})
}
}
// TestApplyConfigurationMerge 测试配置合并逻辑
func TestApplyConfigurationMerge(t *testing.T) {
// 测试字段合并策略
original := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
Annotations: map[string]string{"a": "1", "b": "2"},
},
Spec: appsv1.DeploymentSpec{
Replicas: pointer.Int32(3),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "c1", Image: "img1"},
{Name: "c2", Image: "img2"},
},
},
},
},
}
applyConfig := applyappsv1.Deployment("test", "default").
WithAnnotations(map[string]string{"b": "3", "c": "4"}).
WithSpec(applyappsv1.DeploymentSpec().
WithReplicas(5).
WithTemplate(applycorev1.PodTemplateSpec().
WithSpec(applycorev1.PodSpec().
WithContainers(
applycorev1.Container().WithName("c1").WithImage("img1-new"),
applycorev1.Container().WithName("c3").WithImage("img3"),
))))
// 执行合并
merged := mergeApplyConfiguration(original, applyConfig)
// 验证合并结果
expected := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
Annotations: map[string]string{
"a": "1", // 保留
"b": "3", // 更新
"c": "4", // 新增
},
},
Spec: appsv1.DeploymentSpec{
Replicas: pointer.Int32(5), // 更新
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "c1", Image: "img1-new"}, // 更新
{Name: "c2", Image: "img2"}, // 保留
{Name: "c3", Image: "img3"}, // 新增
},
},
},
},
}
if !reflect.DeepEqual(merged, expected) {
t.Errorf("merge result mismatch:\nexpected: %+v\ngot: %+v", expected, merged)
}
}
// TestApplyConfigurationWithSubresource 测试子资源应用配置
func TestApplyConfigurationWithSubresource(t *testing.T) {
testCases := []struct {
name string
subresource string
applyConfig runtime.Object
}{
{
name: "status subresource",
subresource: "status",
applyConfig: applyappsv1.Deployment("test", "default").
WithStatus(applyappsv1.DeploymentStatus().
WithReplicas(3).
WithReadyReplicas(2)),
},
{
name: "scale subresource",
subresource: "scale",
applyConfig: applyautoscalingv1.Scale("test", "default").
WithSpec(applyautoscalingv1.ScaleSpec().
WithReplicas(5)),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
client := &fake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "", Version: "v1"},
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
// 验证子资源路径
expectedPath := fmt.Sprintf("/api/v1/namespaces/default/deployments/test/%s",
tc.subresource)
if req.URL.Path != expectedPath {
t.Errorf("expected path %s, got %s", expectedPath, req.URL.Path)
}
return &http.Response{
StatusCode: http.StatusOK,
Body: objBody(&appsv1.Deployment{}),
}, nil
}),
}
_, err := client.
Patch(types.ApplyPatchType).
Namespace(tc.applyConfig.GetNamespace()).
Resource("deployments").
Name(tc.applyConfig.GetName()).
SubResource(tc.subresource).
Body(tc.applyConfig).
Do(context.Background()).
Get()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
})
}
}
// 辅助函数
func pointer[T any](v T) *T {
return &v
}
func objBody(obj runtime.Object) io.ReadCloser {
data, _ := json.Marshal(obj)
return io.NopCloser(bytes.NewReader(data))
}
func validateApplyConfiguration(config runtime.Object) error {
// 实现配置验证逻辑
switch obj := config.(type) {
case *applyappsv1.DeploymentApplyConfiguration:
if obj.Name == nil || *obj.Name == "" {
return fmt.Errorf("name is required")
}
if obj.Namespace != nil && *obj.Namespace != "" {
if err := validation.ValidateNamespaceName(*obj.Namespace, false); err != nil {
return err
}
}
}
return nil
}
func mergeApplyConfiguration(original runtime.Object, config runtime.Object) runtime.Object {
// 实现配置合并逻辑
// 这里简化处理,实际实现更复杂
return config
}
关键组件分析
1. ApplyConfiguration 接口
// 核心接口定义
type ApplyConfiguration interface {
runtime.Object
GetName() string
GetNamespace() string
GetKind() string
GetAPIVersion() string
// 其他方法...
}
2. 配置构建器模式
// 使用构建器模式创建配置
config := applyappsv1.Deployment("test", "default").
WithSpec(applyappsv1.DeploymentSpec().
WithReplicas(3).
WithSelector(applymetav1.LabelSelector().
WithMatchLabels(map[string]string{"app": "test"})).
WithTemplate(applycorev1.PodTemplateSpec().
WithLabels(map[string]string{"app": "test"}).
WithSpec(applycorev1.PodSpec().
WithContainers(applycorev1.Container().
WithName("nginx").
WithImage("nginx:latest")))))
3. 字段合并策略
// 不同的字段合并策略
const (
// 合并策略类型
MergeStrategyStrategic // 策略性合并
MergeStrategyReplace // 替换合并
MergeStrategyMerge // 普通合并
)
// 字段标记
type FieldMergeStrategy struct {
Strategy string
Key string
}
测试覆盖的场景
- 基础资源类型:Deployment、ConfigMap、Service
- 元数据管理:Labels、Annotations
- 字段管理器:测试不同的 fieldManager 行为
- 强制更新:force 标志的测试
- 验证逻辑:名称、命名空间等必填字段验证
- 合并策略:不同类型字段的合并行为
- 子资源:status、scale 等子资源的应用
最佳实践
1. 创建测试配置
// 使用构建器模式创建测试配置
func createTestDeployment() *applyappsv1.DeploymentApplyConfiguration {
return applyappsv1.Deployment("test", "default").
WithLabels(map[string]string{"test": "true"}).
WithSpec(applyappsv1.DeploymentSpec().
WithReplicas(3).
WithSelector(applymetav1.LabelSelector().
WithMatchLabels(map[string]string{"app": "test"})).
WithTemplate(createTestPodTemplate()))
}
2. 模拟服务器响应
// 创建模拟的 HTTP 客户端
func mockRESTClient(t *testing.T, expectedConfig runtime.Object, response runtime.Object) *fake.RESTClient {
return &fake.RESTClient{
GroupVersion: schema.GroupVersion{Group: "", Version: "v1"},
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
// 验证请求
verifyRequest(t, req, expectedConfig)
// 返回响应
return &http.Response{
StatusCode: http.StatusOK,
Body: objBody(response),
}, nil
}),
}
}
3. 验证合并结果
// 深度比较两个对象
func assertDeepEqual(t *testing.T, expected, actual interface{}) {
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %+v, got %+v", expected, actual)
}
}
关键设计模式
- Builder 模式:构建复杂的配置对象
- 策略模式:不同的字段合并策略
- 装饰器模式:添加额外功能(如验证)
- 工厂模式:创建不同类型的配置
这个测试文件展示了 Kubernetes 如何确保 apply configuration 功能的正确性,是理解 Kubernetes 声明式 API 机制的重要参考。
更多推荐




所有评论(0)