feature/backoffice (#6)
All checks were successful
Build and Publish / Metadata (push) Successful in 12s
Build and Publish / BuildAndDeployAmd64 (push) Successful in 1m23s
Build and Publish / BuildAndDeployArm64 (push) Successful in 4m11s

Co-authored-by: matst80 <mats.tornberg@gmail.com>
Reviewed-on: https://git.tornberg.me/mats/go-cart-actor/pulls/6
Co-authored-by: Mats Törnberg <mats@tornberg.me>
Co-committed-by: Mats Törnberg <mats@tornberg.me>
This commit was merged in pull request #6.
This commit is contained in:
2025-10-16 09:45:58 +02:00
committed by mats
parent 8c2bcf5e75
commit 8682daf481
32 changed files with 907 additions and 164 deletions

View File

@@ -13,6 +13,7 @@ import (
"time"
"git.tornberg.me/go-cart-actor/pkg/actor"
"git.tornberg.me/go-cart-actor/pkg/cart"
"git.tornberg.me/go-cart-actor/pkg/discovery"
messages "git.tornberg.me/go-cart-actor/pkg/messages"
"git.tornberg.me/go-cart-actor/pkg/proxy"
@@ -46,7 +47,7 @@ func init() {
}
type App struct {
pool *actor.SimpleGrainPool[CartGrain]
pool *actor.SimpleGrainPool[cart.CartGrain]
}
var podIp = os.Getenv("POD_IP")
@@ -98,63 +99,24 @@ type MutationContext struct {
VoucherService voucher.Service
}
type CartChangeEvent struct {
CartId cart.CartId `json:"cartId"`
Mutations []actor.ApplyResult `json:"mutations"`
}
func main() {
controlPlaneConfig := actor.DefaultServerConfig()
reg := actor.NewMutationRegistry()
reg.RegisterMutations(
actor.NewMutation(AddItem, func() *messages.AddItem {
return &messages.AddItem{}
}),
actor.NewMutation(ChangeQuantity, func() *messages.ChangeQuantity {
return &messages.ChangeQuantity{}
}),
actor.NewMutation(RemoveItem, func() *messages.RemoveItem {
return &messages.RemoveItem{}
}),
actor.NewMutation(InitializeCheckout, func() *messages.InitializeCheckout {
return &messages.InitializeCheckout{}
}),
actor.NewMutation(OrderCreated, func() *messages.OrderCreated {
return &messages.OrderCreated{}
}),
actor.NewMutation(RemoveDelivery, func() *messages.RemoveDelivery {
return &messages.RemoveDelivery{}
}),
actor.NewMutation(SetDelivery, func() *messages.SetDelivery {
return &messages.SetDelivery{}
}),
actor.NewMutation(SetPickupPoint, func() *messages.SetPickupPoint {
return &messages.SetPickupPoint{}
}),
actor.NewMutation(ClearCart, func() *messages.ClearCartRequest {
return &messages.ClearCartRequest{}
}),
actor.NewMutation(AddVoucher, func() *messages.AddVoucher {
return &messages.AddVoucher{}
}),
actor.NewMutation(RemoveVoucher, func() *messages.RemoveVoucher {
return &messages.RemoveVoucher{}
}),
)
diskStorage := actor.NewDiskStorage[CartGrain]("data", reg)
poolConfig := actor.GrainPoolConfig[CartGrain]{
reg := cart.NewCartMultationRegistry()
diskStorage := actor.NewDiskStorage[cart.CartGrain]("data", reg)
poolConfig := actor.GrainPoolConfig[cart.CartGrain]{
MutationRegistry: reg,
Storage: diskStorage,
Spawn: func(id uint64) (actor.Grain[CartGrain], error) {
Spawn: func(id uint64) (actor.Grain[cart.CartGrain], error) {
grainSpawns.Inc()
ret := &CartGrain{
lastItemId: 0,
lastDeliveryId: 0,
Deliveries: []*CartDelivery{},
Id: CartId(id),
Items: []*CartItem{},
TotalPrice: NewPrice(),
}
ret := cart.NewCartGrain(id, time.Now())
// Set baseline lastChange at spawn; replay may update it to last event timestamp.
ret.lastChange = time.Now()
ret.lastAccess = time.Now()
err := diskStorage.LoadEvents(id, ret)
@@ -181,17 +143,22 @@ func main() {
fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}
amqpListener := actor.NewAmqpListener(conn)
amqpListener := actor.NewAmqpListener(conn, func(id uint64, msg []actor.ApplyResult) (any, error) {
return &CartChangeEvent{
CartId: cart.CartId(id),
Mutations: msg,
}, nil
})
amqpListener.DefineTopics()
pool.AddListener(amqpListener)
grpcSrv, err := actor.NewControlServer[*CartGrain](controlPlaneConfig, pool)
grpcSrv, err := actor.NewControlServer[*cart.CartGrain](controlPlaneConfig, pool)
if err != nil {
log.Fatalf("Error starting control plane gRPC server: %v\n", err)
}
defer grpcSrv.GracefulStop()
go diskStorage.SaveLoop(10 * time.Second)
// go diskStorage.SaveLoop(10 * time.Second)
go func(hw discovery.Discovery) {
if hw == nil {
@@ -282,14 +249,14 @@ func main() {
w.Write([]byte("no cart id to checkout is empty"))
return
}
parsed, ok := ParseCartId(cookie.Value)
parsed, ok := cart.ParseCartId(cookie.Value)
if !ok {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("invalid cart id format"))
return
}
cartId := parsed
syncedServer.ProxyHandler(func(w http.ResponseWriter, r *http.Request, cartId CartId) error {
syncedServer.ProxyHandler(func(w http.ResponseWriter, r *http.Request, cartId cart.CartId) error {
order, err = syncedServer.CreateOrUpdateCheckout(r.Host, cartId)
if err != nil {
return err
@@ -443,7 +410,7 @@ func triggerOrderCompleted(syncedServer *PoolServer, order *CheckoutOrder) error
OrderId: order.ID,
Status: order.Status,
}
cid, ok := ParseCartId(order.MerchantReference1)
cid, ok := cart.ParseCartId(order.MerchantReference1)
if !ok {
return fmt.Errorf("invalid cart id in order reference: %s", order.MerchantReference1)
}