Compare commits
3 Commits
e0207a8638
...
16948fcbdb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
16948fcbdb | ||
|
|
3942ea911e | ||
|
|
1c589e0558 |
@@ -14,12 +14,14 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.tornberg.me/go-cart-actor/pkg/actor"
|
||||||
"git.tornberg.me/go-cart-actor/pkg/cart"
|
"git.tornberg.me/go-cart-actor/pkg/cart"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FileServer struct {
|
type FileServer struct {
|
||||||
// Define fields here
|
// Define fields here
|
||||||
dataDir string
|
dataDir string
|
||||||
|
storage actor.LogStorage[cart.CartGrain]
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFileServer(dataDir string) *FileServer {
|
func NewFileServer(dataDir string) *FileServer {
|
||||||
@@ -28,6 +30,27 @@ func NewFileServer(dataDir string) *FileServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isValidId(id string) (uint64, bool) {
|
||||||
|
if nr, err := strconv.ParseUint(id, 10, 64); err == nil {
|
||||||
|
return nr, true
|
||||||
|
}
|
||||||
|
if nr, ok := cart.ParseCartId(id); ok {
|
||||||
|
return uint64(nr), true
|
||||||
|
}
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func isValidFileId(name string) (uint64, bool) {
|
||||||
|
|
||||||
|
parts := strings.Split(name, ".")
|
||||||
|
if len(parts) > 1 && parts[1] == "events" {
|
||||||
|
idStr := parts[0]
|
||||||
|
|
||||||
|
return isValidId(idStr)
|
||||||
|
}
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`)
|
var cartFileRe = regexp.MustCompile(`^(\d+)\.events\.log$`)
|
||||||
|
|
||||||
func listCartFiles(dir string) ([]CartFileInfo, error) {
|
func listCartFiles(dir string) ([]CartFileInfo, error) {
|
||||||
@@ -43,18 +66,8 @@ func listCartFiles(dir string) ([]CartFileInfo, error) {
|
|||||||
if e.IsDir() {
|
if e.IsDir() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
name := e.Name()
|
id, valid := isValidFileId(e.Name())
|
||||||
var idStr string
|
if !valid {
|
||||||
var id uint64
|
|
||||||
var parseErr error
|
|
||||||
parts := strings.Split(name, ".")
|
|
||||||
if len(parts) > 1 && parts[1] == "events" {
|
|
||||||
idStr = parts[0]
|
|
||||||
id, parseErr = strconv.ParseUint(idStr, 10, 64)
|
|
||||||
} else {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if parseErr != nil {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -62,30 +75,32 @@ func listCartFiles(dir string) ([]CartFileInfo, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
info.Sys()
|
||||||
out = append(out, CartFileInfo{
|
out = append(out, CartFileInfo{
|
||||||
ID: idStr,
|
ID: fmt.Sprintf("%d", id),
|
||||||
CartId: cart.CartId(id),
|
CartId: cart.CartId(id),
|
||||||
Size: info.Size(),
|
Size: info.Size(),
|
||||||
Modified: info.ModTime(),
|
Modified: info.ModTime(),
|
||||||
|
System: info.Sys(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func readRawLogLines(path string) ([]string, error) {
|
func readRawLogLines(path string) ([]json.RawMessage, error) {
|
||||||
fh, err := os.Open(path)
|
fh, err := os.Open(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer fh.Close()
|
defer fh.Close()
|
||||||
lines := make([]string, 0, 64)
|
lines := make([]json.RawMessage, 0, 64)
|
||||||
s := bufio.NewScanner(fh)
|
s := bufio.NewScanner(fh)
|
||||||
// increase buffer to handle larger JSON lines
|
// increase buffer to handle larger JSON lines
|
||||||
buf := make([]byte, 0, 1024*1024)
|
buf := make([]byte, 0, 1024*1024)
|
||||||
s.Buffer(buf, 1024*1024)
|
s.Buffer(buf, 1024*1024)
|
||||||
for s.Scan() {
|
for s.Scan() {
|
||||||
line := strings.TrimSpace(s.Text())
|
line := s.Bytes()
|
||||||
if line == "" {
|
if line == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
lines = append(lines, line)
|
lines = append(lines, line)
|
||||||
@@ -118,40 +133,41 @@ func (fs *FileServer) CartsHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type JsonError struct {
|
||||||
|
Error string `json:"error"`
|
||||||
|
}
|
||||||
|
|
||||||
func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) {
|
func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
idStr := r.PathValue("id")
|
idStr := r.PathValue("id")
|
||||||
if idStr == "" {
|
if idStr == "" {
|
||||||
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "missing id"})
|
writeJSON(w, http.StatusBadRequest, JsonError{Error: "missing id"})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
id, ok := isValidId(idStr)
|
||||||
id, err := strconv.ParseUint(idStr, 10, 64)
|
if !ok {
|
||||||
if err != nil {
|
writeJSON(w, http.StatusBadRequest, JsonError{Error: "invalid id"})
|
||||||
if cartId, ok := cart.ParseCartId(idStr); !ok {
|
|
||||||
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid id"})
|
|
||||||
return
|
|
||||||
} else {
|
|
||||||
id = uint64(cartId)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// reconstruct state from event log if present
|
// reconstruct state from event log if present
|
||||||
grain := cart.NewCartGrain(id, time.Now())
|
grain := cart.NewCartGrain(id, time.Now())
|
||||||
if globalDisk != nil {
|
|
||||||
_ = globalDisk.LoadEvents(id, grain)
|
err := fs.storage.LoadEvents(id, grain)
|
||||||
|
if err != nil {
|
||||||
|
writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()})
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
path := filepath.Join(fs.dataDir, fmt.Sprintf("%d.events.log", id))
|
path := filepath.Join(fs.dataDir, fmt.Sprintf("%d.events.log", id))
|
||||||
info, err := os.Stat(path)
|
info, err := os.Stat(path)
|
||||||
if err != nil && errors.Is(err, os.ErrNotExist) {
|
if err != nil && errors.Is(err, os.ErrNotExist) {
|
||||||
writeJSON(w, http.StatusNotFound, map[string]string{"error": "cart not found"})
|
writeJSON(w, http.StatusNotFound, JsonError{Error: "cart not found"})
|
||||||
return
|
return
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
lines, err := readRawLogLines(path)
|
lines, err := readRawLogLines(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
writeJSON(w, http.StatusInternalServerError, JsonError{Error: err.Error()})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusOK, map[string]any{
|
writeJSON(w, http.StatusOK, map[string]any{
|
||||||
@@ -163,6 +179,7 @@ func (fs *FileServer) CartHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
"size": info.Size(),
|
"size": info.Size(),
|
||||||
"modified": info.ModTime(),
|
"modified": info.ModTime(),
|
||||||
"path": path,
|
"path": path,
|
||||||
|
"system": info.Sys(),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import (
|
|||||||
|
|
||||||
actor "git.tornberg.me/go-cart-actor/pkg/actor"
|
actor "git.tornberg.me/go-cart-actor/pkg/actor"
|
||||||
"git.tornberg.me/go-cart-actor/pkg/cart"
|
"git.tornberg.me/go-cart-actor/pkg/cart"
|
||||||
messages "git.tornberg.me/go-cart-actor/pkg/messages"
|
|
||||||
"github.com/matst80/slask-finder/pkg/messaging"
|
"github.com/matst80/slask-finder/pkg/messaging"
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
)
|
)
|
||||||
@@ -20,6 +19,7 @@ type CartFileInfo struct {
|
|||||||
CartId cart.CartId `json:"cartId"`
|
CartId cart.CartId `json:"cartId"`
|
||||||
Size int64 `json:"size"`
|
Size int64 `json:"size"`
|
||||||
Modified time.Time `json:"modified"`
|
Modified time.Time `json:"modified"`
|
||||||
|
System any `json:"system"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func envOrDefault(key, def string) string {
|
func envOrDefault(key, def string) string {
|
||||||
@@ -31,24 +31,6 @@ func envOrDefault(key, def string) string {
|
|||||||
|
|
||||||
var globalDisk *actor.DiskStorage[cart.CartGrain]
|
var globalDisk *actor.DiskStorage[cart.CartGrain]
|
||||||
|
|
||||||
func buildRegistry() actor.MutationRegistry {
|
|
||||||
reg := actor.NewMutationRegistry()
|
|
||||||
reg.RegisterMutations(
|
|
||||||
actor.NewMutation(cart.AddItem, func() *messages.AddItem { return &messages.AddItem{} }),
|
|
||||||
actor.NewMutation(cart.ChangeQuantity, func() *messages.ChangeQuantity { return &messages.ChangeQuantity{} }),
|
|
||||||
actor.NewMutation(cart.RemoveItem, func() *messages.RemoveItem { return &messages.RemoveItem{} }),
|
|
||||||
actor.NewMutation(cart.InitializeCheckout, func() *messages.InitializeCheckout { return &messages.InitializeCheckout{} }),
|
|
||||||
actor.NewMutation(cart.OrderCreated, func() *messages.OrderCreated { return &messages.OrderCreated{} }),
|
|
||||||
actor.NewMutation(cart.RemoveDelivery, func() *messages.RemoveDelivery { return &messages.RemoveDelivery{} }),
|
|
||||||
actor.NewMutation(cart.SetDelivery, func() *messages.SetDelivery { return &messages.SetDelivery{} }),
|
|
||||||
actor.NewMutation(cart.SetPickupPoint, func() *messages.SetPickupPoint { return &messages.SetPickupPoint{} }),
|
|
||||||
actor.NewMutation(cart.ClearCart, func() *messages.ClearCartRequest { return &messages.ClearCartRequest{} }),
|
|
||||||
actor.NewMutation(cart.AddVoucher, func() *messages.AddVoucher { return &messages.AddVoucher{} }),
|
|
||||||
actor.NewMutation(cart.RemoveVoucher, func() *messages.RemoveVoucher { return &messages.RemoveVoucher{} }),
|
|
||||||
)
|
|
||||||
return reg
|
|
||||||
}
|
|
||||||
|
|
||||||
func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) error {
|
func startMutationConsumer(ctx context.Context, conn *amqp.Connection, hub *Hub) error {
|
||||||
ch, err := conn.Channel()
|
ch, err := conn.Channel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -99,7 +81,7 @@ func main() {
|
|||||||
|
|
||||||
_ = os.MkdirAll(dataDir, 0755)
|
_ = os.MkdirAll(dataDir, 0755)
|
||||||
|
|
||||||
reg := buildRegistry()
|
reg := cart.NewCartMultationRegistry()
|
||||||
globalDisk = actor.NewDiskStorage[cart.CartGrain](dataDir, reg)
|
globalDisk = actor.NewDiskStorage[cart.CartGrain](dataDir, reg)
|
||||||
|
|
||||||
fs := NewFileServer(dataDir)
|
fs := NewFileServer(dataDir)
|
||||||
|
|||||||
@@ -108,42 +108,7 @@ func main() {
|
|||||||
|
|
||||||
controlPlaneConfig := actor.DefaultServerConfig()
|
controlPlaneConfig := actor.DefaultServerConfig()
|
||||||
|
|
||||||
reg := actor.NewMutationRegistry()
|
reg := cart.NewCartMultationRegistry()
|
||||||
reg.RegisterMutations(
|
|
||||||
actor.NewMutation(cart.AddItem, func() *messages.AddItem {
|
|
||||||
return &messages.AddItem{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(cart.ChangeQuantity, func() *messages.ChangeQuantity {
|
|
||||||
return &messages.ChangeQuantity{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(cart.RemoveItem, func() *messages.RemoveItem {
|
|
||||||
return &messages.RemoveItem{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(cart.InitializeCheckout, func() *messages.InitializeCheckout {
|
|
||||||
return &messages.InitializeCheckout{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(cart.OrderCreated, func() *messages.OrderCreated {
|
|
||||||
return &messages.OrderCreated{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(cart.RemoveDelivery, func() *messages.RemoveDelivery {
|
|
||||||
return &messages.RemoveDelivery{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(cart.SetDelivery, func() *messages.SetDelivery {
|
|
||||||
return &messages.SetDelivery{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(cart.SetPickupPoint, func() *messages.SetPickupPoint {
|
|
||||||
return &messages.SetPickupPoint{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(cart.ClearCart, func() *messages.ClearCartRequest {
|
|
||||||
return &messages.ClearCartRequest{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(cart.AddVoucher, func() *messages.AddVoucher {
|
|
||||||
return &messages.AddVoucher{}
|
|
||||||
}),
|
|
||||||
actor.NewMutation(cart.RemoveVoucher, func() *messages.RemoveVoucher {
|
|
||||||
return &messages.RemoveVoucher{}
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
diskStorage := actor.NewDiskStorage[cart.CartGrain]("data", reg)
|
diskStorage := actor.NewDiskStorage[cart.CartGrain]("data", reg)
|
||||||
poolConfig := actor.GrainPoolConfig[cart.CartGrain]{
|
poolConfig := actor.GrainPoolConfig[cart.CartGrain]{
|
||||||
MutationRegistry: reg,
|
MutationRegistry: reg,
|
||||||
|
|||||||
48
pkg/cart/cart-mutation-helper.go
Normal file
48
pkg/cart/cart-mutation-helper.go
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
package cart
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.tornberg.me/go-cart-actor/pkg/actor"
|
||||||
|
messages "git.tornberg.me/go-cart-actor/pkg/messages"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewCartMultationRegistry() actor.MutationRegistry {
|
||||||
|
|
||||||
|
reg := actor.NewMutationRegistry()
|
||||||
|
reg.RegisterMutations(
|
||||||
|
actor.NewMutation(AddItem, func() *messages.AddItem {
|
||||||
|
return &messages.AddItem{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(ChangeQuantity, func() *messages.ChangeQuantity {
|
||||||
|
return &messages.ChangeQuantity{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(RemoveItem, func() *messages.RemoveItem {
|
||||||
|
return &messages.RemoveItem{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(InitializeCheckout, func() *messages.InitializeCheckout {
|
||||||
|
return &messages.InitializeCheckout{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(OrderCreated, func() *messages.OrderCreated {
|
||||||
|
return &messages.OrderCreated{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(RemoveDelivery, func() *messages.RemoveDelivery {
|
||||||
|
return &messages.RemoveDelivery{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(SetDelivery, func() *messages.SetDelivery {
|
||||||
|
return &messages.SetDelivery{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(SetPickupPoint, func() *messages.SetPickupPoint {
|
||||||
|
return &messages.SetPickupPoint{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(ClearCart, func() *messages.ClearCartRequest {
|
||||||
|
return &messages.ClearCartRequest{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(AddVoucher, func() *messages.AddVoucher {
|
||||||
|
return &messages.AddVoucher{}
|
||||||
|
}),
|
||||||
|
actor.NewMutation(RemoveVoucher, func() *messages.RemoveVoucher {
|
||||||
|
return &messages.RemoveVoucher{}
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
return reg
|
||||||
|
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user