From f543ed1d7422a3c654d0e4960ee43708ccea354d Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 08:34:08 +0200 Subject: [PATCH 01/23] add backoffice and move stuff --- Dockerfile | 8 + cmd/backoffice/fileserver.go | 149 +++++++ cmd/backoffice/main.go | 401 +++++++++++++++++- cmd/cart/checkout_builder.go | 4 +- cmd/cart/main.go | 50 +-- cmd/cart/pool-server.go | 59 +-- cmd/cart/product-fetcher.go | 14 +- deployment/deployment.yaml | 123 ++++++ {cmd => pkg}/cart/cart-grain.go | 35 +- {cmd => pkg}/cart/cart_grain_totals_test.go | 2 +- {cmd => pkg}/cart/cart_id.go | 2 +- {cmd => pkg}/cart/cart_id_test.go | 2 +- {cmd => pkg}/cart/mutation_add_item.go | 2 +- {cmd => pkg}/cart/mutation_add_voucher.go | 2 +- {cmd => pkg}/cart/mutation_change_quantity.go | 2 +- .../cart/mutation_initialize_checkout.go | 2 +- {cmd => pkg}/cart/mutation_order_created.go | 2 +- {cmd => pkg}/cart/mutation_remove_delivery.go | 2 +- {cmd => pkg}/cart/mutation_remove_item.go | 2 +- {cmd => pkg}/cart/mutation_set_delivery.go | 2 +- .../cart/mutation_set_pickup_point.go | 2 +- {cmd => pkg}/cart/price.go | 2 +- 22 files changed, 774 insertions(+), 95 deletions(-) create mode 100644 cmd/backoffice/fileserver.go rename {cmd => pkg}/cart/cart-grain.go (94%) rename {cmd => pkg}/cart/cart_grain_totals_test.go (99%) rename {cmd => pkg}/cart/cart_id.go (99%) rename {cmd => pkg}/cart/cart_id_test.go (99%) rename {cmd => pkg}/cart/mutation_add_item.go (99%) rename {cmd => pkg}/cart/mutation_add_voucher.go (98%) rename {cmd => pkg}/cart/mutation_change_quantity.go (99%) rename {cmd => pkg}/cart/mutation_initialize_checkout.go (99%) rename {cmd => pkg}/cart/mutation_order_created.go (99%) rename {cmd => pkg}/cart/mutation_remove_delivery.go (99%) rename {cmd => pkg}/cart/mutation_remove_item.go (98%) rename {cmd => pkg}/cart/mutation_set_delivery.go (99%) rename {cmd => pkg}/cart/mutation_set_pickup_point.go (99%) rename {cmd => pkg}/cart/price.go (99%) diff --git a/Dockerfile b/Dockerfile index 1fa7879..dc7bb9d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -59,6 +59,13 @@ RUN --mount=type=cache,target=/go/build-cache \ -X main.BuildDate=${BUILD_DATE}" \ -o /out/go-cart-actor ./cmd/cart +RUN --mount=type=cache,target=/go/build-cache \ + go build -trimpath -ldflags="-s -w \ + -X main.Version=${VERSION} \ + -X main.GitCommit=${GIT_COMMIT} \ + -X main.BuildDate=${BUILD_DATE}" \ + -o /out/go-cart-backoffice ./cmd/backoffice + ############################ # Runtime Stage ############################ @@ -67,6 +74,7 @@ FROM gcr.io/distroless/static-debian12:nonroot AS runtime WORKDIR / COPY --from=build /out/go-cart-actor /go-cart-actor +COPY --from=build /out/go-cart-backoffice /go-cart-backoffice # Document (not expose forcibly) typical ports: 8080 (HTTP), 1337 (gRPC) EXPOSE 8080 1337 diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go new file mode 100644 index 0000000..f4632bd --- /dev/null +++ b/cmd/backoffice/fileserver.go @@ -0,0 +1,149 @@ +package main + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "net/http" + "os" + "path/filepath" + "sort" + "strconv" + "strings" +) + +type FileServer struct { + // Define fields here + dataDir string +} + +func NewFileServer(dataDir string) *FileServer { + return &FileServer{ + dataDir: dataDir, + } +} + +func listCartFiles(dir string) ([]CartFileInfo, error) { + entries, err := os.ReadDir(dir) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return []CartFileInfo{}, nil + } + return nil, err + } + out := make([]CartFileInfo, 0) + for _, e := range entries { + if e.IsDir() { + continue + } + name := e.Name() + m := cartFileRe.FindStringSubmatch(name) + if m == nil { + continue + } + idStr := m[1] + id, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + continue + } + p := filepath.Join(dir, name) + info, err := e.Info() + if err != nil { + continue + } + out = append(out, CartFileInfo{ + ID: id, + Path: p, + Size: info.Size(), + Modified: info.ModTime(), + }) + } + return out, nil +} + +func readRawLogLines(path string) ([]string, error) { + fh, err := os.Open(path) + if err != nil { + return nil, err + } + defer fh.Close() + lines := make([]string, 0, 64) + s := bufio.NewScanner(fh) + // increase buffer to handle larger JSON lines + buf := make([]byte, 0, 1024*1024) + s.Buffer(buf, 1024*1024) + for s.Scan() { + line := strings.TrimSpace(s.Text()) + if line == "" { + continue + } + lines = append(lines, line) + } + if err := s.Err(); err != nil { + return nil, err + } + return lines, nil +} + +func writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Cache-Control", "no-cache") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(v) +} + +func (fs *FileServer) CartsHandler(w http.ResponseWriter, r *http.Request) { + list, err := listCartFiles(fs.dataDir) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + // sort by modified desc + sort.Slice(list, func(i, j int) bool { return list[i].Modified.After(list[j].Modified) }) + writeJSON(w, http.StatusOK, map[string]any{ + "count": len(list), + "carts": list, + }) +} + +func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { + idStr := r.PathValue("id") + if idStr == "" { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "missing id"}) + return + } + // allow both decimal id and filename-like with suffix + if strings.HasSuffix(idStr, ".events.log") { + idStr = strings.TrimSuffix(idStr, ".events.log") + } + id, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid id"}) + return + } + path := filepath.Join(fs.dataDir, fmt.Sprintf("%d.events.log", id)) + info, err := os.Stat(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + writeJSON(w, http.StatusNotFound, map[string]string{"error": "cart not found"}) + return + } + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + lines, err := readRawLogLines(path) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "id": id, + "rawLog": lines, + "meta": map[string]any{ + "size": info.Size(), + "modified": info.ModTime(), + "path": path, + }, + }) +} diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index 1f2ab91..d5df2f4 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -1,5 +1,402 @@ package main -func main() { - // Your code here +import ( + "bufio" + "context" + "crypto/sha1" + "encoding/base64" + "encoding/binary" + "errors" + "io" + "log" + "net" + "net/http" + "os" + "os/signal" + "regexp" + "strings" + "syscall" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type CartFileInfo struct { + ID uint64 `json:"id"` + Path string `json:"path"` + Size int64 `json:"size"` + Modified time.Time `json:"modified"` +} + +func envOrDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`) + +// WebSocket hub to broadcast live mutation events to connected clients. +type Hub struct { + register chan *Client + unregister chan *Client + broadcast chan []byte + clients map[*Client]bool +} + +type Client struct { + hub *Hub + conn net.Conn + send chan []byte +} + +func NewHub() *Hub { + return &Hub{ + register: make(chan *Client), + unregister: make(chan *Client), + broadcast: make(chan []byte, 1024), + clients: make(map[*Client]bool), + } +} + +func (h *Hub) Run() { + for { + select { + case c := <-h.register: + h.clients[c] = true + case c := <-h.unregister: + if _, ok := h.clients[c]; ok { + delete(h.clients, c) + close(c.send) + _ = c.conn.Close() + } + case msg := <-h.broadcast: + for c := range h.clients { + select { + case c.send <- msg: + default: + // Client is slow or dead; drop it. + delete(h.clients, c) + close(c.send) + _ = c.conn.Close() + } + } + } + } +} + +func computeAccept(key string) string { + const magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + h := sha1.New() + h.Write([]byte(key + magic)) + return base64.StdEncoding.EncodeToString(h.Sum(nil)) +} + +func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) { + if !strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") || strings.ToLower(r.Header.Get("Upgrade")) != "websocket" { + http.Error(w, "upgrade required", http.StatusBadRequest) + return + } + key := r.Header.Get("Sec-WebSocket-Key") + if key == "" { + http.Error(w, "missing Sec-WebSocket-Key", http.StatusBadRequest) + return + } + accept := computeAccept(key) + + hj, ok := w.(http.Hijacker) + if !ok { + http.Error(w, "websocket not supported", http.StatusInternalServerError) + return + } + conn, buf, err := hj.Hijack() + if err != nil { + return + } + + // Write the upgrade response + response := "HTTP/1.1 101 Switching Protocols\r\n" + + "Upgrade: websocket\r\n" + + "Connection: Upgrade\r\n" + + "Sec-WebSocket-Accept: " + accept + "\r\n" + + "\r\n" + if _, err := buf.WriteString(response); err != nil { + _ = conn.Close() + return + } + if err := buf.Flush(); err != nil { + _ = conn.Close() + return + } + + client := &Client{ + hub: h, + conn: conn, + send: make(chan []byte, 256), + } + h.register <- client + go client.writePump() + go client.readPump() +} + +func writeWSFrame(w io.Writer, opcode byte, payload []byte) error { + // FIN set, opcode as provided + header := []byte{0x80 | (opcode & 0x0F)} + l := len(payload) + switch { + case l < 126: + header = append(header, byte(l)) + case l <= 65535: + ext := make([]byte, 2) + binary.BigEndian.PutUint16(ext, uint16(l)) + header = append(header, 126) + header = append(header, ext...) + default: + ext := make([]byte, 8) + binary.BigEndian.PutUint64(ext, uint64(l)) + header = append(header, 127) + header = append(header, ext...) + } + if _, err := w.Write(header); err != nil { + return err + } + if l > 0 { + if _, err := w.Write(payload); err != nil { + return err + } + } + return nil +} + +func (c *Client) readPump() { + defer func() { + c.hub.unregister <- c + }() + reader := bufio.NewReader(c.conn) + for { + // Read first two bytes + b1, err := reader.ReadByte() + if err != nil { + return + } + b2, err := reader.ReadByte() + if err != nil { + return + } + opcode := b1 & 0x0F + masked := (b2 & 0x80) != 0 + length := int64(b2 & 0x7F) + if length == 126 { + ext := make([]byte, 2) + if _, err := io.ReadFull(reader, ext); err != nil { + return + } + length = int64(binary.BigEndian.Uint16(ext)) + } else if length == 127 { + ext := make([]byte, 8) + if _, err := io.ReadFull(reader, ext); err != nil { + return + } + length = int64(binary.BigEndian.Uint64(ext)) + } + var maskKey [4]byte + if masked { + if _, err := io.ReadFull(reader, maskKey[:]); err != nil { + return + } + } + + // Read payload + if opcode == 0x9 && length <= 125 { // Ping -> respond with Pong + payload := make([]byte, length) + if _, err := io.ReadFull(reader, payload); err != nil { + return + } + // Unmask if masked + if masked { + for i := int64(0); i < length; i++ { + payload[i] ^= maskKey[i%4] + } + } + _ = writeWSFrame(c.conn, 0xA, payload) // best-effort pong + continue + } + + // Close frame + if opcode == 0x8 { + // Drain payload if any, then exit + if _, err := io.CopyN(io.Discard, reader, length); err != nil { + return + } + return + } + + // For other frames, just discard payload + if _, err := io.CopyN(io.Discard, reader, length); err != nil { + return + } + } +} + +func (c *Client) writePump() { + ticker := time.NewTicker(30 * time.Second) + defer func() { + ticker.Stop() + _ = c.conn.Close() + }() + for { + select { + case msg, ok := <-c.send: + if !ok { + // try to send close frame + _ = writeWSFrame(c.conn, 0x8, nil) + return + } + if err := writeWSFrame(c.conn, 0x1, msg); err != nil { + return + } + case <-ticker.C: + // Send a ping to keep connections alive behind proxies + _ = writeWSFrame(c.conn, 0x9, []byte("ping")) + } + } +} + +func startMutationConsumer(ctx context.Context, amqpURL string, hub *Hub) error { + conn, err := amqp.Dial(amqpURL) + if err != nil { + return err + } + ch, err := conn.Channel() + if err != nil { + _ = conn.Close() + return err + } + + // declare exchange (idempotent) + if err := ch.ExchangeDeclare( + "cart", // name + "topic", // type + true, // durable + false, // autoDelete + false, // internal + false, // noWait + nil, // args + ); err != nil { + _ = ch.Close() + _ = conn.Close() + return err + } + // declare an exclusive, auto-deleted queue by default + q, err := ch.QueueDeclare( + "", // name -> let server generate + false, // durable + true, // autoDelete + true, // exclusive + false, // noWait + nil, // args + ) + if err != nil { + _ = ch.Close() + _ = conn.Close() + return err + } + if err := ch.QueueBind(q.Name, "mutation", "cart", false, nil); err != nil { + _ = ch.Close() + _ = conn.Close() + return err + } + msgs, err := ch.Consume(q.Name, "backoffice", true, true, false, false, nil) + if err != nil { + _ = ch.Close() + _ = conn.Close() + return err + } + go func() { + defer ch.Close() + defer conn.Close() + for { + select { + case <-ctx.Done(): + return + case m, ok := <-msgs: + if !ok { + return + } + // Log and broadcast to all websocket clients + log.Printf("mutation event: %s", string(m.Body)) + if hub != nil { + select { + case hub.broadcast <- m.Body: + default: + // if hub queue is full, drop to avoid blocking + } + } + } + } + }() + return nil +} + +func main() { + dataDir := envOrDefault("DATA_DIR", "data") + addr := envOrDefault("ADDR", ":8080") + amqpURL := os.Getenv("AMQP_URL") + + _ = os.MkdirAll(dataDir, 0755) + + fs := NewFileServer(dataDir) + + hub := NewHub() + go hub.Run() + + mux := http.NewServeMux() + mux.HandleFunc("GET /carts", fs.CartsHandler) + mux.HandleFunc("GET /cart/{id}", fs.CartHandler) + mux.HandleFunc("/ws", hub.ServeWS) + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + + srv := &http.Server{ + Addr: addr, + Handler: mux, + ReadTimeout: 10 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 60 * time.Second, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if amqpURL != "" { + if err := startMutationConsumer(ctx, amqpURL, hub); err != nil { + log.Printf("AMQP listener disabled: %v", err) + } else { + log.Printf("AMQP listener connected") + } + } + + go func() { + log.Printf("backoffice HTTP listening on %s (dataDir=%s)", addr, dataDir) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("http server error: %v", err) + } + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + <-sigs + log.Printf("shutting down...") + + shutdownCtx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel2() + _ = srv.Shutdown(shutdownCtx) + cancel() } diff --git a/cmd/cart/checkout_builder.go b/cmd/cart/checkout_builder.go index cca5c30..e4a152a 100644 --- a/cmd/cart/checkout_builder.go +++ b/cmd/cart/checkout_builder.go @@ -3,6 +3,8 @@ package main import ( "encoding/json" "fmt" + + "git.tornberg.me/go-cart-actor/pkg/cart" ) // CheckoutMeta carries the external / URL metadata required to build a @@ -33,7 +35,7 @@ type CheckoutMeta struct { // // If you later need to support different tax rates per line, you can extend // CartItem / Delivery to expose that data and propagate it here. -func BuildCheckoutOrderPayload(grain *CartGrain, meta *CheckoutMeta) ([]byte, *CheckoutOrder, error) { +func BuildCheckoutOrderPayload(grain *cart.CartGrain, meta *CheckoutMeta) ([]byte, *CheckoutOrder, error) { if grain == nil { return nil, nil, fmt.Errorf("nil grain") } diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 88813db..cf44881 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -13,6 +13,7 @@ import ( "time" "git.tornberg.me/go-cart-actor/pkg/actor" + "git.tornberg.me/go-cart-actor/pkg/cart" "git.tornberg.me/go-cart-actor/pkg/discovery" messages "git.tornberg.me/go-cart-actor/pkg/messages" "git.tornberg.me/go-cart-actor/pkg/proxy" @@ -46,7 +47,7 @@ func init() { } type App struct { - pool *actor.SimpleGrainPool[CartGrain] + pool *actor.SimpleGrainPool[cart.CartGrain] } var podIp = os.Getenv("POD_IP") @@ -104,57 +105,48 @@ func main() { reg := actor.NewMutationRegistry() reg.RegisterMutations( - actor.NewMutation(AddItem, func() *messages.AddItem { + actor.NewMutation(cart.AddItem, func() *messages.AddItem { return &messages.AddItem{} }), - actor.NewMutation(ChangeQuantity, func() *messages.ChangeQuantity { + actor.NewMutation(cart.ChangeQuantity, func() *messages.ChangeQuantity { return &messages.ChangeQuantity{} }), - actor.NewMutation(RemoveItem, func() *messages.RemoveItem { + actor.NewMutation(cart.RemoveItem, func() *messages.RemoveItem { return &messages.RemoveItem{} }), - actor.NewMutation(InitializeCheckout, func() *messages.InitializeCheckout { + actor.NewMutation(cart.InitializeCheckout, func() *messages.InitializeCheckout { return &messages.InitializeCheckout{} }), - actor.NewMutation(OrderCreated, func() *messages.OrderCreated { + actor.NewMutation(cart.OrderCreated, func() *messages.OrderCreated { return &messages.OrderCreated{} }), - actor.NewMutation(RemoveDelivery, func() *messages.RemoveDelivery { + actor.NewMutation(cart.RemoveDelivery, func() *messages.RemoveDelivery { return &messages.RemoveDelivery{} }), - actor.NewMutation(SetDelivery, func() *messages.SetDelivery { + actor.NewMutation(cart.SetDelivery, func() *messages.SetDelivery { return &messages.SetDelivery{} }), - actor.NewMutation(SetPickupPoint, func() *messages.SetPickupPoint { + actor.NewMutation(cart.SetPickupPoint, func() *messages.SetPickupPoint { return &messages.SetPickupPoint{} }), - actor.NewMutation(ClearCart, func() *messages.ClearCartRequest { + actor.NewMutation(cart.ClearCart, func() *messages.ClearCartRequest { return &messages.ClearCartRequest{} }), - actor.NewMutation(AddVoucher, func() *messages.AddVoucher { + actor.NewMutation(cart.AddVoucher, func() *messages.AddVoucher { return &messages.AddVoucher{} }), - actor.NewMutation(RemoveVoucher, func() *messages.RemoveVoucher { + actor.NewMutation(cart.RemoveVoucher, func() *messages.RemoveVoucher { return &messages.RemoveVoucher{} }), ) - diskStorage := actor.NewDiskStorage[CartGrain]("data", reg) - poolConfig := actor.GrainPoolConfig[CartGrain]{ + diskStorage := actor.NewDiskStorage[cart.CartGrain]("data", reg) + poolConfig := actor.GrainPoolConfig[cart.CartGrain]{ MutationRegistry: reg, Storage: diskStorage, - Spawn: func(id uint64) (actor.Grain[CartGrain], error) { + Spawn: func(id uint64) (actor.Grain[cart.CartGrain], error) { grainSpawns.Inc() - ret := &CartGrain{ - lastItemId: 0, - lastDeliveryId: 0, - Deliveries: []*CartDelivery{}, - Id: CartId(id), - Items: []*CartItem{}, - TotalPrice: NewPrice(), - } + ret := cart.NewCartGrain(id, time.Now()) // Set baseline lastChange at spawn; replay may update it to last event timestamp. - ret.lastChange = time.Now() - ret.lastAccess = time.Now() err := diskStorage.LoadEvents(id, ret) @@ -185,7 +177,7 @@ func main() { amqpListener.DefineTopics() pool.AddListener(amqpListener) - grpcSrv, err := actor.NewControlServer[*CartGrain](controlPlaneConfig, pool) + grpcSrv, err := actor.NewControlServer[*cart.CartGrain](controlPlaneConfig, pool) if err != nil { log.Fatalf("Error starting control plane gRPC server: %v\n", err) } @@ -282,14 +274,14 @@ func main() { w.Write([]byte("no cart id to checkout is empty")) return } - parsed, ok := ParseCartId(cookie.Value) + parsed, ok := cart.ParseCartId(cookie.Value) if !ok { w.WriteHeader(http.StatusBadRequest) w.Write([]byte("invalid cart id format")) return } cartId := parsed - syncedServer.ProxyHandler(func(w http.ResponseWriter, r *http.Request, cartId CartId) error { + syncedServer.ProxyHandler(func(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error { order, err = syncedServer.CreateOrUpdateCheckout(r.Host, cartId) if err != nil { return err @@ -443,7 +435,7 @@ func triggerOrderCompleted(syncedServer *PoolServer, order *CheckoutOrder) error OrderId: order.ID, Status: order.Status, } - cid, ok := ParseCartId(order.MerchantReference1) + cid, ok := cart.ParseCartId(order.MerchantReference1) if !ok { return fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1) } diff --git a/cmd/cart/pool-server.go b/cmd/cart/pool-server.go index f4f4e69..fae39e4 100644 --- a/cmd/cart/pool-server.go +++ b/cmd/cart/pool-server.go @@ -11,18 +11,19 @@ import ( "time" "git.tornberg.me/go-cart-actor/pkg/actor" + "git.tornberg.me/go-cart-actor/pkg/cart" messages "git.tornberg.me/go-cart-actor/pkg/messages" "git.tornberg.me/go-cart-actor/pkg/voucher" "github.com/gogo/protobuf/proto" ) type PoolServer struct { - actor.GrainPool[*CartGrain] + actor.GrainPool[*cart.CartGrain] pod_name string klarnaClient *KlarnaClient } -func NewPoolServer(pool actor.GrainPool[*CartGrain], pod_name string, klarnaClient *KlarnaClient) *PoolServer { +func NewPoolServer(pool actor.GrainPool[*cart.CartGrain], pod_name string, klarnaClient *KlarnaClient) *PoolServer { return &PoolServer{ GrainPool: pool, pod_name: pod_name, @@ -30,11 +31,11 @@ func NewPoolServer(pool actor.GrainPool[*CartGrain], pod_name string, klarnaClie } } -func (s *PoolServer) ApplyLocal(id CartId, mutation ...proto.Message) (*actor.MutationResult[*CartGrain], error) { +func (s *PoolServer) ApplyLocal(id cart.CartId, mutation ...proto.Message) (*actor.MutationResult[*cart.CartGrain], error) { return s.Apply(uint64(id), mutation...) } -func (s *PoolServer) GetCartHandler(w http.ResponseWriter, r *http.Request, id CartId) error { +func (s *PoolServer) GetCartHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error { grain, err := s.Get(uint64(id)) if err != nil { return err @@ -43,7 +44,7 @@ func (s *PoolServer) GetCartHandler(w http.ResponseWriter, r *http.Request, id C return s.WriteResult(w, grain) } -func (s *PoolServer) AddSkuToCartHandler(w http.ResponseWriter, r *http.Request, id CartId) error { +func (s *PoolServer) AddSkuToCartHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error { sku := r.PathValue("sku") msg, err := GetItemAddMessage(sku, 1, getCountryFromHost(r.Host), nil) if err != nil { @@ -73,7 +74,7 @@ func (s *PoolServer) WriteResult(w http.ResponseWriter, result any) error { return err } -func (s *PoolServer) DeleteItemHandler(w http.ResponseWriter, r *http.Request, id CartId) error { +func (s *PoolServer) DeleteItemHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error { itemIdString := r.PathValue("itemId") itemId, err := strconv.ParseInt(itemIdString, 10, 64) @@ -93,7 +94,7 @@ type SetDeliveryRequest struct { PickupPoint *messages.PickupPoint `json:"pickupPoint,omitempty"` } -func (s *PoolServer) SetDeliveryHandler(w http.ResponseWriter, r *http.Request, id CartId) error { +func (s *PoolServer) SetDeliveryHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error { delivery := SetDeliveryRequest{} err := json.NewDecoder(r.Body).Decode(&delivery) @@ -111,7 +112,7 @@ func (s *PoolServer) SetDeliveryHandler(w http.ResponseWriter, r *http.Request, return s.WriteResult(w, data) } -func (s *PoolServer) SetPickupPointHandler(w http.ResponseWriter, r *http.Request, id CartId) error { +func (s *PoolServer) SetPickupPointHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error { deliveryIdString := r.PathValue("deliveryId") deliveryId, err := strconv.ParseInt(deliveryIdString, 10, 64) @@ -138,7 +139,7 @@ func (s *PoolServer) SetPickupPointHandler(w http.ResponseWriter, r *http.Reques return s.WriteResult(w, reply) } -func (s *PoolServer) RemoveDeliveryHandler(w http.ResponseWriter, r *http.Request, id CartId) error { +func (s *PoolServer) RemoveDeliveryHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error { deliveryIdString := r.PathValue("deliveryId") deliveryId, err := strconv.Atoi(deliveryIdString) @@ -152,7 +153,7 @@ func (s *PoolServer) RemoveDeliveryHandler(w http.ResponseWriter, r *http.Reques return s.WriteResult(w, reply) } -func (s *PoolServer) QuantityChangeHandler(w http.ResponseWriter, r *http.Request, id CartId) error { +func (s *PoolServer) QuantityChangeHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error { changeQuantity := messages.ChangeQuantity{} err := json.NewDecoder(r.Body).Decode(&changeQuantity) if err != nil { @@ -197,7 +198,7 @@ func getMultipleAddMessages(items []Item, country string) []proto.Message { return msgs } -func (s *PoolServer) SetCartItemsHandler(w http.ResponseWriter, r *http.Request, id CartId) error { +func (s *PoolServer) SetCartItemsHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error { setCartItems := SetCartItems{} err := json.NewDecoder(r.Body).Decode(&setCartItems) if err != nil { @@ -215,7 +216,7 @@ func (s *PoolServer) SetCartItemsHandler(w http.ResponseWriter, r *http.Request, return s.WriteResult(w, reply) } -func (s *PoolServer) AddMultipleItemHandler(w http.ResponseWriter, r *http.Request, id CartId) error { +func (s *PoolServer) AddMultipleItemHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error { setCartItems := SetCartItems{} err := json.NewDecoder(r.Body).Decode(&setCartItems) if err != nil { @@ -236,7 +237,7 @@ type AddRequest struct { StoreId *string `json:"storeId"` } -func (s *PoolServer) AddSkuRequestHandler(w http.ResponseWriter, r *http.Request, id CartId) error { +func (s *PoolServer) AddSkuRequestHandler(w http.ResponseWriter, r *http.Request, id cart.CartId) error { addRequest := AddRequest{Quantity: 1} err := json.NewDecoder(r.Body).Decode(&addRequest) if err != nil { @@ -287,7 +288,7 @@ func getLocale(country string) string { return "sv-se" } -func (s *PoolServer) CreateOrUpdateCheckout(host string, id CartId) (*CheckoutOrder, error) { +func (s *PoolServer) CreateOrUpdateCheckout(host string, id cart.CartId) (*CheckoutOrder, error) { country := getCountryFromHost(host) meta := &CheckoutMeta{ Terms: fmt.Sprintf("https://%s/terms", host), @@ -319,7 +320,7 @@ func (s *PoolServer) CreateOrUpdateCheckout(host string, id CartId) (*CheckoutOr } } -func (s *PoolServer) ApplyCheckoutStarted(klarnaOrder *CheckoutOrder, id CartId) (*actor.MutationResult[*CartGrain], error) { +func (s *PoolServer) ApplyCheckoutStarted(klarnaOrder *CheckoutOrder, id cart.CartId) (*actor.MutationResult[*cart.CartGrain], error) { // Persist initialization state via mutation (best-effort) return s.ApplyLocal(id, &messages.InitializeCheckout{ OrderId: klarnaOrder.ID, @@ -341,12 +342,12 @@ func (s *PoolServer) ApplyCheckoutStarted(klarnaOrder *CheckoutOrder, id CartId) // } // -func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) { +func CookieCartIdHandler(fn func(cartId cart.CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { - var id CartId + var id cart.CartId cookie, err := r.Cookie("cartid") if err != nil || cookie.Value == "" { - id = MustNewCartId() + id = cart.MustNewCartId() http.SetCookie(w, &http.Cookie{ Name: "cartid", Value: id.String(), @@ -358,9 +359,9 @@ func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.R }) w.Header().Set("Set-Cart-Id", id.String()) } else { - parsed, ok := ParseCartId(cookie.Value) + parsed, ok := cart.ParseCartId(cookie.Value) if !ok { - id = MustNewCartId() + id = cart.MustNewCartId() http.SetCookie(w, &http.Cookie{ Name: "cartid", Value: id.String(), @@ -388,7 +389,7 @@ func CookieCartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.R // Removed leftover legacy block after CookieCartIdHandler (obsolete code referencing cid/legacy) -func (s *PoolServer) RemoveCartCookie(w http.ResponseWriter, r *http.Request, cartId CartId) error { +func (s *PoolServer) RemoveCartCookie(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error { // Clear cart cookie (breaking change: do not issue a new legacy id here) http.SetCookie(w, &http.Cookie{ Name: "cartid", @@ -403,17 +404,17 @@ func (s *PoolServer) RemoveCartCookie(w http.ResponseWriter, r *http.Request, ca return nil } -func CartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) { +func CartIdHandler(fn func(cartId cart.CartId, w http.ResponseWriter, r *http.Request) error) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { - var id CartId + var id cart.CartId raw := r.PathValue("id") // If no id supplied, generate a new one if raw == "" { - id := MustNewCartId() + id := cart.MustNewCartId() w.Header().Set("Set-Cart-Id", id.String()) } else { // Parse base62 cart id - if parsedId, ok := ParseCartId(raw); !ok { + if parsedId, ok := cart.ParseCartId(raw); !ok { w.WriteHeader(http.StatusBadRequest) w.Write([]byte("cart id is invalid")) return @@ -431,8 +432,8 @@ func CartIdHandler(fn func(cartId CartId, w http.ResponseWriter, r *http.Request } } -func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request, cartId CartId) error) func(cartId CartId, w http.ResponseWriter, r *http.Request) error { - return func(cartId CartId, w http.ResponseWriter, r *http.Request) error { +func (s *PoolServer) ProxyHandler(fn func(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error) func(cartId cart.CartId, w http.ResponseWriter, r *http.Request) error { + return func(cartId cart.CartId, w http.ResponseWriter, r *http.Request) error { if ownerHost, ok := s.OwnerHost(uint64(cartId)); ok { handled, err := ownerHost.Proxy(uint64(cartId), w, r) if err == nil && handled { @@ -449,7 +450,7 @@ type AddVoucherRequest struct { VoucherCode string `json:"code"` } -func (s *PoolServer) AddVoucherHandler(w http.ResponseWriter, r *http.Request, cartId CartId) error { +func (s *PoolServer) AddVoucherHandler(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error { data := &AddVoucherRequest{} json.NewDecoder(r.Body).Decode(data) v := voucher.Service{} @@ -469,7 +470,7 @@ func (s *PoolServer) AddVoucherHandler(w http.ResponseWriter, r *http.Request, c return nil } -func (s *PoolServer) RemoveVoucherHandler(w http.ResponseWriter, r *http.Request, cartId CartId) error { +func (s *PoolServer) RemoveVoucherHandler(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error { idStr := r.PathValue("voucherId") id, err := strconv.ParseInt(idStr, 10, 64) diff --git a/cmd/cart/product-fetcher.go b/cmd/cart/product-fetcher.go index 23ea6fd..ef6edbc 100644 --- a/cmd/cart/product-fetcher.go +++ b/cmd/cart/product-fetcher.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" + "git.tornberg.me/go-cart-actor/pkg/cart" messages "git.tornberg.me/go-cart-actor/pkg/messages" "github.com/matst80/slask-finder/pkg/index" ) @@ -53,19 +54,19 @@ func ToItemAddMessage(item *index.DataItem, storeId *string, qty int, country st return nil } - stock := StockStatus(0) + stock := cart.StockStatus(0) centralStockValue, ok := item.GetStringFieldValue(3) if storeId == nil { if ok { pureNumber := strings.Replace(centralStockValue, "+", "", -1) if centralStock, err := strconv.ParseInt(pureNumber, 10, 64); err == nil { - stock = StockStatus(centralStock) + stock = cart.StockStatus(centralStock) } } } else { storeStock, ok := item.Stock.GetStock()[*storeId] if ok { - stock = StockStatus(storeStock) + stock = cart.StockStatus(storeStock) } } @@ -119,3 +120,10 @@ func getTax(articleType string) int32 { return 2500 } } + +func getInt(data float64, ok bool) (int, error) { + if !ok { + return 0, fmt.Errorf("invalid type") + } + return int(data), nil +} diff --git a/deployment/deployment.yaml b/deployment/deployment.yaml index 8f63485..8be6099 100644 --- a/deployment/deployment.yaml +++ b/deployment/deployment.yaml @@ -9,6 +9,94 @@ type: Opaque --- apiVersion: apps/v1 kind: Deployment +metadata: + labels: + app: cart-backoffice + arch: amd64 + name: cart-backoffice-x86 +spec: + replicas: 3 + selector: + matchLabels: + app: cart-backoffice + arch: amd64 + template: + metadata: + labels: + app: cart-backoffice + arch: amd64 + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: kubernetes.io/arch + operator: NotIn + values: + - arm64 + volumes: + - name: data + nfs: + path: /i-data/7a8af061/nfs/cart-actor + server: 10.10.1.10 + imagePullSecrets: + - name: regcred + serviceAccountName: default + containers: + - image: registry.knatofs.se/go-cart-actor-amd64:latest + name: cart-actor-amd64 + imagePullPolicy: Always + command: ["/go-cart-backoffice"] + lifecycle: + preStop: + exec: + command: ["sleep", "15"] + ports: + - containerPort: 8080 + name: web + livenessProbe: + httpGet: + path: /livez + port: web + failureThreshold: 1 + periodSeconds: 10 + readinessProbe: + httpGet: + path: /readyz + port: web + failureThreshold: 2 + initialDelaySeconds: 2 + periodSeconds: 10 + volumeMounts: + - mountPath: "/data" + name: data + resources: + limits: + memory: "768Mi" + requests: + memory: "70Mi" + cpu: "1200m" + env: + - name: TZ + value: "Europe/Stockholm" + - name: KLARNA_API_USERNAME + valueFrom: + secretKeyRef: + name: klarna-api-credentials + key: username + - name: KLARNA_API_PASSWORD + valueFrom: + secretKeyRef: + name: klarna-api-credentials + key: password + - name: AMQP_URL + value: "amqp://admin:12bananer@rabbitmq.dev:5672/" + # - name: BASE_URL + # value: "https://s10n-no.tornberg.me" +--- +apiVersion: apps/v1 +kind: Deployment metadata: labels: app: cart-actor @@ -222,6 +310,17 @@ spec: - name: web port: 8080 --- +kind: Service +apiVersion: v1 +metadata: + name: cart-backoffice +spec: + selector: + app: cart-backoffice + ports: + - name: web + port: 8080 +--- apiVersion: networking.k8s.io/v1 kind: Ingress metadata: @@ -250,3 +349,27 @@ spec: name: cart-actor port: number: 8080 +--- +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: cart-backend-ingress + annotations: + cert-manager.io/cluster-issuer: letsencrypt-prod +spec: + ingressClassName: nginx + tls: + - hosts: + - slask-cart.tornberg.me + secretName: cart-backoffice-actor-tls-secret + rules: + - host: slask-cart.tornberg.me + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: cart-backoffice + port: + number: 8080 diff --git a/cmd/cart/cart-grain.go b/pkg/cart/cart-grain.go similarity index 94% rename from cmd/cart/cart-grain.go rename to pkg/cart/cart-grain.go index 3448274..85aaed1 100644 --- a/cmd/cart/cart-grain.go +++ b/pkg/cart/cart-grain.go @@ -1,8 +1,7 @@ -package main +package cart import ( "encoding/json" - "fmt" "slices" "sync" "time" @@ -138,6 +137,22 @@ func (v *Voucher) AppliesTo(cart *CartGrain) ([]*CartItem, bool) { return cart.Items, true } +func NewCartGrain(id uint64, ts time.Time) *CartGrain { + return &CartGrain{ + lastItemId: 0, + lastDeliveryId: 0, + lastVoucherId: 0, + lastAccess: ts, + lastChange: ts, + TotalDiscount: NewPrice(), + Vouchers: []*Voucher{}, + Deliveries: []*CartDelivery{}, + Id: CartId(id), + Items: []*CartItem{}, + TotalPrice: NewPrice(), + } +} + func (c *CartGrain) GetId() uint64 { return uint64(c.Id) } @@ -155,22 +170,6 @@ func (c *CartGrain) GetCurrentState() (*CartGrain, error) { return c, nil } -func getInt(data float64, ok bool) (int, error) { - if !ok { - return 0, fmt.Errorf("invalid type") - } - return int(data), nil -} - -// func (c *CartGrain) AddItem(sku string, qty int, country string, storeId *string) (*CartGrain, error) { -// cartItem, err := getItemData(sku, qty, country) -// if err != nil { -// return nil, err -// } -// cartItem.StoreId = storeId -// return c.Apply(cartItem, false) -// } - func (c *CartGrain) GetState() ([]byte, error) { return json.Marshal(c) } diff --git a/cmd/cart/cart_grain_totals_test.go b/pkg/cart/cart_grain_totals_test.go similarity index 99% rename from cmd/cart/cart_grain_totals_test.go rename to pkg/cart/cart_grain_totals_test.go index ae0dd7e..135a6a7 100644 --- a/cmd/cart/cart_grain_totals_test.go +++ b/pkg/cart/cart_grain_totals_test.go @@ -1,4 +1,4 @@ -package main +package cart import ( "testing" diff --git a/cmd/cart/cart_id.go b/pkg/cart/cart_id.go similarity index 99% rename from cmd/cart/cart_id.go rename to pkg/cart/cart_id.go index 6039101..e57c2b2 100644 --- a/cmd/cart/cart_id.go +++ b/pkg/cart/cart_id.go @@ -1,4 +1,4 @@ -package main +package cart import ( "crypto/rand" diff --git a/cmd/cart/cart_id_test.go b/pkg/cart/cart_id_test.go similarity index 99% rename from cmd/cart/cart_id_test.go rename to pkg/cart/cart_id_test.go index f7d1883..272bf41 100644 --- a/cmd/cart/cart_id_test.go +++ b/pkg/cart/cart_id_test.go @@ -1,4 +1,4 @@ -package main +package cart import ( "encoding/json" diff --git a/cmd/cart/mutation_add_item.go b/pkg/cart/mutation_add_item.go similarity index 99% rename from cmd/cart/mutation_add_item.go rename to pkg/cart/mutation_add_item.go index 459b9cc..9a3a092 100644 --- a/cmd/cart/mutation_add_item.go +++ b/pkg/cart/mutation_add_item.go @@ -1,4 +1,4 @@ -package main +package cart import ( "fmt" diff --git a/cmd/cart/mutation_add_voucher.go b/pkg/cart/mutation_add_voucher.go similarity index 98% rename from cmd/cart/mutation_add_voucher.go rename to pkg/cart/mutation_add_voucher.go index c29af7c..85bffe0 100644 --- a/cmd/cart/mutation_add_voucher.go +++ b/pkg/cart/mutation_add_voucher.go @@ -1,4 +1,4 @@ -package main +package cart import ( "slices" diff --git a/cmd/cart/mutation_change_quantity.go b/pkg/cart/mutation_change_quantity.go similarity index 99% rename from cmd/cart/mutation_change_quantity.go rename to pkg/cart/mutation_change_quantity.go index 1de8745..7ab1416 100644 --- a/cmd/cart/mutation_change_quantity.go +++ b/pkg/cart/mutation_change_quantity.go @@ -1,4 +1,4 @@ -package main +package cart import ( "fmt" diff --git a/cmd/cart/mutation_initialize_checkout.go b/pkg/cart/mutation_initialize_checkout.go similarity index 99% rename from cmd/cart/mutation_initialize_checkout.go rename to pkg/cart/mutation_initialize_checkout.go index dcc2d50..aa1a43c 100644 --- a/cmd/cart/mutation_initialize_checkout.go +++ b/pkg/cart/mutation_initialize_checkout.go @@ -1,4 +1,4 @@ -package main +package cart import ( "fmt" diff --git a/cmd/cart/mutation_order_created.go b/pkg/cart/mutation_order_created.go similarity index 99% rename from cmd/cart/mutation_order_created.go rename to pkg/cart/mutation_order_created.go index a197929..00d6914 100644 --- a/cmd/cart/mutation_order_created.go +++ b/pkg/cart/mutation_order_created.go @@ -1,4 +1,4 @@ -package main +package cart import ( "fmt" diff --git a/cmd/cart/mutation_remove_delivery.go b/pkg/cart/mutation_remove_delivery.go similarity index 99% rename from cmd/cart/mutation_remove_delivery.go rename to pkg/cart/mutation_remove_delivery.go index 3ec92f9..dc38824 100644 --- a/cmd/cart/mutation_remove_delivery.go +++ b/pkg/cart/mutation_remove_delivery.go @@ -1,4 +1,4 @@ -package main +package cart import ( "fmt" diff --git a/cmd/cart/mutation_remove_item.go b/pkg/cart/mutation_remove_item.go similarity index 98% rename from cmd/cart/mutation_remove_item.go rename to pkg/cart/mutation_remove_item.go index c5ecd3c..e12a647 100644 --- a/cmd/cart/mutation_remove_item.go +++ b/pkg/cart/mutation_remove_item.go @@ -1,4 +1,4 @@ -package main +package cart import ( "fmt" diff --git a/cmd/cart/mutation_set_delivery.go b/pkg/cart/mutation_set_delivery.go similarity index 99% rename from cmd/cart/mutation_set_delivery.go rename to pkg/cart/mutation_set_delivery.go index 8dc9df3..a853958 100644 --- a/cmd/cart/mutation_set_delivery.go +++ b/pkg/cart/mutation_set_delivery.go @@ -1,4 +1,4 @@ -package main +package cart import ( "fmt" diff --git a/cmd/cart/mutation_set_pickup_point.go b/pkg/cart/mutation_set_pickup_point.go similarity index 99% rename from cmd/cart/mutation_set_pickup_point.go rename to pkg/cart/mutation_set_pickup_point.go index caf72d2..057769b 100644 --- a/cmd/cart/mutation_set_pickup_point.go +++ b/pkg/cart/mutation_set_pickup_point.go @@ -1,4 +1,4 @@ -package main +package cart import ( "fmt" diff --git a/cmd/cart/price.go b/pkg/cart/price.go similarity index 99% rename from cmd/cart/price.go rename to pkg/cart/price.go index 4d66ee4..2f060e8 100644 --- a/cmd/cart/price.go +++ b/pkg/cart/price.go @@ -1,4 +1,4 @@ -package main +package cart import ( "encoding/json" -- 2.49.1 From a1ba48053aac65d9a574214230b9b89f7e70fa1a Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 08:47:05 +0200 Subject: [PATCH 02/23] update backoffice --- cmd/backoffice/fileserver.go | 10 ++ cmd/backoffice/hub.go | 248 +++++++++++++++++++++++++++++++ cmd/backoffice/main.go | 275 ++++------------------------------- deployment/deployment.yaml | 2 +- 4 files changed, 287 insertions(+), 248 deletions(-) create mode 100644 cmd/backoffice/hub.go diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index f4632bd..227e7ae 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -11,6 +11,9 @@ import ( "sort" "strconv" "strings" + "time" + + "git.tornberg.me/go-cart-actor/pkg/cart" ) type FileServer struct { @@ -122,6 +125,12 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid id"}) return } + // reconstruct state from event log if present + grain := cart.NewCartGrain(id, time.Now()) + if globalDisk != nil { + _ = globalDisk.LoadEvents(id, grain) + } + path := filepath.Join(fs.dataDir, fmt.Sprintf("%d.events.log", id)) info, err := os.Stat(path) if err != nil { @@ -139,6 +148,7 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { } writeJSON(w, http.StatusOK, map[string]any{ "id": id, + "state": grain, "rawLog": lines, "meta": map[string]any{ "size": info.Size(), diff --git a/cmd/backoffice/hub.go b/cmd/backoffice/hub.go new file mode 100644 index 0000000..67be2cd --- /dev/null +++ b/cmd/backoffice/hub.go @@ -0,0 +1,248 @@ +package main + +import ( + "bufio" + "crypto/sha1" + "encoding/base64" + "encoding/binary" + "io" + "net" + "net/http" + "strings" + "time" +) + +// Hub manages websocket clients and broadcasts messages to them. +type Hub struct { + register chan *Client + unregister chan *Client + broadcast chan []byte + clients map[*Client]bool +} + +// Client represents a single websocket client connection. +type Client struct { + hub *Hub + conn net.Conn + send chan []byte +} + +// NewHub constructs a new Hub instance. +func NewHub() *Hub { + return &Hub{ + register: make(chan *Client), + unregister: make(chan *Client), + broadcast: make(chan []byte, 1024), + clients: make(map[*Client]bool), + } +} + +// Run starts the hub event loop. +func (h *Hub) Run() { + for { + select { + case c := <-h.register: + h.clients[c] = true + case c := <-h.unregister: + if _, ok := h.clients[c]; ok { + delete(h.clients, c) + close(c.send) + _ = c.conn.Close() + } + case msg := <-h.broadcast: + for c := range h.clients { + select { + case c.send <- msg: + default: + // Client is slow or dead; drop it. + delete(h.clients, c) + close(c.send) + _ = c.conn.Close() + } + } + } + } +} + +// computeAccept computes the Sec-WebSocket-Accept header value. +func computeAccept(key string) string { + const magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + h := sha1.New() + h.Write([]byte(key + magic)) + return base64.StdEncoding.EncodeToString(h.Sum(nil)) +} + +// ServeWS upgrades the HTTP request to a WebSocket connection and registers a client. +func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) { + if !strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") || strings.ToLower(r.Header.Get("Upgrade")) != "websocket" { + http.Error(w, "upgrade required", http.StatusBadRequest) + return + } + key := r.Header.Get("Sec-WebSocket-Key") + if key == "" { + http.Error(w, "missing Sec-WebSocket-Key", http.StatusBadRequest) + return + } + accept := computeAccept(key) + + hj, ok := w.(http.Hijacker) + if !ok { + http.Error(w, "websocket not supported", http.StatusInternalServerError) + return + } + conn, buf, err := hj.Hijack() + if err != nil { + return + } + + // Write the upgrade response + response := "HTTP/1.1 101 Switching Protocols\r\n" + + "Upgrade: websocket\r\n" + + "Connection: Upgrade\r\n" + + "Sec-WebSocket-Accept: " + accept + "\r\n" + + "\r\n" + if _, err := buf.WriteString(response); err != nil { + _ = conn.Close() + return + } + if err := buf.Flush(); err != nil { + _ = conn.Close() + return + } + + client := &Client{ + hub: h, + conn: conn, + send: make(chan []byte, 256), + } + h.register <- client + go client.writePump() + go client.readPump() +} + +// writeWSFrame writes a single WebSocket frame to the writer. +func writeWSFrame(w io.Writer, opcode byte, payload []byte) error { + // FIN set, opcode as provided + header := []byte{0x80 | (opcode & 0x0F)} + l := len(payload) + switch { + case l < 126: + header = append(header, byte(l)) + case l <= 65535: + ext := make([]byte, 2) + binary.BigEndian.PutUint16(ext, uint16(l)) + header = append(header, 126) + header = append(header, ext...) + default: + ext := make([]byte, 8) + binary.BigEndian.PutUint64(ext, uint64(l)) + header = append(header, 127) + header = append(header, ext...) + } + if _, err := w.Write(header); err != nil { + return err + } + if l > 0 { + if _, err := w.Write(payload); err != nil { + return err + } + } + return nil +} + +// readPump handles control frames from the client and discards other incoming frames. +// This server is broadcast-only to clients. +func (c *Client) readPump() { + defer func() { + c.hub.unregister <- c + }() + reader := bufio.NewReader(c.conn) + for { + // Read first two bytes + b1, err := reader.ReadByte() + if err != nil { + return + } + b2, err := reader.ReadByte() + if err != nil { + return + } + opcode := b1 & 0x0F + masked := (b2 & 0x80) != 0 + length := int64(b2 & 0x7F) + if length == 126 { + ext := make([]byte, 2) + if _, err := io.ReadFull(reader, ext); err != nil { + return + } + length = int64(binary.BigEndian.Uint16(ext)) + } else if length == 127 { + ext := make([]byte, 8) + if _, err := io.ReadFull(reader, ext); err != nil { + return + } + length = int64(binary.BigEndian.Uint64(ext)) + } + var maskKey [4]byte + if masked { + if _, err := io.ReadFull(reader, maskKey[:]); err != nil { + return + } + } + + // Handle Ping -> Pong + if opcode == 0x9 && length <= 125 { + payload := make([]byte, length) + if _, err := io.ReadFull(reader, payload); err != nil { + return + } + // Unmask if masked + if masked { + for i := int64(0); i < length; i++ { + payload[i] ^= maskKey[i%4] + } + } + _ = writeWSFrame(c.conn, 0xA, payload) // best-effort pong + continue + } + + // Close frame + if opcode == 0x8 { + // Drain payload if any, then exit + if _, err := io.CopyN(io.Discard, reader, length); err != nil { + return + } + return + } + + // For other frames, just discard payload + if _, err := io.CopyN(io.Discard, reader, length); err != nil { + return + } + } +} + +// writePump sends queued messages to the client and pings periodically to keep the connection alive. +func (c *Client) writePump() { + ticker := time.NewTicker(30 * time.Second) + defer func() { + ticker.Stop() + _ = c.conn.Close() + }() + for { + select { + case msg, ok := <-c.send: + if !ok { + // try to send close frame + _ = writeWSFrame(c.conn, 0x8, nil) + return + } + if err := writeWSFrame(c.conn, 0x1, msg); err != nil { + return + } + case <-ticker.C: + // Send a ping to keep connections alive behind proxies + _ = writeWSFrame(c.conn, 0x9, []byte("ping")) + } + } +} diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index d5df2f4..2f11eba 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -1,23 +1,17 @@ package main import ( - "bufio" "context" - "crypto/sha1" - "encoding/base64" - "encoding/binary" "errors" - "io" "log" - "net" "net/http" "os" - "os/signal" "regexp" - "strings" - "syscall" "time" + actor "git.tornberg.me/go-cart-actor/pkg/actor" + "git.tornberg.me/go-cart-actor/pkg/cart" + messages "git.tornberg.me/go-cart-actor/pkg/messages" amqp "github.com/rabbitmq/amqp091-go" ) @@ -37,230 +31,24 @@ func envOrDefault(key, def string) string { var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`) -// WebSocket hub to broadcast live mutation events to connected clients. -type Hub struct { - register chan *Client - unregister chan *Client - broadcast chan []byte - clients map[*Client]bool -} +var globalDisk *actor.DiskStorage[cart.CartGrain] -type Client struct { - hub *Hub - conn net.Conn - send chan []byte -} - -func NewHub() *Hub { - return &Hub{ - register: make(chan *Client), - unregister: make(chan *Client), - broadcast: make(chan []byte, 1024), - clients: make(map[*Client]bool), - } -} - -func (h *Hub) Run() { - for { - select { - case c := <-h.register: - h.clients[c] = true - case c := <-h.unregister: - if _, ok := h.clients[c]; ok { - delete(h.clients, c) - close(c.send) - _ = c.conn.Close() - } - case msg := <-h.broadcast: - for c := range h.clients { - select { - case c.send <- msg: - default: - // Client is slow or dead; drop it. - delete(h.clients, c) - close(c.send) - _ = c.conn.Close() - } - } - } - } -} - -func computeAccept(key string) string { - const magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" - h := sha1.New() - h.Write([]byte(key + magic)) - return base64.StdEncoding.EncodeToString(h.Sum(nil)) -} - -func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) { - if !strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") || strings.ToLower(r.Header.Get("Upgrade")) != "websocket" { - http.Error(w, "upgrade required", http.StatusBadRequest) - return - } - key := r.Header.Get("Sec-WebSocket-Key") - if key == "" { - http.Error(w, "missing Sec-WebSocket-Key", http.StatusBadRequest) - return - } - accept := computeAccept(key) - - hj, ok := w.(http.Hijacker) - if !ok { - http.Error(w, "websocket not supported", http.StatusInternalServerError) - return - } - conn, buf, err := hj.Hijack() - if err != nil { - return - } - - // Write the upgrade response - response := "HTTP/1.1 101 Switching Protocols\r\n" + - "Upgrade: websocket\r\n" + - "Connection: Upgrade\r\n" + - "Sec-WebSocket-Accept: " + accept + "\r\n" + - "\r\n" - if _, err := buf.WriteString(response); err != nil { - _ = conn.Close() - return - } - if err := buf.Flush(); err != nil { - _ = conn.Close() - return - } - - client := &Client{ - hub: h, - conn: conn, - send: make(chan []byte, 256), - } - h.register <- client - go client.writePump() - go client.readPump() -} - -func writeWSFrame(w io.Writer, opcode byte, payload []byte) error { - // FIN set, opcode as provided - header := []byte{0x80 | (opcode & 0x0F)} - l := len(payload) - switch { - case l < 126: - header = append(header, byte(l)) - case l <= 65535: - ext := make([]byte, 2) - binary.BigEndian.PutUint16(ext, uint16(l)) - header = append(header, 126) - header = append(header, ext...) - default: - ext := make([]byte, 8) - binary.BigEndian.PutUint64(ext, uint64(l)) - header = append(header, 127) - header = append(header, ext...) - } - if _, err := w.Write(header); err != nil { - return err - } - if l > 0 { - if _, err := w.Write(payload); err != nil { - return err - } - } - return nil -} - -func (c *Client) readPump() { - defer func() { - c.hub.unregister <- c - }() - reader := bufio.NewReader(c.conn) - for { - // Read first two bytes - b1, err := reader.ReadByte() - if err != nil { - return - } - b2, err := reader.ReadByte() - if err != nil { - return - } - opcode := b1 & 0x0F - masked := (b2 & 0x80) != 0 - length := int64(b2 & 0x7F) - if length == 126 { - ext := make([]byte, 2) - if _, err := io.ReadFull(reader, ext); err != nil { - return - } - length = int64(binary.BigEndian.Uint16(ext)) - } else if length == 127 { - ext := make([]byte, 8) - if _, err := io.ReadFull(reader, ext); err != nil { - return - } - length = int64(binary.BigEndian.Uint64(ext)) - } - var maskKey [4]byte - if masked { - if _, err := io.ReadFull(reader, maskKey[:]); err != nil { - return - } - } - - // Read payload - if opcode == 0x9 && length <= 125 { // Ping -> respond with Pong - payload := make([]byte, length) - if _, err := io.ReadFull(reader, payload); err != nil { - return - } - // Unmask if masked - if masked { - for i := int64(0); i < length; i++ { - payload[i] ^= maskKey[i%4] - } - } - _ = writeWSFrame(c.conn, 0xA, payload) // best-effort pong - continue - } - - // Close frame - if opcode == 0x8 { - // Drain payload if any, then exit - if _, err := io.CopyN(io.Discard, reader, length); err != nil { - return - } - return - } - - // For other frames, just discard payload - if _, err := io.CopyN(io.Discard, reader, length); err != nil { - return - } - } -} - -func (c *Client) writePump() { - ticker := time.NewTicker(30 * time.Second) - defer func() { - ticker.Stop() - _ = c.conn.Close() - }() - for { - select { - case msg, ok := <-c.send: - if !ok { - // try to send close frame - _ = writeWSFrame(c.conn, 0x8, nil) - return - } - if err := writeWSFrame(c.conn, 0x1, msg); err != nil { - return - } - case <-ticker.C: - // Send a ping to keep connections alive behind proxies - _ = writeWSFrame(c.conn, 0x9, []byte("ping")) - } - } +func buildRegistry() actor.MutationRegistry { + reg := actor.NewMutationRegistry() + reg.RegisterMutations( + actor.NewMutation(cart.AddItem, func() *messages.AddItem { return &messages.AddItem{} }), + actor.NewMutation(cart.ChangeQuantity, func() *messages.ChangeQuantity { return &messages.ChangeQuantity{} }), + actor.NewMutation(cart.RemoveItem, func() *messages.RemoveItem { return &messages.RemoveItem{} }), + actor.NewMutation(cart.InitializeCheckout, func() *messages.InitializeCheckout { return &messages.InitializeCheckout{} }), + actor.NewMutation(cart.OrderCreated, func() *messages.OrderCreated { return &messages.OrderCreated{} }), + actor.NewMutation(cart.RemoveDelivery, func() *messages.RemoveDelivery { return &messages.RemoveDelivery{} }), + actor.NewMutation(cart.SetDelivery, func() *messages.SetDelivery { return &messages.SetDelivery{} }), + actor.NewMutation(cart.SetPickupPoint, func() *messages.SetPickupPoint { return &messages.SetPickupPoint{} }), + actor.NewMutation(cart.ClearCart, func() *messages.ClearCartRequest { return &messages.ClearCartRequest{} }), + actor.NewMutation(cart.AddVoucher, func() *messages.AddVoucher { return &messages.AddVoucher{} }), + actor.NewMutation(cart.RemoveVoucher, func() *messages.RemoveVoucher { return &messages.RemoveVoucher{} }), + ) + return reg } func startMutationConsumer(ctx context.Context, amqpURL string, hub *Hub) error { @@ -346,6 +134,9 @@ func main() { _ = os.MkdirAll(dataDir, 0755) + reg := buildRegistry() + globalDisk = actor.NewDiskStorage[cart.CartGrain](dataDir, reg) + fs := NewFileServer(dataDir) hub := NewHub() @@ -383,20 +174,10 @@ func main() { } } - go func() { - log.Printf("backoffice HTTP listening on %s (dataDir=%s)", addr, dataDir) - if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Fatalf("http server error: %v", err) - } - }() + log.Printf("backoffice HTTP listening on %s (dataDir=%s)", addr, dataDir) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("http server error: %v", err) + } - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - <-sigs - log.Printf("shutting down...") - - shutdownCtx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel2() - _ = srv.Shutdown(shutdownCtx) - cancel() + // server stopped } diff --git a/deployment/deployment.yaml b/deployment/deployment.yaml index 8be6099..72097db 100644 --- a/deployment/deployment.yaml +++ b/deployment/deployment.yaml @@ -15,7 +15,7 @@ metadata: arch: amd64 name: cart-backoffice-x86 spec: - replicas: 3 + replicas: 1 selector: matchLabels: app: cart-backoffice -- 2.49.1 From f1e82c74d84a5fb5b5888b5420de8bfa3586b694 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 08:52:29 +0200 Subject: [PATCH 03/23] livez endpoint --- cmd/backoffice/main.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index 2f11eba..f83232b 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -154,6 +154,10 @@ func main() { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) + mux.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) srv := &http.Server{ Addr: addr, -- 2.49.1 From 9ba9117a0f1941a1981e41601062a67c0284901e Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 09:07:09 +0200 Subject: [PATCH 04/23] allow cors --- cmd/backoffice/main.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index f83232b..56fcf54 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -159,9 +159,22 @@ func main() { _, _ = w.Write([]byte("ok")) }) + // Global CORS middleware allowing all origins and handling preflight + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, PATCH, DELETE, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With") + w.Header().Set("Access-Control-Expose-Headers", "*") + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusNoContent) + return + } + mux.ServeHTTP(w, r) + }) + srv := &http.Server{ Addr: addr, - Handler: mux, + Handler: handler, ReadTimeout: 10 * time.Second, WriteTimeout: 30 * time.Second, IdleTimeout: 60 * time.Second, -- 2.49.1 From e1302a8ffaf1d588b83b67a6c9fff9a02873f9df Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 09:22:00 +0200 Subject: [PATCH 05/23] update --- cmd/backoffice/fileserver.go | 56 ++++++++++++++++++++++++++++-------- cmd/backoffice/main.go | 9 +++--- 2 files changed, 49 insertions(+), 16 deletions(-) diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index 227e7ae..a797c99 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -41,13 +41,18 @@ func listCartFiles(dir string) ([]CartFileInfo, error) { continue } name := e.Name() - m := cartFileRe.FindStringSubmatch(name) - if m == nil { + var id uint64 + var parseErr error + if strings.HasPrefix(name, "{") && strings.HasSuffix(name, "}.events.log") { + idStr := strings.TrimSuffix(strings.TrimPrefix(name, "{"), "}.events.log") + id, parseErr = strconv.ParseUint(idStr, 10, 64) + } else if strings.HasSuffix(name, ".events.log") { + base := strings.TrimSuffix(name, ".events.log") + id, parseErr = strconv.ParseUint(base, 10, 64) + } else { continue } - idStr := m[1] - id, err := strconv.ParseUint(idStr, 10, 64) - if err != nil { + if parseErr != nil { continue } p := filepath.Join(dir, name) @@ -57,6 +62,7 @@ func listCartFiles(dir string) ([]CartFileInfo, error) { } out = append(out, CartFileInfo{ ID: id, + CartId: cart.CartId(id), Path: p, Size: info.Size(), Modified: info.ModTime(), @@ -104,9 +110,20 @@ func (fs *FileServer) CartsHandler(w http.ResponseWriter, r *http.Request) { } // sort by modified desc sort.Slice(list, func(i, j int) bool { return list[i].Modified.After(list[j].Modified) }) + carts := make([]map[string]any, 0, len(list)) + for _, it := range list { + carts = append(carts, map[string]any{ + "id": it.ID, + "cartId": cart.CartId(it.ID).String(), + "filename": filepath.Base(it.Path), + "path": it.Path, + "size": it.Size, + "modified": it.Modified, + }) + } writeJSON(w, http.StatusOK, map[string]any{ - "count": len(list), - "carts": list, + "count": len(carts), + "carts": carts, }) } @@ -116,8 +133,12 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "missing id"}) return } - // allow both decimal id and filename-like with suffix - if strings.HasSuffix(idStr, ".events.log") { + // normalize idStr: support "{123}.events.log", "{123}", "123.events.log", "123" + if strings.HasPrefix(idStr, "{") && strings.HasSuffix(idStr, "}.events.log") { + idStr = strings.TrimSuffix(strings.TrimPrefix(idStr, "{"), "}.events.log") + } else if strings.HasPrefix(idStr, "{") && strings.HasSuffix(idStr, "}") { + idStr = strings.TrimSuffix(strings.TrimPrefix(idStr, "{"), "}") + } else if strings.HasSuffix(idStr, ".events.log") { idStr = strings.TrimSuffix(idStr, ".events.log") } id, err := strconv.ParseUint(idStr, 10, 64) @@ -133,11 +154,21 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { path := filepath.Join(fs.dataDir, fmt.Sprintf("%d.events.log", id)) info, err := os.Stat(path) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - writeJSON(w, http.StatusNotFound, map[string]string{"error": "cart not found"}) + if err != nil && errors.Is(err, os.ErrNotExist) { + // try brace-wrapped filename as fallback + alt := filepath.Join(fs.dataDir, fmt.Sprintf("{%d}.events.log", id)) + if fi, err2 := os.Stat(alt); err2 == nil { + path = alt + info = fi + } else { + if errors.Is(err2, os.ErrNotExist) { + writeJSON(w, http.StatusNotFound, map[string]string{"error": "cart not found"}) + return + } + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err2.Error()}) return } + } else if err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return } @@ -148,6 +179,7 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { } writeJSON(w, http.StatusOK, map[string]any{ "id": id, + "cartId": cart.CartId(id).String(), "state": grain, "rawLog": lines, "meta": map[string]any{ diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index 56fcf54..8697dea 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -16,10 +16,11 @@ import ( ) type CartFileInfo struct { - ID uint64 `json:"id"` - Path string `json:"path"` - Size int64 `json:"size"` - Modified time.Time `json:"modified"` + ID uint64 `json:"id"` + CartId cart.CartId `json:"cart_id"` + Path string `json:"path"` + Size int64 `json:"size"` + Modified time.Time `json:"modified"` } func envOrDefault(key, def string) string { -- 2.49.1 From 21fd5c844690d583e4c40a82f3a6c97cd7be2831 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 09:31:54 +0200 Subject: [PATCH 06/23] id fix --- cmd/backoffice/fileserver.go | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index a797c99..4d878ee 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -43,12 +43,13 @@ func listCartFiles(dir string) ([]CartFileInfo, error) { name := e.Name() var id uint64 var parseErr error - if strings.HasPrefix(name, "{") && strings.HasSuffix(name, "}.events.log") { - idStr := strings.TrimSuffix(strings.TrimPrefix(name, "{"), "}.events.log") + parts := strings.Split(name, ".") + if len(parts) == 2 && parts[1] == "events.log" { + idStr := parts[0] + id, parseErr = strconv.ParseUint(idStr, 10, 64) + } else if len(parts) == 3 && parts[1] == "events" && parts[2] == "log" { + idStr := parts[0] id, parseErr = strconv.ParseUint(idStr, 10, 64) - } else if strings.HasSuffix(name, ".events.log") { - base := strings.TrimSuffix(name, ".events.log") - id, parseErr = strconv.ParseUint(base, 10, 64) } else { continue } @@ -133,18 +134,15 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "missing id"}) return } - // normalize idStr: support "{123}.events.log", "{123}", "123.events.log", "123" - if strings.HasPrefix(idStr, "{") && strings.HasSuffix(idStr, "}.events.log") { - idStr = strings.TrimSuffix(strings.TrimPrefix(idStr, "{"), "}.events.log") - } else if strings.HasPrefix(idStr, "{") && strings.HasSuffix(idStr, "}") { - idStr = strings.TrimSuffix(strings.TrimPrefix(idStr, "{"), "}") - } else if strings.HasSuffix(idStr, ".events.log") { - idStr = strings.TrimSuffix(idStr, ".events.log") - } + id, err := strconv.ParseUint(idStr, 10, 64) if err != nil { - writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid id"}) - return + if cartId, ok := cart.ParseCartId(idStr); !ok { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid id"}) + return + } else { + id = uint64(cartId) + } } // reconstruct state from event log if present grain := cart.NewCartGrain(id, time.Now()) -- 2.49.1 From a5d61ce56f34c91b9571f3769923b18486998517 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 09:35:35 +0200 Subject: [PATCH 07/23] fix --- cmd/backoffice/fileserver.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index 4d878ee..d437ef7 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -153,19 +153,8 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { path := filepath.Join(fs.dataDir, fmt.Sprintf("%d.events.log", id)) info, err := os.Stat(path) if err != nil && errors.Is(err, os.ErrNotExist) { - // try brace-wrapped filename as fallback - alt := filepath.Join(fs.dataDir, fmt.Sprintf("{%d}.events.log", id)) - if fi, err2 := os.Stat(alt); err2 == nil { - path = alt - info = fi - } else { - if errors.Is(err2, os.ErrNotExist) { - writeJSON(w, http.StatusNotFound, map[string]string{"error": "cart not found"}) - return - } - writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err2.Error()}) - return - } + writeJSON(w, http.StatusNotFound, map[string]string{"error": "cart not found"}) + return } else if err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return -- 2.49.1 From b918f406df9cb25bdbc943a421428fee067e0f42 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 09:52:43 +0200 Subject: [PATCH 08/23] listen to correct topic --- cmd/backoffice/main.go | 60 ++++++++++-------------------------------- 1 file changed, 14 insertions(+), 46 deletions(-) diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index 8697dea..50df960 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -3,6 +3,7 @@ package main import ( "context" "errors" + "fmt" "log" "net/http" "os" @@ -12,6 +13,7 @@ import ( actor "git.tornberg.me/go-cart-actor/pkg/actor" "git.tornberg.me/go-cart-actor/pkg/cart" messages "git.tornberg.me/go-cart-actor/pkg/messages" + "github.com/matst80/slask-finder/pkg/messaging" amqp "github.com/rabbitmq/amqp091-go" ) @@ -52,59 +54,21 @@ func buildRegistry() actor.MutationRegistry { return reg } -func startMutationConsumer(ctx context.Context, amqpURL string, hub *Hub) error { - conn, err := amqp.Dial(amqpURL) - if err != nil { - return err - } +func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) error { ch, err := conn.Channel() if err != nil { _ = conn.Close() return err } + msgs, err := messaging.DeclareBindAndConsume(ch, "cart", "mutation") + if err != nil { + _ = ch.Close() + return err + } - // declare exchange (idempotent) - if err := ch.ExchangeDeclare( - "cart", // name - "topic", // type - true, // durable - false, // autoDelete - false, // internal - false, // noWait - nil, // args - ); err != nil { - _ = ch.Close() - _ = conn.Close() - return err - } - // declare an exclusive, auto-deleted queue by default - q, err := ch.QueueDeclare( - "", // name -> let server generate - false, // durable - true, // autoDelete - true, // exclusive - false, // noWait - nil, // args - ) - if err != nil { - _ = ch.Close() - _ = conn.Close() - return err - } - if err := ch.QueueBind(q.Name, "mutation", "cart", false, nil); err != nil { - _ = ch.Close() - _ = conn.Close() - return err - } - msgs, err := ch.Consume(q.Name, "backoffice", true, true, false, false, nil) - if err != nil { - _ = ch.Close() - _ = conn.Close() - return err - } go func() { defer ch.Close() - defer conn.Close() + for { select { case <-ctx.Done(): @@ -185,7 +149,11 @@ func main() { defer cancel() if amqpURL != "" { - if err := startMutationConsumer(ctx, amqpURL, hub); err != nil { + conn, err := amqp.Dial(amqpURL) + if err != nil { + fmt.Errorf("failed to connect to RabbitMQ: %w", err) + } + if err := startMutationConsumer(ctx, conn, hub); err != nil { log.Printf("AMQP listener disabled: %v", err) } else { log.Printf("AMQP listener connected") -- 2.49.1 From 9d202af55be2068b586f43fcbf81928392ffd5d8 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 12:51:25 +0200 Subject: [PATCH 09/23] more logs --- cmd/backoffice/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index 50df960..7ee7975 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -75,7 +75,8 @@ func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) return case m, ok := <-msgs: if !ok { - return + log.Print("no message") + continue } // Log and broadcast to all websocket clients log.Printf("mutation event: %s", string(m.Body)) -- 2.49.1 From abca60490bed217cbf22d98736a0d99cae6e4c9d Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 12:52:49 +0200 Subject: [PATCH 10/23] test build --- .gitea/workflows/build.yaml | 4 ++++ cmd/backoffice/main.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.gitea/workflows/build.yaml b/.gitea/workflows/build.yaml index 7a3f23a..70abbf6 100644 --- a/.gitea/workflows/build.yaml +++ b/.gitea/workflows/build.yaml @@ -47,6 +47,10 @@ jobs: docker push registry.knatofs.se/go-cart-actor-amd64:${{ needs.Metadata.outputs.version }} - name: Apply deployment manifests run: kubectl apply -f deployment/deployment.yaml -n cart + - name: Rollout amd64 backoffice deployment (pin to version) + run: | + kubectl set image deployment/cart-backoffice-x86 -n cart cart-backoffice-amd64=registry.knatofs.se/go-cart-actor-amd64:${{ needs.Metadata.outputs.version }} + kubectl rollout status deployment/cart-backoffice-x86 -n cart - name: Rollout amd64 deployment (pin to version) run: | kubectl set image deployment/cart-actor-x86 -n cart cart-actor-amd64=registry.knatofs.se/go-cart-actor-amd64:${{ needs.Metadata.outputs.version }} diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index 7ee7975..f28ed8a 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -75,7 +75,7 @@ func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) return case m, ok := <-msgs: if !ok { - log.Print("no message") + log.Print("no message, would have closed") continue } // Log and broadcast to all websocket clients -- 2.49.1 From a8a697d113b694d3f6209f079283817feddddeb1 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 14:40:03 +0200 Subject: [PATCH 11/23] ack messages --- cmd/backoffice/main.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index f28ed8a..03fbdd1 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -3,7 +3,6 @@ package main import ( "context" "errors" - "fmt" "log" "net/http" "os" @@ -75,11 +74,12 @@ func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) return case m, ok := <-msgs: if !ok { - log.Print("no message, would have closed") + log.Fatalf("connection closed") continue } // Log and broadcast to all websocket clients log.Printf("mutation event: %s", string(m.Body)) + if hub != nil { select { case hub.broadcast <- m.Body: @@ -87,6 +87,9 @@ func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) // if hub queue is full, drop to avoid blocking } } + if err := m.Ack(false); err != nil { + log.Printf("error acknowledging message: %v", err) + } } } }() @@ -152,7 +155,7 @@ func main() { if amqpURL != "" { conn, err := amqp.Dial(amqpURL) if err != nil { - fmt.Errorf("failed to connect to RabbitMQ: %w", err) + log.Fatalf("failed to connect to RabbitMQ: %w", err) } if err := startMutationConsumer(ctx, conn, hub); err != nil { log.Printf("AMQP listener disabled: %v", err) -- 2.49.1 From 4268616cbe817a3d7a3b16fcfb80f95317740f08 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 14:46:28 +0200 Subject: [PATCH 12/23] dont pin --- .gitea/workflows/build.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.gitea/workflows/build.yaml b/.gitea/workflows/build.yaml index 70abbf6..9c8df3f 100644 --- a/.gitea/workflows/build.yaml +++ b/.gitea/workflows/build.yaml @@ -49,8 +49,7 @@ jobs: run: kubectl apply -f deployment/deployment.yaml -n cart - name: Rollout amd64 backoffice deployment (pin to version) run: | - kubectl set image deployment/cart-backoffice-x86 -n cart cart-backoffice-amd64=registry.knatofs.se/go-cart-actor-amd64:${{ needs.Metadata.outputs.version }} - kubectl rollout status deployment/cart-backoffice-x86 -n cart + kubectl rollout restart deployment/cart-backoffice-x86 -n cart - name: Rollout amd64 deployment (pin to version) run: | kubectl set image deployment/cart-actor-x86 -n cart cart-actor-amd64=registry.knatofs.se/go-cart-actor-amd64:${{ needs.Metadata.outputs.version }} -- 2.49.1 From 55d0595161c5fcfb18e4a5804102598037e0cec5 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 17:06:45 +0200 Subject: [PATCH 13/23] append asap --- .gitea/workflows/build.yaml | 2 +- cmd/cart/main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitea/workflows/build.yaml b/.gitea/workflows/build.yaml index 9c8df3f..1e328a8 100644 --- a/.gitea/workflows/build.yaml +++ b/.gitea/workflows/build.yaml @@ -47,7 +47,7 @@ jobs: docker push registry.knatofs.se/go-cart-actor-amd64:${{ needs.Metadata.outputs.version }} - name: Apply deployment manifests run: kubectl apply -f deployment/deployment.yaml -n cart - - name: Rollout amd64 backoffice deployment (pin to version) + - name: Rollout amd64 backoffice deployment run: | kubectl rollout restart deployment/cart-backoffice-x86 -n cart - name: Rollout amd64 deployment (pin to version) diff --git a/cmd/cart/main.go b/cmd/cart/main.go index cf44881..3179b3e 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -183,7 +183,7 @@ func main() { } defer grpcSrv.GracefulStop() - go diskStorage.SaveLoop(10 * time.Second) + // go diskStorage.SaveLoop(10 * time.Second) go func(hw discovery.Discovery) { if hw == nil { -- 2.49.1 From 9dc5bab7c5d9538e8b08cb163029674253260e49 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 18:02:30 +0200 Subject: [PATCH 14/23] test --- cmd/backoffice/fileserver.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index d437ef7..b86f1f8 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -41,15 +41,12 @@ func listCartFiles(dir string) ([]CartFileInfo, error) { continue } name := e.Name() - var id uint64 + var id int64 var parseErr error parts := strings.Split(name, ".") - if len(parts) == 2 && parts[1] == "events.log" { + if len(parts) > 1 && parts[1] == "events" { idStr := parts[0] - id, parseErr = strconv.ParseUint(idStr, 10, 64) - } else if len(parts) == 3 && parts[1] == "events" && parts[2] == "log" { - idStr := parts[0] - id, parseErr = strconv.ParseUint(idStr, 10, 64) + id, parseErr = strconv.ParseInt(idStr, 10, 64) } else { continue } @@ -62,7 +59,7 @@ func listCartFiles(dir string) ([]CartFileInfo, error) { continue } out = append(out, CartFileInfo{ - ID: id, + ID: uint64(id), CartId: cart.CartId(id), Path: p, Size: info.Size(), -- 2.49.1 From 8456184973e37a5deab6234886e1845efbe9879c Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 18:09:04 +0200 Subject: [PATCH 15/23] less overhead --- cmd/backoffice/fileserver.go | 19 ++++--------------- cmd/backoffice/main.go | 3 +-- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index b86f1f8..de8ca01 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -53,7 +53,7 @@ func listCartFiles(dir string) ([]CartFileInfo, error) { if parseErr != nil { continue } - p := filepath.Join(dir, name) + info, err := e.Info() if err != nil { continue @@ -61,7 +61,6 @@ func listCartFiles(dir string) ([]CartFileInfo, error) { out = append(out, CartFileInfo{ ID: uint64(id), CartId: cart.CartId(id), - Path: p, Size: info.Size(), Modified: info.ModTime(), }) @@ -108,20 +107,10 @@ func (fs *FileServer) CartsHandler(w http.ResponseWriter, r *http.Request) { } // sort by modified desc sort.Slice(list, func(i, j int) bool { return list[i].Modified.After(list[j].Modified) }) - carts := make([]map[string]any, 0, len(list)) - for _, it := range list { - carts = append(carts, map[string]any{ - "id": it.ID, - "cartId": cart.CartId(it.ID).String(), - "filename": filepath.Base(it.Path), - "path": it.Path, - "size": it.Size, - "modified": it.Modified, - }) - } + writeJSON(w, http.StatusOK, map[string]any{ - "count": len(carts), - "carts": carts, + "count": len(list), + "carts": list, }) } diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index 03fbdd1..f65d44a 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -18,8 +18,7 @@ import ( type CartFileInfo struct { ID uint64 `json:"id"` - CartId cart.CartId `json:"cart_id"` - Path string `json:"path"` + CartId cart.CartId `json:"cartId"` Size int64 `json:"size"` Modified time.Time `json:"modified"` } -- 2.49.1 From c61adb3b9d7b04575d09188797d333e87a0d9257 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 18:21:16 +0200 Subject: [PATCH 16/23] clean and change type --- cmd/backoffice/fileserver.go | 9 +++++---- cmd/backoffice/main.go | 2 +- data/1.prot | Bin 1064 -> 0 bytes data/4.prot | Bin 498 -> 0 bytes data/5.prot | Bin 165 -> 0 bytes data/state.gob | Bin 101 -> 0 bytes data/state.gob.bak | Bin 79 -> 0 bytes 7 files changed, 6 insertions(+), 5 deletions(-) delete mode 100644 data/1.prot delete mode 100644 data/4.prot delete mode 100644 data/5.prot delete mode 100644 data/state.gob delete mode 100644 data/state.gob.bak diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index de8ca01..e0ae87e 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -41,12 +41,13 @@ func listCartFiles(dir string) ([]CartFileInfo, error) { continue } name := e.Name() - var id int64 + var idStr string + var id uint64 var parseErr error parts := strings.Split(name, ".") if len(parts) > 1 && parts[1] == "events" { - idStr := parts[0] - id, parseErr = strconv.ParseInt(idStr, 10, 64) + idStr = parts[0] + id, parseErr = strconv.ParseUint(idStr, 10, 64) } else { continue } @@ -59,7 +60,7 @@ func listCartFiles(dir string) ([]CartFileInfo, error) { continue } out = append(out, CartFileInfo{ - ID: uint64(id), + ID: idStr, CartId: cart.CartId(id), Size: info.Size(), Modified: info.ModTime(), diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index f65d44a..1d0cfc1 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -17,7 +17,7 @@ import ( ) type CartFileInfo struct { - ID uint64 `json:"id"` + ID string `json:"id"` CartId cart.CartId `json:"cartId"` Size int64 `json:"size"` Modified time.Time `json:"modified"` diff --git a/data/1.prot b/data/1.prot deleted file mode 100644 index f8153e3a10543190fd54697cbf0adc4d67f65944..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1064 zcmZQ%U}6ZZ(@SRnf({S?1Oki_M|QF(v6@(#nwe^81SJ+1B&8M=RVoBzDwrtv=BK3Q zC^#wPW@Z=VrzDo-7b$3%D7ZUm8u{yI<|d}6>Zg>&m!~GhyBJy+7#J8D7#JCuSz4MJ z=!4DEFM=7TTac-1qMHjcPB%#xVH{8}JxMRCAe|BDw(wE6kri&A2XVL^OECebw=Iz5 z21;);rtvGWnVACMFdbUxLu}D623b-JwnR4(Xo+q? yQNAw15?zobx=5DjrsWsuLhOK~O-6>}P=9bi{lUQ~1oZZg>&m!~GhyBJy+7#J8D7#JCuSz4MJ z=!4DEFM=7TTac-1qMHjcPB%#xVH{8}JxMRCAe|BDw(>#jwu(XPHg<;pQbyo7<$^>h P0~bq4YH^7W6C)D<0>g)Z diff --git a/data/5.prot b/data/5.prot deleted file mode 100644 index d45fb228a0d189eed043c089281253d0b9020c28..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 165 zcmZQ%U}AVSQ!kwX2quCEAP``bnDOQ>qY|5ishOFDv6hZ+VzN_yezrnDQNDtqnS!qg zm}O$2Z)#}d?xdg*T#%TYs^FB9n4GO?6r`V-o0y)epHdcIo|+WzVrXGtU|?uuW(mZm uhWZdQ^>Y)GlYmC*78K>{8k*_mnt)j*7C;lzlXQzg2I?k(4AjdiNCyC6MJ*!$ diff --git a/data/state.gob b/data/state.gob deleted file mode 100644 index f1608e092261212ce96b3a6984f717c65eeb9202..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 101 zcmd=8-^jwq^uLLL@qYs&3j+ghJtL6Yz`)4HsKCJB29#hHFl0alf6m4HYY{L(6=e8x OE>`e|fGMV^@jU`dd0Gizo Date: Wed, 15 Oct 2025 19:44:51 +0200 Subject: [PATCH 17/23] update --- cmd/cart/main.go | 12 +++++++++- pkg/actor/disk_storage.go | 42 ---------------------------------- pkg/actor/log_listerner.go | 47 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 43 deletions(-) create mode 100644 pkg/actor/log_listerner.go diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 3179b3e..79bfd11 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -99,6 +99,11 @@ type MutationContext struct { VoucherService voucher.Service } +type CartChangeEvent struct { + CartId cart.CartId `json:"cartId"` + Mutations []actor.ApplyResult `json:"mutations"` +} + func main() { controlPlaneConfig := actor.DefaultServerConfig() @@ -173,7 +178,12 @@ func main() { fmt.Errorf("failed to connect to RabbitMQ: %w", err) } - amqpListener := actor.NewAmqpListener(conn) + amqpListener := actor.NewAmqpListener(conn, func(id uint64, msg []actor.ApplyResult) (any, error) { + return &CartChangeEvent{ + CartId: cart.CartId(id), + Mutations: msg, + }, nil + }) amqpListener.DefineTopics() pool.AddListener(amqpListener) diff --git a/pkg/actor/disk_storage.go b/pkg/actor/disk_storage.go index 08daf71..deff401 100644 --- a/pkg/actor/disk_storage.go +++ b/pkg/actor/disk_storage.go @@ -10,8 +10,6 @@ import ( "time" "github.com/gogo/protobuf/proto" - "github.com/matst80/slask-finder/pkg/messaging" - amqp "github.com/rabbitmq/amqp091-go" ) type QueueEvent struct { @@ -31,46 +29,6 @@ type LogStorage[V any] interface { AppendMutations(id uint64, msg ...proto.Message) error } -type LogListener interface { - AppendMutations(id uint64, msg ...ApplyResult) -} - -type AmqpListener struct { - conn *amqp.Connection -} - -func NewAmqpListener(conn *amqp.Connection) *AmqpListener { - return &AmqpListener{ - conn: conn, - } -} - -func (l *AmqpListener) DefineTopics() { - ch, err := l.conn.Channel() - if err != nil { - log.Fatalf("Failed to open a channel: %v", err) - } - defer ch.Close() - if err := messaging.DefineTopic(ch, "cart", "mutation"); err != nil { - log.Fatalf("Failed to declare topic mutation: %v", err) - } -} - -type CartEvent struct { - Id uint64 `json:"id"` - Mutations []ApplyResult `json:"mutations"` -} - -func (l *AmqpListener) AppendMutations(id uint64, msg ...ApplyResult) { - err := messaging.SendChange(l.conn, "cart", "mutation", &CartEvent{ - Id: id, - Mutations: msg, - }) - if err != nil { - log.Printf("Failed to send mutation event: %v", err) - } -} - func NewDiskStorage[V any](path string, registry MutationRegistry) *DiskStorage[V] { return &DiskStorage[V]{ StateStorage: NewState(registry), diff --git a/pkg/actor/log_listerner.go b/pkg/actor/log_listerner.go new file mode 100644 index 0000000..3990c77 --- /dev/null +++ b/pkg/actor/log_listerner.go @@ -0,0 +1,47 @@ +package actor + +import ( + "log" + + "github.com/matst80/slask-finder/pkg/messaging" + amqp "github.com/rabbitmq/amqp091-go" +) + +type LogListener interface { + AppendMutations(id uint64, msg ...ApplyResult) +} + +type AmqpListener struct { + conn *amqp.Connection + transformer func(id uint64, msg []ApplyResult) (any, error) +} + +func NewAmqpListener(conn *amqp.Connection, transformer func(id uint64, msg []ApplyResult) (any, error)) *AmqpListener { + return &AmqpListener{ + conn: conn, + transformer: transformer, + } +} + +func (l *AmqpListener) DefineTopics() { + ch, err := l.conn.Channel() + if err != nil { + log.Fatalf("Failed to open a channel: %v", err) + } + defer ch.Close() + if err := messaging.DefineTopic(ch, "cart", "mutation"); err != nil { + log.Fatalf("Failed to declare topic mutation: %v", err) + } +} + +func (l *AmqpListener) AppendMutations(id uint64, msg ...ApplyResult) { + data, err := l.transformer(id, msg) + if err != nil { + log.Printf("Failed to transform mutation event: %v", err) + return + } + err = messaging.SendChange(l.conn, "cart", "mutation", data) + if err != nil { + log.Printf("Failed to send mutation event: %v", err) + } +} -- 2.49.1 From e0207a8638a11ea94247f925644fb334079de6b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mats=20T=C3=B6rnberg?= Date: Wed, 15 Oct 2025 19:47:42 +0200 Subject: [PATCH 18/23] update --- cmd/backoffice/fileserver.go | 3 +++ cmd/backoffice/main.go | 3 --- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index e0ae87e..e937a29 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "path/filepath" + "regexp" "sort" "strconv" "strings" @@ -27,6 +28,8 @@ func NewFileServer(dataDir string) *FileServer { } } +var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`) + func listCartFiles(dir string) ([]CartFileInfo, error) { entries, err := os.ReadDir(dir) if err != nil { diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index 1d0cfc1..2b656f7 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -6,7 +6,6 @@ import ( "log" "net/http" "os" - "regexp" "time" actor "git.tornberg.me/go-cart-actor/pkg/actor" @@ -30,8 +29,6 @@ func envOrDefault(key, def string) string { return def } -var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`) - var globalDisk *actor.DiskStorage[cart.CartGrain] func buildRegistry() actor.MutationRegistry { -- 2.49.1 From 1c589e0558506fc8660f2adcb47eb58317e532ff Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 22:20:33 +0200 Subject: [PATCH 19/23] add sys to se what we get --- cmd/backoffice/fileserver.go | 33 ++++++++++++++++++++------------- cmd/backoffice/main.go | 1 + 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index e0ae87e..8005005 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -27,6 +27,21 @@ func NewFileServer(dataDir string) *FileServer { } } +func isValidFileId(name string) (uint64, bool) { + + parts := strings.Split(name, ".") + if len(parts) > 1 && parts[1] == "events" { + idStr := parts[0] + if _, err := strconv.ParseUint(idStr, 10, 64); err != nil { + return 0, false + } + if id, err := strconv.ParseUint(idStr, 10, 64); err == nil { + return id, true + } + } + return 0, false +} + func listCartFiles(dir string) ([]CartFileInfo, error) { entries, err := os.ReadDir(dir) if err != nil { @@ -40,18 +55,8 @@ func listCartFiles(dir string) ([]CartFileInfo, error) { if e.IsDir() { continue } - name := e.Name() - var idStr string - var id uint64 - var parseErr error - parts := strings.Split(name, ".") - if len(parts) > 1 && parts[1] == "events" { - idStr = parts[0] - id, parseErr = strconv.ParseUint(idStr, 10, 64) - } else { - continue - } - if parseErr != nil { + id, valid := isValidFileId(e.Name()) + if !valid { continue } @@ -59,11 +64,13 @@ func listCartFiles(dir string) ([]CartFileInfo, error) { if err != nil { continue } + info.Sys() out = append(out, CartFileInfo{ - ID: idStr, + ID: fmt.Sprintf("%d", id), CartId: cart.CartId(id), Size: info.Size(), Modified: info.ModTime(), + System: info.Sys(), }) } return out, nil diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index 1d0cfc1..df3f33d 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -21,6 +21,7 @@ type CartFileInfo struct { CartId cart.CartId `json:"cartId"` Size int64 `json:"size"` Modified time.Time `json:"modified"` + System any `json:"system"` } func envOrDefault(key, def string) string { -- 2.49.1 From 3942ea911ed05712a5b4b0ed02fbe1e8517cddc1 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 22:27:36 +0200 Subject: [PATCH 20/23] clean --- cmd/backoffice/main.go | 21 +------------- cmd/cart/main.go | 37 +----------------------- pkg/cart/cart-mutation-helper.go | 48 ++++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 56 deletions(-) create mode 100644 pkg/cart/cart-mutation-helper.go diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index df3f33d..1b1c03b 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -11,7 +11,6 @@ import ( actor "git.tornberg.me/go-cart-actor/pkg/actor" "git.tornberg.me/go-cart-actor/pkg/cart" - messages "git.tornberg.me/go-cart-actor/pkg/messages" "github.com/matst80/slask-finder/pkg/messaging" amqp "github.com/rabbitmq/amqp091-go" ) @@ -35,24 +34,6 @@ var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`) var globalDisk *actor.DiskStorage[cart.CartGrain] -func buildRegistry() actor.MutationRegistry { - reg := actor.NewMutationRegistry() - reg.RegisterMutations( - actor.NewMutation(cart.AddItem, func() *messages.AddItem { return &messages.AddItem{} }), - actor.NewMutation(cart.ChangeQuantity, func() *messages.ChangeQuantity { return &messages.ChangeQuantity{} }), - actor.NewMutation(cart.RemoveItem, func() *messages.RemoveItem { return &messages.RemoveItem{} }), - actor.NewMutation(cart.InitializeCheckout, func() *messages.InitializeCheckout { return &messages.InitializeCheckout{} }), - actor.NewMutation(cart.OrderCreated, func() *messages.OrderCreated { return &messages.OrderCreated{} }), - actor.NewMutation(cart.RemoveDelivery, func() *messages.RemoveDelivery { return &messages.RemoveDelivery{} }), - actor.NewMutation(cart.SetDelivery, func() *messages.SetDelivery { return &messages.SetDelivery{} }), - actor.NewMutation(cart.SetPickupPoint, func() *messages.SetPickupPoint { return &messages.SetPickupPoint{} }), - actor.NewMutation(cart.ClearCart, func() *messages.ClearCartRequest { return &messages.ClearCartRequest{} }), - actor.NewMutation(cart.AddVoucher, func() *messages.AddVoucher { return &messages.AddVoucher{} }), - actor.NewMutation(cart.RemoveVoucher, func() *messages.RemoveVoucher { return &messages.RemoveVoucher{} }), - ) - return reg -} - func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) error { ch, err := conn.Channel() if err != nil { @@ -103,7 +84,7 @@ func main() { _ = os.MkdirAll(dataDir, 0755) - reg := buildRegistry() + reg := cart.NewCartMultationRegistry() globalDisk = actor.NewDiskStorage[cart.CartGrain](dataDir, reg) fs := NewFileServer(dataDir) diff --git a/cmd/cart/main.go b/cmd/cart/main.go index 3179b3e..cd6d558 100644 --- a/cmd/cart/main.go +++ b/cmd/cart/main.go @@ -103,42 +103,7 @@ func main() { controlPlaneConfig := actor.DefaultServerConfig() - reg := actor.NewMutationRegistry() - reg.RegisterMutations( - actor.NewMutation(cart.AddItem, func() *messages.AddItem { - return &messages.AddItem{} - }), - actor.NewMutation(cart.ChangeQuantity, func() *messages.ChangeQuantity { - return &messages.ChangeQuantity{} - }), - actor.NewMutation(cart.RemoveItem, func() *messages.RemoveItem { - return &messages.RemoveItem{} - }), - actor.NewMutation(cart.InitializeCheckout, func() *messages.InitializeCheckout { - return &messages.InitializeCheckout{} - }), - actor.NewMutation(cart.OrderCreated, func() *messages.OrderCreated { - return &messages.OrderCreated{} - }), - actor.NewMutation(cart.RemoveDelivery, func() *messages.RemoveDelivery { - return &messages.RemoveDelivery{} - }), - actor.NewMutation(cart.SetDelivery, func() *messages.SetDelivery { - return &messages.SetDelivery{} - }), - actor.NewMutation(cart.SetPickupPoint, func() *messages.SetPickupPoint { - return &messages.SetPickupPoint{} - }), - actor.NewMutation(cart.ClearCart, func() *messages.ClearCartRequest { - return &messages.ClearCartRequest{} - }), - actor.NewMutation(cart.AddVoucher, func() *messages.AddVoucher { - return &messages.AddVoucher{} - }), - actor.NewMutation(cart.RemoveVoucher, func() *messages.RemoveVoucher { - return &messages.RemoveVoucher{} - }), - ) + reg := cart.NewCartMultationRegistry() diskStorage := actor.NewDiskStorage[cart.CartGrain]("data", reg) poolConfig := actor.GrainPoolConfig[cart.CartGrain]{ MutationRegistry: reg, diff --git a/pkg/cart/cart-mutation-helper.go b/pkg/cart/cart-mutation-helper.go new file mode 100644 index 0000000..891f9e1 --- /dev/null +++ b/pkg/cart/cart-mutation-helper.go @@ -0,0 +1,48 @@ +package cart + +import ( + "git.tornberg.me/go-cart-actor/pkg/actor" + messages "git.tornberg.me/go-cart-actor/pkg/messages" +) + +func NewCartMultationRegistry() actor.MutationRegistry { + + reg := actor.NewMutationRegistry() + reg.RegisterMutations( + actor.NewMutation(AddItem, func() *messages.AddItem { + return &messages.AddItem{} + }), + actor.NewMutation(ChangeQuantity, func() *messages.ChangeQuantity { + return &messages.ChangeQuantity{} + }), + actor.NewMutation(RemoveItem, func() *messages.RemoveItem { + return &messages.RemoveItem{} + }), + actor.NewMutation(InitializeCheckout, func() *messages.InitializeCheckout { + return &messages.InitializeCheckout{} + }), + actor.NewMutation(OrderCreated, func() *messages.OrderCreated { + return &messages.OrderCreated{} + }), + actor.NewMutation(RemoveDelivery, func() *messages.RemoveDelivery { + return &messages.RemoveDelivery{} + }), + actor.NewMutation(SetDelivery, func() *messages.SetDelivery { + return &messages.SetDelivery{} + }), + actor.NewMutation(SetPickupPoint, func() *messages.SetPickupPoint { + return &messages.SetPickupPoint{} + }), + actor.NewMutation(ClearCart, func() *messages.ClearCartRequest { + return &messages.ClearCartRequest{} + }), + actor.NewMutation(AddVoucher, func() *messages.AddVoucher { + return &messages.AddVoucher{} + }), + actor.NewMutation(RemoveVoucher, func() *messages.RemoveVoucher { + return &messages.RemoveVoucher{} + }), + ) + return reg + +} -- 2.49.1 From 96315383d4932c1571c39850ecafff3b0a442ed5 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 22:41:13 +0200 Subject: [PATCH 21/23] change name --- cmd/backoffice/fileserver.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index 3efd62c..9260dde 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -171,10 +171,10 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { return } writeJSON(w, http.StatusOK, map[string]any{ - "id": id, - "cartId": cart.CartId(id).String(), - "state": grain, - "rawLog": lines, + "id": id, + "cartId": cart.CartId(id).String(), + "state": grain, + "mutations": lines, "meta": map[string]any{ "size": info.Size(), "modified": info.ModTime(), -- 2.49.1 From 9537dc671f4880e244b60592ac1ff43a2927b578 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 22:45:53 +0200 Subject: [PATCH 22/23] slask --- cmd/backoffice/fileserver.go | 3 ++- cmd/backoffice/main.go | 6 ++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index 9260dde..0480f8a 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -24,9 +24,10 @@ type FileServer struct { storage actor.LogStorage[cart.CartGrain] } -func NewFileServer(dataDir string) *FileServer { +func NewFileServer(dataDir string, storage actor.LogStorage[cart.CartGrain]) *FileServer { return &FileServer{ dataDir: dataDir, + storage: storage, } } diff --git a/cmd/backoffice/main.go b/cmd/backoffice/main.go index 8a72597..dab72d7 100644 --- a/cmd/backoffice/main.go +++ b/cmd/backoffice/main.go @@ -29,8 +29,6 @@ func envOrDefault(key, def string) string { return def } -var globalDisk *actor.DiskStorage[cart.CartGrain] - func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) error { ch, err := conn.Channel() if err != nil { @@ -82,9 +80,9 @@ func main() { _ = os.MkdirAll(dataDir, 0755) reg := cart.NewCartMultationRegistry() - globalDisk = actor.NewDiskStorage[cart.CartGrain](dataDir, reg) + diskStorage := actor.NewDiskStorage[cart.CartGrain](dataDir, reg) - fs := NewFileServer(dataDir) + fs := NewFileServer(dataDir, diskStorage) hub := NewHub() go hub.Run() -- 2.49.1 From bad15e7f70c2e001a50d74ce44b2e14cb535b7d8 Mon Sep 17 00:00:00 2001 From: matst80 Date: Wed, 15 Oct 2025 23:12:02 +0200 Subject: [PATCH 23/23] asdf --- cmd/backoffice/fileserver.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/backoffice/fileserver.go b/cmd/backoffice/fileserver.go index 0480f8a..ad7602d 100644 --- a/cmd/backoffice/fileserver.go +++ b/cmd/backoffice/fileserver.go @@ -147,6 +147,7 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) { id, ok := isValidId(idStr) if !ok { writeJSON(w, http.StatusBadRequest, JsonError{Error: "invalid id"}) + return } // reconstruct state from event log if present grain := cart.NewCartGrain(id, time.Now()) -- 2.49.1