2 Commits

Author SHA1 Message Date
matst80
c57b8a2c53 measure remote grain latency
All checks were successful
Build and Publish / BuildAndDeploy (push) Successful in 1m51s
2024-11-11 13:35:03 +01:00
matst80
c4572bf057 set deliveries on all items 2024-11-11 13:22:28 +01:00
6 changed files with 243 additions and 19 deletions

View File

@@ -3,6 +3,7 @@ package main
import (
"encoding/json"
"fmt"
"slices"
"time"
messages "git.tornberg.me/go-cart-actor/proto"
@@ -118,6 +119,43 @@ func (c *CartGrain) GetStorageMessage(since int64) []StorableMessage {
return ret
}
func (c *CartGrain) GetState() ([]byte, error) {
return json.Marshal(c)
}
func (c *CartGrain) ItemsWithDelivery() []int {
ret := make([]int, 0, len(c.Items))
for _, item := range c.Items {
for _, delivery := range c.Deliveries {
for _, id := range delivery.Items {
if item.Id == id {
ret = append(ret, id)
}
}
}
}
return ret
}
func (c *CartGrain) ItemsWithoutDelivery() []int {
ret := make([]int, 0, len(c.Items))
hasDelivery := c.ItemsWithDelivery()
for _, item := range c.Items {
found := false
for _, id := range hasDelivery {
if item.Id == id {
found = true
break
}
}
if !found {
ret = append(ret, item.Id)
}
}
return ret
}
func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
if message.TimeStamp == nil {
now := time.Now().Unix()
@@ -195,21 +233,29 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, erro
}
}
}
case AddDeliveryType:
case SetDeliveryType:
msg, ok := message.Content.(*messages.SetDelivery)
if !ok {
err = fmt.Errorf("expected SetDelivery")
} else {
c.lastDeliveryId++
items := make([]int, 0)
withDelivery := c.ItemsWithDelivery()
if len(msg.Items) == 0 {
items = append(items, c.ItemsWithoutDelivery()...)
} else {
for _, id := range msg.Items {
for _, item := range c.Items {
if item.Id == int(id) {
if slices.Contains(withDelivery, item.Id) {
return nil, fmt.Errorf("item already has delivery")
}
items = append(items, int(item.Id))
break
}
}
}
}
c.Deliveries = append(c.Deliveries, CartDelivery{
Provider: msg.Provider,
Price: 49,

View File

@@ -7,6 +7,15 @@ import (
messages "git.tornberg.me/go-cart-actor/proto"
)
func GetMessage(t uint16, data interface{}) *Message {
ts := time.Now().Unix()
return &Message{
TimeStamp: &ts,
Type: t,
Content: data,
}
}
func TestAddToCart(t *testing.T) {
grain, err := spawn(ToCartId("kalle"))
if err != nil {
@@ -15,19 +24,14 @@ func TestAddToCart(t *testing.T) {
if len(grain.Items) != 0 {
t.Errorf("Expected 0 items, got %d\n", len(grain.Items))
}
addItem := messages.AddItem{
msg := GetMessage(AddItemType, &messages.AddItem{
Quantity: 2,
Price: 100,
Sku: "123",
Name: "Test item",
Image: "test.jpg",
}
ts := time.Now().Unix()
msg := &Message{
TimeStamp: &ts,
Type: AddItemType,
Content: &addItem,
}
})
result, err := grain.HandleMessage(msg, false)
if err != nil {
@@ -59,3 +63,109 @@ func TestAddToCart(t *testing.T) {
t.Errorf("Expected total price 400, got %d\n", grain.TotalPrice)
}
}
func TestSetDelivery(t *testing.T) {
grain, err := spawn(ToCartId("kalle"))
if err != nil {
t.Errorf("Error spawning: %v\n", err)
}
if len(grain.Items) != 0 {
t.Errorf("Expected 0 items, got %d\n", len(grain.Items))
}
msg := GetMessage(AddItemType, &messages.AddItem{
Quantity: 2,
Price: 100,
Sku: "123",
Name: "Test item",
Image: "test.jpg",
})
grain.HandleMessage(msg, false)
msg = GetMessage(AddItemType, &messages.AddItem{
Quantity: 2,
Price: 100,
Sku: "123",
Name: "Test item",
Image: "test.jpg",
})
result, err := grain.HandleMessage(msg, false)
if err != nil {
t.Errorf("Error handling message: %v\n", err)
}
if len(result) == 0 {
t.Errorf("Expected result, got nil\n")
}
setDelivery := GetMessage(SetDeliveryType, &messages.SetDelivery{
Provider: "test",
Items: []int64{1},
})
_, err = grain.HandleMessage(setDelivery, false)
if err != nil {
t.Errorf("Error handling message: %v\n", err)
}
if len(grain.Deliveries) != 1 {
t.Errorf("Expected 1 delivery, got %d\n", len(grain.Deliveries))
}
if len(grain.Deliveries[0].Items) != 1 {
t.Errorf("Expected 1 items in delivery, got %d\n", len(grain.Deliveries[0].Items))
}
}
func TestSetDeliveryOnAll(t *testing.T) {
grain, err := spawn(ToCartId("kalle"))
if err != nil {
t.Errorf("Error spawning: %v\n", err)
}
if len(grain.Items) != 0 {
t.Errorf("Expected 0 items, got %d\n", len(grain.Items))
}
msg := GetMessage(AddItemType, &messages.AddItem{
Quantity: 2,
Price: 100,
Sku: "123",
Name: "Test item",
Image: "test.jpg",
})
grain.HandleMessage(msg, false)
msg = GetMessage(AddItemType, &messages.AddItem{
Quantity: 2,
Price: 100,
Sku: "1233",
Name: "Test item2",
Image: "test.jpg",
})
result, err := grain.HandleMessage(msg, false)
if err != nil {
t.Errorf("Error handling message: %v\n", err)
}
if len(result) == 0 {
t.Errorf("Expected result, got nil\n")
}
setDelivery := GetMessage(SetDeliveryType, &messages.SetDelivery{
Provider: "test",
Items: []int64{},
})
_, err = grain.HandleMessage(setDelivery, false)
if err != nil {
t.Errorf("Error handling message: %v\n", err)
}
if len(grain.Deliveries) != 1 {
t.Errorf("Expected 1 delivery, got %d\n", len(grain.Deliveries))
}
if len(grain.Deliveries[0].Items) != 2 {
t.Errorf("Expected 2 items in delivery, got %d\n", len(grain.Deliveries[0].Items))
}
}

View File

@@ -88,3 +88,42 @@ func TestItemRequest(t *testing.T) {
t.Errorf("Expected sku '123', got %s\n", r.Sku)
}
}
func TestSetDeliveryMssage(t *testing.T) {
h, err := GetMessageHandler(SetDeliveryType)
if err != nil {
t.Errorf("Error getting message handler: %v\n", err)
}
if h == nil {
t.Errorf("Expected message handler, got nil\n")
}
message := Message{
Type: SetDeliveryType,
Content: &messages.SetDelivery{
Provider: "test",
Items: []int64{1, 2},
},
}
var b bytes.Buffer
err = h.Write(&message, &b)
if err != nil {
t.Errorf("Error writing message: %v\n", err)
}
result, err := h.Read(b.Bytes())
if err != nil {
t.Errorf("Error reading message: %v\n", err)
}
if result == nil {
t.Errorf("Expected result, got nil\n")
}
r, ok := result.(*messages.SetDelivery)
if !ok {
t.Errorf("Expected AddRequest, got %T\n", result)
}
if len(r.Items) != 2 {
t.Errorf("Expected 2 items, got %d\n", len(r.Items))
}
if r.Provider != "test" {
t.Errorf("Expected provider 'test', got %s\n", r.Provider)
}
}

View File

@@ -3,7 +3,7 @@ package main
const (
AddRequestType = 1
AddItemType = 2
AddDeliveryType = 3
//AddDeliveryType = 3
RemoveItemType = 4
RemoveDeliveryType = 5
ChangeQuantityType = 6

View File

@@ -4,6 +4,10 @@ import (
"fmt"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type RemoteGrainPool struct {
@@ -41,13 +45,36 @@ func NewRemoteGrain(id CartId, host string) (*RemoteGrain, error) {
}, nil
}
var (
remoteCartLatency = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_remote_grain_calls_total_latency",
Help: "The total latency of remote grains",
})
remoteCartCallsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "cart_remote_grain_calls_total",
Help: "The total number of calls to remote grains",
})
)
func MeasureLatency(fn func() ([]byte, error)) ([]byte, error) {
start := time.Now()
data, err := fn()
if err != nil {
return data, err
}
elapsed := time.Since(start).Milliseconds()
remoteCartLatency.Add(float64(elapsed))
remoteCartCallsTotal.Inc()
return data, nil
}
func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
data, err := GetData(message.Write)
if err != nil {
return nil, err
}
reply, err := g.Call(RemoteHandleMutation, g.Id, RemoteHandleMutationReply, data)
reply, err := MeasureLatency(func() ([]byte, error) { return g.Call(RemoteHandleMutation, g.Id, RemoteHandleMutationReply, data) })
if err != nil {
return nil, err
@@ -61,7 +88,7 @@ func (g *RemoteGrain) GetId() CartId {
}
func (g *RemoteGrain) GetCurrentState() ([]byte, error) {
return g.Call(RemoteGetState, g.Id, RemoteGetStateReply, []byte{})
return MeasureLatency(func() ([]byte, error) { return g.Call(RemoteGetState, g.Id, RemoteGetStateReply, []byte{}) })
}
func NewRemoteGrainPool(addr string) *RemoteGrainPool {

View File

@@ -41,7 +41,9 @@ func (h *GrainHandler) RemoteHandleMessageHandler(id CartId, data []byte) (uint3
fmt.Println("Error reading message:", err)
return RemoteHandleMutationReply, nil, err
}
replyData, err := h.pool.Process(id, msg)
if err != nil {
fmt.Println("Error handling message:", err)
}