/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1beta1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// TemplateSpec defines the desired state of Template
type TemplateDefinition struct {
Language string `json:"language,omitempty"`
BatchModeProcessing bool `json:"batchModeProcessing,omitempty"`
Data string `json:"data,omitempty"`
FileName string `json:"fileName,omitempty"`
}
type TemplateSpec struct {
Query string `json:"query,omitempty"`
TemplateDefinition TemplateDefinition `json:"template,omitempty"`
}
// TemplateStatus defines the observed state of Template
type TemplateStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
}
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// Template is the Schema for the templates API
type Template struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec TemplateSpec `json:"spec,omitempty"`
Status TemplateStatus `json:"status,omitempty"`
}
// +kubebuilder:object:root=true
// TemplateList contains a list of Template
type TemplateList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Template `json:"items"`
}
func init() {
SchemeBuilder.Register(&Template{}, &TemplateList{})
}
//go:build !ignore_autogenerated
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by controller-gen. DO NOT EDIT.
package v1beta1
import (
runtime "k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Template) DeepCopyInto(out *Template) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
out.Spec = in.Spec
out.Status = in.Status
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Template.
func (in *Template) DeepCopy() *Template {
if in == nil {
return nil
}
out := new(Template)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *Template) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TemplateDefinition) DeepCopyInto(out *TemplateDefinition) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemplateDefinition.
func (in *TemplateDefinition) DeepCopy() *TemplateDefinition {
if in == nil {
return nil
}
out := new(TemplateDefinition)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TemplateList) DeepCopyInto(out *TemplateList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]Template, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemplateList.
func (in *TemplateList) DeepCopy() *TemplateList {
if in == nil {
return nil
}
out := new(TemplateList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *TemplateList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TemplateSpec) DeepCopyInto(out *TemplateSpec) {
*out = *in
out.TemplateDefinition = in.TemplateDefinition
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemplateSpec.
func (in *TemplateSpec) DeepCopy() *TemplateSpec {
if in == nil {
return nil
}
out := new(TemplateSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TemplateStatus) DeepCopyInto(out *TemplateStatus) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemplateStatus.
func (in *TemplateStatus) DeepCopy() *TemplateStatus {
if in == nil {
return nil
}
out := new(TemplateStatus)
in.DeepCopyInto(out)
return out
}
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"crypto/tls"
"flag"
_ "kumquat/renderer/cue"
"kumquat/repository"
"os"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
kumquatv1beta1 "kumquat/api/v1beta1"
"kumquat/internal/controller"
// +kubebuilder:scaffold:imports
)
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(kumquatv1beta1.AddToScheme(scheme))
// +kubebuilder:scaffold:scheme
}
func main() {
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
var secureMetrics bool
var enableHTTP2 bool
var tlsOpts []func(*tls.Config)
flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&secureMetrics, "metrics-secure", true,
"If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
opts := zap.Options{
Development: true,
}
opts.BindFlags(flag.CommandLine)
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
// if the enable-http2 flag is false (the default), http/2 should be disabled
// due to its vulnerabilities. More specifically, disabling http/2 will
// prevent from being vulnerable to the HTTP/2 Stream Cancellation and
// Rapid Reset CVEs. For more information see:
// - https://github.com/advisories/GHSA-qppj-fm5r-hxr3
// - https://github.com/advisories/GHSA-4374-p667-p6c8
disableHTTP2 := func(c *tls.Config) {
setupLog.Info("disabling http/2")
c.NextProtos = []string{"http/1.1"}
}
if !enableHTTP2 {
tlsOpts = append(tlsOpts, disableHTTP2)
}
webhookServer := webhook.NewServer(webhook.Options{
TLSOpts: tlsOpts,
})
// Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server.
// More info:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.18.4/pkg/metrics/server
// - https://book.kubebuilder.io/reference/metrics.html
metricsServerOptions := metricsserver.Options{
BindAddress: metricsAddr,
SecureServing: secureMetrics,
// TODO(user): TLSOpts is used to allow configuring the TLS config used for the server. If certificates are
// not provided, self-signed certificates will be generated by default. This option is not recommended for
// production environments as self-signed certificates do not offer the same level of trust and security
// as certificates issued by a trusted Certificate Authority (CA). The primary risk is potentially allowing
// unauthorized access to sensitive metrics data. Consider replacing with CertDir, CertName, and KeyName
// to provide certificates, ensuring the server communicates using trusted and secure certificates.
TLSOpts: tlsOpts,
}
if secureMetrics {
// FilterProvider is used to protect the metrics endpoint with authn/authz.
// These configurations ensure that only authorized users and service accounts
// can access the metrics endpoint. The RBAC are configured in 'config/rbac/kustomization.yaml'. More info:
// https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.18.4/pkg/metrics/filters#WithAuthenticationAndAuthorization
metricsServerOptions.FilterProvider = filters.WithAuthenticationAndAuthorization
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: metricsServerOptions,
WebhookServer: webhookServer,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "857969d7.guidewire.com",
// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
// when the Manager ends. This requires the binary to immediately end when the
// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly
// speeds up voluntary leader transitions as the new leader don't have to wait
// LeaseDuration time first.
//
// In the default scaffold provided, the program ends immediately after
// the manager stops, so would be fine to enable this option. However,
// if you are doing or is intended to do any operation such as perform cleanups
// after the manager stops then its usage might be unsafe.
// LeaderElectionReleaseOnCancel: true,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
// create the dynamic k8s client
k8sClient, err := controller.NewDynamicK8sClient(mgr.GetClient(), mgr.GetRESTMapper())
if err != nil {
setupLog.Error(err, "unable to create dynamic k8s client")
os.Exit(1)
}
rep, err := repository.NewSQLiteRepository()
if err != nil {
setupLog.Error(err, "unable to create repository")
os.Exit(1)
}
if err = (&controller.TemplateReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
K8sClient: k8sClient,
Repository: rep,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Template")
os.Exit(1)
}
// +kubebuilder:scaffold:builder
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
package controller
import (
"context"
"fmt"
kumquatv1beta1 "kumquat/api/v1beta1"
"kumquat/repository"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
type DynamicReconciler struct {
client client.Client
gvk schema.GroupVersionKind
k8sClient K8sClient
watchManager WatchManager
repository repository.Repository
}
func NewDynamicReconciler(client client.Client, gvk schema.GroupVersionKind, k8sClient K8sClient, wm WatchManager, repo repository.Repository) *DynamicReconciler {
return &DynamicReconciler{
client: client,
gvk: gvk,
k8sClient: k8sClient,
watchManager: wm,
repository: repo,
}
}
func (r *DynamicReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := log.FromContext(ctx)
log.Info("Reconciling dynamic resource", "GVK", r.gvk, "name", req.Name, "namespace", req.Namespace)
resource, err := r.fetchResource(ctx, req)
if err != nil {
return reconcile.Result{}, err
}
if resource == nil {
log.Info("Resource deleted", "GVK", r.gvk, "name", req.Name, "namespace", req.Namespace)
// set group to core if it is empty
group := r.gvk.Group
if r.gvk.Group == "" {
group = "core"
}
err = DeleteResourceFromDatabaseByNameAndNameSpace(r.repository, r.gvk.Kind, group, req.Namespace, req.Name)
if err != nil {
return reconcile.Result{}, err
}
err = r.findAndReProcessAffectedTemplates(ctx)
if err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}
err = UpsertResourceToDatabase(r.repository, resource, ctx)
if err != nil {
return reconcile.Result{}, err
}
err = r.findAndReProcessAffectedTemplates(ctx)
if err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}
// reconcileTemplates reconciles the templates associated with the resource.
func (r *DynamicReconciler) findAndReProcessAffectedTemplates(ctx context.Context) error {
log := log.FromContext(ctx)
var templates []string
templatesMap := r.watchManager.GetManagedTemplates()
for templateName, gvks := range templatesMap {
if _, exists := gvks[r.gvk]; exists {
log.Info("Reconciling template", "templateName", templateName)
templates = append(templates, templateName)
}
}
for _, templateName := range templates {
if err := r.processTemplate(ctx, templateName); err != nil {
if err.Error() == "not found" {
log.Info("Resource not found", "templateName", templateName)
} else {
fmt.Println("Error in processTemplate", err)
return err
}
}
}
return nil
}
// fetchResource fetches the resource from the cluster.
func (r *DynamicReconciler) fetchResource(
ctx context.Context,
req reconcile.Request,
) (*unstructured.Unstructured, error) {
resource := &unstructured.Unstructured{}
resource.SetGroupVersionKind(r.gvk)
err := r.client.Get(ctx, req.NamespacedName, resource)
if err != nil {
return nil, client.IgnoreNotFound(err)
}
return resource, nil
}
// processTemplate processes a single template.
func (r *DynamicReconciler) processTemplate(ctx context.Context, templateName string) error {
log := log.FromContext(ctx)
template, err := r.k8sClient.Get(ctx, "kumquat.guidewire.com", "Template", "templates", templateName)
if err != nil {
log.Error(err, "unable to get template", "templateName", templateName)
return err
}
templateObj := &kumquatv1beta1.Template{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(template.Object, templateObj); err != nil {
log.Error(err, "unable to convert unstructured to template")
return err
}
return ProcessTemplateResources(templateObj, r.repository, log, r.k8sClient, r.watchManager)
}
package controller
import (
"context"
"fmt"
"sigs.k8s.io/controller-runtime/pkg/client"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func NewDynamicK8sClient(client client.Client, restMapper meta.RESTMapper) (K8sClient, error) {
return &DynamicK8sClient{
client: client,
restMapper: restMapper,
}, nil
}
// K8sClient interface remains the same
type K8sClient interface {
Create(ctx context.Context, obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
CreateOrUpdate(ctx context.Context, obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
List(ctx context.Context, group, kind, namespace string) (*unstructured.UnstructuredList, error)
Get(ctx context.Context, group, kind, namespace, name string) (*unstructured.Unstructured, error)
Update(ctx context.Context, group, kind, namespace string, obj *unstructured.Unstructured) (
*unstructured.Unstructured, error)
Delete(ctx context.Context, group, kind, namespace, name string) error
GetPreferredGVK(group, kind string) (schema.GroupVersionKind, error)
}
type DynamicK8sClient struct {
client client.Client
restMapper meta.RESTMapper
}
// Implement the methods using client.Client
func (k *DynamicK8sClient) Create(ctx context.Context, obj *unstructured.Unstructured) (
*unstructured.Unstructured, error) {
err := k.client.Create(ctx, obj)
if err != nil {
return nil, err
}
return obj, nil
}
func (k *DynamicK8sClient) CreateOrUpdate(ctx context.Context, obj *unstructured.Unstructured) (
*unstructured.Unstructured, error) {
existing := &unstructured.Unstructured{}
existing.SetGroupVersionKind(obj.GroupVersionKind())
key := client.ObjectKey{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
err := k.client.Get(ctx, key, existing)
if err != nil {
if client.IgnoreNotFound(err) != nil {
return nil, err
}
// Not found, create
err = k.client.Create(ctx, obj)
if err != nil {
return nil, err
}
return obj, nil
}
// Resource exists, update
obj.SetResourceVersion(existing.GetResourceVersion())
err = k.client.Update(ctx, obj)
if err != nil {
return nil, err
}
return obj, nil
}
func (k *DynamicK8sClient) List(ctx context.Context, group, kind, namespace string) (
*unstructured.UnstructuredList, error) {
gvk, err := k.GetPreferredGVK(group, kind)
if err != nil {
return nil, err
}
list := &unstructured.UnstructuredList{}
list.SetGroupVersionKind(gvk)
err = k.client.List(ctx, list, client.InNamespace(namespace))
if err != nil {
return nil, err
}
return list, nil
}
func (k *DynamicK8sClient) Get(ctx context.Context, group, kind, namespace, name string) (
*unstructured.Unstructured, error) {
gvk, err := k.GetPreferredGVK(group, kind)
if err != nil {
return nil, err
}
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)
key := client.ObjectKey{
Namespace: namespace,
Name: name,
}
err = k.client.Get(ctx, key, obj)
if err != nil {
return nil, err
}
return obj, nil
}
func (k *DynamicK8sClient) Update(ctx context.Context, group, kind, namespace string, obj *unstructured.Unstructured) (
*unstructured.Unstructured, error) {
err := k.client.Update(ctx, obj)
if err != nil {
return nil, err
}
return obj, nil
}
func (k *DynamicK8sClient) Delete(ctx context.Context, group, kind, namespace, name string) error {
gvk, err := k.GetPreferredGVK(group, kind)
if err != nil {
return err
}
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)
obj.SetNamespace(namespace)
obj.SetName(name)
err = k.client.Delete(ctx, obj)
if err != nil {
return err
}
return nil
}
func (k *DynamicK8sClient) GetPreferredGVK(group, kind string) (schema.GroupVersionKind, error) {
partialGVK := schema.GroupVersionKind{
Group: group,
Kind: kind,
}
mapping, err := k.restMapper.RESTMapping(partialGVK.GroupKind())
if err != nil {
return schema.GroupVersionKind{}, fmt.Errorf("failed to get GVK from RESTMapper: %v", err)
}
return mapping.GroupVersionKind, nil
}
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"strings"
kumquatTemplate "kumquat/template"
mapset "github.com/deckarep/golang-set/v2"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/yaml"
kumquatv1beta1 "kumquat/api/v1beta1"
"kumquat/repository"
)
const templateFinalizer = "kumquat.guidewire.com/finalizer"
// containsString checks if a string is in a slice
func containsString(slice []string, s string) bool {
for _, item := range slice {
if item == s {
return true
}
}
return false
}
// removeString removes a string from a slice
func removeString(slice []string, s string) []string {
var result []string
for _, item := range slice {
if item != s {
result = append(result, item)
}
}
return result
}
// EnsureFinalizer adds a finalizer to the resource if not present
func (r *TemplateReconciler) EnsureFinalizer(template *kumquatv1beta1.Template) bool {
if !containsString(template.GetFinalizers(), templateFinalizer) {
template.SetFinalizers(append(template.GetFinalizers(), templateFinalizer))
return true
}
return false
}
// RemoveFinalizer removes the finalizer from the resource
func (r *TemplateReconciler) RemoveFinalizer(template *kumquatv1beta1.Template) bool {
if containsString(template.GetFinalizers(), templateFinalizer) {
template.SetFinalizers(removeString(template.GetFinalizers(), templateFinalizer))
return true
}
return false
}
// TemplateReconciler reconciles a Template object
type TemplateReconciler struct {
client.Client
Scheme *runtime.Scheme
WatchManager WatchManager
K8sClient K8sClient
Repository repository.Repository
}
func (r *TemplateReconciler) handleDeletion(
ctx context.Context,
log logr.Logger,
template *kumquatv1beta1.Template,
) (ctrl.Result, error) {
log.Info("template deleted", "name", template.Name)
r.WatchManager.RemoveWatch(template.Name)
err := deleteAssociatedResources(template, r.Repository, log, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
if r.RemoveFinalizer(template) {
err := r.Update(ctx, template)
if err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
func deleteAssociatedResources(
template *kumquatv1beta1.Template,
re repository.Repository,
log logr.Logger,
k8sClient K8sClient,
) error {
template.GetObjectKind().SetGroupVersionKind(schema.GroupVersionKind{
Group: "kumquat.guidewire.com",
Version: "v1beta1",
Kind: "Template",
})
objMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(template)
if err != nil {
log.Error(err, "failed to convert template to unstructured map")
return err
}
resource, err := repository.MakeResource(objMap)
if err != nil {
log.Error(err, "unable to make resource from object")
return err
}
t, err := kumquatTemplate.NewTemplate(resource)
if err != nil {
log.Error(err, "unable to create template from resource")
return err
}
o, err := t.Evaluate(re)
if err != nil {
log.Error(err, "unable to evaluate template")
return err
}
fmt.Println(o.Output)
for i := 0; i < o.Output.ResourceCount(); i++ {
out, err := o.Output.ResultString(i)
if err != nil {
log.Error(err, "unable to get result string")
return err
}
fmt.Println(out)
err = deleteResourceFromCluster(out, log, k8sClient)
if err != nil {
return err
}
}
return nil
}
func deleteResourceFromCluster(out string, log logr.Logger, k8sClient K8sClient) error {
jsonData, err := yaml.YAMLToJSON([]byte(out))
if err != nil {
log.Error(err, "unable to convert YAML to JSON")
return err
}
unstructuredObj := &unstructured.Unstructured{}
err = unstructuredObj.UnmarshalJSON(jsonData)
if err != nil {
log.Error(err, "unable to unmarshal JSON")
return err
}
context := context.TODO()
err = k8sClient.Delete(context,
unstructuredObj.GetObjectKind().GroupVersionKind().Group,
unstructuredObj.GetKind(),
unstructuredObj.GetNamespace(),
unstructuredObj.GetName())
if err != nil {
if strings.Contains(err.Error(), "not found") {
log.Info("resource already deleted", "resource", unstructuredObj.GetName())
} else {
log.Error(err, "unable to delete resource")
return err
}
}
return nil
}
// +kubebuilder:rbac:groups=kumquat.guidewire.com,resources=templates,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kumquat.guidewire.com,resources=templates/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=kumquat.guidewire.com,resources=templates/finalizers,verbs=update
// +kubebuilder:rbac:groups=*,resources=*,verbs=*
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the Template object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.18.4/pkg/reconcile
func (r *TemplateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
template := &kumquatv1beta1.Template{}
err := r.Get(ctx, req.NamespacedName, template)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if !template.DeletionTimestamp.IsZero() {
return r.handleDeletion(ctx, log, template)
}
if r.EnsureFinalizer(template) {
err := r.Update(ctx, template)
if err != nil {
return ctrl.Result{}, err
}
}
gvkList, err := extractGVKsFromQuery(template.Spec.Query, r.Repository, log, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
for _, gvk := range gvkList {
err := addDataToDatabase(gvk.Group, gvk.Kind, log, r.K8sClient, r.Repository)
if err != nil {
log.Error(err, "unable to add data to database", "gvk", gvk)
}
}
data, err := r.Repository.Query(template.Spec.Query)
fmt.Println(template, "this is template")
if err != nil {
log.Error(err, "unable to query database", "query", template.Spec.Query)
return ctrl.Result{}, err
}
fmt.Println(len(data.Results), "found in the database")
err = applyTemplateResources(template, r.Repository, log, r.K8sClient, r.WatchManager)
if err != nil {
return ctrl.Result{}, err
}
err = r.WatchManager.UpdateWatch(template.Name, gvkList)
if err != nil {
log.Error(err, "unable to update watch for resource", "template", template.Name)
}
return ctrl.Result{}, nil
}
func extractGVKsFromQuery(
query string,
re repository.Repository,
log logr.Logger,
k8sClient K8sClient,
) ([]schema.GroupVersionKind, error) {
tableNames := re.ExtractTableNamesFromQuery(query)
gvkList := make([]schema.GroupVersionKind, 0, len(tableNames))
for _, tableName := range tableNames {
gvk, err := BuildTableGVK(tableName, log, k8sClient)
if err != nil {
log.Error(err, "unable to build GVK for table", "table", tableName)
return nil, err
}
gvkList = append(gvkList, gvk)
}
return gvkList, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *TemplateReconciler) SetupWithManager(mgr ctrl.Manager) error {
c, err := controller.New("template-controller", mgr,
controller.Options{
Reconciler: r,
SkipNameValidation: ptr.To(true),
})
if err != nil {
return err
}
err = c.Watch(source.Kind(
mgr.GetCache(),
&kumquatv1beta1.Template{},
&handler.TypedEnqueueRequestForObject[*kumquatv1beta1.Template]{}))
if err != nil {
return err
}
r.WatchManager = NewWatchManager(mgr, r.K8sClient, r.Repository)
return nil
}
func BuildTableGVK(tableName string, log logr.Logger, k8sClient K8sClient) (schema.GroupVersionKind, error) {
dotIndex := strings.Index(tableName, ".")
if dotIndex == -1 {
return schema.GroupVersionKind{}, fmt.Errorf("invalid table name format")
}
kind := tableName[:dotIndex]
group := tableName[dotIndex+1:]
// The core API group is represented by the empty string in Kubernetes API calls
if group == "core" {
group = ""
}
gvk, err := k8sClient.GetPreferredGVK(group, kind)
if err != nil {
return schema.GroupVersionKind{}, err
}
return gvk, nil
}
func addDataToDatabase(group string, kind string, log logr.Logger, k8sClient K8sClient, repo repository.Repository) error {
fmt.Println("Adding data to database for", group, kind)
context := context.TODO()
data, err := k8sClient.List(context, group, kind, "")
if err != nil {
return err
}
log.Info("found in the cluster", "count", len(data.Items))
for _, item := range data.Items {
err := UpsertResourceToDatabase(repo, &item, context)
if err != nil {
return err
}
}
return nil
}
func GetTemplateResourceFromCluster(kind string, group string, name string, log logr.Logger,
k8sClient K8sClient) (*unstructured.Unstructured, error) {
context := context.TODO()
data, error := k8sClient.Get(context, group, kind, "", name)
if error != nil {
return &unstructured.Unstructured{}, error
}
return data, nil
}
var ProcessTemplateResources = processTemplateResources
// applyTemplateResources applies the resources generated from the template.
func applyTemplateResources(
template *kumquatv1beta1.Template, re repository.Repository, log logr.Logger, k8sClient K8sClient, wm WatchManager) error {
return ProcessTemplateResources(template, re, log, k8sClient, wm)
}
func processTemplateResources(
template *kumquatv1beta1.Template,
re repository.Repository,
log logr.Logger,
k8sClient K8sClient,
wm WatchManager,
) error {
objMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(template)
if err != nil {
log.Error(err, "failed to convert template to unstructured map")
return err
}
resource, err := repository.MakeResource(objMap)
if err != nil {
log.Error(err, "unable to make resource from object")
return err
}
t, err := kumquatTemplate.NewTemplate(resource)
if err != nil {
log.Error(err, "unable to create template from resource")
return err
}
o, err := t.Evaluate(re)
if err != nil {
log.Error(err, "unable to evaluate template")
return err
}
desiredResourceIdentifiersSet := mapset.NewSet[ResourceIdentifier]()
desiredResources := make(map[ResourceIdentifier]*unstructured.Unstructured)
// Process each resource generated by the template
for i := 0; i < o.Output.ResourceCount(); i++ {
out, err := o.Output.ResultString(i)
if err != nil {
log.Error(err, "unable to get result string")
return err
}
jsonData, err := yaml.YAMLToJSON([]byte(out))
if err != nil {
log.Error(err, "unable to convert YAML to JSON")
return err
}
unstructuredObj := &unstructured.Unstructured{}
err = unstructuredObj.UnmarshalJSON(jsonData)
if err != nil {
log.Error(err, "unable to unmarshal JSON")
return err
}
ri := ResourceIdentifier{
Group: unstructuredObj.GroupVersionKind().Group,
Kind: unstructuredObj.GroupVersionKind().Kind,
Namespace: unstructuredObj.GetNamespace(),
Name: unstructuredObj.GetName(),
}
desiredResources[ri] = unstructuredObj
desiredResourceIdentifiersSet.Add(ri)
}
// Retrieve existing resources generated by this template
existingResourceIdentifiers := wm.GetGeneratedResources(template.Name)
if existingResourceIdentifiers == nil {
existingResourceIdentifiers = mapset.NewSet[ResourceIdentifier]()
}
fmt.Println("Existing resources", existingResourceIdentifiers)
// Convert the existing resources to a set for easy comparison
resourcesToDelete := existingResourceIdentifiers.Difference(desiredResourceIdentifiersSet)
// Delete unwanted resources
for ri := range resourcesToDelete.Iter() {
err := deleteResource(ri, k8sClient, log)
if err != nil {
log.Error(err, "unable to delete resource", "resource", ri)
return err
}
}
// Apply desired resources (create or update)
for ri := range desiredResourceIdentifiersSet.Iter() {
_, err := k8sClient.CreateOrUpdate(context.Background(), desiredResources[ri])
if err != nil {
log.Error(err, "unable to create or update resource", "resource", ri)
return err
}
}
// Update the generatedResources in WatchManager
wm.UpdateGeneratedResources(template.Name, desiredResourceIdentifiersSet)
return nil
}
func deleteResource(ri ResourceIdentifier, k8sClient K8sClient, log logr.Logger) error {
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(schema.GroupVersionKind{
Group: ri.Group,
Kind: ri.Kind,
})
obj.SetNamespace(ri.Namespace)
obj.SetName(ri.Name)
err := k8sClient.Delete(context.Background(), obj.GroupVersionKind().Group,
obj.GetKind(), obj.GetNamespace(), obj.GetName())
if err != nil && !strings.Contains(err.Error(), "not found") {
log.Error(err, "unable to delete resource", "resource", ri)
return err
}
log.Info("Deleted resource", "resource", ri)
return nil
}
package controller
import (
"context"
"fmt"
"kumquat/repository"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/controller-runtime/pkg/log"
)
func DeleteResourceFromDatabaseByNameAndNameSpace(
repo repository.Repository, kind, group, namespace, name string) error {
tableName := kind + "." + group
err := repo.Delete(namespace, name, tableName)
if err != nil {
log.Log.Error(err, "unable to delete record")
return err
}
log.Log.Info("Record deleted", "table", tableName, "namespace", namespace, "name", name)
return nil
}
func UpsertResourceToDatabase(
repo repository.Repository, resource *unstructured.Unstructured, ctx context.Context) error {
makedResource, err := repository.MakeResource(resource.Object)
if err != nil {
return fmt.Errorf("error creating resource: %w", err)
}
return repo.Upsert(makedResource)
}
// deleteTableFromDataBase deletes a table from the database.
func deleteTableFromDataBase(repo repository.Repository, tableName string) error {
err := repo.DropTable(tableName)
if err != nil {
// if the table does not exist, return nil
if err.Error() == "table does not exist: "+tableName {
return nil
}
log.Log.Error(err, "unable to drop table")
return err
}
log.Log.Info("Table dropped", "tableName", tableName)
return nil
}
package controller
import (
"context"
"fmt"
"kumquat/repository"
"sync"
mapset "github.com/deckarep/golang-set/v2"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/source"
)
type WatchManager interface {
UpdateGeneratedResources(templateName string, resourceSet mapset.Set[ResourceIdentifier])
UpdateWatch(templateName string, newGVKs []schema.GroupVersionKind) error
RemoveWatch(templateName string)
GetGeneratedResources(templateName string) mapset.Set[ResourceIdentifier]
GetManagedTemplates() map[string]map[schema.GroupVersionKind]struct{}
}
// ControllerEntry represents a dynamically managed controller.
type ControllerEntry struct {
controller controller.Controller
cancelFunc context.CancelFunc
ctx context.Context
}
type ResourceIdentifier struct {
Group string
Kind string
Namespace string
Name string
}
// WatchManager manages dynamic watches.
type watchManager struct {
refCounts map[schema.GroupVersionKind]int
watchedResources map[schema.GroupVersionKind]ControllerEntry
templates map[string]map[schema.GroupVersionKind]struct{}
mu sync.Mutex
cache cache.Cache
client client.Client
scheme *runtime.Scheme
mgr manager.Manager
K8sClient K8sClient
generatedResources map[string]mapset.Set[ResourceIdentifier]
repository repository.Repository
}
// NewWatchManager creates a new WatchManager instance.
func NewWatchManager(mgr manager.Manager, k8sClient K8sClient, repo repository.Repository) WatchManager {
wm := &watchManager{
watchedResources: make(map[schema.GroupVersionKind]ControllerEntry),
refCounts: make(map[schema.GroupVersionKind]int),
templates: make(map[string]map[schema.GroupVersionKind]struct{}),
generatedResources: make(map[string]mapset.Set[ResourceIdentifier]),
cache: mgr.GetCache(),
scheme: mgr.GetScheme(),
mgr: mgr,
K8sClient: k8sClient,
client: mgr.GetClient(),
repository: repo,
}
return wm
}
func (wm *watchManager) GetManagedTemplates() map[string]map[schema.GroupVersionKind]struct{} {
wm.mu.Lock()
defer wm.mu.Unlock()
return wm.templates
}
// AddWatch adds a watch for the specified template and GVKs.
func (wm *watchManager) AddWatch(templateName string, gvks []schema.GroupVersionKind) error {
if _, exists := wm.templates[templateName]; !exists {
wm.templates[templateName] = make(map[schema.GroupVersionKind]struct{})
}
for _, gvk := range gvks {
if _, exists := wm.templates[templateName][gvk]; exists {
continue
}
wm.templates[templateName][gvk] = struct{}{}
if wm.refCounts[gvk] == 0 {
tableName := gvk.Kind + "." + gvk.Group
if err := deleteTableFromDataBase(wm.repository, tableName); err != nil {
return err
}
if err := wm.startWatching(gvk); err != nil {
return err
}
}
wm.refCounts[gvk]++
log.Log.Info("Incremented watch reference count", "gvk", gvk, "count", wm.refCounts[gvk])
}
return nil
}
func (wm *watchManager) UpdateGeneratedResources(templateName string, resources mapset.Set[ResourceIdentifier]) {
wm.mu.Lock()
defer wm.mu.Unlock()
wm.generatedResources[templateName] = resources
}
// UpdateWatch updates the watch for the specified template with new GVKs.
func (wm *watchManager) UpdateWatch(templateName string, newGVKs []schema.GroupVersionKind) error {
wm.mu.Lock()
defer wm.mu.Unlock()
if _, exists := wm.templates[templateName]; !exists {
log.Log.Info("Resource template not found", "templateName", templateName)
return wm.AddWatch(templateName, newGVKs)
}
oldGVKs := wm.templates[templateName]
removedGVKs := make(map[schema.GroupVersionKind]struct{})
addedGVKs := make(map[schema.GroupVersionKind]struct{})
for gvk := range oldGVKs {
removedGVKs[gvk] = struct{}{}
}
for _, gvk := range newGVKs {
if _, exists := removedGVKs[gvk]; exists {
delete(removedGVKs, gvk)
} else {
addedGVKs[gvk] = struct{}{}
}
}
for gvk := range removedGVKs {
wm.removeWatchForGVK(templateName, gvk)
}
for gvk := range addedGVKs {
if err := wm.addWatchForGVK(templateName, gvk); err != nil {
return err
}
}
return nil
}
// RemoveWatch removes the watch for the specified template.
func (wm *watchManager) RemoveWatch(templateName string) {
wm.mu.Lock()
defer wm.mu.Unlock()
log.Log.Info("Removing watch", "templateName", templateName)
if watchedGVKs, exists := wm.templates[templateName]; exists {
for gvk := range watchedGVKs {
wm.refCounts[gvk]--
if wm.refCounts[gvk] <= 0 {
wm.stopWatching(gvk)
delete(wm.refCounts, gvk)
}
}
delete(wm.templates, templateName)
}
for gvk, count := range wm.refCounts {
log.Log.Info("Reference count", "gvk", gvk, "count", count)
}
wm.logActiveControllers()
}
// addWatchForGVK adds a watch for a specific GVK.
func (wm *watchManager) addWatchForGVK(templateName string, gvk schema.GroupVersionKind) error {
wm.templates[templateName][gvk] = struct{}{}
if wm.refCounts[gvk] == 0 {
if err := wm.startWatching(gvk); err != nil {
log.Log.Error(err, "unable to start watching", "gvk", gvk)
return err
}
}
wm.refCounts[gvk]++
log.Log.Info("Incremented watch reference count", "gvk", gvk, "count", wm.refCounts[gvk])
return nil
}
// removeWatchForGVK removes a watch for a specific GVK.
func (wm *watchManager) removeWatchForGVK(templateName string, gvk schema.GroupVersionKind) {
wm.refCounts[gvk]--
if wm.refCounts[gvk] <= 0 {
wm.stopWatching(gvk)
delete(wm.refCounts, gvk)
}
delete(wm.templates[templateName], gvk)
log.Log.Info("Decremented watch reference count", "gvk", gvk, "count", wm.refCounts[gvk])
}
// startWatching starts watching a specific GVK.
func (wm *watchManager) startWatching(gvk schema.GroupVersionKind) error {
log.Log.Info("Starting watch", "gvk", gvk)
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)
dynamicReconciler := NewDynamicReconciler(wm.client, gvk, wm.K8sClient, wm, wm.repository)
c, err := controller.NewUnmanaged("dynamic-controller-"+gvk.Kind, wm.mgr, controller.Options{
Reconciler: dynamicReconciler,
// Skip the name check introduced in v0.19.0 of controller-runtime via
// https://github.com/kubernetes-sigs/controller-runtime/pull/2902; we managed the controller lifecycle
// ourselves and it is not necessary to have unique names.
SkipNameValidation: ptr.To(true),
})
if err != nil {
fmt.Printf("Error creating controller: %v\n", err)
return err
}
kindSource := source.Kind(wm.mgr.GetCache(), obj, &unstructuredEventHandler{})
err = c.Watch(kindSource)
if err != nil {
return err
}
ctx, cancelFunc := context.WithCancel(context.Background())
wm.watchedResources[gvk] = ControllerEntry{controller: c, cancelFunc: cancelFunc, ctx: ctx}
go func() {
if err := c.Start(ctx); err != nil && err != context.Canceled {
log.Log.Error(err, "unable to start controller", "gvk", gvk)
}
}()
return nil
}
// stopWatching stops watching a specific GVK.
func (wm *watchManager) stopWatching(gvk schema.GroupVersionKind) {
log.Log.Info("Stopping watch", "gvk", gvk)
if entry, exists := wm.watchedResources[gvk]; exists {
entry.cancelFunc()
<-entry.ctx.Done()
delete(wm.watchedResources, gvk)
}
}
// logs all active controllers.
func (wm *watchManager) logActiveControllers() {
log.Log.Info("Listing all active controllers:")
for gvk, entry := range wm.watchedResources {
log.Log.Info("Active controller", "gvk", gvk, "context", entry.ctx)
}
}
// DeleteRecord deletes a record from the specified table.
func DeleteRecord(table, namespace, name string, repo repository.Repository) error {
err := repo.Delete(namespace, name, table)
if err != nil {
log.Log.Error(err, "unable to delete record")
return err
}
log.Log.Info("Record deleted", "table", table, "namespace", namespace, "name", name)
return nil
}
func (wm *watchManager) GetGeneratedResources(templateName string) mapset.Set[ResourceIdentifier] {
wm.mu.Lock()
defer wm.mu.Unlock()
return wm.generatedResources[templateName]
}
// unstructuredEventHandler handles events for unstructured resources.
type unstructuredEventHandler struct{}
func (h *unstructuredEventHandler) Create(
ctx context.Context,
evt event.TypedCreateEvent[*unstructured.Unstructured],
q workqueue.TypedRateLimitingInterface[ctrl.Request],
) {
q.Add(ctrl.Request{NamespacedName: client.ObjectKeyFromObject(evt.Object)})
}
func (h *unstructuredEventHandler) Update(
ctx context.Context,
evt event.TypedUpdateEvent[*unstructured.Unstructured],
q workqueue.TypedRateLimitingInterface[ctrl.Request],
) {
q.Add(ctrl.Request{NamespacedName: client.ObjectKeyFromObject(evt.ObjectNew)})
}
func (h *unstructuredEventHandler) Delete(
ctx context.Context,
evt event.TypedDeleteEvent[*unstructured.Unstructured],
q workqueue.TypedRateLimitingInterface[ctrl.Request],
) {
q.Add(ctrl.Request{NamespacedName: client.ObjectKeyFromObject(evt.Object)})
}
func (h *unstructuredEventHandler) Generic(
ctx context.Context,
evt event.TypedGenericEvent[*unstructured.Unstructured],
q workqueue.TypedRateLimitingInterface[ctrl.Request],
) {
q.Add(ctrl.Request{NamespacedName: client.ObjectKeyFromObject(evt.Object)})
}
package main
import (
"flag"
"fmt"
"kumquat/repository"
"kumquat/store"
"kumquat/template"
"log/slog"
"os"
)
func main() {
var repo repository.Repository
repo, err := repository.NewSQLiteRepository()
if err != nil {
slog.Error("Unable to create repository", "err", err)
panic(err)
}
defer repo.Close() //nolint:errcheck
inDir := flag.String("in", "sampledata", "directory path to read Kubernetes resources")
flag.Parse()
err = repository.LoadYAMLFromDirectoryTree(os.DirFS("."), *inDir, repo)
if err != nil {
slog.Error("Unable to load directory tree", "err", err)
panic(err)
}
tplrs, err := repo.Query(
/* sql */ `SELECT template.data AS tpl FROM "` + template.TemplateResourceType + `" AS template`,
)
if err != nil {
slog.Error("Unable to find template", "err", err)
panic(err)
}
// Process every Template
templates := make([]*template.Template, 0, len(tplrs.Results))
for _, tplrs := range tplrs.Results {
tplres := tplrs["tpl"]
t, err := template.NewTemplate(tplres)
if err != nil {
fmt.Printf("%v\n", err)
continue
}
templates = append(templates, t)
fmt.Printf("Loaded Template %s\n", t.Name())
}
for _, t := range templates {
o, err := t.Evaluate(repo)
if err != nil {
fmt.Printf("%v\n", err)
continue
}
generateOutput(o)
}
}
func generateOutput(o *template.TemplateOutput) {
// for loop over data
for i := 0; i < o.Output.ResourceCount(); i++ {
out, err := o.Output.ResultString(i)
if err != nil {
panic(err)
}
fileName := o.FileNames[i]
// use WriteFile function in store package to write the output to a file
err = store.WriteToFile(fileName, "", out)
if err != nil {
panic(err)
}
}
}
package cue
import (
"fmt"
"kumquat/renderer"
"strings"
cuelang "cuelang.org/go/cue"
"cuelang.org/go/cue/cuecontext"
cue_errors "cuelang.org/go/cue/errors"
"gopkg.in/yaml.v3"
)
func init() {
err := renderer.Register("cue", func(template, source string) (renderer.Renderer, error) {
return NewCUERenderer(template, source)
})
if err != nil {
panic(err)
}
}
type CUERenderer struct {
config string
source string
ctx *cuelang.Context
}
func NewCUERenderer(template string, source string) (*CUERenderer, error) {
tpl := CUERenderer{
config: template,
source: source,
ctx: cuecontext.New(),
}
return &tpl, nil
}
func (r *CUERenderer) Render(results any, output *renderer.Output) error {
err := r.evaluate(results, output)
if err != nil {
return fmt.Errorf("error evaluating CUE template '%s': %w", r.source, err)
}
return nil
}
func (t *CUERenderer) evaluate(r any, o *renderer.Output) error {
data := t.ctx.Encode(map[string]any{"DATA": r})
compiled := t.ctx.CompileString(t.config, cuelang.Scope(data))
v := compiled.Eval()
if v.Err() != nil {
return newRendererError(v.Err())
}
return appendOutput(o, v)
}
func appendOutput(o *renderer.Output, v cuelang.Value) error {
switch t := v.Kind(); t {
case cuelang.ListKind:
var output []map[string]any
err := v.Decode(&output)
if err != nil {
return fmt.Errorf("error decoding output: %w", err)
}
var outputs []string
for i := 0; i < len(output); i++ {
outputByteArray, err := yaml.Marshal(output[i])
if err != nil {
return fmt.Errorf("error decoding output: %w", err)
}
outputs = append(outputs, string(outputByteArray))
}
o.Append(strings.Join(outputs, "---\n"))
case cuelang.StructKind:
var output map[string]any
err := v.Decode(&output)
if err != nil {
return fmt.Errorf("error decoding output: %w", err)
}
// convert output to string
outputByteArray, err := yaml.Marshal(output)
if err != nil {
return fmt.Errorf("error decoding output: %w", err)
}
o.Append(string(outputByteArray))
case cuelang.BottomKind:
return fmt.Errorf("output is nothing concrete")
default:
return fmt.Errorf("output is unsupported type '%v'", t)
}
return nil
}
func newRendererError(err error) *renderer.Error {
pos := cue_errors.Positions(err)
if len(pos) > 0 {
return renderer.NewError(err, pos[0].Line(), pos[0].Column())
}
return renderer.NewError(err, 0, 0)
}
package renderer
import "fmt"
type LookupError struct {
rendererName string
}
func (e *LookupError) Error() string {
return fmt.Sprintf("unknown renderer '%s'", e.rendererName)
}
type Error struct {
err error
line int
column int
}
func NewError(err error, line, column int) *Error {
if line == 0 {
column = 0
}
return &Error{err, line, column}
}
// Error returns the error message.
func (e *Error) Error() string {
if e.line == 0 {
return fmt.Sprintf("[line ?, column ?] %s", e.err.Error())
}
if e.column == 0 {
return fmt.Sprintf("[line %d, column ?] %s", e.line, e.err.Error())
}
return fmt.Sprintf("[line %d, column %d] %s", e.line, e.column, e.err.Error())
}
// Unwrap returns the wrapped error.
func (e *Error) Unwrap() error {
return e.err
}
// Line returns the line number where the error occurred; 0 if unknown.
func (e *Error) Line() int {
return e.line
}
// Column returns the column number where the error occurred; 0 if unknown.
func (e *Error) Column() int {
return e.column
}
package gotemplate
import (
"bytes"
"errors"
"fmt"
"kumquat/renderer"
"regexp"
"strconv"
"text/template"
)
func init() {
err := renderer.Register("gotemplate", func(template, source string) (renderer.Renderer, error) {
return NewGoRenderer(template, source)
})
if err != nil {
panic(err)
}
}
type GoRenderer struct {
template *template.Template
source string
}
func (r *GoRenderer) Render(results any, output *renderer.Output) error {
var buffer bytes.Buffer
err := r.template.Execute(&buffer, results)
if err != nil {
return fmt.Errorf("error executing Go template: %w", newRendererError(err, r.source))
}
output.Append(buffer.String())
return nil
}
func NewGoRenderer(tmpl string, source string) (*GoRenderer, error) {
t, err := template.New(source).Option("missingkey=zero").Parse(tmpl)
if err != nil {
return nil, fmt.Errorf("error parsing Go template: %w", newRendererError(err, source))
}
return &GoRenderer{template: t, source: source}, nil
}
var lineRE = regexp.MustCompile(`^template: .*?:(\d+)(:(\d+))?: (.*)`)
func newRendererError(err error, source string) *renderer.Error {
s := err.Error()
// parse the line number from the error message
var line, column int
matches := lineRE.FindStringSubmatch(s)
if matches != nil {
// parse error returns 0; same as line unknown
line, _ = strconv.Atoi(matches[1])
// parse error returns 0; same as column unknown
column, _ = strconv.Atoi(matches[3])
s = matches[4]
}
// replace "sourcefile:line#" with "line line#" in the remainder of the error message
re := regexp.MustCompile(source + `:(\d+)`)
s = re.ReplaceAllString(s, "line $1")
return renderer.NewError(errors.New(s), line, column)
}
package jsonnet
import (
"encoding/json"
"errors"
"fmt"
"kumquat/renderer"
"reflect"
js "github.com/google/go-jsonnet"
"github.com/google/go-jsonnet/ast"
)
func init() {
err := renderer.Register("jsonnet", func(template, source string) (renderer.Renderer, error) {
return NewJsonnetRenderer(template, source)
})
if err != nil {
panic(err)
}
}
type JsonnetRenderer struct {
vm *js.VM
template ast.Node
}
func NewJsonnetRenderer(template string, source string) (*JsonnetRenderer, error) {
parsed, err := js.SnippetToAST(source, template)
if err != nil {
return nil, fmt.Errorf("error parsing Jsonnet: %w", newRendererError(err))
}
return &JsonnetRenderer{template: parsed, vm: js.MakeVM()}, nil
}
func newRendererError(err error) *renderer.Error {
re, ok := err.(js.RuntimeError)
if ok {
for _, frame := range re.StackTrace {
if frame.Loc.Begin.Line > 0 {
return renderer.NewError(err, frame.Loc.Begin.Line, frame.Loc.Begin.Column)
}
}
} else {
// Not a RuntimeError, maybe we can find a StaticError
for e := err; e != nil; e = errors.Unwrap(err) {
ev := reflect.ValueOf(e)
mv := ev.MethodByName("Loc") // StaticError has method Loc() returning LocationRange
if mv != reflect.ValueOf(nil) {
rvs := mv.Call(nil) // Call Loc() to get LocationRange
bv := rvs[0].FieldByName("Begin") // LocationRange has a Begin field of type Location
lv := bv.FieldByName("Line") // Location has Line
cv := bv.FieldByName("Column") // Location has Column
return renderer.NewError(err, int(lv.Int()), int(cv.Int()))
}
}
}
return renderer.NewError(err, 0, 0)
}
func (r *JsonnetRenderer) Render(results any, output *renderer.Output) error {
b, err := json.Marshal(results)
if err != nil {
return fmt.Errorf("error converting results to Jsonnet: %w", err)
}
code := string(b)
r.vm.ExtCode("data", code)
js, err := r.vm.Evaluate(r.template)
if err != nil {
return fmt.Errorf("error rendering Jsonnet: %w", newRendererError(err))
}
output.Append(js)
return nil
}
package renderer
import (
"fmt"
"kumquat/repository"
)
type Renderer interface {
// Render the results and add the output to the provided output object. The results may be a single result
// of type `map[string]any` or a slice of such results (i.e. `[]map[string]any`).
Render(results any, output *Output) error
}
// Render the results using the provided renderer. If batchMode is true, the renderer will be called once
// with all the results; otherwise, the renderer will be called once for each result.
func Render(renderer Renderer, results []map[string]repository.Resource, batchMode bool) (*Output, error) {
var o *Output
resultsWithoutResources := StripResourcesFromResults(results)
if batchMode {
o = NewOutput(1)
err := renderer.Render(resultsWithoutResources, o)
if err != nil {
return nil, err
}
} else {
o = NewOutput(len(results))
for _, r := range resultsWithoutResources {
err := renderer.Render(r, o)
if err != nil {
return nil, err
}
}
}
return o, nil
}
type RendererMaker func(string, string) (Renderer, error)
var rendererRegistry map[string]RendererMaker
func Register(name string, f RendererMaker) error {
if rendererRegistry == nil {
rendererRegistry = make(map[string]RendererMaker)
}
if rendererRegistry[name] != nil {
return fmt.Errorf("renderer '%s' already registered", name)
}
fmt.Printf("Renderer '%s' registered.\n", name)
rendererRegistry[name] = f
return nil
}
func MakeRenderer(name, template, source string) (Renderer, error) {
f, ok := rendererRegistry[name]
if !ok {
return nil, &LookupError{rendererName: name}
}
return f(template, source)
}
type Output struct {
output []string
}
func NewOutput(expectedSize int) *Output {
return &Output{output: make([]string, 0, expectedSize)}
}
func NewOutputFromSlice(output []string) *Output {
return &Output{output: output}
}
func (o *Output) Append(result string) {
o.output = append(o.output, result)
}
func (o *Output) ResourceCount() int {
return len(o.output)
}
func (o *Output) ResultString(resource int) (string, error) {
if resource < 0 || resource >= len(o.output) {
return "", fmt.Errorf("resource index out of range")
}
return o.output[resource], nil
}
package renderer
import (
"kumquat/repository"
)
// stripResourcesFromResults removes the Resource objects from the results, replacing them with the underlying content.
func StripResourcesFromResults(results []map[string]repository.Resource) []map[string]any {
strippedResults := make([]map[string]any, len(results))
for i, result := range results {
stripped := make(map[string]any)
for k, v := range result {
stripped[k] = v.Content()
}
strippedResults[i] = stripped
}
return strippedResults
}
package repository
import (
"fmt"
"io/fs"
"strings"
"gopkg.in/yaml.v3"
)
type Resource struct {
group string
version string
kind string
namespace string
name string
content map[string]any
}
// Group returns the API group of the resource.
func (r Resource) Group() string {
return r.group
}
// Version returns the API version of the resource.
func (r Resource) Version() string {
return r.version
}
// Kind returns the kind of the resource.
func (r Resource) Kind() string {
return r.kind
}
// Namespace returns the namespace of the resource.
func (r Resource) Namespace() string {
return r.namespace
}
// Name returns the name of the resource.
func (r Resource) Name() string {
return r.name
}
// Content returns the content of the resource.
// Do not alter the returned map or the resource could become inconsistent.
func (r Resource) Content() map[string]any {
return r.content
}
// MakeResource creates a new Resource from a map of the resource's content.
func MakeResource(content map[string]any) (Resource, error) {
apiVersionRaw, ok := content["apiVersion"]
if !ok {
return Resource{}, fmt.Errorf("missing apiVersion")
}
apiVersion, ok := apiVersionRaw.(string)
if !ok {
return Resource{}, fmt.Errorf("apiVersion '%#v' must be a string but it is %T", apiVersionRaw, apiVersionRaw)
}
var group, version string
if apiVersion == "v1" {
group = "core"
version = "v1"
} else {
var found bool
group, version, found = strings.Cut(apiVersion, "/")
if !found {
return Resource{}, fmt.Errorf("missing '/' separator in apiVersion'%s'", apiVersion)
}
if len(group) == 0 {
return Resource{}, fmt.Errorf("missing group in apiVersion '%s'", apiVersion)
}
if len(version) == 0 {
return Resource{}, fmt.Errorf("missing version in apiVersion '%s'", apiVersion)
}
}
kindRaw, ok := content["kind"]
if !ok {
return Resource{}, fmt.Errorf("missing kind")
}
kind, ok := kindRaw.(string)
if !ok {
return Resource{}, fmt.Errorf("kind '%#v' must be a string but it is %T", kindRaw, kindRaw)
}
metadataRaw, ok := content["metadata"]
if !ok {
return Resource{}, fmt.Errorf("missing metadata")
}
metadata, ok := metadataRaw.(map[string]any)
if !ok {
return Resource{}, fmt.Errorf("metadata '%#v' must be a map but it is %T", metadataRaw, metadataRaw)
}
namespaceRaw, ok := metadata["namespace"]
if !ok {
namespaceRaw = ""
}
namespace, ok := namespaceRaw.(string)
if !ok {
return Resource{}, fmt.Errorf("metadata.namespace '%#v' must be a string but it is %T", namespaceRaw, namespaceRaw)
}
nameRaw, ok := metadata["name"]
if !ok {
return Resource{}, fmt.Errorf("missing name")
}
name, ok := nameRaw.(string)
if !ok {
return Resource{}, fmt.Errorf("metadata.name '%#v' must be a string but it is %T", nameRaw, nameRaw)
}
res := Resource{
group: group,
version: version,
kind: kind,
namespace: namespace,
name: name,
content: content,
}
return res, nil
}
type Repository interface {
Query(query string) (ResultSet, error)
Close() error
Upsert(resource Resource) error
Delete(namespace, name, table string) error
ExtractTableNamesFromQuery(query string) []string
DropTable(table string) error
}
type ResultSet struct {
Names []string
Results []map[string]Resource
}
// LoadYAMLFromDirectoryTree loads all YAML files from a directory tree into a repository.
func LoadYAMLFromDirectoryTree(filesystem fs.FS, directory string, repo Repository) error {
return fs.WalkDir(filesystem, directory, func(path string, d fs.DirEntry, pathErr error) error {
if pathErr != nil {
return pathErr
}
if d.IsDir() {
return nil
}
data, err := fs.ReadFile(filesystem, path)
if err != nil {
return fmt.Errorf("file '%s': %w", path, err)
}
parsed := make(map[string]any)
err = yaml.Unmarshal(data, &parsed)
if err != nil {
return fmt.Errorf("file '%s': %w", path, err)
}
res, err := MakeResource(parsed)
if err != nil {
return fmt.Errorf("file '%s': %w", path, err)
}
return repo.Upsert(res)
})
}
package repository
import (
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"regexp"
"strings"
"sync"
_ "github.com/mattn/go-sqlite3"
"sigs.k8s.io/controller-runtime/pkg/log"
)
type SQLiteRepository struct {
db *sql.DB
StoredKinds map[string]bool
mu sync.Mutex
}
type GroupKind struct {
Group string
Kind string
}
var tableErrorRegexp = regexp.MustCompile(`^no such table: (.*)$`)
func (r *SQLiteRepository) Query(query string) (ResultSet, error) {
r.mu.Lock()
defer r.mu.Unlock()
slog.Debug("Running query", "query", query)
var rows *sql.Rows
var err error
for rows, err = r.db.Query(query); err != nil; rows, err = r.db.Query(query) {
m := tableErrorRegexp.FindStringSubmatch(err.Error())
// print m to see the table name
log.Log.Info("Error running qsdsduery", "error", err.Error(), "table", m)
if m != nil {
fmt.Println("Table not found, creating table", "table", m[1])
missingTable := m[1]
err := r.createTable(missingTable)
if err != nil {
return ResultSet{}, fmt.Errorf("unable to create empty table to run query: %w", err)
}
} else {
return ResultSet{}, fmt.Errorf("error running query: %w", err)
}
}
defer rows.Close() //nolint:errcheck
/*
* Process Columns
*/
columnNames, err := rows.Columns()
if err != nil {
return ResultSet{}, fmt.Errorf("error getting query result column information: %w", err)
}
/*
* Allocate temporary storage to Scan() into
*/
columnValues := make([]any, len(columnNames))
columnValuePtrs := make([]any, len(columnNames))
for i := 0; i < len(columnNames); i++ {
columnValuePtrs[i] = &columnValues[i]
}
/*
* Process Rows
*/
var results = make([]map[string]Resource, 0)
for rows.Next() {
err = rows.Scan(columnValuePtrs...)
if err != nil {
return ResultSet{}, fmt.Errorf("error while scanning query result: %w", err)
}
var result = make(map[string]Resource)
for i, columnName := range columnNames {
var parsed any
err = json.Unmarshal([]byte(columnValues[i].(string)), &parsed)
if err != nil {
return ResultSet{}, fmt.Errorf("error while unmarshaling column '%s' to JSON: %w",
strings.Trim(columnName, "'"), err)
}
switch v := parsed.(type) {
case map[string]any:
res, err := MakeResource(v)
if err != nil {
return ResultSet{}, fmt.Errorf("error retrieving resource from column '%s': %w",
strings.Trim(columnName, "'"), err)
}
result[columnName] = res
default:
return ResultSet{}, fmt.Errorf("expected JSON object in column '%s' but got %T",
strings.Trim(columnName, "'"), v)
}
}
results = append(results, result)
}
err = rows.Err()
if err != nil {
return ResultSet{}, fmt.Errorf("error while iterating query result: %w", err)
}
resultset := ResultSet{
Names: columnNames,
Results: results,
}
return resultset, nil
}
func (r *SQLiteRepository) Close() error {
return r.db.Close()
}
func (r *SQLiteRepository) createTable(table string) error {
if r.StoredKinds[table] {
return fmt.Errorf("table already exists: %s", table)
}
_, err := r.db.Exec( /* sql */ `CREATE TABLE "` + table +
`" (namespace TEXT NOT NULL, name TEXT NOT NULL, data TEXT NOT NULL, PRIMARY KEY (namespace, name)) STRICT`)
if err != nil {
return fmt.Errorf("unable to create table: %w", err)
}
r.StoredKinds[table] = true
return nil
}
func (r *SQLiteRepository) Delete(namespace, name, table string) error {
r.mu.Lock()
defer r.mu.Unlock()
// Prepare the SQL statement to delete the record
query := `DELETE FROM "` + table + `" WHERE namespace = ? AND name = ?`
// Execute the query
_, err := r.db.Exec(query, namespace, name)
if err != nil {
return fmt.Errorf("unable to delete record: %w", err)
}
return nil
}
func (r *SQLiteRepository) Upsert(resource Resource) error {
r.mu.Lock()
defer r.mu.Unlock()
byteJSON, err := json.Marshal(resource.Content())
if err != nil {
return fmt.Errorf("unable to encode resource as JSON: %w", err)
}
table := resource.Kind() + "." + resource.Group()
slog.Debug("Upserting resource", "table", table, "namespace", resource.Namespace(), "name", resource.Name())
contentJSON := string(byteJSON)
if !r.StoredKinds[table] {
err := r.createTable(table)
if err != nil {
return err
}
}
_, err = r.db.Exec( /* sql */ `INSERT INTO "`+table+`" (namespace, name, data) VALUES (?,?,?)
ON CONFLICT(namespace, name) DO UPDATE SET data=excluded.data`,
resource.Namespace(), resource.Name(), contentJSON)
if err != nil {
return fmt.Errorf("unable to upsert resource: %w", err)
}
return nil
}
func (r *SQLiteRepository) ExtractTableNamesFromQuery(query string) []string {
// Extract table names from query
tableNames := make([]string, 0)
tableNameSet := make(map[string]struct{})
// Find all table names in the query, including subqueries and quoted table names
tableNameRegexp := regexp.MustCompile(`(?i)(?:FROM|JOIN|CROSS JOIN)\s+["]?(\w+(\.\w+)*|[\w.]+)["]?`)
tableNameMatches := tableNameRegexp.FindAllStringSubmatch(query, -1)
// Extract table names from the matches and add to set to avoid duplicates
for _, match := range tableNameMatches {
tableName := match[1]
if _, exists := tableNameSet[tableName]; !exists {
tableNames = append(tableNames, tableName)
tableNameSet[tableName] = struct{}{}
}
}
return tableNames
}
func (r *SQLiteRepository) DropTable(table string) error {
if !r.StoredKinds[table] {
return fmt.Errorf("table does not exist: %s", table)
}
_, err := r.db.Exec( /* sql */ `DROP TABLE "` + table + `"`)
if err != nil {
return fmt.Errorf("unable to drop table: %w", err)
}
delete(r.StoredKinds, table)
log.Log.Info("Table dropped", "tableName", table)
return nil
}
func NewSQLiteRepository() (*SQLiteRepository, error) {
db, err := sql.Open("sqlite3", ":memory:")
if err != nil {
return nil, err
}
repo := &SQLiteRepository{
db: db,
StoredKinds: make(map[string]bool),
mu: sync.Mutex{},
}
return repo, nil
}
package store
import (
"os"
"path/filepath"
)
// writeFile takes a file name, file path, and a string content, then writes the content to the specified file.
func WriteToFile(fileName, filePath, content string) error {
// Combine the file path and file name to get the full file path
fullPath := filepath.Join(filePath, fileName)
// Create the directory path if it doesn't exist
if err := os.MkdirAll(filepath.Dir(fullPath), 0755); err != nil {
return err
}
// Convert the string content to a byte slice, as WriteFile expects []byte
byteContent := []byte(content)
// Write the content to the file
err := os.WriteFile(fullPath, byteContent, 0644)
if err != nil {
return err
}
return nil
}
package template
import (
"fmt"
"strings"
)
type FieldValidationError struct {
template string
field string
err error
}
func (e *FieldValidationError) Error() string {
return fmt.Sprintf("'%s'.%s: %v", e.template, e.field, e.Unwrap())
}
func (e *FieldValidationError) Unwrap() error {
return e.err
}
type ValidationErrors struct {
template string
errors []error
}
func NewValidationErrors(template string) *ValidationErrors {
return &ValidationErrors{template: template}
}
func (e *ValidationErrors) Error() string {
if !e.HasErrors() {
return fmt.Sprintf("Template '%s': unspecified validation error", e.template)
}
var errs []string = make([]string, len(e.errors)+1)
errs[0] = fmt.Sprintf("Invalid %s '%s':", TemplateKind, e.template)
for i, err := range e.errors {
errs[i+1] = err.Error()
}
return strings.Join(errs, "\n")
}
func (e *ValidationErrors) Unwrap() []error {
return e.errors
}
func (e *ValidationErrors) Append(err error) {
if err != nil {
if e.errors == nil {
e.errors = make([]error, 0, 1)
}
e.errors = append(e.errors, err)
}
}
func (e *ValidationErrors) HasErrors() bool {
return len(e.errors) > 0
}
func (e *ValidationErrors) Template() string {
return e.template
}
package template
import (
"errors"
"fmt"
"kumquat/renderer"
"kumquat/renderer/gotemplate"
"kumquat/repository"
"strings"
)
const TemplateAPIGroup = "kumquat.guidewire.com"
const TemplateKind = "Template"
const TemplateResourceType = TemplateKind + "." + TemplateAPIGroup
type Template struct {
name string
query string
fileNameTemplate renderer.Renderer
renderer renderer.Renderer
batchMode bool
}
type TemplateOutput struct {
Output *renderer.Output
FileNames []string
}
func NewTemplate(r repository.Resource) (*Template, error) {
// name is guaranteed to be present since it comes from a Resource
name := r.Content()["metadata"].(map[string]any)["name"].(string)
var validationError = &ValidationErrors{template: name}
// apiVersion is guaranteed to be a string with a "/" in it because it comes from a Resource
apiVersion := r.Content()["apiVersion"].(string)
s := strings.SplitN(apiVersion, "/", 2)
if len(s) != 2 || s[0] != TemplateAPIGroup {
validationError.Append(&FieldValidationError{
template: name,
field: "apiVersion",
err: fmt.Errorf("'%s' should be '%s'", s[0], TemplateAPIGroup),
})
}
if s[1] != "v1beta1" {
validationError.Append(&FieldValidationError{
template: name,
field: "apiVersion",
err: fmt.Errorf("'%s' unsupported; supported versions are: 'v1beta1'", s[1]),
})
}
// kind is guaranteed to be a string because it comes from a Resource
kind := r.Content()["kind"].(string)
if kind != TemplateKind {
validationError.Append(&FieldValidationError{
template: name,
field: "kind",
err: fmt.Errorf("'%s' should be '%s'", kind, TemplateKind),
})
}
spec, ok := r.Content()["spec"].(map[string]any)
if !ok {
validationError.Append(&FieldValidationError{
template: name,
field: "spec",
err: errors.New("missing or not a map"),
})
}
var query string
var template map[string]any
if len(spec) > 0 {
query, _ = spec["query"].(string)
if query == "" {
validationError.Append(&FieldValidationError{
template: name,
field: "spec.query",
err: errors.New("missing or not a string"),
})
}
template, _ = spec["template"].(map[string]any)
if len(template) == 0 {
validationError.Append(&FieldValidationError{
template: name,
field: "spec.template",
err: errors.New("missing or not a map"),
})
}
}
var lang, data, fileNameTemplate string
if len(template) > 0 {
data, _ = template["data"].(string)
if data == "" {
validationError.Append(&FieldValidationError{
template: name,
field: "spec.template.data",
err: errors.New("missing or not a string"),
})
}
lang, _ = template["language"].(string)
if lang == "" {
validationError.Append(&FieldValidationError{
template: name,
field: "spec.template.language",
err: errors.New("missing or not a string"),
})
}
fileNameTemplate, _ = template["fileName"].(string)
if fileNameTemplate == "" {
validationError.Append(&FieldValidationError{
template: name,
field: "spec.template.fileName",
err: errors.New("missing or not a string"),
})
}
}
// batchModeProcessing false unless explicitly set to true in template
batchModeProcessing := false
if val, ok := template["batchModeProcessing"].(bool); ok {
batchModeProcessing = val
}
t := &Template{
name: name,
query: query,
batchMode: batchModeProcessing,
}
var err error
if lang != "" {
t.renderer, err = renderer.MakeRenderer(lang, data, name)
}
if err != nil {
if _, ok := err.(*renderer.LookupError); ok {
validationError.Append(&FieldValidationError{
template: name,
field: "spec.template.language",
err: err,
})
} else {
fmt.Printf("Error making renderer: %#v\n", err)
validationError.Append(&FieldValidationError{
template: name,
field: "spec.template.data",
err: err,
})
}
}
err = nil
if fileNameTemplate != "" {
t.fileNameTemplate, err = gotemplate.NewGoRenderer(fileNameTemplate, name)
}
if err != nil {
validationError.Append(&FieldValidationError{
template: name,
field: "spec.template.fileName",
err: err,
})
}
if validationError.HasErrors() {
return nil, validationError
}
return t, nil
}
// Evaluate runs the query and renders the results using the template.
func (t *Template) Evaluate(repo repository.Repository) (*TemplateOutput, error) {
resultset, err := repo.Query(t.query)
if err != nil {
return nil, fmt.Errorf("query failed in Template '%s': %w", t.name, err)
}
output, err := renderer.Render(t.renderer, resultset.Results, t.batchMode)
if err != nil {
return nil, fmt.Errorf("error rendering Template '%s': %w", t.name, err)
}
fileNamesOutput, err := renderer.Render(t.fileNameTemplate, resultset.Results, t.batchMode)
if err != nil {
return nil, fmt.Errorf("error rendering fileName in Template '%s': %w", t.name, err)
}
// TODO revisit this when we start outputting resources to Kubernetes directly
// When putting results in files, these invariants should hold:
if t.batchMode {
if fileNamesOutput.ResourceCount() != 1 {
return nil, fmt.Errorf("expected one file name in batch mode; got %d", fileNamesOutput.ResourceCount())
}
if output.ResourceCount() != 1 {
return nil, fmt.Errorf("expected one rendered resource in batch mode; got %d", output.ResourceCount())
}
} else {
if fileNamesOutput.ResourceCount() != len(resultset.Results) {
return nil, fmt.Errorf("expected %d file names; got %d", len(resultset.Results), fileNamesOutput.ResourceCount())
}
if output.ResourceCount() != len(resultset.Results) {
return nil, fmt.Errorf("expected %d rendered resources; got %d", len(resultset.Results), output.ResourceCount())
}
}
fileNames := make([]string, fileNamesOutput.ResourceCount())
for i := 0; i < fileNamesOutput.ResourceCount(); i++ {
fileName, err := fileNamesOutput.ResultString(i)
if err != nil {
return nil, fmt.Errorf("error getting filename: %w", err)
}
fileNames[i] = strings.TrimSpace(fileName)
}
return &TemplateOutput{Output: output, FileNames: fileNames}, nil
}
func (t *Template) BatchMode() bool {
return t.batchMode
}
func (t *Template) Name() string {
return t.name
}
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
"fmt"
"os"
"os/exec"
"strings"
. "github.com/onsi/ginkgo/v2" //nolint:golint,revive
)
const (
prometheusOperatorVersion = "v0.72.0"
prometheusOperatorURL = "https://github.com/prometheus-operator/prometheus-operator/" +
"releases/download/%s/bundle.yaml"
certmanagerVersion = "v1.14.4"
certmanagerURLTmpl = "https://github.com/jetstack/cert-manager/releases/download/%s/cert-manager.yaml"
)
func warnError(err error) {
_, _ = fmt.Fprintf(GinkgoWriter, "warning: %v\n", err)
}
// InstallPrometheusOperator installs the prometheus Operator to be used to export the enabled metrics.
func InstallPrometheusOperator() error {
url := fmt.Sprintf(prometheusOperatorURL, prometheusOperatorVersion)
cmd := exec.Command("kubectl", "create", "-f", url)
_, err := Run(cmd)
return err
}
// Run executes the provided command within this context
func Run(cmd *exec.Cmd) ([]byte, error) {
dir, _ := GetProjectDir()
cmd.Dir = dir
if err := os.Chdir(cmd.Dir); err != nil {
_, _ = fmt.Fprintf(GinkgoWriter, "chdir dir: %s\n", err)
}
cmd.Env = append(os.Environ(), "GO111MODULE=on")
command := strings.Join(cmd.Args, " ")
_, _ = fmt.Fprintf(GinkgoWriter, "running: %s\n", command)
output, err := cmd.CombinedOutput()
if err != nil {
return output, fmt.Errorf("%s failed with error: (%v) %s", command, err, string(output))
}
return output, nil
}
// UninstallPrometheusOperator uninstalls the prometheus
func UninstallPrometheusOperator() {
url := fmt.Sprintf(prometheusOperatorURL, prometheusOperatorVersion)
cmd := exec.Command("kubectl", "delete", "-f", url)
if _, err := Run(cmd); err != nil {
warnError(err)
}
}
// UninstallCertManager uninstalls the cert manager
func UninstallCertManager() {
url := fmt.Sprintf(certmanagerURLTmpl, certmanagerVersion)
cmd := exec.Command("kubectl", "delete", "-f", url)
if _, err := Run(cmd); err != nil {
warnError(err)
}
}
// InstallCertManager installs the cert manager bundle.
func InstallCertManager() error {
url := fmt.Sprintf(certmanagerURLTmpl, certmanagerVersion)
cmd := exec.Command("kubectl", "apply", "-f", url)
if _, err := Run(cmd); err != nil {
return err
}
// Wait for cert-manager-webhook to be ready, which can take time if cert-manager
// was re-installed after uninstalling on a cluster.
cmd = exec.Command("kubectl", "wait", "deployment.apps/cert-manager-webhook",
"--for", "condition=Available",
"--namespace", "cert-manager",
"--timeout", "5m",
)
_, err := Run(cmd)
return err
}
// LoadImageToKindClusterWithName loads a local docker image to the kind cluster
func LoadImageToKindClusterWithName(name string) error {
cluster := "kind"
if v, ok := os.LookupEnv("KIND_CLUSTER"); ok {
cluster = v
}
kindOptions := []string{"load", "docker-image", name, "--name", cluster}
cmd := exec.Command("kind", kindOptions...)
_, err := Run(cmd)
return err
}
// GetNonEmptyLines converts given command output string into individual objects
// according to line breakers, and ignores the empty elements in it.
func GetNonEmptyLines(output string) []string {
var res []string
elements := strings.Split(output, "\n")
for _, element := range elements {
if element != "" {
res = append(res, element)
}
}
return res
}
// GetProjectDir will return the directory where the project is
func GetProjectDir() (string, error) {
wd, err := os.Getwd()
if err != nil {
return wd, err
}
wd = strings.Replace(wd, "/test/e2e", "", -1)
return wd, nil
}
// GetSubDirs returns a slice of subdirectory names under the given directory path.
func GetSubDirs(dir string) ([]string, error) {
var subDirs []string
// Read the contents of the directory
entries, err := os.ReadDir(dir)
if err != nil {
return nil, fmt.Errorf("error reading directory %q: %w", dir, err)
}
// Loop through the entries and collect directory names
for _, entry := range entries {
if entry.IsDir() {
subDirs = append(subDirs, entry.Name())
}
}
return subDirs, nil
}
func GetFiles(dir string) ([]string, error) {
var files []string
// Read the contents of the directory
entries, err := os.ReadDir(dir)
if err != nil {
return nil, fmt.Errorf("error reading directory %q: %w", dir, err)
}
// Loop through the entries and collect directory names
for _, entry := range entries {
if !entry.IsDir() {
files = append(files, entry.Name())
}
}
return files, nil
}