Chaos Mesh® 是由 TiDB 背后的 PingCAP 公司开发,运行在 Kubernetes 上的混沌工程(Chaos Engineering)系统。简而言之,Chaos Mesh® 通过运行在 K8s 集群中的“特权”容器,依据 CRD 资源中的测试场景,在集群中制造浑沌(模拟故障)[1]。
本文探索混沌工程在 Kubernetes 集群上的实践,基于源码分析了解 Chaos Mesh® 的工作原理,以代码示例阐述如何开发 Chaos Mesh® 的控制平面。如果你缺乏基础知识,要想对 Chaos Mesh® 的架构有宏观上的认识,请参阅文末尾注中的链接。
本文试验代码位于 mayocream/chaos-mesh-controlpanel-demo 仓库。
如何制造混沌 特权模式 杀死 Pod 网络故障 压力测试 IO 注入 控制平面 职能划分 实施混沌apiVersion:apps/v1kind:DaemonSetspec: template: metadata:... spec: containers: -name:chaos-daemon securityContext: {{-if.Values.chaosDaemon.privileged}} privileged:true capabilities: add: -SYS_PTRACE {{-else}} capabilities: add: -SYS_PTRACE -NET_ADMIN -MKNOD -SYS_CHROOT -SYS_ADMIN -KILL # CAP_IPC_LOCK is used to lock memory -IPC_LOCK {{-end}}
// #include
import ( "context" v1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client")type Impl struct { client.Client}func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) { ... err = impl.Get(ctx, namespacedName, &pod) if err != nil { // TODO: handle this error return v1alpha1.NotInjected, err } err = impl.Delete(ctx, &pod, &client.DeleteOptions{ GracePeriodSeconds: &podchaos.Spec.GracePeriod, // PeriodSeconds has to be set specifically }) ... return v1alpha1.Injected, nil}
func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) { ... pod := origin.DeepCopy() for index := range pod.Spec.Containers { originImage := pod.Spec.Containers[index].Image name := pod.Spec.Containers[index].Name key := annotation.GenKeyForImage(podchaos, name, false) if pod.Annotations == nil { pod.Annotations = make(map[string]string) } // If the annotation is already existed, we could skip the reconcile for this container if _, ok := pod.Annotations[key]; ok { continue } pod.Annotations[key] = originImage pod.Spec.Containers[index].Image = config.ControllerCfg.PodFailurePauseImage } for index := range pod.Spec.InitContainers { originImage := pod.Spec.InitContainers[index].Image name := pod.Spec.InitContainers[index].Name key := annotation.GenKeyForImage(podchaos, name, true) if pod.Annotations == nil { pod.Annotations = make(map[string]string) } // If the annotation is already existed, we could skip the reconcile for this container if _, ok := pod.Annotations[key]; ok { continue } pod.Annotations[key] = originImage pod.Spec.InitContainers[index].Image = config.ControllerCfg.PodFailurePauseImage } err = impl.Patch(ctx, pod, client.MergeFrom(&origin)) if err != nil { // TODO: handle this error return v1alpha1.NotInjected, err } return v1alpha1.Injected, nil}
func (b *ChaosDaemonClientBuilder) Build(ctx context.Context, pod *v1.Pod) (chaosdaemonclient.ChaosDaemonClientInterface, error) { ... daemonIP, err := b.FindDaemonIP(ctx, pod) if err != nil { return nil, err } builder := grpcUtils.Builder(daemonIP, config.ControllerCfg.ChaosDaemonPort).WithDefaultTimeout() if config.ControllerCfg.TLSConfig.ChaosMeshCACert != "" { builder.TLSFromFile(config.ControllerCfg.TLSConfig.ChaosMeshCACert, config.ControllerCfg.TLSConfig.ChaosDaemonClientCert, config.ControllerCfg.TLSConfig.ChaosDaemonClientKey) } else { builder.Insecure() } cc, err := builder.Build() if err != nil { return nil, err } return chaosdaemonclient.New(cc), nil}
func newGRPCServer(containerRuntime string, reg prometheus.Registerer, tlsConf tlsConfig) (*grpc.Server, error) { ... if tlsConf != (tlsConfig{}) { caCert, err := ioutil.ReadFile(tlsConf.CaCert) if err != nil { return nil, err } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) serverCert, err := tls.LoadX509KeyPair(tlsConf.Cert, tlsConf.Key) if err != nil { return nil, err } creds := credentials.NewTLS(&tls.Config{ Certificates: []tls.Certificate{serverCert}, ClientCAs: caCertPool, ClientAuth: tls.RequireAndVerifyClientCert, }) grpcOpts = append(grpcOpts, grpc.Creds(creds)) } s := grpc.NewServer(grpcOpts...) grpcMetrics.InitializeMetrics(s) pb.RegisterChaosDaemonServer(s, ds) reflection.Register(s) return s, nil}
// ChaosDaemonClient is the client API for ChaosDaemon service.//// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.type ChaosDaemonClient interface { SetTcs(ctx context.Context, in *TcsRequest, opts ...grpc.CallOption) (*empty.Empty, error) FlushIPSets(ctx context.Context, in *IPSetsRequest, opts ...grpc.CallOption) (*empty.Empty, error) SetIptablesChains(ctx context.Context, in *IptablesChainsRequest, opts ...grpc.CallOption) (*empty.Empty, error) SetTimeOffset(ctx context.Context, in *TimeRequest, opts ...grpc.CallOption) (*empty.Empty, error) RecoverTimeOffset(ctx context.Context, in *TimeRequest, opts ...grpc.CallOption) (*empty.Empty, error) ContainerKill(ctx context.Context, in *ContainerRequest, opts ...grpc.CallOption) (*empty.Empty, error) ContainerGetPid(ctx context.Context, in *ContainerRequest, opts ...grpc.CallOption) (*ContainerResponse, error) ExecStressors(ctx context.Context, in *ExecStressRequest, opts ...grpc.CallOption) (*ExecStressResponse, error) CancelStressors(ctx context.Context, in *CancelStressRequest, opts ...grpc.CallOption) (*empty.Empty, error) ApplyIOChaos(ctx context.Context, in *ApplyIOChaosRequest, opts ...grpc.CallOption) (*ApplyIOChaosResponse, error) ApplyHttpChaos(ctx context.Context, in *ApplyHttpChaosRequest, opts ...grpc.CallOption) (*ApplyHttpChaosResponse, error) SetDNSServer(ctx context.Context, in *SetDNSServerRequest, opts ...grpc.CallOption) (*empty.Empty, error)}
func (r *Reconciler) applyPod(ctx context.Context, pod *v1.Pod, networkchaos *v1alpha1.NetworkChaos) error { ... pbClient := pb.NewChaosDaemonClient(c) containerId := pod.Status.ContainerStatuses[0].ContainerID netem, err := spec.ToNetem() if err != nil { return err } _, err = pbClient.SetNetem(ctx, &pb.NetemRequest{ ContainerId: containerId, Netem: netem, }) return err}
func (s *Server) SetNetem(ctx context.Context, in *pb.NetemRequest) (*empty.Empty, error) { log.Info("Set netem", "Request", in) pid, err := s.crClient.GetPidFromContainerID(ctx, in.ContainerId) if err != nil { return nil, status.Errorf(codes.Internal, "get pid from containerID error: %v", err) } if err := Apply(in.Netem, pid); err != nil { return nil, status.Errorf(codes.Internal, "netem apply error: %v", err) } return &empty.Empty{}, nil}// Apply applies a netem on eth0 in pid related namespacefunc Apply(netem *pb.Netem, pid uint32) error { log.Info("Apply netem on PID", "pid", pid) ns, err := netns.GetFromPath(GenNetnsPath(pid)) if err != nil { log.Error(err, "failed to find network namespace", "pid", pid) return errors.Trace(err) } defer ns.Close() handle, err := netlink.NewHandleAt(ns) if err != nil { log.Error(err, "failed to get handle at network namespace", "network namespace", ns) return err } link, err := handle.LinkByName("eth0") // TODO: check whether interface name is eth0 if err != nil { log.Error(err, "failed to find eth0 interface") return errors.Trace(err) } netemQdisc := netlink.NewNetem(netlink.QdiscAttrs{ LinkIndex: link.Attrs().Index, Handle: netlink.MakeHandle(1, 0), Parent: netlink.HANDLE_ROOT, }, ToNetlinkNetemAttrs(netem)) if err = handle.QdiscAdd(netemQdisc); err != nil { if !strings.Contains(err.Error(), "file exists") { log.Error(err, "failed to add Qdisc") return errors.Trace(err) } } return nil}
RUN apt-get update && \ apt-get install -y tzdata iptables ipset stress-ng iproute2 fuse util-linux procps curl && \ rm -rf /var/lib/apt/lists/*
// Normalize the stressors to comply with stress-ngfunc (in *Stressors) Normalize() (string, error) { stressors := "" if in.MemoryStressor != nil && in.MemoryStressor.Workers != 0 { stressors += fmt.Sprintf(" --vm %d --vm-keep", in.MemoryStressor.Workers) if len(in.MemoryStressor.Size) != 0 { if in.MemoryStressor.Size[len(in.MemoryStressor.Size)-1] != '%' { size, err := units.FromHumanSize(string(in.MemoryStressor.Size)) if err != nil { return "", err } stressors += fmt.Sprintf(" --vm-bytes %d", size) } else { stressors += fmt.Sprintf(" --vm-bytes %s", in.MemoryStressor.Size) } } if in.MemoryStressor.Options != nil { for _, v := range in.MemoryStressor.Options { stressors += fmt.Sprintf(" %v ", v) } } } if in.CPUStressor != nil && in.CPUStressor.Workers != 0 { stressors += fmt.Sprintf(" --cpu %d", in.CPUStressor.Workers) if in.CPUStressor.Load != nil { stressors += fmt.Sprintf(" --cpu-load %d", *in.CPUStressor.Load) } if in.CPUStressor.Options != nil { for _, v := range in.CPUStressor.Options { stressors += fmt.Sprintf(" %v ", v) } } } return stressors, nil}
// Apply implements the reconciler.InnerReconciler.Applyfunc (r *Reconciler) Apply(ctx context.Context, req ctrl.Request, chaos v1alpha1.InnerObject) error { iochaos, ok := chaos.(*v1alpha1.IoChaos) if !ok { err := errors.New("chaos is not IoChaos") r.Log.Error(err, "chaos is not IoChaos", "chaos", chaos) return err } source := iochaos.Namespace + "/" + iochaos.Name m := podiochaosmanager.New(source, r.Log, r.Client) pods, err := utils.SelectAndFilterPods(ctx, r.Client, r.Reader, &iochaos.Spec) if err != nil { r.Log.Error(err, "failed to select and filter pods") return err } r.Log.Info("applying iochaos", "iochaos", iochaos) for _, pod := range pods { t := m.WithInit(types.NamespacedName{ Name: pod.Name, Namespace: pod.Namespace, }) // TODO: support chaos on multiple volume t.SetVolumePath(iochaos.Spec.VolumePath) t.Append(v1alpha1.IoChaosAction{ Type: iochaos.Spec.Action, Filter: v1alpha1.Filter{ Path: iochaos.Spec.Path, Percent: iochaos.Spec.Percent, Methods: iochaos.Spec.Methods, }, Faults: []v1alpha1.IoFault{ { Errno: iochaos.Spec.Errno, Weight: 1, }, }, Latency: iochaos.Spec.Delay, AttrOverrideSpec: iochaos.Spec.Attr, Source: m.Source, }) key, err := cache.MetaNamespaceKeyFunc(&pod) if err != nil { return err } iochaos.Finalizers = utils.InsertFinalizer(iochaos.Finalizers, key) } r.Log.Info("commiting updates of podiochaos") err = m.Commit(ctx) if err != nil { r.Log.Error(err, "fail to commit") return err } r.Event(iochaos, v1.EventTypeNormal, utils.EventChaosInjected, "") return nil}
// Apply flushes io configuration on podfunc (h *Handler) Apply(ctx context.Context, chaos *v1alpha1.PodIoChaos) error { h.Log.Info("updating io chaos", "pod", chaos.Namespace+"/"+chaos.Name, "spec", chaos.Spec) ... res, err := pbClient.ApplyIoChaos(ctx, &pb.ApplyIoChaosRequest{ Actions: input, Volume: chaos.Spec.VolumeMountPath, ContainerId: containerID, Instance: chaos.Spec.Pid, StartTime: chaos.Spec.StartTime, }) if err != nil { return err } chaos.Spec.Pid = res.Instance chaos.Spec.StartTime = res.StartTime chaos.OwnerReferences = []metav1.OwnerReference{ { APIVersion: pod.APIVersion, Kind: pod.Kind, Name: pod.Name, UID: pod.UID, }, } return nil}
func (s *DaemonServer) ApplyIOChaos(ctx context.Context, in *pb.ApplyIOChaosRequest) (*pb.ApplyIOChaosResponse, error) { ... pid, err := s.crClient.GetPidFromContainerID(ctx, in.ContainerId) if err != nil { log.Error(err, "error while getting PID") return nil, err } args := fmt.Sprintf("--path %s --verbose info", in.Volume) log.Info("executing", "cmd", todaBin+" "+args) processBuilder := bpm.DefaultProcessBuilder(todaBin, strings.Split(args, " ")...). EnableLocalMnt(). SetIdentifier(in.ContainerId) if in.EnterNS { processBuilder = processBuilder.SetNS(pid, bpm.MountNS).SetNS(pid, bpm.PidNS) } ... // JSON RPC 调用 client, err := jrpc.DialIO(ctx, receiver, caller) if err != nil { return nil, err } cmd := processBuilder.Build() procState, err := s.backgroundProcessManager.StartProcess(cmd) if err != nil { return nil, err } ...}
// GetNsPath returns corresponding namespace pathfunc GetNsPath(pid uint32, typ NsType) string { return fmt.Sprintf("%s/%d/ns/%s", DefaultProcPrefix, pid, string(typ))}// SetNS sets the namespace of the processfunc (b *ProcessBuilder) SetNS(pid uint32, typ NsType) *ProcessBuilder { return b.SetNSOpt([]nsOption{{ Typ: typ, Path: GetNsPath(pid, typ), }})}// Build builds the processfunc (b *ProcessBuilder) Build() *ManagedProcess { args := b.args cmd := b.cmd if len(b.nsOptions) > 0 { args = append([]string{"--", cmd}, args...) for _, option := range b.nsOptions { args = append([]string{"-" + nsArgMap[option.Typ], option.Path}, args...) } if b.localMnt { args = append([]string{"-l"}, args...) } cmd = nsexecPath } ...}
import ( "context" "github.com/pingcap/chaos-mesh/api/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/client")func main() { ... delay := &chaosv1alpha1.NetworkChaos{ Spec: chaosv1alpha1.NetworkChaosSpec{...}, } k8sClient := client.New(conf, client.Options{ Scheme: scheme.Scheme }) k8sClient.Create(context.TODO(), delay) k8sClient.Delete(context.TODO(), delay)}
package mainimport ( "context" "controlpanel" "log" "github.com/chaos-mesh/chaos-mesh/api/v1alpha1" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1")func applyPodKill(name, namespace string, labels map[string]string) error { cli, err := controlpanel.NewClient() if err != nil { return errors.Wrap(err, "create client") } cr := &v1alpha1.PodChaos{ ObjectMeta: metav1.ObjectMeta{ GenerateName: name, Namespace: namespace, }, Spec: v1alpha1.PodChaosSpec{ Action: v1alpha1.PodKillAction, ContainerSelector: v1alpha1.ContainerSelector{ PodSelector: v1alpha1.PodSelector{ Mode: v1alpha1.OnePodMode, Selector: v1alpha1.PodSelectorSpec{ Namespaces: []string{namespace}, LabelSelectors: labels, }, }, }, }, } if err := cli.Create(context.Background(), cr); err != nil { return errors.Wrap(err, "create podkill") } return nil}
I1021 00:51:55.225502 23781 request.go:665] Waited for 1.033116256s due to client-side throttling, not priority and fairness, request: GET:https://***2021/10/21 00:51:56 apply podkill
$ k describe podchaos.chaos-mesh.org -n dev podkillvjn77Name: podkillvjn77Namespace: devLabels:
package controlpanelimport ( "context" "github.com/chaos-mesh/chaos-mesh/api/v1alpha1" "github.com/pkg/errors" "sigs.k8s.io/controller-runtime/pkg/client")func GetPodChaos(name, namespace string) (*v1alpha1.PodChaos, error) { cli := mgr.GetClient()
item := new(v1alpha1.PodChaos)
if err := cli.Get(context.Background(), client.ObjectKey{Name: name, Namespace: namespace}, item); err != nil {
return nil, errors.Wrap(err, "get cr")
}
return item, nil
}
func ListPodChaos(namespace string, labels map[string]string) ([]v1alpha1.PodChaos, error) {
cli := mgr.GetClient()
list := new(v1alpha1.PodChaosList)
if err := cli.List(context.Background(), list, client.InNamespace(namespace), client.MatchingLabels(labels)); err != nil {
return nil, err
}
return list.Items, nil
}
获取 Pod
初次获取全量数据(List)
Watch 数据变化时更新缓存
混沌编排 平台功能 混沌注入 Pod 崩溃 网络故障 负载测试 IO 故障 事件跟踪 关联告警 时序遥测 controller-runtime源码分析 つくって学ぶKubebuilder(日文教程) Kubebuilder Book / 中文版 kube-controller-manager源码分析(三)之 Informer机制 kubebuilder2.0学习笔记——进阶使用 client-go和golang源码中的技巧 Chaos Mesh - 让应用跟混沌在 Kubernetes 上共舞 自制文件系统 —— 02 开发者的福音,FUSE 文件系统 系统压力测试工具-stress-ng Dkron 源码分析 RunC 源码通读指南之 NameSpace
2023-07-18 PingCAP发布了 《时刻领先丨PingCAP 用户峰会 2023 圆满收官》的文章
2023-02-13 PingCAP发布了 《促进关键软件高层次人才培养:平凯星辰与华东师范大学签订联合博士培养合作协议》的文章
2023-01-10 PingCAP发布了 《同盾科技 x TiDB丨实时数据架构为风控智能决策保驾护航》的文章
2022-12-09 PingCAP发布了 《PingCAP 成为中国唯一入选 Forrester Wave 数据库厂商,被评为卓越表现者》的文章
2022-12-09 PingCAP发布了 《案例故事丨老虎国际 x TiDB ,降低架构复杂性,保障全球用户安全可靠投资》的文章