diff --git a/api/formance.com/v1beta1/ledger_types.go b/api/formance.com/v1beta1/ledger_types.go index cbd80deb..637d2428 100644 --- a/api/formance.com/v1beta1/ledger_types.go +++ b/api/formance.com/v1beta1/ledger_types.go @@ -17,54 +17,16 @@ limitations under the License. package v1beta1 import ( - "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -type LockingStrategyRedisConfig struct { - Uri string `json:"uri,omitempty"` - // +optional - // +kubebuilder:default:=false - TLS bool `json:"tls"` - // +optional - // +kubebuilder:default:=false - InsecureTLS bool `json:"insecure,omitempty"` - // +optional - Duration time.Duration `json:"duration,omitempty"` - // +optional - Retry time.Duration `json:"retry,omitempty"` -} - -type LockingStrategy struct { - // +kubebuilder:Enum:={memory,redis} - // +kubebuilder:default:=memory - // +optional - Strategy string `json:"strategy,omitempty"` - // +optional - Redis *LockingStrategyRedisConfig `json:"redis"` -} -type DeploymentStrategy string - -const ( - DeploymentStrategySingle = "single" - DeploymentStrategyMonoWriterMultipleReader = "single-writer" -) type LedgerSpec struct { ModuleProperties `json:",inline"` StackDependency `json:",inline"` // +optional Auth *AuthConfig `json:"auth,omitempty"` - //+kubebuilder:Enum:={single, single-writer} - //+kubebuilder:default:=single - //+optional - // Deprecated. - DeploymentStrategy DeploymentStrategy `json:"deploymentStrategy,omitempty"` - // Locking is intended for ledger v1 only - //+optional - Locking *LockingStrategy `json:"locking,omitempty"` } type LedgerStatus struct { @@ -73,18 +35,8 @@ type LedgerStatus struct { // Ledger is the module allowing to install a ledger instance. // -// The ledger is actually a stateful application on the writer part. -// So we cannot scale the ledger as we want without prior configuration. -// -// So, the ledger can run in two modes : -// * single instance: Only one instance will be deployed. We cannot scale in that mode. -// * single writer / multiple reader: In this mode, we will have a single writer and multiple readers if needed. -// -// Use setting `ledger.deployment-strategy` with either the value : -// - single : For the single instance mode. -// - single-writer: For the single writer / multiple reader mode. -// Under the hood, the operator create two deployments and force the scaling of the writer to stay at 1. -// Then you can scale the deployment of the reader to the value you want. +// The ledger is a stateful application that manages financial transactions +// and maintains an immutable audit trail. // // +kubebuilder:object:root=true // +kubebuilder:subresource:status diff --git a/api/formance.com/v1beta1/zz_generated.deepcopy.go b/api/formance.com/v1beta1/zz_generated.deepcopy.go index df8b1d60..03a1cff7 100644 --- a/api/formance.com/v1beta1/zz_generated.deepcopy.go +++ b/api/formance.com/v1beta1/zz_generated.deepcopy.go @@ -1365,11 +1365,6 @@ func (in *LedgerSpec) DeepCopyInto(out *LedgerSpec) { *out = new(AuthConfig) **out = **in } - if in.Locking != nil { - in, out := &in.Locking, &out.Locking - *out = new(LockingStrategy) - (*in).DeepCopyInto(*out) - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LedgerSpec. @@ -1398,41 +1393,6 @@ func (in *LedgerStatus) DeepCopy() *LedgerStatus { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *LockingStrategy) DeepCopyInto(out *LockingStrategy) { - *out = *in - if in.Redis != nil { - in, out := &in.Redis, &out.Redis - *out = new(LockingStrategyRedisConfig) - **out = **in - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LockingStrategy. -func (in *LockingStrategy) DeepCopy() *LockingStrategy { - if in == nil { - return nil - } - out := new(LockingStrategy) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *LockingStrategyRedisConfig) DeepCopyInto(out *LockingStrategyRedisConfig) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LockingStrategyRedisConfig. -func (in *LockingStrategyRedisConfig) DeepCopy() *LockingStrategyRedisConfig { - if in == nil { - return nil - } - out := new(LockingStrategyRedisConfig) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ModuleProperties) DeepCopyInto(out *ModuleProperties) { *out = *in diff --git a/config/crd/bases/formance.com_ledgers.yaml b/config/crd/bases/formance.com_ledgers.yaml index f54870df..60dff8bf 100644 --- a/config/crd/bases/formance.com_ledgers.yaml +++ b/config/crd/bases/formance.com_ledgers.yaml @@ -40,20 +40,8 @@ spec: Ledger is the module allowing to install a ledger instance. - The ledger is actually a stateful application on the writer part. - So we cannot scale the ledger as we want without prior configuration. - - - So, the ledger can run in two modes : - * single instance: Only one instance will be deployed. We cannot scale in that mode. - * single writer / multiple reader: In this mode, we will have a single writer and multiple readers if needed. - - - Use setting `ledger.deployment-strategy` with either the value : - - single : For the single instance mode. - - single-writer: For the single writer / multiple reader mode. - Under the hood, the operator create two deployments and force the scaling of the writer to stay at 1. - Then you can scale the deployment of the reader to the value you want. + The ledger is a stateful application that manages financial transactions + and maintains an immutable audit trail. properties: apiVersion: description: |- @@ -85,48 +73,12 @@ spec: default: false description: Allow to enable debug mode on the module type: boolean - deploymentStrategy: - default: single - description: Deprecated. - type: string dev: default: false description: |- Allow to enable dev mode on the module Dev mode is used to allow some application to do custom setup in development mode (allow insecure certificates for example) type: boolean - locking: - description: Locking is intended for ledger v1 only - properties: - redis: - properties: - duration: - description: |- - A Duration represents the elapsed time between two instants - as an int64 nanosecond count. The representation limits the - largest representable duration to approximately 290 years. - format: int64 - type: integer - insecure: - default: false - type: boolean - retry: - description: |- - A Duration represents the elapsed time between two instants - as an int64 nanosecond count. The representation limits the - largest representable duration to approximately 290 years. - format: int64 - type: integer - tls: - default: false - type: boolean - uri: - type: string - type: object - strategy: - default: memory - type: string - type: object stack: description: Stack indicates the stack on which the module is installed type: string diff --git a/docs/09-Configuration reference/02-Custom Resource Definitions.md b/docs/09-Configuration reference/02-Custom Resource Definitions.md index abc15e1a..f77d0c2f 100644 --- a/docs/09-Configuration reference/02-Custom Resource Definitions.md +++ b/docs/09-Configuration reference/02-Custom Resource Definitions.md @@ -741,20 +741,8 @@ Gateway is the Schema for the gateways API Ledger is the module allowing to install a ledger instance. -The ledger is actually a stateful application on the writer part. -So we cannot scale the ledger as we want without prior configuration. - - -So, the ledger can run in two modes : -* single instance: Only one instance will be deployed. We cannot scale in that mode. -* single writer / multiple reader: In this mode, we will have a single writer and multiple readers if needed. - - -Use setting `ledger.deployment-strategy` with either the value : - - single : For the single instance mode. - - single-writer: For the single writer / multiple reader mode. - Under the hood, the operator create two deployments and force the scaling of the writer to stay at 1. - Then you can scale the deployment of the reader to the value you want. +The ledger is a stateful application that manages financial transactions +and maintains an immutable audit trail. @@ -806,82 +794,6 @@ Use setting `ledger.deployment-strategy` with either the value : | `dev` _boolean_ | Allow to enable dev mode on the module
Dev mode is used to allow some application to do custom setup in development mode (allow insecure certificates for example) | false | | | `version` _string_ | Version allow to override global version defined at stack level for a specific module | | | | `stack` _string_ | Stack indicates the stack on which the module is installed | | | -| `deploymentStrategy` _[DeploymentStrategy](#deploymentstrategy)_ | Deprecated. | single | | -| `locking` _[LockingStrategy](#lockingstrategy)_ | Locking is intended for ledger v1 only | | | - -###### DeploymentStrategy - -_Underlying type:_ _string_ - - - - - - - - - - - - - - - - - - -###### LockingStrategy - - - - - - - - - - - - - - - - - - - -| Field | Description | Default | Validation | -| --- | --- | --- | --- | -| `strategy` _string_ | | memory | | -| `redis` _[LockingStrategyRedisConfig](#lockingstrategyredisconfig)_ | | | | - -###### LockingStrategyRedisConfig - - - - - - - - - - - - - - - - - - - -| Field | Description | Default | Validation | -| --- | --- | --- | --- | -| `uri` _string_ | | | | -| `tls` _boolean_ | | false | | -| `insecure` _boolean_ | | false | | -| `duration` _string_ | | | | -| `retry` _string_ | | | | diff --git a/helm/crds/Chart.yaml b/helm/crds/Chart.yaml index 1e185aa1..050b3229 100644 --- a/helm/crds/Chart.yaml +++ b/helm/crds/Chart.yaml @@ -12,9 +12,9 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: "2.8.3" +version: "3.0.0" # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: v2.8.3 +appVersion: v3.0.0 diff --git a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_ledgers.formance.com.yaml b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_ledgers.formance.com.yaml index ae2478c4..b06c46bd 100644 --- a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_ledgers.formance.com.yaml +++ b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_ledgers.formance.com.yaml @@ -40,20 +40,8 @@ spec: Ledger is the module allowing to install a ledger instance. - The ledger is actually a stateful application on the writer part. - So we cannot scale the ledger as we want without prior configuration. - - - So, the ledger can run in two modes : - * single instance: Only one instance will be deployed. We cannot scale in that mode. - * single writer / multiple reader: In this mode, we will have a single writer and multiple readers if needed. - - - Use setting `ledger.deployment-strategy` with either the value : - - single : For the single instance mode. - - single-writer: For the single writer / multiple reader mode. - Under the hood, the operator create two deployments and force the scaling of the writer to stay at 1. - Then you can scale the deployment of the reader to the value you want. + The ledger is a stateful application that manages financial transactions + and maintains an immutable audit trail. properties: apiVersion: description: |- @@ -85,48 +73,12 @@ spec: default: false description: Allow to enable debug mode on the module type: boolean - deploymentStrategy: - default: single - description: Deprecated. - type: string dev: default: false description: |- Allow to enable dev mode on the module Dev mode is used to allow some application to do custom setup in development mode (allow insecure certificates for example) type: boolean - locking: - description: Locking is intended for ledger v1 only - properties: - redis: - properties: - duration: - description: |- - A Duration represents the elapsed time between two instants - as an int64 nanosecond count. The representation limits the - largest representable duration to approximately 290 years. - format: int64 - type: integer - insecure: - default: false - type: boolean - retry: - description: |- - A Duration represents the elapsed time between two instants - as an int64 nanosecond count. The representation limits the - largest representable duration to approximately 290 years. - format: int64 - type: integer - tls: - default: false - type: boolean - uri: - type: string - type: object - strategy: - default: memory - type: string - type: object stack: description: Stack indicates the stack on which the module is installed type: string diff --git a/helm/operator/Chart.yaml b/helm/operator/Chart.yaml index 75288d0e..2405a1bc 100644 --- a/helm/operator/Chart.yaml +++ b/helm/operator/Chart.yaml @@ -12,14 +12,14 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: "2.15.0" +version: "3.0.0" # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "v2.15.0" +appVersion: "v3.0.0" dependencies: - name: operator-crds - version: "2.X" + version: "3.X" repository: "file://../crds" condition: operator-crds.create diff --git a/internal/resources/ledgers/assets.go b/internal/resources/ledgers/assets.go index 1363d6bf..d9937852 100644 --- a/internal/resources/ledgers/assets.go +++ b/internal/resources/ledgers/assets.go @@ -2,11 +2,7 @@ package ledgers import ( "embed" - _ "embed" ) -//go:embed assets/Caddyfile.gotpl -var Caddyfile string - -//go:embed assets/reindex +//go:embed assets/reindex/v2.0.0/*.yaml var reindexStreams embed.FS diff --git a/internal/resources/ledgers/assets/Caddyfile.gotpl b/internal/resources/ledgers/assets/Caddyfile.gotpl deleted file mode 100644 index dfcef55e..00000000 --- a/internal/resources/ledgers/assets/Caddyfile.gotpl +++ /dev/null @@ -1,30 +0,0 @@ -{ - {{ if .Debug }}debug{{ end }} -} - -:8080 { - {{- if .EnableOpenTelemetry }} - tracing { - span gateway - } - {{- end }} - log { - output stdout - {{- if .Debug }} - level DEBUG - {{- end }} - } - - handle { - method GET - reverse_proxy ledger-read:8080 { - header_up Host {upstream_hostport} - } - } - - handle { - reverse_proxy ledger-write:8080 { - header_up Host {upstream_hostport} - } - } -} \ No newline at end of file diff --git a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex.yaml b/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex.yaml deleted file mode 100644 index 8db8e175..00000000 --- a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex.yaml +++ /dev/null @@ -1,16 +0,0 @@ -input: - http_server: - path: / - -output: - broker: - outputs: - - http_client: - verb: POST - url: http://localhost:4195/ledger_reindex_volumes - - http_client: - verb: POST - url: http://localhost:4195/ledger_reindex_transactions - - http_client: - verb: POST - url: http://localhost:4195/ledger_reindex_accounts diff --git a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_accounts.yaml b/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_accounts.yaml deleted file mode 100644 index 56dfae15..00000000 --- a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_accounts.yaml +++ /dev/null @@ -1,60 +0,0 @@ -input: - http_server: - path: / - -pipeline: - processors: - - bloblang: | - meta ledger = this.ledger - meta batchSize = 100 - - postgres_query: - service: ledger - query: 'select count(*) as accounts_count from "${! meta("ledger") }".accounts' - - unarchive: - format: json_array - - bloblang: | - meta loopCount = (this.accounts_count.number() / meta("batchSize").number()).ceil() - meta loopIndex = 0 - - bloblang: | - root = if meta("loopCount") == "0" { - deleted() - } - - while: - check: 'meta("loopIndex") < meta("loopCount")' - processors: - - postgres_query: - service: ledger - query: | - select address, metadata - from "${! meta("ledger") }".accounts - offset ${! meta("loopIndex").number() * meta("batchSize").number() } - limit ${! meta("batchSize") } - - bloblang: - meta loopIndex = meta("loopIndex").number() + 1 - - unarchive: - format: json_array - - bloblang: | - root = this.assign({ - "metadata": this.metadata.parse_json() - }) - - bloblang: | - root = { - "document": { - "data": { - "address": this.address, - "ledger": meta("ledger"), - "metadata": this.metadata - }, - "indexed": { - "address": this.address, - "ledger": meta("ledger") - }, - "kind": "ACCOUNT", - "ledger": meta("ledger") - }, - "id": "ACCOUNT-%s-%s".format(meta("ledger"), this.address), - "action": "upsert" - } - -output: - resource: elasticsearch diff --git a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_all.yaml b/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_all.yaml deleted file mode 100644 index 1a86110b..00000000 --- a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_all.yaml +++ /dev/null @@ -1,18 +0,0 @@ -input: - http_server: - path: / - -pipeline: - processors: - - postgres_query: - service: ledger - query: 'select * from "_system".ledgers' - - unarchive: - format: json_array - -output: - broker: - outputs: - - http_client: - verb: POST - url: http://localhost:4195/ledger_reindex diff --git a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_transactions.yaml b/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_transactions.yaml deleted file mode 100644 index a8b5cefe..00000000 --- a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_transactions.yaml +++ /dev/null @@ -1,76 +0,0 @@ -input: - http_server: - path: / - -pipeline: - processors: - - bloblang: | - meta ledger = this.ledger - meta batchSize = 100 - - postgres_query: - service: ledger - query: 'select count(*) as transactions_count from "${! meta("ledger") }".transactions' - - unarchive: - format: json_array - - bloblang: | - meta loopCount = (this.transactions_count.number() / meta("batchSize").number()).ceil() - meta loopIndex = 0 - - bloblang: | - root = if meta("loopCount") == "0" { - deleted() - } - - while: - check: 'meta("loopIndex") < meta("loopCount")' - processors: - - postgres_query: - service: ledger - query: | - select id, timestamp, reference, metadata, postings - from "${! meta("ledger") }".transactions - offset ${! meta("loopIndex").number() * meta("batchSize").number() } - limit ${! meta("batchSize") } - - bloblang: - meta loopIndex = meta("loopIndex").number() + 1 - - unarchive: - format: json_array - - bloblang: | - root = this.assign({ - "postings": this.postings.parse_json(), - "metadata": this.metadata.parse_json() - }) - - bloblang: | - root = { - "id": "TRANSACTION-%s-%s".format(meta("ledger"), this.id), - "action": "upsert", - "document": { - "data": { - "postings": this.postings, - "reference": this.reference, - "txid": this.txid, - "timestamp": this.timestamp, - "metadata": if this.metadata { this.metadata } else {{}}, - "ledger": meta("ledger") - }, - "indexed": { - "reference": this.reference, - "txid": this.id, - "timestamp": this.timestamp, - "asset": this.postings.map_each(p -> p.asset), - "source": this.postings.map_each(p -> p.source), - "destination": this.postings.map_each(p -> p.destination), - "amount": this.postings.map_each(p -> if p.asset.contains("/") { - [ - p.amount, - p.amount / range(0, p.asset.split("/").index(1).number()).fold(1, t -> t.tally * 10) # amount / pow(10, decimal part of asset) - ] - } else { [ p.amount ] }).flatten().map_each(v -> "%v".format(v)), - "ledger": meta("ledger") - }, - "kind": "TRANSACTION", - "ledger": meta("ledger"), - "when": this.date - } - } - -output: - resource: elasticsearch diff --git a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_volumes.yaml b/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_volumes.yaml deleted file mode 100644 index 62223860..00000000 --- a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_volumes.yaml +++ /dev/null @@ -1,60 +0,0 @@ -input: - http_server: - path: / - -pipeline: - processors: - - bloblang: | - meta ledger = this.ledger - meta batchSize = 100 - - postgres_query: - service: ledger - query: 'select count(*) as volumes_count from "${! meta("ledger") }".volumes' - - unarchive: - format: json_array - - bloblang: | - meta loopCount = (this.volumes_count.number() / meta("batchSize").number()).ceil() - meta loopIndex = 0 - - bloblang: | - root = if meta("loopCount") == "0" { - deleted() - } - - while: - check: 'meta("loopIndex") < meta("loopCount")' - processors: - - postgres_query: - service: ledger - query: | - select account, asset, input, output - from "${! meta("ledger") }".volumes - offset ${! meta("loopIndex").number() * meta("batchSize").number() } - limit ${! meta("batchSize") } - - bloblang: - meta loopIndex = meta("loopIndex").number() + 1 - - unarchive: - format: json_array - - bloblang: | - root = { - "id": "ASSET-%s-%s-%s".format(meta("ledger"), this.account, this.asset), - "action": "upsert", - "document": { - "data": { - "name": this.asset, - "input": this.input, - "output": this.output, - "account": this.account, - "ledger": meta("ledger") - }, - "indexed": { - "account": this.account, - "name": this.asset, - "ledger": meta("ledger") - }, - "kind": "ASSET", - "ledger": meta("ledger"), - "when": this.date - } - } - -output: - resource: elasticsearch diff --git a/internal/resources/ledgers/deployments.go b/internal/resources/ledgers/deployments.go index bd9d628f..00109dc9 100644 --- a/internal/resources/ledgers/deployments.go +++ b/internal/resources/ledgers/deployments.go @@ -4,11 +4,9 @@ import ( "fmt" "github.com/formancehq/operator/internal/resources/auths" "golang.org/x/mod/semver" - "strconv" "github.com/formancehq/operator/internal/resources/brokers" "github.com/formancehq/operator/internal/resources/brokertopics" - "github.com/formancehq/operator/internal/resources/caddy" "k8s.io/apimachinery/pkg/types" "github.com/formancehq/operator/api/formance.com/v1beta1" @@ -16,8 +14,8 @@ import ( "github.com/formancehq/operator/internal/resources/applications" "github.com/formancehq/operator/internal/resources/databases" "github.com/formancehq/operator/internal/resources/gateways" + "github.com/formancehq/operator/internal/resources/jobs" "github.com/formancehq/operator/internal/resources/registries" - "github.com/formancehq/operator/internal/resources/services" "github.com/formancehq/operator/internal/resources/settings" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -25,54 +23,11 @@ import ( ) const ( - ConditionTypeDeploymentStrategy = "LedgerDeploymentStrategy" - ReasonLedgerSingle = "Single" - ReasonLedgerMonoWriterMultipleReader = "MonoWriterMultipleReader" + ConditionTypeRestore = "Restore" + ConditionTypeReindexSchedulerEnabled = "ReindexSchedulerEnabled" ) -func hasDeploymentStrategyChanged(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, strategy string) (err error) { - condition := v1beta1.NewCondition(ConditionTypeDeploymentStrategy, ledger.Generation).SetReason( - func() string { - switch strategy { - case v1beta1.DeploymentStrategySingle: - return ReasonLedgerSingle - case v1beta1.DeploymentStrategyMonoWriterMultipleReader: - return ReasonLedgerMonoWriterMultipleReader - default: - return "unknown strategy" - } - }(), - ).SetMessage("Deployment strategy initialized") - - defer func() { - ledger.GetConditions().AppendOrReplace(*condition, v1beta1.AndConditions( - v1beta1.ConditionTypeMatch(ConditionTypeDeploymentStrategy), - v1beta1.ConditionGenerationMatch(ledger.Generation), - )) - }() - - // There is no generation 0, so we can't check for a change in strategy - // Uninstall is useless if the ledger deployment strategy has not changed - if ledger.GetConditions().Check(v1beta1.AndConditions( - v1beta1.ConditionTypeMatch(ConditionTypeDeploymentStrategy), - v1beta1.ConditionReasonMatch(condition.Reason), - v1beta1.ConditionGenerationMatch(ledger.Generation-1), - )) || ledger.GetGeneration() == 1 { - return - } - - condition.SetMessage("Deployment strategy has changed") - switch strategy { - case v1beta1.DeploymentStrategySingle: - return uninstallLedgerMonoWriterMultipleReader(ctx, stack) - case v1beta1.DeploymentStrategyMonoWriterMultipleReader: - return core.DeleteIfExists[*appsv1.Deployment](ctx, core.GetNamespacedResourceName(stack.Name, "ledger")) - default: - return fmt.Errorf("unknown deployment strategy %s", strategy) - } -} - -func installLedger(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, database *v1beta1.Database, imageConfiguration *registries.ImageConfiguration, version string, isV2 bool) (err error) { +func installLedger(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, database *v1beta1.Database, imageConfiguration *registries.ImageConfiguration, version string) (err error) { if !semver.IsValid(version) || semver.Compare(version, "v2.2.0-alpha") > 0 { if err := uninstallLedgerMonoWriterMultipleReader(ctx, stack); err != nil { @@ -89,65 +44,23 @@ func installLedger(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledge return nil } - deploymentStrategySettings, err := settings.GetStringOrDefault(ctx, stack.Name, v1beta1.DeploymentStrategySingle, "ledger", "deployment-strategy") - if err != nil { - return err - } - - if ledger.Spec.DeploymentStrategy == v1beta1.DeploymentStrategyMonoWriterMultipleReader { - deploymentStrategySettings = v1beta1.DeploymentStrategyMonoWriterMultipleReader - } - - if err = hasDeploymentStrategyChanged(ctx, stack, ledger, deploymentStrategySettings); err != nil { - return err - } - - switch deploymentStrategySettings { - case v1beta1.DeploymentStrategySingle: - return installLedgerSingleInstance(ctx, stack, ledger, database, imageConfiguration, isV2) - case v1beta1.DeploymentStrategyMonoWriterMultipleReader: - return installLedgerMonoWriterMultipleReader(ctx, stack, ledger, database, imageConfiguration, isV2) - default: - return fmt.Errorf("unknown deployment strategy %s", deploymentStrategySettings) - } + // For older versions, just use single instance deployment + return installLedgerSingleInstance(ctx, stack, ledger, database, imageConfiguration) } -func installLedgerSingleInstance( - ctx core.Context, - stack *v1beta1.Stack, - ledger *v1beta1.Ledger, - database *v1beta1.Database, - imageConfiguration *registries.ImageConfiguration, - v2 bool, -) error { - container, err := createLedgerContainerFull(ctx, stack, v2) +func installLedgerSingleInstance(ctx core.Context, stack *v1beta1.Stack, + ledger *v1beta1.Ledger, database *v1beta1.Database, imageConfiguration *registries.ImageConfiguration) error { + container, err := createLedgerContainerFull(ctx, stack) if err != nil { return err } - err = setCommonAPIContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, container, v2) + err = setCommonAPIContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, container) if err != nil { return err } - if !v2 && ledger.Spec.Locking != nil && ledger.Spec.Locking.Strategy == "redis" { - container.Env = append(container.Env, - core.Env("NUMARY_LOCK_STRATEGY", "redis"), - core.Env("NUMARY_LOCK_STRATEGY_REDIS_URL", ledger.Spec.Locking.Redis.Uri), - core.Env("NUMARY_LOCK_STRATEGY_REDIS_TLS_ENABLED", strconv.FormatBool(ledger.Spec.Locking.Redis.TLS)), - core.Env("NUMARY_LOCK_STRATEGY_REDIS_TLS_INSECURE", strconv.FormatBool(ledger.Spec.Locking.Redis.InsecureTLS)), - ) - - if ledger.Spec.Locking.Redis.Duration != 0 { - container.Env = append(container.Env, core.Env("NUMARY_LOCK_STRATEGY_REDIS_DURATION", ledger.Spec.Locking.Redis.Duration.String())) - } - - if ledger.Spec.Locking.Redis.Retry != 0 { - container.Env = append(container.Env, core.Env("NUMARY_LOCK_STRATEGY_REDIS_RETRY", ledger.Spec.Locking.Redis.Retry.String())) - } - } - - if err := createDeployment(ctx, stack, ledger, "ledger", *container, v2, 1, imageConfiguration); err != nil { + if err := createDeployment(ctx, stack, ledger, "ledger", *container, 1, imageConfiguration); err != nil { return err } @@ -236,7 +149,7 @@ func installLedgerStateless(ctx core.Context, stack *v1beta1.Stack, ledger *v1be container.Env = append(container.Env, core.Env("BULK_MAX_SIZE", fmt.Sprint(*bulkMaxSize))) } - err = setCommonAPIContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, &container, true) + err = setCommonAPIContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, &container) if err != nil { return err } @@ -283,7 +196,7 @@ func installLedgerWorker(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1 Args: []string{"worker"}, } - err := setCommonContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, &container, true) + err := setCommonContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, &container) if err != nil { return err } @@ -343,43 +256,17 @@ func installLedgerWorker(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1 return nil } -func installLedgerMonoWriterMultipleReader(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, database *v1beta1.Database, imageConfiguration *registries.ImageConfiguration, v2 bool) error { - - createDeployment := func(name string, container corev1.Container, replicas uint64) error { - err := setCommonAPIContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, &container, v2) - if err != nil { - return err - } - - if err := createDeployment(ctx, stack, ledger, name, container, v2, replicas, imageConfiguration); err != nil { - return err - } - - if _, err := services.Create(ctx, ledger, name, services.WithDefault(name)); err != nil { - return err - } - - return nil - } - - container, err := createLedgerContainerWriteOnly(ctx, stack, v2) - if err != nil { - return err - } - if err := createDeployment("ledger-write", *container, 1); err != nil { - return err - } - - container = createLedgerContainerReadOnly(v2) - if err := createDeployment("ledger-read", *container, 0); err != nil { - return err - } - - if err := createGatewayDeployment(ctx, stack, ledger); err != nil { - return err - } - - return nil +func getUpgradeContainer(ctx core.Context, stack *v1beta1.Stack, database *v1beta1.Database, imageConfiguration *registries.ImageConfiguration, version string) (corev1.Container, error) { + return databases.MigrateDatabaseContainer(ctx, stack, imageConfiguration, database, + func(m *databases.MigrationConfiguration) { + if core.IsLower(version, "v2.0.0-rc.6") { + m.Command = []string{"buckets", "upgrade-all"} + } + m.AdditionalEnv = []corev1.EnvVar{ + core.Env("STORAGE_POSTGRES_CONN_STRING", "$(POSTGRES_URI)"), + } + }, + ) } func uninstallLedgerMonoWriterMultipleReader(ctx core.Context, stack *v1beta1.Stack) error { @@ -410,22 +297,13 @@ func uninstallLedgerMonoWriterMultipleReader(ctx core.Context, stack *v1beta1.St return nil } -func createDeployment(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, name string, container corev1.Container, v2 bool, replicas uint64, imageConfiguration *registries.ImageConfiguration) error { +func createDeployment(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, name string, container corev1.Container, replicas uint64, imageConfiguration *registries.ImageConfiguration) error { serviceAccountName, err := settings.GetAWSServiceAccount(ctx, stack.Name) if err != nil { return err } - var volumes []corev1.Volume - if !v2 { - volumes = []corev1.Volume{{ - Name: "config", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, - }, - }} - } - + // No volumes needed for v2 tpl := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -435,7 +313,6 @@ func createDeployment(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Le Spec: corev1.PodSpec{ ImagePullSecrets: imageConfiguration.PullSecrets, Containers: []corev1.Container{container}, - Volumes: volumes, ServiceAccountName: serviceAccountName, }, }, @@ -448,21 +325,17 @@ func createDeployment(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Le Install(ctx) } -func setCommonContainerConfiguration(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, imageConfiguration *registries.ImageConfiguration, database *v1beta1.Database, container *corev1.Container, v2 bool) error { +func setCommonContainerConfiguration(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, imageConfiguration *registries.ImageConfiguration, database *v1beta1.Database, container *corev1.Container) error { - prefix := "" - if !v2 { - prefix = "NUMARY_" - } env := make([]corev1.EnvVar, 0) - otlpEnv, err := settings.GetOTELEnvVarsWithPrefix(ctx, stack.Name, core.LowerCamelCaseKind(ctx, ledger), prefix, " ") + otlpEnv, err := settings.GetOTELEnvVarsWithPrefix(ctx, stack.Name, core.LowerCamelCaseKind(ctx, ledger), "", " ") if err != nil { return err } env = append(env, otlpEnv...) - env = append(env, core.GetDevEnvVarsWithPrefix(stack, ledger, prefix)...) + env = append(env, core.GetDevEnvVarsWithPrefix(stack, ledger, "")...) - postgresEnvVar, err := databases.PostgresEnvVarsWithPrefix(ctx, stack, database, prefix) + postgresEnvVar, err := databases.PostgresEnvVarsWithPrefix(ctx, stack, database, "") if err != nil { return err } @@ -470,30 +343,25 @@ func setCommonContainerConfiguration(ctx core.Context, stack *v1beta1.Stack, led container.Image = imageConfiguration.GetFullImageName() container.Env = append(container.Env, env...) - container.Env = append(container.Env, core.Env(fmt.Sprintf("%sSTORAGE_POSTGRES_CONN_STRING", prefix), fmt.Sprintf("$(%sPOSTGRES_URI)", prefix))) - container.Env = append(container.Env, core.Env(fmt.Sprintf("%sSTORAGE_DRIVER", prefix), "postgres")) + container.Env = append(container.Env, core.Env("STORAGE_POSTGRES_CONN_STRING", "$(POSTGRES_URI)")) + container.Env = append(container.Env, core.Env("STORAGE_DRIVER", "postgres")) return nil } -func setCommonAPIContainerConfiguration(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, imageConfiguration *registries.ImageConfiguration, database *v1beta1.Database, container *corev1.Container, v2 bool) error { - - prefix := "" - if !v2 { - prefix = "NUMARY_" - } +func setCommonAPIContainerConfiguration(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, imageConfiguration *registries.ImageConfiguration, database *v1beta1.Database, container *corev1.Container) error { - if err := setCommonContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, container, v2); err != nil { + if err := setCommonContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, container); err != nil { return err } - authEnvVars, err := auths.ProtectedAPIEnvVarsWithPrefix(ctx, stack, "ledger", ledger.Spec.Auth, prefix) + authEnvVars, err := auths.ProtectedAPIEnvVarsWithPrefix(ctx, stack, "ledger", ledger.Spec.Auth, "") if err != nil { return err } container.Env = append(container.Env, authEnvVars...) - gatewayEnv, err := gateways.EnvVarsIfEnabledWithPrefix(ctx, stack.Name, prefix) + gatewayEnv, err := gateways.EnvVarsIfEnabledWithPrefix(ctx, stack.Name, "") if err != nil { return err } @@ -504,28 +372,16 @@ func setCommonAPIContainerConfiguration(ctx core.Context, stack *v1beta1.Stack, return nil } -func createBaseLedgerContainer(v2 bool) *corev1.Container { +func createBaseLedgerContainer() *corev1.Container { ret := &corev1.Container{ Name: "ledger", } - var bindFlag = "BIND" - if !v2 { - bindFlag = "NUMARY_SERVER_HTTP_BIND_ADDRESS" - } - ret.Env = append(ret.Env, core.Env(bindFlag, ":8080")) - if !v2 { - ret.VolumeMounts = []corev1.VolumeMount{{ - Name: "config", - ReadOnly: false, - MountPath: "/root/.numary", - }} - } - + ret.Env = append(ret.Env, core.Env("BIND", ":8080")) return ret } -func createLedgerContainerFull(ctx core.Context, stack *v1beta1.Stack, v2 bool) (*corev1.Container, error) { - container := createBaseLedgerContainer(v2) +func createLedgerContainerFull(ctx core.Context, stack *v1beta1.Stack) (*corev1.Container, error) { + container := createBaseLedgerContainer() var broker *v1beta1.Broker if t, err := brokertopics.Find(ctx, stack, "ledger"); err != nil { @@ -543,75 +399,62 @@ func createLedgerContainerFull(ctx core.Context, stack *v1beta1.Stack, v2 bool) if !broker.Status.Ready { return nil, core.NewPendingError().WithMessage("broker not ready") } - prefix := "" - if !v2 { - prefix = "NUMARY_" - } - brokerEnvVar, err := brokers.GetEnvVarsWithPrefix(ctx, broker.Status.URI, stack.Name, "ledger", prefix) + brokerEnvVar, err := brokers.GetEnvVarsWithPrefix(ctx, broker.Status.URI, stack.Name, "ledger", "") if err != nil { return nil, err } container.Env = append(container.Env, brokerEnvVar...) - container.Env = append(container.Env, brokers.GetPublisherEnvVars(stack, broker, "ledger", prefix)...) + container.Env = append(container.Env, brokers.GetPublisherEnvVars(stack, broker, "ledger", "")...) } - if v2 { - hasDependency, err := core.HasDependency(ctx, stack.Name, &v1beta1.Analytics{}) - if err != nil { - return nil, err - } - if hasDependency { - container.Env = append(container.Env, core.Env("EMIT_LOGS", "true")) - } + hasDependency, err := core.HasDependency(ctx, stack.Name, &v1beta1.Analytics{}) + if err != nil { + return nil, err + } + if hasDependency { + container.Env = append(container.Env, core.Env("EMIT_LOGS", "true")) + } - logsBatchSize, err := settings.GetInt(ctx, stack.Name, "ledger", "logs", "max-batch-size") - if err != nil { - return nil, err - } - if logsBatchSize != nil && *logsBatchSize != 0 { - container.Env = append(container.Env, core.Env("LEDGER_BATCH_SIZE", fmt.Sprint(*logsBatchSize))) - } + logsBatchSize, err := settings.GetInt(ctx, stack.Name, "ledger", "logs", "max-batch-size") + if err != nil { + return nil, err + } + if logsBatchSize != nil && *logsBatchSize != 0 { + container.Env = append(container.Env, core.Env("LEDGER_BATCH_SIZE", fmt.Sprint(*logsBatchSize))) } return container, nil } -func createLedgerContainerWriteOnly(ctx core.Context, stack *v1beta1.Stack, v2 bool) (*corev1.Container, error) { - return createLedgerContainerFull(ctx, stack, v2) -} - -func createLedgerContainerReadOnly(v2 bool) *corev1.Container { - container := createBaseLedgerContainer(v2) - container.Env = append(container.Env, core.Env("READ_ONLY", "true")) - return container -} - -func createGatewayDeployment(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger) error { - - caddyfileConfigMap, err := caddy.CreateCaddyfileConfigMap(ctx, stack, "ledger", Caddyfile, map[string]any{ - "Debug": stack.Spec.Debug || ledger.Spec.Debug, - }, core.WithController[*corev1.ConfigMap](ctx.GetScheme(), ledger)) +func migrate(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, database *v1beta1.Database, imageConfiguration *registries.ImageConfiguration, version string) error { + serviceAccountName, err := settings.GetAWSServiceAccount(ctx, stack.Name) if err != nil { return err } - env := make([]corev1.EnvVar, 0) - env = append(env, core.GetDevEnvVars(stack, ledger)...) - - caddyImage, err := registries.GetCaddyImage(ctx, stack) + upgradeContainer, err := getUpgradeContainer(ctx, stack, database, imageConfiguration, version) if err != nil { return err } - tpl, err := caddy.DeploymentTemplate(ctx, stack, ledger, caddyfileConfigMap, caddyImage, env) - if err != nil { - return err - } + return jobs.Handle(ctx, ledger, "migrate-v2", upgradeContainer, + jobs.PreCreate(func() error { + list := &appsv1.DeploymentList{} + if err := ctx.GetClient().List(ctx, list, client.InNamespace(stack.Name)); err != nil { + return err + } - tpl.Name = "ledger-gateway" - return applications. - New(ledger, tpl). - Install(ctx) + for _, item := range list.Items { + if controller := metav1.GetControllerOf(&item); controller != nil && controller.UID == ledger.GetUID() { + if err := ctx.GetClient().Delete(ctx, &item); err != nil { + return err + } + } + } + return nil + }), + jobs.WithServiceAccount(serviceAccountName), + ) } diff --git a/internal/resources/ledgers/init.go b/internal/resources/ledgers/init.go index d9fca522..88d180ea 100644 --- a/internal/resources/ledgers/init.go +++ b/internal/resources/ledgers/init.go @@ -18,10 +18,6 @@ package ledgers import ( _ "embed" - "fmt" - "github.com/formancehq/operator/internal/resources/jobs" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" "github.com/formancehq/operator/api/formance.com/v1beta1" . "github.com/formancehq/operator/internal/core" @@ -58,20 +54,11 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, versio return err } - isV2 := false - if !semver.IsValid(version) || semver.Compare(version, "v2.0.0-alpha") > 0 { - isV2 = true - } - if err := benthosstreams.LoadFromFileSystem(ctx, benthos.Streams, ledger, "streams/ledger", "ingestion"); err != nil { return err } - streamsVersion := "v1.0.0" - if isV2 { - streamsVersion = "v2.0.0" - } - if err := benthosstreams.LoadFromFileSystem(ctx, reindexStreams, ledger, fmt.Sprintf("assets/reindex/%s", streamsVersion), "reindex"); err != nil { + if err := benthosstreams.LoadFromFileSystem(ctx, reindexStreams, ledger, "assets/reindex/v2.0.0", "reindex"); err != nil { return err } @@ -95,46 +82,16 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, versio return NewPendingError().WithMessage("database not ready") } - if isV2 && databases.GetSavedModuleVersion(database) != version { - err := databases.Migrate( - ctx, - stack, - ledger, - imageConfiguration, - database, - jobs.Mutator(func(t *batchv1.Job) error { - if IsLower(version, "v2.0.0-rc.6") { - t.Spec.Template.Spec.Containers[0].Command = []string{"buckets", "upgrade-all"} - } - t.Spec.Template.Spec.Containers[0].Env = append(t.Spec.Template.Spec.Containers[0].Env, Env("STORAGE_POSTGRES_CONN_STRING", "$(POSTGRES_URI)")) - - return nil - }), - jobs.PreCreate(func() error { - list := &appsv1.DeploymentList{} - if err := ctx.GetClient().List(ctx, list, client.InNamespace(stack.Name)); err != nil { - return err - } - - for _, item := range list.Items { - if controller := metav1.GetControllerOf(&item); controller != nil && controller.UID == ledger.GetUID() { - if err := ctx.GetClient().Delete(ctx, &item); err != nil { - return err - } - } - } - - return nil - }), - ) + if databases.GetSavedModuleVersion(database) != version { + err := migrate(ctx, stack, ledger, database, imageConfiguration, version) if err != nil { isV2_2 := !semver.IsValid(version) || semver.Compare(version, "v2.2.0-alpha") > 0 if !isV2_2 { return err } - if IsApplicationError(err) { // Start the ledger even if migrations are not terminated - return installLedger(ctx, stack, ledger, database, imageConfiguration, version, isV2) + if IsApplicationError(err) { + return installLedger(ctx, stack, ledger, database, imageConfiguration, version) } return err @@ -144,7 +101,7 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, versio } } - return installLedger(ctx, stack, ledger, database, imageConfiguration, version, isV2) + return installLedger(ctx, stack, ledger, database, imageConfiguration, version) } func init() {