diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index 8005005..3efd62c 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -8,17 +8,20 @@ import ( "net/http" "os" "path/filepath" + "regexp" "sort" "strconv" "strings" "time" + "git.tornberg.me/go-cart-actor/pkg/actor" "git.tornberg.me/go-cart-actor/pkg/cart" ) type FileServer struct { // Define fields here dataDir string + storage actor.LogStorage[cart.CartGrain] } func NewFileServer(dataDir string) *FileServer { @@ -27,21 +30,29 @@ func NewFileServer(dataDir string) *FileServer { } } +func isValidId(id string) (uint64, bool) { + if nr, err := strconv.ParseUint(id, 10, 64); err == nil { + return nr, true + } + if nr, ok := cart.ParseCartId(id); ok { + return uint64(nr), true + } + return 0, false +} + func isValidFileId(name string) (uint64, bool) { parts := strings.Split(name, ".") if len(parts) > 1 && parts[1] == "events" { idStr := parts[0] - if _, err := strconv.ParseUint(idStr, 10, 64); err != nil { - return 0, false - } - if id, err := strconv.ParseUint(idStr, 10, 64); err == nil { - return id, true - } + + return isValidId(idStr) } return 0, false } +var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`) + func listCartFiles(dir string) ([]CartFileInfo, error) { entries, err := os.ReadDir(dir) if err != nil { @@ -76,20 +87,20 @@ func listCartFiles(dir string) ([]CartFileInfo, error) { return out, nil } -func readRawLogLines(path string) ([]string, error) { +func readRawLogLines(path string) ([]json.RawMessage, error) { fh, err := os.Open(path) if err != nil { return nil, err } defer fh.Close() - lines := make([]string, 0, 64) + lines := make([]json.RawMessage, 0, 64) s := bufio.NewScanner(fh) // increase buffer to handle larger JSON lines buf := make([]byte, 0, 1024*1024) s.Buffer(buf, 1024*1024) for s.Scan() { - line := strings.TrimSpace(s.Text()) - if line == "" { + line := s.Bytes() + if line == nil { continue } lines = append(lines, line) @@ -122,40 +133,41 @@ func (fs *FileServer) CartsHandler(w http.ResponseWriter, r *http.Request) { }) } +type JsonError struct { + Error string `json:"error"` +} + func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { idStr := r.PathValue("id") if idStr == "" { - writeJSON(w, http.StatusBadRequest, map[string]string{"error": "missing id"}) + writeJSON(w, http.StatusBadRequest, JsonError{Error: "missing id"}) return } - - id, err := strconv.ParseUint(idStr, 10, 64) - if err != nil { - if cartId, ok := cart.ParseCartId(idStr); !ok { - writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid id"}) - return - } else { - id = uint64(cartId) - } + id, ok := isValidId(idStr) + if !ok { + writeJSON(w, http.StatusBadRequest, JsonError{Error: "invalid id"}) } // reconstruct state from event log if present grain := cart.NewCartGrain(id, time.Now()) - if globalDisk != nil { - _ = globalDisk.LoadEvents(id, grain) + + err := fs.storage.LoadEvents(id, grain) + if err != nil { + writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()}) + return } path := filepath.Join(fs.dataDir, fmt.Sprintf("%d.events.log", id)) info, err := os.Stat(path) if err != nil && errors.Is(err, os.ErrNotExist) { - writeJSON(w, http.StatusNotFound, map[string]string{"error": "cart not found"}) + writeJSON(w, http.StatusNotFound, JsonError{Error: "cart not found"}) return } else if err != nil { - writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()}) return } lines, err := readRawLogLines(path) if err != nil { - writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()}) return } writeJSON(w, http.StatusOK, map[string]any{ @@ -167,6 +179,7 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { "size": info.Size(), "modified": info.ModTime(), "path": path, + "system": info.Sys(), }, }) } diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index 1b1c03b..8a72597 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -6,7 +6,6 @@ import ( "log" "net/http" "os" - "regexp" "time" actor "git.tornberg.me/go-cart-actor/pkg/actor" @@ -30,8 +29,6 @@ func envOrDefault(key, def string) string { return def } -var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`) - var globalDisk *actor.DiskStorage[cart.CartGrain] func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) error { diff --git a/cmd/cart/main.go b/cmd/cart/main.go index cd6d558..d1e71c5 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -99,6 +99,11 @@ type MutationContext struct { VoucherService voucher.Service } +type CartChangeEvent struct { + CartId cart.CartId `json:"cartId"` + Mutations []actor.ApplyResult `json:"mutations"` +} + func main() { controlPlaneConfig := actor.DefaultServerConfig() @@ -138,7 +143,12 @@ func main() { fmt.Errorf("failed to connect to RabbitMQ: %w", err) } - amqpListener := actor.NewAmqpListener(conn) + amqpListener := actor.NewAmqpListener(conn, func(id uint64, msg []actor.ApplyResult) (any, error) { + return &CartChangeEvent{ + CartId: cart.CartId(id), + Mutations: msg, + }, nil + }) amqpListener.DefineTopics() pool.AddListener(amqpListener) diff --git a/pkg/actor/disk_storage.go b/pkg/actor/disk_storage.go index 08daf71..deff401 100644 --- a/pkg/actor/disk_storage.go +++ b/pkg/actor/disk_storage.go @@ -10,8 +10,6 @@ import ( "time" "github.com/gogo/protobuf/proto" - "github.com/matst80/slask-finder/pkg/messaging" - amqp "github.com/rabbitmq/amqp091-go" ) type QueueEvent struct { @@ -31,46 +29,6 @@ type LogStorage[V any] interface { AppendMutations(id uint64, msg ...proto.Message) error } -type LogListener interface { - AppendMutations(id uint64, msg ...ApplyResult) -} - -type AmqpListener struct { - conn *amqp.Connection -} - -func NewAmqpListener(conn *amqp.Connection) *AmqpListener { - return &AmqpListener{ - conn: conn, - } -} - -func (l *AmqpListener) DefineTopics() { - ch, err := l.conn.Channel() - if err != nil { - log.Fatalf("Failed to open a channel: %v", err) - } - defer ch.Close() - if err := messaging.DefineTopic(ch, "cart", "mutation"); err != nil { - log.Fatalf("Failed to declare topic mutation: %v", err) - } -} - -type CartEvent struct { - Id uint64 `json:"id"` - Mutations []ApplyResult `json:"mutations"` -} - -func (l *AmqpListener) AppendMutations(id uint64, msg ...ApplyResult) { - err := messaging.SendChange(l.conn, "cart", "mutation", &CartEvent{ - Id: id, - Mutations: msg, - }) - if err != nil { - log.Printf("Failed to send mutation event: %v", err) - } -} - func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] { return &DiskStorage[V]{ StateStorage: NewState(registry), diff --git a/pkg/actor/log_listerner.go b/pkg/actor/log_listerner.go new file mode 100644 index 0000000..3990c77 --- /dev/null +++ b/pkg/actor/log_listerner.go @@ -0,0 +1,47 @@ +package actor + +import ( + "log" + + "github.com/matst80/slask-finder/pkg/messaging" + amqp "github.com/rabbitmq/amqp091-go" +) + +type LogListener interface { + AppendMutations(id uint64, msg ...ApplyResult) +} + +type AmqpListener struct { + conn *amqp.Connection + transformer func(id uint64, msg []ApplyResult) (any, error) +} + +func NewAmqpListener(conn *amqp.Connection, transformer func(id uint64, msg []ApplyResult) (any, error)) *AmqpListener { + return &AmqpListener{ + conn: conn, + transformer: transformer, + } +} + +func (l *AmqpListener) DefineTopics() { + ch, err := l.conn.Channel() + if err != nil { + log.Fatalf("Failed to open a channel: %v", err) + } + defer ch.Close() + if err := messaging.DefineTopic(ch, "cart", "mutation"); err != nil { + log.Fatalf("Failed to declare topic mutation: %v", err) + } +} + +func (l *AmqpListener) AppendMutations(id uint64, msg ...ApplyResult) { + data, err := l.transformer(id, msg) + if err != nil { + log.Printf("Failed to transform mutation event: %v", err) + return + } + err = messaging.SendChange(l.conn, "cart", "mutation", data) + if err != nil { + log.Printf("Failed to send mutation event: %v", err) + } +}