Getting Started
We will create a sample project to let you know how it works. This sample will:
- Reconcile a Memcached CR - which represents an instance of a Memcached deployed/managed on cluster
- Create a Deployment with the Memcached image
- Not allow more instances than the size defined in the CR which will be applied
- Update the Memcached CR status
Create a project
First, create and navigate into a directory for your project. Then, initialize it using kubebuilder
:
mkdir $GOPATH/memcached-operator
cd $GOPATH/memcached-operator
kubebuilder init --domain=example.com
Create the Memcached API (CRD):
Next, we’ll create the API which will be responsible for deploying and managing Memcached(s) instances on the cluster.
kubebuilder create api --group cache --version v1alpha1 --kind Memcached
Understanding APIs
This command’s primary aim is to produce the Custom Resource (CR) and Custom Resource Definition (CRD) for the Memcached Kind.
It creates the API with the group cache.example.com
and version v1alpha1
, uniquely identifying the new CRD of the Memcached Kind.
By leveraging the Kubebuilder tool, we can define our APIs and objects representing our solutions for these platforms.
While we’ve added only one Kind of resource in this example, we can have as many Groups
and Kinds
as necessary.
To make it easier to understand, think of CRDs as the definition of our custom Objects, while CRs are instances of them.
Defining our API
Defining the Specs
Now, we will define the values that each instance of your Memcached resource on the cluster can assume. In this example, we will allow configuring the number of instances with the following:
type MemcachedSpec struct {
...
Size int32 `json:"size,omitempty"`
}
Creating Status definitions
We also want to track the status of our Operations which will be done to manage the Memcached CR(s). This allows us to verify the Custom Resource’s description of our own API and determine if everything occurred successfully or if any errors were encountered, similar to how we do with any resource from the Kubernetes API.
// MemcachedStatus defines the observed state of Memcached
type MemcachedStatus struct {
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"`
}
Markers and validations
Furthermore, we want to validate the values added in our CustomResource
to ensure that those are valid. To achieve this, we will use markers,
such as +kubebuilder:validation:Minimum=1
.
Now, see our example fully completed.
Apache License
Copyright 2024 The Kubernetes authors.
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Imports
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// MemcachedSpec defines the desired state of Memcached.
type MemcachedSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
// Size defines the number of Memcached instances
// The following markers will use OpenAPI v3 schema to validate the value
// More info: https://book.kubebuilder.io/reference/markers/crd-validation.html
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=3
// +kubebuilder:validation:ExclusiveMaximum=false
Size int32 `json:"size,omitempty"`
}
// MemcachedStatus defines the observed state of Memcached.
type MemcachedStatus struct {
// Represents the observations of a Memcached's current state.
// Memcached.status.conditions.type are: "Available", "Progressing", and "Degraded"
// Memcached.status.conditions.status are one of True, False, Unknown.
// Memcached.status.conditions.reason the value should be a CamelCase string and producers of specific
// condition types may define expected values and meanings for this field, and whether the values
// are considered a guaranteed API.
// Memcached.status.conditions.Message is a human readable message indicating details about the transition.
// For further information see: https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#typical-status-properties
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"`
}
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// Memcached is the Schema for the memcacheds API.
type Memcached struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec MemcachedSpec `json:"spec,omitempty"`
Status MemcachedStatus `json:"status,omitempty"`
}
// +kubebuilder:object:root=true
// MemcachedList contains a list of Memcached.
type MemcachedList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Memcached `json:"items"`
}
func init() {
SchemeBuilder.Register(&Memcached{}, &MemcachedList{})
}
Generating manifests with the specs and validations
To generate all required files:
-
Run
make generate
to create the DeepCopy implementations inapi/v1alpha1/zz_generated.deepcopy.go
. -
Then, run
make manifests
to generate the CRD manifests underconfig/crd/bases
and a sample for it underconfig/crd/samples
.
Both commands use controller-gen with different flags for code and manifest generation, respectively.
config/crd/bases/cache.example.com_memcacheds.yaml
: Our Memcached CRD
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.5
name: memcacheds.cache.example.com
spec:
group: cache.example.com
names:
kind: Memcached
listKind: MemcachedList
plural: memcacheds
singular: memcached
scope: Namespaced
versions:
- name: v1alpha1
schema:
openAPIV3Schema:
description: Memcached is the Schema for the memcacheds API.
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
description: MemcachedSpec defines the desired state of Memcached.
properties:
size:
description: |-
Size defines the number of Memcached instances
The following markers will use OpenAPI v3 schema to validate the value
More info: https://book.kubebuilder.io/reference/markers/crd-validation.html
format: int32
maximum: 3
minimum: 1
type: integer
type: object
status:
description: MemcachedStatus defines the observed state of Memcached.
properties:
conditions:
items:
description: Condition contains details for one aspect of the current
state of this API Resource.
properties:
lastTransitionTime:
description: |-
lastTransitionTime is the last time the condition transitioned from one status to another.
This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable.
format: date-time
type: string
message:
description: |-
message is a human readable message indicating details about the transition.
This may be an empty string.
maxLength: 32768
type: string
observedGeneration:
description: |-
observedGeneration represents the .metadata.generation that the condition was set based upon.
For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date
with respect to the current state of the instance.
format: int64
minimum: 0
type: integer
reason:
description: |-
reason contains a programmatic identifier indicating the reason for the condition's last transition.
Producers of specific condition types may define expected values and meanings for this field,
and whether the values are considered a guaranteed API.
The value should be a CamelCase string.
This field may not be empty.
maxLength: 1024
minLength: 1
pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
type: string
status:
description: status of the condition, one of True, False, Unknown.
enum:
- "True"
- "False"
- Unknown
type: string
type:
description: type of condition in CamelCase or in foo.example.com/CamelCase.
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
type: string
required:
- lastTransitionTime
- message
- reason
- status
- type
type: object
type: array
type: object
type: object
served: true
storage: true
subresources:
status: {}
Sample of Custom Resources
The manifests located under the config/samples
directory serve as examples of Custom Resources that can be applied to the cluster.
In this particular example, by applying the given resource to the cluster, we would generate
a Deployment with a single instance size (see size: 1
).
apiVersion: cache.example.com/v1alpha1
kind: Memcached
metadata:
labels:
app.kubernetes.io/name: project
app.kubernetes.io/managed-by: kustomize
name: memcached-sample
spec:
# TODO(user): edit the following value to ensure the number
# of Pods/Instances your Operand must have on cluster
size: 1
Reconciliation Process
In a simplified way, Kubernetes works by allowing us to declare the desired state of our system, and then its controllers continuously observe the cluster and take actions to ensure that the actual state matches the desired state. For our custom APIs and controllers, the process is similar. Remember, we are extending Kubernetes’ behaviors and its APIs to fit our specific needs.
In our controller, we will implement a reconciliation process.
Essentially, the reconciliation process functions as a loop, continuously checking conditions and performing necessary actions until the desired state is achieved. This process will keep running until all conditions in the system align with the desired state defined in our implementation.
Here’s a pseudo-code example to illustrate this:
reconcile App {
// Check if a Deployment for the app exists, if not, create one
// If there's an error, then restart from the beginning of the reconcile
if err != nil {
return reconcile.Result{}, err
}
// Check if a Service for the app exists, if not, create one
// If there's an error, then restart from the beginning of the reconcile
if err != nil {
return reconcile.Result{}, err
}
// Look for Database CR/CRD
// Check the Database Deployment's replicas size
// If deployment.replicas size doesn't match cr.size, then update it
// Then, restart from the beginning of the reconcile. For example, by returning `reconcile.Result{Requeue: true}, nil`.
if err != nil {
return reconcile.Result{Requeue: true}, nil
}
...
// If at the end of the loop:
// Everything was executed successfully, and the reconcile can stop
return reconcile.Result{}, nil
}
In the context of our example
When our sample Custom Resource (CR) is applied to the cluster (i.e. kubectl apply -f config/sample/cache_v1alpha1_memcached.yaml
),
we want to ensure that a Deployment is created for our Memcached image and that it matches the number of replicas defined in the CR.
To achieve this, we need to first implement an operation that checks whether the Deployment for our Memcached instance already exists on the cluster. If it does not, the controller will create the Deployment accordingly. Therefore, our reconciliation process must include an operation to ensure that this desired state is consistently maintained. This operation would involve:
// Check if the deployment already exists, if not create a new one
found := &appsv1.Deployment{}
err = r.Get(ctx, types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}, found)
if err != nil && apierrors.IsNotFound(err) {
// Define a new deployment
dep := r.deploymentForMemcached()
// Create the Deployment on the cluster
if err = r.Create(ctx, dep); err != nil {
log.Error(err, "Failed to create new Deployment",
"Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
return ctrl.Result{}, err
}
...
}
Next, note that the deploymentForMemcached()
function will need to define and return the Deployment that should be
created on the cluster. This function should construct the Deployment object with the necessary
specifications, as demonstrated in the following example:
dep := &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Image: "memcached:1.6.26-alpine3.19",
Name: "memcached",
ImagePullPolicy: corev1.PullIfNotPresent,
Ports: []corev1.ContainerPort{{
ContainerPort: 11211,
Name: "memcached",
}},
Command: []string{"memcached", "--memory-limit=64", "-o", "modern", "-v"},
}},
},
},
},
}
Additionally, we need to implement a mechanism to verify that the number of Memcached replicas on the cluster matches the desired count specified in the Custom Resource (CR). If there is a discrepancy, the reconciliation must update the cluster to ensure consistency. This means that whenever a CR of the Memcached Kind is created or updated on the cluster, the controller will continuously reconcile the state until the actual number of replicas matches the desired count. The following example illustrates this process:
...
size := memcached.Spec.Size
if *found.Spec.Replicas != size {
found.Spec.Replicas = &size
if err = r.Update(ctx, found); err != nil {
log.Error(err, "Failed to update Deployment",
"Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
return ctrl.Result{}, err
}
...
Now, you can review the complete controller responsible for managing Custom Resources of the Memcached Kind. This controller ensures that the desired state is maintained in the cluster, making sure that our Memcached instance continues running with the number of replicas specified by the users.
internal/controller/memcached_controller.go
: Our Controller Implementation
/*
Copyright 2024 The Kubernetes authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"time"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
cachev1alpha1 "example.com/memcached/api/v1alpha1"
)
// Definitions to manage status conditions
const (
// typeAvailableMemcached represents the status of the Deployment reconciliation
typeAvailableMemcached = "Available"
)
// MemcachedReconciler reconciles a Memcached object
type MemcachedReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// It is essential for the controller's reconciliation loop to be idempotent. By following the Operator
// pattern you will create Controllers which provide a reconcile function
// responsible for synchronizing resources until the desired state is reached on the cluster.
// Breaking this recommendation goes against the design principles of controller-runtime.
// and may lead to unforeseen consequences such as resources becoming stuck and requiring manual intervention.
// For further info:
// - About Operator Pattern: https://kubernetes.io/docs/concepts/extend-kubernetes/operator/
// - About Controllers: https://kubernetes.io/docs/concepts/architecture/controller/
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/reconcile
func (r *MemcachedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
// Fetch the Memcached instance
// The purpose is check if the Custom Resource for the Kind Memcached
// is applied on the cluster if not we return nil to stop the reconciliation
memcached := &cachev1alpha1.Memcached{}
err := r.Get(ctx, req.NamespacedName, memcached)
if err != nil {
if apierrors.IsNotFound(err) {
// If the custom resource is not found then it usually means that it was deleted or not created
// In this way, we will stop the reconciliation
log.Info("memcached resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
log.Error(err, "Failed to get memcached")
return ctrl.Result{}, err
}
// Let's just set the status as Unknown when no status is available
if memcached.Status.Conditions == nil || len(memcached.Status.Conditions) == 0 {
meta.SetStatusCondition(&memcached.Status.Conditions, metav1.Condition{Type: typeAvailableMemcached, Status: metav1.ConditionUnknown, Reason: "Reconciling", Message: "Starting reconciliation"})
if err = r.Status().Update(ctx, memcached); err != nil {
log.Error(err, "Failed to update Memcached status")
return ctrl.Result{}, err
}
// Let's re-fetch the memcached Custom Resource after updating the status
// so that we have the latest state of the resource on the cluster and we will avoid
// raising the error "the object has been modified, please apply
// your changes to the latest version and try again" which would re-trigger the reconciliation
// if we try to update it again in the following operations
if err := r.Get(ctx, req.NamespacedName, memcached); err != nil {
log.Error(err, "Failed to re-fetch memcached")
return ctrl.Result{}, err
}
}
// Check if the deployment already exists, if not create a new one
found := &appsv1.Deployment{}
err = r.Get(ctx, types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}, found)
if err != nil && apierrors.IsNotFound(err) {
// Define a new deployment
dep, err := r.deploymentForMemcached(memcached)
if err != nil {
log.Error(err, "Failed to define new Deployment resource for Memcached")
// The following implementation will update the status
meta.SetStatusCondition(&memcached.Status.Conditions, metav1.Condition{Type: typeAvailableMemcached,
Status: metav1.ConditionFalse, Reason: "Reconciling",
Message: fmt.Sprintf("Failed to create Deployment for the custom resource (%s): (%s)", memcached.Name, err)})
if err := r.Status().Update(ctx, memcached); err != nil {
log.Error(err, "Failed to update Memcached status")
return ctrl.Result{}, err
}
return ctrl.Result{}, err
}
log.Info("Creating a new Deployment",
"Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
if err = r.Create(ctx, dep); err != nil {
log.Error(err, "Failed to create new Deployment",
"Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
return ctrl.Result{}, err
}
// Deployment created successfully
// We will requeue the reconciliation so that we can ensure the state
// and move forward for the next operations
return ctrl.Result{RequeueAfter: time.Minute}, nil
} else if err != nil {
log.Error(err, "Failed to get Deployment")
// Let's return the error for the reconciliation be re-trigged again
return ctrl.Result{}, err
}
// The CRD API defines that the Memcached type have a MemcachedSpec.Size field
// to set the quantity of Deployment instances to the desired state on the cluster.
// Therefore, the following code will ensure the Deployment size is the same as defined
// via the Size spec of the Custom Resource which we are reconciling.
size := memcached.Spec.Size
if *found.Spec.Replicas != size {
found.Spec.Replicas = &size
if err = r.Update(ctx, found); err != nil {
log.Error(err, "Failed to update Deployment",
"Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
// Re-fetch the memcached Custom Resource before updating the status
// so that we have the latest state of the resource on the cluster and we will avoid
// raising the error "the object has been modified, please apply
// your changes to the latest version and try again" which would re-trigger the reconciliation
if err := r.Get(ctx, req.NamespacedName, memcached); err != nil {
log.Error(err, "Failed to re-fetch memcached")
return ctrl.Result{}, err
}
// The following implementation will update the status
meta.SetStatusCondition(&memcached.Status.Conditions, metav1.Condition{Type: typeAvailableMemcached,
Status: metav1.ConditionFalse, Reason: "Resizing",
Message: fmt.Sprintf("Failed to update the size for the custom resource (%s): (%s)", memcached.Name, err)})
if err := r.Status().Update(ctx, memcached); err != nil {
log.Error(err, "Failed to update Memcached status")
return ctrl.Result{}, err
}
return ctrl.Result{}, err
}
// Now, that we update the size we want to requeue the reconciliation
// so that we can ensure that we have the latest state of the resource before
// update. Also, it will help ensure the desired state on the cluster
return ctrl.Result{Requeue: true}, nil
}
// The following implementation will update the status
meta.SetStatusCondition(&memcached.Status.Conditions, metav1.Condition{Type: typeAvailableMemcached,
Status: metav1.ConditionTrue, Reason: "Reconciling",
Message: fmt.Sprintf("Deployment for custom resource (%s) with %d replicas created successfully", memcached.Name, size)})
if err := r.Status().Update(ctx, memcached); err != nil {
log.Error(err, "Failed to update Memcached status")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&cachev1alpha1.Memcached{}).
Owns(&appsv1.Deployment{}).
Named("memcached").
Complete(r)
}
// deploymentForMemcached returns a Memcached Deployment object
func (r *MemcachedReconciler) deploymentForMemcached(
memcached *cachev1alpha1.Memcached) (*appsv1.Deployment, error) {
replicas := memcached.Spec.Size
image := "memcached:1.6.26-alpine3.19"
dep := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: memcached.Name,
Namespace: memcached.Namespace,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app.kubernetes.io/name": "project"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app.kubernetes.io/name": "project"},
},
Spec: corev1.PodSpec{
SecurityContext: &corev1.PodSecurityContext{
RunAsNonRoot: ptr.To(true),
SeccompProfile: &corev1.SeccompProfile{
Type: corev1.SeccompProfileTypeRuntimeDefault,
},
},
Containers: []corev1.Container{{
Image: image,
Name: "memcached",
ImagePullPolicy: corev1.PullIfNotPresent,
// Ensure restrictive context for the container
// More info: https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
SecurityContext: &corev1.SecurityContext{
RunAsNonRoot: ptr.To(true),
RunAsUser: ptr.To(int64(1001)),
AllowPrivilegeEscalation: ptr.To(false),
Capabilities: &corev1.Capabilities{
Drop: []corev1.Capability{
"ALL",
},
},
},
Ports: []corev1.ContainerPort{{
ContainerPort: 11211,
Name: "memcached",
}},
Command: []string{"memcached", "--memory-limit=64", "-o", "modern", "-v"},
}},
},
},
},
}
// Set the ownerRef for the Deployment
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/owners-dependents/
if err := ctrl.SetControllerReference(memcached, dep, r.Scheme); err != nil {
return nil, err
}
return dep, nil
}
Diving Into the Controller Implementation
Setting Manager to Watching Resources
The whole idea is to be Watching the resources that matter for the controller. When a resource that the controller is interested in changes, the Watch triggers the controller’s reconciliation loop, ensuring that the actual state of the resource matches the desired state as defined in the controller’s logic.
Notice how we configured the Manager to monitor events such as the creation, update, or deletion of a Custom Resource (CR) of the Memcached kind, as well as any changes to the Deployment that the controller manages and owns:
// SetupWithManager sets up the controller with the Manager.
// The Deployment is also watched to ensure its
// desired state in the cluster.
func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
// Watch the Memcached Custom Resource and trigger reconciliation whenever it
//is created, updated, or deleted
For(&cachev1alpha1.Memcached{}).
// Watch the Deployment managed by the Memcached controller. If any changes occur to the Deployment
// owned and managed by this controller, it will trigger reconciliation, ensuring that the cluster
// state aligns with the desired state.
Owns(&appsv1.Deployment{}).
Complete(r)
}
But, How Does the Manager Know Which Resources Are Owned by It?
We do not want our Controller to watch any Deployment on the cluster and trigger our reconciliation loop. Instead, we only want to trigger reconciliation when the specific Deployment running our Memcached instance is changed. For example, if someone accidentally deletes our Deployment or changes the number of replicas, we want to trigger the reconciliation to ensure that it returns to the desired state.
The Manager knows which Deployment to observe because we set the ownerRef
(Owner Reference):
if err := ctrl.SetControllerReference(memcached, dep, r.Scheme); err != nil {
return nil, err
}
Granting Permissions
It’s important to ensure that the Controller has the necessary permissions(i.e. to create, get, update, and list) the resources it manages.
The RBAC permissions are now configured via RBAC markers, which are used to generate and update the
manifest files present in config/rbac/
. These markers can be found (and should be defined) on the Reconcile()
method of each controller, see
how it is implemented in our example:
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
After making changes to the controller, run the make generate command. This will prompt controller-gen
to refresh the files located under config/rbac
.
config/rbac/role.yaml
: Our RBAC Role generated
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: manager-role
rules:
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- watch
- apiGroups:
- apps
resources:
- deployments
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- cache.example.com
resources:
- memcacheds
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- cache.example.com
resources:
- memcacheds/finalizers
verbs:
- update
- apiGroups:
- cache.example.com
resources:
- memcacheds/status
verbs:
- get
- patch
- update
Manager (main.go)
The Manager in the cmd/main.go
file is responsible for managing the controllers in your application.
cmd/main.go
: Our main.go
/*
Copyright 2024 The Kubernetes authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"crypto/tls"
"flag"
"os"
"path/filepath"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
cachev1alpha1 "example.com/memcached/api/v1alpha1"
"example.com/memcached/internal/controller"
// +kubebuilder:scaffold:imports
)
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(cachev1alpha1.AddToScheme(scheme))
// +kubebuilder:scaffold:scheme
}
func main() {
var metricsAddr string
var metricsCertPath, metricsCertName, metricsCertKey string
var webhookCertPath, webhookCertName, webhookCertKey string
var enableLeaderElection bool
var probeAddr string
var secureMetrics bool
var enableHTTP2 bool
var tlsOpts []func(*tls.Config)
flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&secureMetrics, "metrics-secure", true,
"If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.")
flag.StringVar(&webhookCertPath, "webhook-cert-path", "", "The directory that contains the webhook certificate.")
flag.StringVar(&webhookCertName, "webhook-cert-name", "tls.crt", "The name of the webhook certificate file.")
flag.StringVar(&webhookCertKey, "webhook-cert-key", "tls.key", "The name of the webhook key file.")
flag.StringVar(&metricsCertPath, "metrics-cert-path", "", "The directory that contains the metrics server certificate.")
flag.StringVar(&metricsCertName, "metrics-cert-name", "tls.crt", "The name of the metrics server certificate file.")
flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
opts := zap.Options{
Development: true,
}
opts.BindFlags(flag.CommandLine)
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
// if the enable-http2 flag is false (the default), http/2 should be disabled
// due to its vulnerabilities. More specifically, disabling http/2 will
// prevent from being vulnerable to the HTTP/2 Stream Cancellation and
// Rapid Reset CVEs. For more information see:
// - https://github.com/advisories/GHSA-qppj-fm5r-hxr3
// - https://github.com/advisories/GHSA-4374-p667-p6c8
disableHTTP2 := func(c *tls.Config) {
setupLog.Info("disabling http/2")
c.NextProtos = []string{"http/1.1"}
}
if !enableHTTP2 {
tlsOpts = append(tlsOpts, disableHTTP2)
}
// Create watchers for metrics and webhooks certificates
var metricsCertWatcher, webhookCertWatcher *certwatcher.CertWatcher
// Initial webhook TLS options
webhookTLSOpts := tlsOpts
if len(webhookCertPath) > 0 {
setupLog.Info("Initializing webhook certificate watcher using provided certificates",
"webhook-cert-path", webhookCertPath, "webhook-cert-name", webhookCertName, "webhook-cert-key", webhookCertKey)
var err error
webhookCertWatcher, err = certwatcher.New(
filepath.Join(webhookCertPath, webhookCertName),
filepath.Join(webhookCertPath, webhookCertKey),
)
if err != nil {
setupLog.Error(err, "Failed to initialize webhook certificate watcher")
os.Exit(1)
}
webhookTLSOpts = append(webhookTLSOpts, func(config *tls.Config) {
config.GetCertificate = webhookCertWatcher.GetCertificate
})
}
webhookServer := webhook.NewServer(webhook.Options{
TLSOpts: webhookTLSOpts,
})
// Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server.
// More info:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/server
// - https://book.kubebuilder.io/reference/metrics.html
metricsServerOptions := metricsserver.Options{
BindAddress: metricsAddr,
SecureServing: secureMetrics,
TLSOpts: tlsOpts,
}
if secureMetrics {
// FilterProvider is used to protect the metrics endpoint with authn/authz.
// These configurations ensure that only authorized users and service accounts
// can access the metrics endpoint. The RBAC are configured in 'config/rbac/kustomization.yaml'. More info:
// https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/filters#WithAuthenticationAndAuthorization
metricsServerOptions.FilterProvider = filters.WithAuthenticationAndAuthorization
}
// If the certificate is not specified, controller-runtime will automatically
// generate self-signed certificates for the metrics server. While convenient for development and testing,
// this setup is not recommended for production.
//
// TODO(user): If you enable certManager, uncomment the following lines:
// - [METRICS-WITH-CERTS] at config/default/kustomization.yaml to generate and use certificates
// managed by cert-manager for the metrics server.
// - [PROMETHEUS-WITH-CERTS] at config/prometheus/kustomization.yaml for TLS certification.
if len(metricsCertPath) > 0 {
setupLog.Info("Initializing metrics certificate watcher using provided certificates",
"metrics-cert-path", metricsCertPath, "metrics-cert-name", metricsCertName, "metrics-cert-key", metricsCertKey)
var err error
metricsCertWatcher, err = certwatcher.New(
filepath.Join(metricsCertPath, metricsCertName),
filepath.Join(metricsCertPath, metricsCertKey),
)
if err != nil {
setupLog.Error(err, "to initialize metrics certificate watcher", "error", err)
os.Exit(1)
}
metricsServerOptions.TLSOpts = append(metricsServerOptions.TLSOpts, func(config *tls.Config) {
config.GetCertificate = metricsCertWatcher.GetCertificate
})
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: metricsServerOptions,
WebhookServer: webhookServer,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "4b13cc52.example.com",
// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
// when the Manager ends. This requires the binary to immediately end when the
// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly
// speeds up voluntary leader transitions as the new leader don't have to wait
// LeaseDuration time first.
//
// In the default scaffold provided, the program ends immediately after
// the manager stops, so would be fine to enable this option. However,
// if you are doing or is intended to do any operation such as perform cleanups
// after the manager stops then its usage might be unsafe.
// LeaderElectionReleaseOnCancel: true,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
if err = (&controller.MemcachedReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Memcached")
os.Exit(1)
}
// +kubebuilder:scaffold:builder
if metricsCertWatcher != nil {
setupLog.Info("Adding metrics certificate watcher to manager")
if err := mgr.Add(metricsCertWatcher); err != nil {
setupLog.Error(err, "unable to add metrics certificate watcher to manager")
os.Exit(1)
}
}
if webhookCertWatcher != nil {
setupLog.Info("Adding webhook certificate watcher to manager")
if err := mgr.Add(webhookCertWatcher); err != nil {
setupLog.Error(err, "unable to add webhook certificate watcher to manager")
os.Exit(1)
}
}
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
Checking the Project running in the cluster
At this point you can check the steps to validate the project on the cluster by looking the steps defined in the Quick Start, see: Run It On the Cluster
Next Steps
- To delve deeper into developing your solution, consider going through the CronJob Tutorial
- For insights on optimizing your approach, refer to the Best Practices documentation.