Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions docs/generator-parameters.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ For each microservice, HydraGen supports a set of configuration parameters that
* **execution_mode**: Determines if the server responding at this endpoint should handle requests sequentially or in parallel, on multiple threads. Default: "sequential"
* **cpu_complexity**: CPU stress parameters.
* **network_complexity**: Network stress parameters.
* **resilience_patterns**: Resilience patterns parameters.

#### Format

Expand All @@ -121,14 +122,40 @@ For each microservice, HydraGen supports a set of configuration parameters that
{
"name": "<string>",
"execution_mode": "<string:sequential|parallel>",

"resilience_patterns": {...},
"cpu_complexity": {...},
"network_complexity": {...}
},
...
]
```

## Describing Resilience Patterns

Hydragen supports the injection of resilience patterns into the client generated code. Initially, Circuit Breaker pattern is supported.

### Circuit Breaker

The circuit breaker implementation use a simple strategy to change the circuit breaker state (_CLOSED, OPEN and HALF OPEN_). If some request exceeds the timeout, the circuit breaker implementation will change the state to _OPEN_ (This meansthat the failed request threshold it's one request). After a configured timeout, the pattern will send a request to the destination service and check the response status.

The circuit breaker is configured for the hole endpoint, but it must be enabled for each called service in Network stressor.

#### Required attributes

* **timeout**: Determines the timeout in seconds to consider the response as failed.
* **retry_timer**: Determines the number of seconds that the circuit breaker must wait to retry a request to the destination service connection.

#### Format

```json
"resilience_patterns" : {
"circuit_breaker": {
"timeout": <integer>,
"retry_timer: <integer>
}
}
```

## Describing Resource Stressors

HydraGen supports parameters to express the computational complexity or stress a microservice exerts on the different hardware resources. Initially, CPU-bounded or network-bounded tasks are implemented. The complexity of a CPU-bounded task can be described based on the time a busy-wait is executed, while the load on the network I/O can be described by specifying parameters such as the call forwarding mode and the request/response size for each service endpoint call.
Expand Down Expand Up @@ -168,7 +195,6 @@ The CPU stressor will lock threads for exclusive access while it is executing. T
"network_complexity": {
"forward_requests": "<string:synchronous|asynchronous>",
"response_payload_size": <integer:chars>,

"called_services": [...]
}
```
Expand All @@ -186,6 +212,7 @@ The CPU stressor will lock threads for exclusive access while it is executing. T
* **request_payload_size**: Determines the number of characters that will be sent in the request to the endpoint. Default: 0
* **port**: The port the server is responding to requests on. This is usually determined automatically.
* **protocol**: Determines if the call will be made using HTTP or gRPC. This is usually determined automatically.
* **active_circuit_breaker**: Determines if the endpoint circuit breaker will protect this service call.

#### Format

Expand All @@ -197,7 +224,8 @@ The CPU stressor will lock threads for exclusive access while it is executing. T
"port": "<string>",
"protocol": "<string:http|grpc>",
"traffic_forward_ratio": <integer>,
"request_payload_size": <integer:chars>
"request_payload_size": <integer:chars>,
"active_circuit_breaker": <boolean>
}
]
```
Expand Down
23 changes: 19 additions & 4 deletions emulator/src/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package client

import (
"application-emulator/src/generated/client"
"application-emulator/src/resilience/circuit_breaker"
"application-model/generated"
"context"
"fmt"
Expand All @@ -28,7 +29,7 @@ import (
)

// Sends a gRPC request to the specified endpoint
func GRPC(service, endpoint string, port int, payload string) (*generated.Response, error) {
func GRPC(service, endpoint string, port int, payload, sourceEndpoint string) (*generated.Response, error) {
var url string
// Omit the port if zero
if port == 0 {
Expand All @@ -46,11 +47,25 @@ func GRPC(service, endpoint string, port int, payload string) (*generated.Respon
}
defer conn.Close()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
circuitBreakerRegistry := circuit_breaker.GetCircuitBreakerRegistry()
cbName := circuitBreakerRegistry.BuildName(sourceEndpoint, service, endpoint)
circuitBreaker := circuitBreakerRegistry.GetCircuitBreaker(cbName)

var response *generated.Response
request := &generated.Request{
Payload: payload,
}
callOptions := []grpc.CallOption{}
response, err := client.CallGeneratedEndpoint(ctx, conn, service, endpoint, &generated.Request{Payload: payload}, callOptions...)

if circuitBreaker == nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

response, err = client.CallGeneratedEndpoint(ctx, conn, service, endpoint, request, callOptions...)
} else {
response, err = circuitBreaker.ProxyGRPC(conn, service, endpoint, request, callOptions...)
}

if err != nil {
return nil, err
}
Expand Down
23 changes: 21 additions & 2 deletions emulator/src/client/restful.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package client

import (
"application-emulator/src/resilience/circuit_breaker"
"application-model/generated"
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"

"google.golang.org/protobuf/encoding/protojson"
Expand All @@ -30,7 +32,7 @@ import (
const useProtoJSON = true

// Sends a HTTP POST request to the specified endpoint
func POST(service, endpoint string, port int, payload string, headers http.Header) (int, *generated.Response, error) {
func POST(service, endpoint string, port int, payload string, headers http.Header, sourceEndpoint string) (int, *generated.Response, error) {
var url string
// Omit the port if zero
if port == 0 {
Expand Down Expand Up @@ -58,7 +60,24 @@ func POST(service, endpoint string, port int, payload string, headers http.Heade
}

// Send the request
response, err := http.DefaultClient.Do(request)
// Here it comes the circuit breaker

circuitBreakerRegistry := circuit_breaker.GetCircuitBreakerRegistry()
cbName := circuitBreakerRegistry.BuildName(sourceEndpoint, service, endpoint)
circuitBreaker := circuitBreakerRegistry.GetCircuitBreaker(cbName)

log.Printf("[CLIENT HTTP] Circuit breaker obtanied %v", circuitBreaker)

var response *http.Response
var err error

if circuitBreaker == nil {
response, err = http.DefaultClient.Do(request)
} else {
response, err = circuitBreaker.ProxyHTTP(request)
}

log.Printf("[CLIENT HTTP] Request returned from %s/%s", service, endpoint)
if err != nil {
return 0, nil, err
}
Expand Down
122 changes: 122 additions & 0 deletions emulator/src/resilience/circuit_breaker/implementation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package circuit_breaker

import (
"application-emulator/src/generated/client"
"application-model/generated"
"context"
"errors"
"log"
"net/http"
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type CircuitBreakerState string

const (
OPEN CircuitBreakerState = "OPEN"
CLOSED CircuitBreakerState = "CLOSED"
HALF_OPEN CircuitBreakerState = "HALF OPEN"
)

var GRPC_ERROR = status.Error(codes.Unavailable, "Service unavailable")
var HTTP_ERROR = errors.New("Service unavailable")

type RequestCallback func(ctx context.Context) (any, error)

type CircuitBreaker interface {
ProxyHTTP(request *http.Request) (*http.Response, error)
ProxyGRPC(conn *grpc.ClientConn, service, endpoint string, request *generated.Request, options ...grpc.CallOption) (*generated.Response, error)
ProcessRequest(cb RequestCallback, requestError error) (any, error)
}

type CircuitBreakerImpl struct {
State CircuitBreakerState
Timeout int // timeout in seconds
RetryTimer int // In how many seconds should we retry
lock sync.Mutex
EndpointProtected string
}

func (c *CircuitBreakerImpl) ProcessRequest(cb RequestCallback, requestError error) (any, error) {
log.Printf("[CIRCUIT BREAKER] Circuit breaker of %s in state %s\n", c.EndpointProtected, c.State)
c.lock.Lock()
if c.State == OPEN {
c.lock.Unlock()
return nil, requestError
}
if c.State == HALF_OPEN {
c.State = OPEN
}
c.lock.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
defer cancel()

response, err := cb(ctx)
log.Printf("[CIRCUIT BREAKER] Request sended and callback returned %v, with error %v\n", response, err)

if err != nil && errors.Is(err, context.DeadlineExceeded) {
log.Printf("[CIRCUIT BREAKER] Circuit breaker of %s timedout\n", c.EndpointProtected)
c.lock.Lock()
c.State = OPEN
c.lock.Unlock()
go func() {
time.Sleep(time.Second * time.Duration(c.RetryTimer))
c.lock.Lock()
c.State = HALF_OPEN
c.lock.Unlock()
return
}()
return nil, requestError
}

c.lock.Lock()
if c.State == OPEN || c.State == HALF_OPEN {
c.State = CLOSED
}
c.lock.Unlock()
return response, err
}

func (c *CircuitBreakerImpl) ProxyHTTP(request *http.Request) (*http.Response, error) {

response, err := c.ProcessRequest(func(ctx context.Context) (any, error) {
request = request.WithContext(ctx)
response, err := http.DefaultClient.Do(request)
return response, err
}, HTTP_ERROR)

if err != nil {
return nil, err
}

httpResponse, ok := response.(*http.Response)
log.Printf("[CIRCUIT BREAKER] The response returned from circuit breaker as %v", httpResponse)

if !ok {
return nil, errors.New("HTTP response from Circuit breaker callback broken")
}

return httpResponse, err
}

func (c *CircuitBreakerImpl) ProxyGRPC(conn *grpc.ClientConn, service, endpoint string, request *generated.Request, options ...grpc.CallOption) (*generated.Response, error) {
response, err := c.ProcessRequest(func(ctx context.Context) (any, error) {
return client.CallGeneratedEndpoint(ctx, conn, service, endpoint, request, options...)
}, GRPC_ERROR)

if err != nil {
return nil, err
}
grpcResponse, ok := response.(*generated.Response)
if !ok {
return nil, errors.New("GRPC response from Circuit breaker callback broken")
}

return grpcResponse, err
}
59 changes: 59 additions & 0 deletions emulator/src/resilience/circuit_breaker/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package circuit_breaker

import (
model "application-model"
"fmt"
"sync"
)

var instanceLock = &sync.Mutex{}
var registerLock = &sync.Mutex{}
var GetLock = &sync.Mutex{}

type CircuitBreakerRegister struct {
ProtectedEndpoints map[string]*CircuitBreakerImpl
}

var circuitBreakerInstance *CircuitBreakerRegister

func (cbr *CircuitBreakerRegister) BuildName(sourceEndpoint, destService, destEndpoint string) string {
return fmt.Sprintf("%s:%s/%s", sourceEndpoint, destService, destEndpoint)
}
func (cbr *CircuitBreakerRegister) RegisterEndpoint(endpoint string, config *model.CircuitBreakerConfig) {
registerLock.Lock()
defer registerLock.Unlock()
cbr.ProtectedEndpoints[endpoint] = &CircuitBreakerImpl{
State: CLOSED,
Timeout: config.Timeout,
RetryTimer: config.RetryTimer,
EndpointProtected: endpoint,
}
}

func (cbr *CircuitBreakerRegister) GetCircuitBreaker(endpoint string) *CircuitBreakerImpl {
GetLock.Lock()
defer GetLock.Unlock()

circuitBreaker, ok := cbr.ProtectedEndpoints[endpoint]
if !ok {
return nil
}
return circuitBreaker
}

func CheckCircuitBreakerConfig(endpoint *model.Endpoint) bool {
return endpoint.ResiliencePatterns.CircuitBreaker != nil
}

func GetCircuitBreakerRegistry() *CircuitBreakerRegister {
if circuitBreakerInstance == nil {
instanceLock.Lock()
defer instanceLock.Unlock()
if circuitBreakerInstance == nil {
circuitBreakerInstance = &CircuitBreakerRegister{
ProtectedEndpoints: make(map[string]*CircuitBreakerImpl),
}
}
}
return circuitBreakerInstance
}
Loading