mirror of
https://github.com/akyaiy/GoSally-mvp.git
synced 2026-01-03 19:32:26 +00:00
add full jsonrpc-2.0 batch support
This commit is contained in:
@@ -5,16 +5,17 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"sync"
|
||||||
|
|
||||||
"github.com/akyaiy/GoSally-mvp/internal/server/rpc"
|
"github.com/akyaiy/GoSally-mvp/internal/server/rpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (gs *GatewayServer) Handle(w http.ResponseWriter, r *http.Request) {
|
func (gs *GatewayServer) Handle(w http.ResponseWriter, r *http.Request) {
|
||||||
var req rpc.RPCRequest
|
w.Header().Set("Content-Type", "application/json")
|
||||||
body, err := io.ReadAll(r.Body)
|
body, err := io.ReadAll(r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rpc.WriteRouterError(w, http.StatusBadRequest, &rpc.RPCError{
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
rpc.WriteError(w, &rpc.RPCResponse{
|
||||||
JSONRPC: rpc.JSONRPCVersion,
|
JSONRPC: rpc.JSONRPCVersion,
|
||||||
ID: nil,
|
ID: nil,
|
||||||
Error: map[string]any{
|
Error: map[string]any{
|
||||||
@@ -26,55 +27,70 @@ func (gs *GatewayServer) Handle(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := json.Unmarshal(body, &req); err != nil {
|
// determine if the JSON-RPC request is a batch
|
||||||
rpc.WriteRouterError(w, http.StatusBadRequest, &rpc.RPCError{
|
var batch []rpc.RPCRequest
|
||||||
JSONRPC: rpc.JSONRPCVersion,
|
json.Unmarshal(body, &batch)
|
||||||
ID: nil,
|
var single rpc.RPCRequest
|
||||||
Error: map[string]any{
|
if batch == nil {
|
||||||
"code": rpc.ErrParseError,
|
if err := json.Unmarshal(body, &single); err != nil {
|
||||||
"message": rpc.ErrParseErrorS,
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
},
|
rpc.WriteError(w, &rpc.RPCResponse{
|
||||||
})
|
JSONRPC: rpc.JSONRPCVersion,
|
||||||
gs.log.Info("invalid request received", slog.String("issue", rpc.ErrParseErrorS))
|
ID: nil,
|
||||||
|
Error: map[string]any{
|
||||||
|
"code": rpc.ErrParseError,
|
||||||
|
"message": rpc.ErrParseErrorS,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
gs.log.Info("invalid request received", slog.String("issue", rpc.ErrParseErrorS))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resp := gs.Route(r, &single)
|
||||||
|
rpc.WriteResponse(w, resp)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.JSONRPC != rpc.JSONRPCVersion {
|
// handle batch
|
||||||
rpc.WriteRouterError(w, http.StatusBadRequest, &rpc.RPCError{
|
responses := make(chan rpc.RPCResponse, len(batch))
|
||||||
JSONRPC: rpc.JSONRPCVersion,
|
var wg sync.WaitGroup
|
||||||
ID: req.ID,
|
for _, m := range batch {
|
||||||
Error: map[string]any{
|
wg.Add(1)
|
||||||
"code": rpc.ErrInvalidRequest,
|
go func(req rpc.RPCRequest) {
|
||||||
"message": rpc.ErrInvalidRequestS,
|
defer wg.Done()
|
||||||
},
|
res := gs.Route(r, &req)
|
||||||
})
|
if res != nil {
|
||||||
gs.log.Info("invalid request received", slog.String("issue", rpc.ErrInvalidRequestS), slog.String("requested-version", req.JSONRPC))
|
responses <- *res
|
||||||
return
|
}
|
||||||
|
}(m)
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
|
close(responses)
|
||||||
|
|
||||||
gs.Route(w, r, req)
|
var result []rpc.RPCResponse
|
||||||
|
for res := range responses {
|
||||||
|
result = append(result, res)
|
||||||
|
}
|
||||||
|
if len(result) > 0 {
|
||||||
|
json.NewEncoder(w).Encode(result)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gs *GatewayServer) Route(w http.ResponseWriter, r *http.Request, req rpc.RPCRequest) {
|
func (gs *GatewayServer) Route(r *http.Request, req *rpc.RPCRequest) *rpc.RPCResponse {
|
||||||
server, ok := gs.servers[serversApiVer(req.Params.ContextVersion)]
|
if req.JSONRPC != rpc.JSONRPCVersion {
|
||||||
if !ok {
|
gs.log.Info("invalid request received", slog.String("issue", rpc.ErrInvalidRequestS), slog.String("requested-version", req.JSONRPC))
|
||||||
rpc.WriteRouterError(w, http.StatusBadRequest, &rpc.RPCError{
|
return rpc.NewError(rpc.ErrInvalidRequest, rpc.ErrInvalidRequestS, req.ID)
|
||||||
JSONRPC: rpc.JSONRPCVersion,
|
|
||||||
ID: req.ID,
|
|
||||||
Error: map[string]any{
|
|
||||||
"code": rpc.ErrContextVersion,
|
|
||||||
"message": rpc.ErrContextVersionS,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
gs.log.Info("invalid request received", slog.String("issue", rpc.ErrContextVersionS), slog.String("requested-version", req.Params.ContextVersion))
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
server, ok := gs.servers[serversApiVer(req.ContextVersion)]
|
||||||
|
if !ok {
|
||||||
|
gs.log.Info("invalid request received", slog.String("issue", rpc.ErrContextVersionS), slog.String("requested-version", req.ContextVersion))
|
||||||
|
return rpc.NewError(rpc.ErrContextVersion, rpc.ErrContextVersionS, req.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := server.Handle(r, req)
|
||||||
// checks if request is notification
|
// checks if request is notification
|
||||||
if req.ID == nil {
|
if req.ID == nil {
|
||||||
rr := httptest.NewRecorder()
|
return nil
|
||||||
server.Handle(rr, r, req)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
server.Handle(w, r, req)
|
return resp
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user