Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions api/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
Expand All @@ -52,6 +53,7 @@ type (
GRPCServer struct {
port string
svr *grpc.Server
sem *semaphore.Weighted
}

// GRPCHandler contains the pointer to api coreservice
Expand Down Expand Up @@ -87,6 +89,7 @@ func NewGRPCServer(core CoreService, bds *blockDAOService, grpcPort int) *GRPCSe
return nil
}

sem := semaphore.NewWeighted(_maxRequestLimit)
gSvr := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_prometheus.StreamServerInterceptor,
Expand All @@ -96,6 +99,20 @@ func NewGRPCServer(core CoreService, bds *blockDAOService, grpcPort int) *GRPCSe
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_prometheus.UnaryServerInterceptor,
otelgrpc.UnaryServerInterceptor(),
grpc.UnaryServerInterceptor(func(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (any, error) {
acquireCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := sem.Acquire(acquireCtx, 1); err != nil {
return nil, status.Error(codes.ResourceExhausted, "server busy")
}
defer sem.Release(1)
return handler(ctx, req)
}),
grpc_recovery.UnaryServerInterceptor(RecoveryInterceptor()),
)),
grpc.KeepaliveEnforcementPolicy(kaep),
Expand All @@ -114,6 +131,7 @@ func NewGRPCServer(core CoreService, bds *blockDAOService, grpcPort int) *GRPCSe
return &GRPCServer{
port: ":" + strconv.Itoa(grpcPort),
svr: gSvr,
sem: sem,
}
}

Expand Down
13 changes: 13 additions & 0 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"go.uber.org/zap"
"golang.org/x/sync/semaphore"

apitypes "github.com/iotexproject/iotex-core/v2/api/types"
"github.com/iotexproject/iotex-core/v2/pkg/log"
Expand All @@ -24,6 +25,7 @@ type (
// hTTPHandler handles requests from http protocol
hTTPHandler struct {
msgHandler Web3Handler
sem *semaphore.Weighted
}
)

Expand Down Expand Up @@ -60,6 +62,7 @@ func (hSvr *HTTPServer) Stop(ctx context.Context) error {
func newHTTPHandler(web3Handler Web3Handler) *hTTPHandler {
return &hTTPHandler{
msgHandler: web3Handler,
sem: semaphore.NewWeighted(_maxRequestLimit),
}
}

Expand All @@ -71,6 +74,16 @@ func (handler *hTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

ctx, span := tracer.NewSpan(req.Context(), "http")
defer span.End()
acquireCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := handler.sem.Acquire(acquireCtx, 1); err != nil {
w.WriteHeader(http.StatusTooManyRequests)
log.L().Error("fail to acquire semaphore", zap.Error(err))
return
}
// TODO: add metrics
defer handler.sem.Release(1)

if err := handler.msgHandler.HandlePOSTReq(ctx, req.Body,
apitypes.NewResponseWriter(
func(resp interface{}) (int, error) {
Expand Down
2 changes: 2 additions & 0 deletions api/web3server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
_metamaskBalanceContractAddr = "io1k8uw2hrlvnfq8s2qpwwc24ws2ru54heenx8chr"
// _defaultBatchRequestLimit is the default maximum number of items in a batch.
_defaultBatchRequestLimit = 100 // Maximum number of items in a batch.
// _maxRequestLimit is the maximum number of concurrent requests.
_maxRequestLimit = 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to make maxRequestLimit configurable.

)

type (
Expand Down