diff --git a/client/models.go b/client/models.go index 74b5e33..2dccc29 100644 --- a/client/models.go +++ b/client/models.go @@ -1,5 +1,10 @@ package anaml +type AnamlObject struct { + ID int `json:"id"` + Type string `json:"adt_type"` +} + // Entity .. type Entity struct { ID int `json:"id,omitempty"` @@ -255,10 +260,11 @@ type MetricsJob struct { } type Schedule struct { - Type string `json:"adt_type"` - StartTimeOfDay *string `json:"startTimeOfDay,omitempty"` - CronString string `json:"cronString,omitempty"` - RetryPolicy *RetryPolicy `json:"retryPolicy,omitempty"` + Type string `json:"adt_type"` + StartTimeOfDay *string `json:"startTimeOfDay,omitempty"` + CronString string `json:"cronString,omitempty"` + RetryPolicy *RetryPolicy `json:"retryPolicy,omitempty"` + DependentJobs []AnamlObject `json:"dependentJobs,omitempty"` } type RetryPolicy struct { @@ -677,6 +683,75 @@ func validRoles() []string { } } +func mapAnamlObjectToResource(v AnamlObject) (string, int) { + if v.Type == "anamlentitymapping" { + return "entity_mapping", v.ID + } else if v.Type == "anamlentity" { + return "entity", v.ID + } else if v.Type == "anamlentitypopulation" { + return "entity_population", v.ID + } else if v.Type == "anamleventstore" { + return "event_store", v.ID + } else if v.Type == "anamlfeature" { + return "feature", v.ID + } else if v.Type == "anamlfeaturetemplate" { + return "feature_template", v.ID + } else if v.Type == "anamlfeatureset" { + return "feature_set", v.ID + } else if v.Type == "anamlfeaturestore" { + return "feature_store", v.ID + } else if v.Type == "anamltable" { + return "table", v.ID + } else if v.Type == "anamltablecaching" { + return "caching", v.ID + } else if v.Type == "anamltablemonitoring" { + return "monitoring", v.ID + } else if v.Type == "anamlviewmaterialisation" { + return "view_materialisation_job", v.ID + } else if v.Type == "anamlmetricsset" { + return "metrics_set", v.ID + } else if v.Type == "anamlmetricsjob" { + return "metrics_job", v.ID + } else { + // Not a known resource, just show what it says. + return v.Type, v.ID + } +} + +func mapResourceToAnamlObject(v string, i int) *AnamlObject { + if v == "entity_mapping" { + return &AnamlObject{Type: "anamlentitymapping", ID: i} + } else if v == "entity" { + return &AnamlObject{Type: "anamlentity", ID: i} + } else if v == "entity_population" { + return &AnamlObject{Type: "anamlentitypopulation", ID: i} + } else if v == "event_store" { + return &AnamlObject{Type: "anamleventstore", ID: i} + } else if v == "feature" { + return &AnamlObject{Type: "anamlfeature", ID: i} + } else if v == "feature_template" { + return &AnamlObject{Type: "anamlfeaturetemplate", ID: i} + } else if v == "feature_set" { + return &AnamlObject{Type: "anamlfeatureset", ID: i} + } else if v == "feature_store" { + return &AnamlObject{Type: "anamlfeaturestore", ID: i} + } else if v == "table" { + return &AnamlObject{Type: "anamltable", ID: i} + } else if v == "caching" { + return &AnamlObject{Type: "anamltablecaching", ID: i} + } else if v == "monitoring" { + return &AnamlObject{Type: "anamltablemonitoring", ID: i} + } else if v == "view_materialisation_job" { + return &AnamlObject{Type: "anamlviewmaterialisation", ID: i} + } else if v == "metrics_set" { + return &AnamlObject{Type: "anamlmetricsset", ID: i} + } else if v == "metrics_job" { + return &AnamlObject{Type: "anamlmetricsjob", ID: i} + } else { + return nil + } +} + func mapRolesToBackend(frontend []string) []Role { vs := make([]Role, 0, len(frontend)) for _, v := range frontend { diff --git a/client/resource_event_store.go b/client/resource_event_store.go index fa1433e..98ae8a9 100644 --- a/client/resource_event_store.go +++ b/client/resource_event_store.go @@ -77,14 +77,21 @@ func ResourceEventStore() *schema.Resource { Optional: true, MaxItems: 1, Elem: dailyScheduleSchema(), - ConflictsWith: []string{"cron_schedule"}, + ConflictsWith: []string{"cron_schedule", "dependency_schedule"}, }, "cron_schedule": { Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: cronScheduleSchema(), - ConflictsWith: []string{"daily_schedule"}, + ConflictsWith: []string{"daily_schedule", "dependency_schedule"}, + }, + "dependency_schedule": { + Type: schema.TypeList, + Optional: true, + MaxItems: 1, + Elem: dependencyScheduleSchema(), + ConflictsWith: []string{"daily_schedule", "cron_schedule"}, }, "cluster": { Type: schema.TypeString, @@ -229,7 +236,7 @@ func resourceEventStoreRead(d *schema.ResourceData, m interface{}) error { if err := d.Set("access_rules", flattenAccessRules(entity.AccessRules)); err != nil { return err } - daily, cron, err := parseSchedule(entity.Schedule) + daily, cron, dependency, err := parseSchedule(entity.Schedule) if err != nil { return err } @@ -239,6 +246,9 @@ func resourceEventStoreRead(d *schema.ResourceData, m interface{}) error { if err := d.Set("cron_schedule", cron); err != nil { return err } + if err := d.Set("dependency_schedule", dependency); err != nil { + return err + } return err } diff --git a/client/resource_feature_store.go b/client/resource_feature_store.go index 3eec1cc..ad96ce7 100644 --- a/client/resource_feature_store.go +++ b/client/resource_feature_store.go @@ -83,14 +83,21 @@ func ResourceFeatureStore() *schema.Resource { Optional: true, MaxItems: 1, Elem: dailyScheduleSchema(), - ConflictsWith: []string{"cron_schedule"}, + ConflictsWith: []string{"cron_schedule", "dependency_schedule"}, }, "cron_schedule": { Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: cronScheduleSchema(), - ConflictsWith: []string{"daily_schedule"}, + ConflictsWith: []string{"daily_schedule", "dependency_schedule"}, + }, + "dependency_schedule": { + Type: schema.TypeList, + Optional: true, + MaxItems: 1, + Elem: dependencyScheduleSchema(), + ConflictsWith: []string{"daily_schedule", "cron_schedule"}, }, "destination": { Type: schema.TypeList, @@ -242,7 +249,7 @@ func resourceFeatureStoreRead(d *schema.ResourceData, m interface{}) error { } } - daily, cron, err := parseSchedule(FeatureStore.Schedule) + daily, cron, dependency, err := parseSchedule(FeatureStore.Schedule) if err != nil { return err } @@ -252,6 +259,9 @@ func resourceFeatureStoreRead(d *schema.ResourceData, m interface{}) error { if err := d.Set("cron_schedule", cron); err != nil { return err } + if err := d.Set("dependency_schedule", dependency); err != nil { + return err + } if FeatureStore.VersionTarget != nil { if FeatureStore.VersionTarget.Commit != nil { diff --git a/client/resource_metrics_job.go b/client/resource_metrics_job.go index f6c5202..2c8759c 100644 --- a/client/resource_metrics_job.go +++ b/client/resource_metrics_job.go @@ -66,14 +66,21 @@ func ResourceMetricsJob() *schema.Resource { Optional: true, MaxItems: 1, Elem: dailyScheduleSchema(), - ConflictsWith: []string{"cron_schedule"}, + ConflictsWith: []string{"cron_schedule", "dependency_schedule"}, }, "cron_schedule": { Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: cronScheduleSchema(), - ConflictsWith: []string{"daily_schedule"}, + ConflictsWith: []string{"daily_schedule", "dependency_schedule"}, + }, + "dependency_schedule": { + Type: schema.TypeList, + Optional: true, + MaxItems: 1, + Elem: dependencyScheduleSchema(), + ConflictsWith: []string{"daily_schedule", "cron_schedule"}, }, "destination": { Type: schema.TypeList, @@ -159,7 +166,7 @@ func resourceMetricsJobRead(d *schema.ResourceData, m interface{}) error { return err } - daily, cron, err := parseSchedule(MetricsJob.Schedule) + daily, cron, dependency, err := parseSchedule(MetricsJob.Schedule) if err != nil { return err } @@ -169,6 +176,9 @@ func resourceMetricsJobRead(d *schema.ResourceData, m interface{}) error { if err := d.Set("cron_schedule", cron); err != nil { return err } + if err := d.Set("dependency_schedule", dependency); err != nil { + return err + } if MetricsJob.VersionTarget != nil { if MetricsJob.VersionTarget.Commit != nil { diff --git a/client/resource_table_caching.go b/client/resource_table_caching.go index ef549b3..e6cb50d 100644 --- a/client/resource_table_caching.go +++ b/client/resource_table_caching.go @@ -57,14 +57,21 @@ func ResourceTableCaching() *schema.Resource { Optional: true, MaxItems: 1, Elem: dailyScheduleSchema(), - ConflictsWith: []string{"cron_schedule"}, + ConflictsWith: []string{"cron_schedule", "dependency_schedule"}, }, "cron_schedule": { Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: cronScheduleSchema(), - ConflictsWith: []string{"daily_schedule"}, + ConflictsWith: []string{"daily_schedule", "dependency_schedule"}, + }, + "dependency_schedule": { + Type: schema.TypeList, + Optional: true, + MaxItems: 1, + Elem: dependencyScheduleSchema(), + ConflictsWith: []string{"daily_schedule", "cron_schedule"}, }, "principal": { Type: schema.TypeString, @@ -255,7 +262,7 @@ func resourceTableCachingRead(d *schema.ResourceData, m interface{}) error { d.Set("retainment", nil) } - daily, cron, err := parseSchedule(TableCaching.Schedule) + daily, cron, dependency, err := parseSchedule(TableCaching.Schedule) if err != nil { return err } @@ -265,6 +272,9 @@ func resourceTableCachingRead(d *schema.ResourceData, m interface{}) error { if err := d.Set("cron_schedule", cron); err != nil { return err } + if err := d.Set("dependency_schedule", dependency); err != nil { + return err + } return err } diff --git a/client/resource_table_monitoring.go b/client/resource_table_monitoring.go index 18b5fc7..9926ce9 100644 --- a/client/resource_table_monitoring.go +++ b/client/resource_table_monitoring.go @@ -56,14 +56,21 @@ func ResourceTableMonitoring() *schema.Resource { Optional: true, MaxItems: 1, Elem: dailyScheduleSchema(), - ConflictsWith: []string{"cron_schedule"}, + ConflictsWith: []string{"cron_schedule", "dependency_schedule"}, }, "cron_schedule": { Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: cronScheduleSchema(), - ConflictsWith: []string{"daily_schedule"}, + ConflictsWith: []string{"daily_schedule", "dependency_schedule"}, + }, + "dependency_schedule": { + Type: schema.TypeList, + Optional: true, + MaxItems: 1, + Elem: dependencyScheduleSchema(), + ConflictsWith: []string{"daily_schedule", "cron_schedule"}, }, "cluster": { Type: schema.TypeString, @@ -248,7 +255,7 @@ func resourceTableMonitoringRead(d *schema.ResourceData, m interface{}) error { return err } - daily, cron, err := parseSchedule(TableMonitoring.Schedule) + daily, cron, dependency, err := parseSchedule(TableMonitoring.Schedule) if err != nil { return err } @@ -258,6 +265,9 @@ func resourceTableMonitoringRead(d *schema.ResourceData, m interface{}) error { if err := d.Set("cron_schedule", cron); err != nil { return err } + if err := d.Set("dependency_schedule", dependency); err != nil { + return err + } return err } diff --git a/client/resource_view_materialisation_job.go b/client/resource_view_materialisation_job.go index 64a2a7d..fb9e276 100644 --- a/client/resource_view_materialisation_job.go +++ b/client/resource_view_materialisation_job.go @@ -50,14 +50,21 @@ func ResourceViewMaterialisationJob() *schema.Resource { Optional: true, MaxItems: 1, Elem: dailyScheduleSchema(), - ConflictsWith: []string{"cron_schedule"}, + ConflictsWith: []string{"cron_schedule", "dependency_schedule"}, }, "cron_schedule": { Type: schema.TypeList, Optional: true, MaxItems: 1, Elem: cronScheduleSchema(), - ConflictsWith: []string{"daily_schedule"}, + ConflictsWith: []string{"daily_schedule", "dependency_schedule"}, + }, + "dependency_schedule": { + Type: schema.TypeList, + Optional: true, + MaxItems: 1, + Elem: dependencyScheduleSchema(), + ConflictsWith: []string{"daily_schedule", "cron_schedule"}, }, "usagettl": { Type: schema.TypeString, @@ -158,7 +165,7 @@ func resourceViewMaterialisationJobRead(d *schema.ResourceData, m interface{}) e } if ViewMaterialisationJob.Type == "batch" { - daily, cron, err := parseSchedule(ViewMaterialisationJob.Schedule) + daily, cron, dependency, err := parseSchedule(ViewMaterialisationJob.Schedule) if err != nil { return err } @@ -168,6 +175,9 @@ func resourceViewMaterialisationJobRead(d *schema.ResourceData, m interface{}) e if err := d.Set("cron_schedule", cron); err != nil { return err } + if err := d.Set("dependency_schedule", dependency); err != nil { + return err + } } if ViewMaterialisationJob.Principal != nil { diff --git a/client/schedule.go b/client/schedule.go index e87cc97..bbf933a 100644 --- a/client/schedule.go +++ b/client/schedule.go @@ -2,6 +2,8 @@ package anaml import ( "errors" + "fmt" + "strconv" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" @@ -43,6 +45,44 @@ func cronScheduleSchema() *schema.Resource { } } +func dependencyScheduleSchema() *schema.Resource { + return &schema.Resource{ + Schema: map[string]*schema.Schema{ + "job": { + Type: schema.TypeList, + Description: "Jobs after which this task will be scheduled.", + Required: true, + + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "id": { + Type: schema.TypeString, + Required: true, + ValidateFunc: validateAnamlIdentifier(), + }, + "type": { + Type: schema.TypeString, + Description: "Type of the Job (resource type).", + Required: true, + ValidateFunc: validation.StringInSlice([]string{ + "event_store", "metrics_job", "monitoring", + "view_materialisation_job", "caching", + "feature_store", + }, false), + }, + }, + }, + }, + "fixed_retry_policy": { + Type: schema.TypeList, + Optional: true, + MaxItems: 1, + Elem: fixedRetryPolicySchema(), + }, + }, + } +} + func fixedRetryPolicySchema() *schema.Resource { return &schema.Resource{ Schema: map[string]*schema.Schema{ @@ -83,6 +123,9 @@ func composeSchedule(d *schema.ResourceData) (*Schedule, error) { if cronSchedule, _ := expandSingleMap(d.Get("cron_schedule")); cronSchedule != nil { return composeCronSchedule(cronSchedule) } + if dependencySchedule, _ := expandSingleMap(d.Get("dependency_schedule")); dependencySchedule != nil { + return composeDependencySchedule(dependencySchedule) + } return composeNeverSchedule(), nil } @@ -122,6 +165,36 @@ func composeCronSchedule(d map[string]interface{}) (*Schedule, error) { }, nil } +func composeDependencySchedule(d map[string]interface{}) (*Schedule, error) { + var retryPolicy *RetryPolicy + if fixedRetryPolicy, _ := expandSingleMap(d["fixed_retry_policy"]); fixedRetryPolicy != nil { + retryPolicy = composeFixedRetryPolicy(fixedRetryPolicy) + } else { + retryPolicy = composeNeverRetryPolicy() + } + + jobs := d["job"].([]interface{}) + work := make([]AnamlObject, 0, len(jobs)) + + for _, job := range jobs { + val := job.(map[string]interface{}) + id, _ := strconv.Atoi(val["id"].(string)) + rt := val["type"].(string) + + single := mapResourceToAnamlObject(rt, id) + if single == nil { + return nil, fmt.Errorf("Couldn't obtain anaml object from dependency definition %s", rt) + } + work = append(work, *single) + } + + return &Schedule{ + Type: "dependency", + DependentJobs: work, + RetryPolicy: retryPolicy, + }, nil +} + func composeFixedRetryPolicy(d map[string]interface{}) *RetryPolicy { return &RetryPolicy{ Type: "fixed", @@ -136,13 +209,14 @@ func composeNeverRetryPolicy() *RetryPolicy { } } -func parseSchedule(schedule *Schedule) ([]map[string]interface{}, []map[string]interface{}, error) { +func parseSchedule(schedule *Schedule) ([]map[string]interface{}, []map[string]interface{}, []map[string]interface{}, error) { if schedule == nil { - return nil, nil, errors.New("Schedule is null") + return nil, nil, nil, errors.New("Schedule is null") } daily := make([]map[string]interface{}, 0, 1) cron := make([]map[string]interface{}, 0, 1) + dependency := make([]map[string]interface{}, 0, 1) if schedule.Type == "daily" { single := make(map[string]interface{}) @@ -153,7 +227,7 @@ func parseSchedule(schedule *Schedule) ([]map[string]interface{}, []map[string]i if schedule.RetryPolicy.Type == "fixed" { fixedRetryPolicy, err := parseFixedRetryPolicy(schedule.RetryPolicy) if err != nil { - return nil, nil, err + return nil, nil, nil, err } single["fixed_retry_policy"] = fixedRetryPolicy } @@ -166,15 +240,37 @@ func parseSchedule(schedule *Schedule) ([]map[string]interface{}, []map[string]i if schedule.RetryPolicy.Type == "fixed" { fixedRetryPolicy, err := parseFixedRetryPolicy(schedule.RetryPolicy) if err != nil { - return nil, nil, err + return nil, nil, nil, err } single["fixed_retry_policy"] = fixedRetryPolicy } cron = append(cron, single) + } else if schedule.Type == "dependency" { + single := make(map[string]interface{}) + work := make([]map[string]interface{}, 0, len(schedule.DependentJobs)) + for _, depJob := range schedule.DependentJobs { + singleJob := make(map[string]interface{}) + rt, id := mapAnamlObjectToResource(depJob) + singleJob["id"] = strconv.Itoa(id) + singleJob["type"] = rt + work = append(work, singleJob) + } + + single["job"] = work + + if schedule.RetryPolicy.Type == "fixed" { + fixedRetryPolicy, err := parseFixedRetryPolicy(schedule.RetryPolicy) + if err != nil { + return nil, nil, nil, err + } + + single["fixed_retry_policy"] = fixedRetryPolicy + } + dependency = append(dependency, single) } - return daily, cron, nil + return daily, cron, dependency, nil } func parseFixedRetryPolicy(retryPolicy *RetryPolicy) ([]map[string]interface{}, error) {