From 7de5ec524894ff97dfe983db7487634e7239f4fb Mon Sep 17 00:00:00 2001 From: Alexey Date: Mon, 4 Aug 2025 13:36:30 +0300 Subject: [PATCH] add sqlite support --- internal/server/sv1/lua_handler.go | 358 +++++++++++++++++++++++++++-- 1 file changed, 336 insertions(+), 22 deletions(-) diff --git a/internal/server/sv1/lua_handler.go b/internal/server/sv1/lua_handler.go index 6fd60bf..7963f57 100644 --- a/internal/server/sv1/lua_handler.go +++ b/internal/server/sv1/lua_handler.go @@ -3,6 +3,7 @@ package sv1 // TODO: make a lua state pool using sync.Pool import ( + "database/sql" "fmt" "io" "log/slog" @@ -12,12 +13,322 @@ import ( "path/filepath" "strconv" "strings" + "sync" "github.com/akyaiy/GoSally-mvp/internal/colors" "github.com/akyaiy/GoSally-mvp/internal/server/rpc" lua "github.com/yuin/gopher-lua" + _ "modernc.org/sqlite" ) +type DBConnection struct { + dbPath string + log bool + logger *slog.Logger + writeChan chan *dbWriteRequest + closeChan chan struct{} +} + +type dbWriteRequest struct { + query string + args []interface{} + resCh chan *dbWriteResult +} + +type dbWriteResult struct { + rowsAffected int64 + err error +} + +var dbMutexMap = make(map[string]*sync.RWMutex) +var dbGlobalMutex sync.Mutex + +func getDBMutex(dbPath string) *sync.RWMutex { + dbGlobalMutex.Lock() + defer dbGlobalMutex.Unlock() + + if mtx, ok := dbMutexMap[dbPath]; ok { + return mtx + } + + mtx := &sync.RWMutex{} + dbMutexMap[dbPath] = mtx + return mtx +} + +func loadDBMod(llog *slog.Logger) func(*lua.LState) int { + llog.Debug("import module db-sqlite") + return func(L *lua.LState) int { + dbMod := L.NewTable() + + L.SetField(dbMod, "connect", L.NewFunction(func(L *lua.LState) int { + dbPath := L.CheckString(1) + + logQueries := false + if L.GetTop() >= 2 { + opts := L.CheckTable(2) + if val := opts.RawGetString("log"); val != lua.LNil { + logQueries = lua.LVAsBool(val) + } + } + + conn := &DBConnection{ + dbPath: dbPath, + log: logQueries, + logger: llog, + writeChan: make(chan *dbWriteRequest, 100), + closeChan: make(chan struct{}), + } + + go conn.processWrites() + + ud := L.NewUserData() + ud.Value = conn + L.SetMetatable(ud, L.GetTypeMetatable("gosally_db")) + + L.Push(ud) + return 1 + })) + + mt := L.NewTypeMetatable("gosally_db") + L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), map[string]lua.LGFunction{ + "exec": dbExec, + "query": dbQuery, + "close": dbClose, + })) + + L.SetField(dbMod, "__gosally_internal", lua.LString("0")) + L.Push(dbMod) + return 1 + } +} + +func (conn *DBConnection) processWrites() { + for { + select { + case req := <-conn.writeChan: + mtx := getDBMutex(conn.dbPath) + mtx.Lock() + + db, err := sql.Open("sqlite", conn.dbPath+"?_busy_timeout=5000&_journal_mode=WAL&_sync=NORMAL&_cache_size=-10000") + if err == nil { + _, err = db.Exec("PRAGMA journal_mode=WAL;") + if err == nil { + res, execErr := db.Exec(req.query, req.args...) + if execErr == nil { + rows, _ := res.RowsAffected() + req.resCh <- &dbWriteResult{rowsAffected: rows} + } else { + req.resCh <- &dbWriteResult{err: execErr} + } + } + db.Close() + } + + if err != nil { + req.resCh <- &dbWriteResult{err: err} + } + + mtx.Unlock() + case <-conn.closeChan: + return + } + } +} + +func dbExec(L *lua.LState) int { + ud := L.CheckUserData(1) + conn, ok := ud.Value.(*DBConnection) + if !ok { + L.Push(lua.LNil) + L.Push(lua.LString("invalid database connection")) + return 2 + } + + query := L.CheckString(2) + + var args []any + if L.GetTop() >= 3 { + params := L.CheckTable(3) + params.ForEach(func(k lua.LValue, v lua.LValue) { + args = append(args, ConvertLuaTypesToGolang(v)) + }) + } + + if conn.log { + conn.logger.Info("DB Exec", + slog.String("query", query), + slog.Any("params", args)) + } + + resCh := make(chan *dbWriteResult, 1) + conn.writeChan <- &dbWriteRequest{ + query: query, + args: args, + resCh: resCh, + } + + ctx := L.NewTable() + L.SetField(ctx, "done", lua.LBool(false)) + + var result lua.LValue = lua.LNil + var errorMsg lua.LValue = lua.LNil + + L.SetField(ctx, "wait", L.NewFunction(func(lL *lua.LState) int { + res := <-resCh + L.SetField(ctx, "done", lua.LBool(true)) + + if res.err != nil { + errorMsg = lua.LString(res.err.Error()) + result = lua.LNil + } else { + result = lua.LNumber(res.rowsAffected) + errorMsg = lua.LNil + } + + if res.err != nil { + lL.Push(lua.LNil) + lL.Push(lua.LString(res.err.Error())) + return 2 + } + lL.Push(lua.LNumber(res.rowsAffected)) + lL.Push(lua.LNil) + return 2 + })) + + L.SetField(ctx, "check", L.NewFunction(func(lL *lua.LState) int { + select { + case res := <-resCh: + lL.SetField(ctx, "done", lua.LBool(true)) + if res.err != nil { + errorMsg = lua.LString(res.err.Error()) + result = lua.LNil + lL.Push(lua.LNil) + lL.Push(lua.LString(res.err.Error())) + return 2 + } else { + result = lua.LNumber(res.rowsAffected) + errorMsg = lua.LNil + lL.Push(lua.LNumber(res.rowsAffected)) + lL.Push(lua.LNil) + return 2 + } + default: + lL.Push(result) + lL.Push(errorMsg) + return 2 + } + })) + + L.Push(ctx) + L.Push(lua.LNil) + return 2 +} + +func dbQuery(L *lua.LState) int { + ud := L.CheckUserData(1) + conn, ok := ud.Value.(*DBConnection) + if !ok { + L.Push(lua.LNil) + L.Push(lua.LString("invalid database connection")) + return 2 + } + + query := L.CheckString(2) + + var args []any + if L.GetTop() >= 3 { + params := L.CheckTable(3) + params.ForEach(func(k lua.LValue, v lua.LValue) { + args = append(args, ConvertLuaTypesToGolang(v)) + }) + } + + if conn.log { + conn.logger.Info("DB Query", + slog.String("query", query), + slog.Any("params", args)) + } + + mtx := getDBMutex(conn.dbPath) + mtx.RLock() + defer mtx.RUnlock() + + db, err := sql.Open("sqlite", conn.dbPath+"?_busy_timeout=5000&_journal_mode=WAL&_sync=NORMAL&_cache_size=-10000") + if err != nil { + L.Push(lua.LNil) + L.Push(lua.LString(err.Error())) + return 2 + } + defer db.Close() + + rows, err := db.Query(query, args...) + if err != nil { + L.Push(lua.LNil) + L.Push(lua.LString(fmt.Sprintf("query failed: %v", err))) + return 2 + } + defer rows.Close() + + columns, err := rows.Columns() + if err != nil { + L.Push(lua.LNil) + L.Push(lua.LString(fmt.Sprintf("get columns failed: %v", err))) + return 2 + } + + result := L.NewTable() + colCount := len(columns) + values := make([]any, colCount) + valuePtrs := make([]any, colCount) + + for rows.Next() { + for i := range columns { + valuePtrs[i] = &values[i] + } + + if err := rows.Scan(valuePtrs...); err != nil { + L.Push(lua.LNil) + L.Push(lua.LString(fmt.Sprintf("scan failed: %v", err))) + return 2 + } + + rowTable := L.NewTable() + for i, col := range columns { + val := values[i] + if val == nil { + L.SetField(rowTable, col, lua.LNil) + } else { + L.SetField(rowTable, col, ConvertGolangTypesToLua(L, val)) + } + } + result.Append(rowTable) + } + + if err := rows.Err(); err != nil { + L.Push(lua.LNil) + L.Push(lua.LString(fmt.Sprintf("rows iteration failed: %v", err))) + return 2 + } + + L.Push(result) + return 1 +} + +func dbClose(L *lua.LState) int { + ud := L.CheckUserData(1) + conn, ok := ud.Value.(*DBConnection) + if !ok { + L.Push(lua.LFalse) + L.Push(lua.LString("invalid database connection")) + return 2 + } + + close(conn.closeChan) + L.Push(lua.LTrue) + return 1 +} + func addInitiatorHeaders(sid string, req *http.Request, headers http.Header) { clientIP := req.RemoteAddr if forwardedFor := req.Header.Get("X-Forwarded-For"); forwardedFor != "" { @@ -38,14 +349,15 @@ func addInitiatorHeaders(sid string, req *http.Request, headers http.Header) { // I will be only glad. // TODO: make this huge function more harmonious by dividing responsibilities func (h *HandlerV1) handleLUA(sid string, r *http.Request, req *rpc.RPCRequest, path string) *rpc.RPCResponse { - h.x.SLog.Debug("handling LUA", slog.String("session-id", sid)) + llog := h.x.SLog.With(slog.String("session-id", sid)) + llog.Debug("handling LUA") L := lua.NewState() defer L.Close() seed := rand.Int() loadSessionMod := func(lL *lua.LState) int { - h.x.SLog.Debug("import module session", slog.String("script", path)) + llog.Debug("import module session", slog.String("script", path)) sessionMod := lL.NewTable() inTable := lL.NewTable() paramsTable := lL.NewTable() @@ -60,6 +372,7 @@ func (h *HandlerV1) handleLUA(sid string, r *http.Request, req *rpc.RPCRequest, resultTable := lL.NewTable() lL.SetField(outTable, "result", resultTable) + lL.SetField(inTable, "address", lua.LString(r.RemoteAddr)) lL.SetField(sessionMod, "request", inTable) lL.SetField(sessionMod, "response", outTable) @@ -71,14 +384,14 @@ func (h *HandlerV1) handleLUA(sid string, r *http.Request, req *rpc.RPCRequest, } loadLogMod := func(lL *lua.LState) int { - h.x.SLog.Debug("import module log", slog.String("script", path)) + llog.Debug("import module log", slog.String("script", path)) logMod := lL.NewTable() logFuncs := map[string]func(string, ...any){ - "info": h.x.SLog.Info, - "debug": h.x.SLog.Debug, - "error": h.x.SLog.Error, - "warn": h.x.SLog.Warn, + "info": llog.Info, + "debug": llog.Debug, + "error": llog.Error, + "warn": llog.Warn, } for name, logFunc := range logFuncs { @@ -118,7 +431,7 @@ func (h *HandlerV1) handleLUA(sid string, r *http.Request, req *rpc.RPCRequest, } loadNetMod := func(lL *lua.LState) int { - h.x.SLog.Debug("import module net", slog.String("script", path)) + llog.Debug("import module net", slog.String("script", path)) netMod := lL.NewTable() netModhttp := lL.NewTable() @@ -152,7 +465,7 @@ func (h *HandlerV1) handleLUA(sid string, r *http.Request, req *rpc.RPCRequest, } if logRequest { - h.x.SLog.Info("HTTP GET request", + llog.Info("HTTP GET request", slog.String("script", path), slog.String("url", url), slog.Int("status", resp.StatusCode), @@ -213,7 +526,7 @@ func (h *HandlerV1) handleLUA(sid string, r *http.Request, req *rpc.RPCRequest, } if logRequest { - h.x.SLog.Info("HTTP POST request", + llog.Info("HTTP POST request", slog.String("script", path), slog.String("url", url), slog.String("content_type", contentType), @@ -249,32 +562,33 @@ func (h *HandlerV1) handleLUA(sid string, r *http.Request, req *rpc.RPCRequest, L.PreloadModule("internal.session", loadSessionMod) L.PreloadModule("internal.log", loadLogMod) L.PreloadModule("internal.net", loadNetMod) + L.PreloadModule("internal.database-sqlite", loadDBMod(llog)) - h.x.SLog.Debug("preparing environment", slog.String("session-id", sid)) + llog.Debug("preparing environment") prep := filepath.Join(*h.x.Config.Conf.Node.ComDir, "_prepare.lua") if _, err := os.Stat(prep); err == nil { if err := L.DoFile(prep); err != nil { - h.x.SLog.Error("script error", slog.String("script", path), slog.String("error", err.Error())) + llog.Error("script error", slog.String("script", path), slog.String("error", err.Error())) return rpc.NewError(rpc.ErrInternalError, rpc.ErrInternalErrorS, nil, req.ID) } } - h.x.SLog.Debug("executing script", slog.String("script", path), slog.String("session-id", sid)) + llog.Debug("executing script", slog.String("script", path)) if err := L.DoFile(path); err != nil { - h.x.SLog.Error("script error", slog.String("script", path), slog.String("error", err.Error())) + llog.Error("script error", slog.String("script", path), slog.String("error", err.Error())) return rpc.NewError(rpc.ErrInternalError, rpc.ErrInternalErrorS, nil, req.ID) } pkg := L.GetGlobal("package") pkgTbl, ok := pkg.(*lua.LTable) if !ok { - h.x.SLog.Error("script error", slog.String("script", path), slog.String("error", "package not found")) + llog.Error("script error", slog.String("script", path), slog.String("error", "package not found")) return rpc.NewError(rpc.ErrInternalError, rpc.ErrInternalErrorS, nil, req.ID) } loaded := pkgTbl.RawGetString("loaded") loadedTbl, ok := loaded.(*lua.LTable) if !ok { - h.x.SLog.Error("script error", slog.String("script", path), slog.String("error", "package.loaded not found")) + llog.Error("script error", slog.String("script", path), slog.String("error", "package.loaded not found")) return rpc.NewError(rpc.ErrInternalError, rpc.ErrInternalErrorS, nil, req.ID) } @@ -288,7 +602,7 @@ func (h *HandlerV1) handleLUA(sid string, r *http.Request, req *rpc.RPCRequest, tag := sessionTbl.RawGetString("__gosally_internal") if tag.Type() != lua.LTString || tag.String() != fmt.Sprint(seed) { - h.x.SLog.Debug("stock session module is not imported: wrong seed", slog.String("script", path), slog.String("session-id", sid)) + llog.Debug("stock session module is not imported: wrong seed", slog.String("script", path)) return rpc.NewResponse(map[string]any{ "responsible-node": h.cs.UUID32, }, req.ID) @@ -297,12 +611,12 @@ func (h *HandlerV1) handleLUA(sid string, r *http.Request, req *rpc.RPCRequest, outVal := sessionTbl.RawGetString("response") outTbl, ok := outVal.(*lua.LTable) if !ok { - h.x.SLog.Error("script error", slog.String("script", path), slog.String("error", "response is not a table")) + llog.Error("script error", slog.String("script", path), slog.String("error", "response is not a table")) return rpc.NewError(rpc.ErrInternalError, rpc.ErrInternalErrorS, nil, req.ID) } if errVal := outTbl.RawGetString("error"); errVal != lua.LNil { - h.x.SLog.Debug("catch error table", slog.String("script", path), slog.String("session-id", sid)) + llog.Debug("catch error table", slog.String("script", path)) if errTbl, ok := errVal.(*lua.LTable); ok { code := rpc.ErrInternalError message := rpc.ErrInternalErrorS @@ -318,10 +632,10 @@ func (h *HandlerV1) handleLUA(sid string, r *http.Request, req *rpc.RPCRequest, if tbl, ok := rawData.(*lua.LTable); ok { tbl.ForEach(func(k, v lua.LValue) { data[k.String()] = ConvertLuaTypesToGolang(v) }) } else { - h.x.SLog.Error("the script terminated with an error", slog.String("code", strconv.Itoa(code)), slog.String("message", message)) - return rpc.NewError(code, message, rawData, req.ID) + llog.Error("the script terminated with an error", slog.String("code", strconv.Itoa(code)), slog.String("message", message)) + return rpc.NewError(code, message, ConvertLuaTypesToGolang(rawData), req.ID) } - h.x.SLog.Error("the script terminated with an error", slog.String("code", strconv.Itoa(code)), slog.String("message", message)) + llog.Error("the script terminated with an error", slog.String("code", strconv.Itoa(code)), slog.String("message", message)) return rpc.NewError(code, message, data, req.ID) } return rpc.NewError(rpc.ErrInternalError, rpc.ErrInternalErrorS, nil, req.ID)