Compare commits
2 Commits
1c31ff9d80
...
c57b8a2c53
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c57b8a2c53 | ||
|
|
c4572bf057 |
@@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"slices"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
messages "git.tornberg.me/go-cart-actor/proto"
|
messages "git.tornberg.me/go-cart-actor/proto"
|
||||||
@@ -118,6 +119,43 @@ func (c *CartGrain) GetStorageMessage(since int64) []StorableMessage {
|
|||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *CartGrain) GetState() ([]byte, error) {
|
||||||
|
return json.Marshal(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CartGrain) ItemsWithDelivery() []int {
|
||||||
|
ret := make([]int, 0, len(c.Items))
|
||||||
|
for _, item := range c.Items {
|
||||||
|
for _, delivery := range c.Deliveries {
|
||||||
|
for _, id := range delivery.Items {
|
||||||
|
if item.Id == id {
|
||||||
|
ret = append(ret, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CartGrain) ItemsWithoutDelivery() []int {
|
||||||
|
ret := make([]int, 0, len(c.Items))
|
||||||
|
hasDelivery := c.ItemsWithDelivery()
|
||||||
|
for _, item := range c.Items {
|
||||||
|
found := false
|
||||||
|
for _, id := range hasDelivery {
|
||||||
|
if item.Id == id {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !found {
|
||||||
|
ret = append(ret, item.Id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
|
func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
|
||||||
if message.TimeStamp == nil {
|
if message.TimeStamp == nil {
|
||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
@@ -195,21 +233,29 @@ func (c *CartGrain) HandleMessage(message *Message, isReplay bool) ([]byte, erro
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case AddDeliveryType:
|
case SetDeliveryType:
|
||||||
msg, ok := message.Content.(*messages.SetDelivery)
|
msg, ok := message.Content.(*messages.SetDelivery)
|
||||||
if !ok {
|
if !ok {
|
||||||
err = fmt.Errorf("expected SetDelivery")
|
err = fmt.Errorf("expected SetDelivery")
|
||||||
} else {
|
} else {
|
||||||
c.lastDeliveryId++
|
c.lastDeliveryId++
|
||||||
items := make([]int, 0)
|
items := make([]int, 0)
|
||||||
|
withDelivery := c.ItemsWithDelivery()
|
||||||
|
if len(msg.Items) == 0 {
|
||||||
|
items = append(items, c.ItemsWithoutDelivery()...)
|
||||||
|
} else {
|
||||||
for _, id := range msg.Items {
|
for _, id := range msg.Items {
|
||||||
for _, item := range c.Items {
|
for _, item := range c.Items {
|
||||||
if item.Id == int(id) {
|
if item.Id == int(id) {
|
||||||
|
if slices.Contains(withDelivery, item.Id) {
|
||||||
|
return nil, fmt.Errorf("item already has delivery")
|
||||||
|
}
|
||||||
items = append(items, int(item.Id))
|
items = append(items, int(item.Id))
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
c.Deliveries = append(c.Deliveries, CartDelivery{
|
c.Deliveries = append(c.Deliveries, CartDelivery{
|
||||||
Provider: msg.Provider,
|
Provider: msg.Provider,
|
||||||
Price: 49,
|
Price: 49,
|
||||||
|
|||||||
@@ -7,6 +7,15 @@ import (
|
|||||||
messages "git.tornberg.me/go-cart-actor/proto"
|
messages "git.tornberg.me/go-cart-actor/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func GetMessage(t uint16, data interface{}) *Message {
|
||||||
|
ts := time.Now().Unix()
|
||||||
|
return &Message{
|
||||||
|
TimeStamp: &ts,
|
||||||
|
Type: t,
|
||||||
|
Content: data,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestAddToCart(t *testing.T) {
|
func TestAddToCart(t *testing.T) {
|
||||||
grain, err := spawn(ToCartId("kalle"))
|
grain, err := spawn(ToCartId("kalle"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -15,19 +24,14 @@ func TestAddToCart(t *testing.T) {
|
|||||||
if len(grain.Items) != 0 {
|
if len(grain.Items) != 0 {
|
||||||
t.Errorf("Expected 0 items, got %d\n", len(grain.Items))
|
t.Errorf("Expected 0 items, got %d\n", len(grain.Items))
|
||||||
}
|
}
|
||||||
addItem := messages.AddItem{
|
msg := GetMessage(AddItemType, &messages.AddItem{
|
||||||
Quantity: 2,
|
Quantity: 2,
|
||||||
Price: 100,
|
Price: 100,
|
||||||
Sku: "123",
|
Sku: "123",
|
||||||
Name: "Test item",
|
Name: "Test item",
|
||||||
Image: "test.jpg",
|
Image: "test.jpg",
|
||||||
}
|
})
|
||||||
ts := time.Now().Unix()
|
|
||||||
msg := &Message{
|
|
||||||
TimeStamp: &ts,
|
|
||||||
Type: AddItemType,
|
|
||||||
Content: &addItem,
|
|
||||||
}
|
|
||||||
result, err := grain.HandleMessage(msg, false)
|
result, err := grain.HandleMessage(msg, false)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -59,3 +63,109 @@ func TestAddToCart(t *testing.T) {
|
|||||||
t.Errorf("Expected total price 400, got %d\n", grain.TotalPrice)
|
t.Errorf("Expected total price 400, got %d\n", grain.TotalPrice)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSetDelivery(t *testing.T) {
|
||||||
|
grain, err := spawn(ToCartId("kalle"))
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error spawning: %v\n", err)
|
||||||
|
}
|
||||||
|
if len(grain.Items) != 0 {
|
||||||
|
t.Errorf("Expected 0 items, got %d\n", len(grain.Items))
|
||||||
|
}
|
||||||
|
msg := GetMessage(AddItemType, &messages.AddItem{
|
||||||
|
Quantity: 2,
|
||||||
|
Price: 100,
|
||||||
|
Sku: "123",
|
||||||
|
Name: "Test item",
|
||||||
|
Image: "test.jpg",
|
||||||
|
})
|
||||||
|
|
||||||
|
grain.HandleMessage(msg, false)
|
||||||
|
|
||||||
|
msg = GetMessage(AddItemType, &messages.AddItem{
|
||||||
|
Quantity: 2,
|
||||||
|
Price: 100,
|
||||||
|
Sku: "123",
|
||||||
|
Name: "Test item",
|
||||||
|
Image: "test.jpg",
|
||||||
|
})
|
||||||
|
|
||||||
|
result, err := grain.HandleMessage(msg, false)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error handling message: %v\n", err)
|
||||||
|
}
|
||||||
|
if len(result) == 0 {
|
||||||
|
t.Errorf("Expected result, got nil\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
setDelivery := GetMessage(SetDeliveryType, &messages.SetDelivery{
|
||||||
|
Provider: "test",
|
||||||
|
Items: []int64{1},
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err = grain.HandleMessage(setDelivery, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error handling message: %v\n", err)
|
||||||
|
}
|
||||||
|
if len(grain.Deliveries) != 1 {
|
||||||
|
t.Errorf("Expected 1 delivery, got %d\n", len(grain.Deliveries))
|
||||||
|
}
|
||||||
|
if len(grain.Deliveries[0].Items) != 1 {
|
||||||
|
t.Errorf("Expected 1 items in delivery, got %d\n", len(grain.Deliveries[0].Items))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSetDeliveryOnAll(t *testing.T) {
|
||||||
|
grain, err := spawn(ToCartId("kalle"))
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error spawning: %v\n", err)
|
||||||
|
}
|
||||||
|
if len(grain.Items) != 0 {
|
||||||
|
t.Errorf("Expected 0 items, got %d\n", len(grain.Items))
|
||||||
|
}
|
||||||
|
msg := GetMessage(AddItemType, &messages.AddItem{
|
||||||
|
Quantity: 2,
|
||||||
|
Price: 100,
|
||||||
|
Sku: "123",
|
||||||
|
Name: "Test item",
|
||||||
|
Image: "test.jpg",
|
||||||
|
})
|
||||||
|
|
||||||
|
grain.HandleMessage(msg, false)
|
||||||
|
|
||||||
|
msg = GetMessage(AddItemType, &messages.AddItem{
|
||||||
|
Quantity: 2,
|
||||||
|
Price: 100,
|
||||||
|
Sku: "1233",
|
||||||
|
Name: "Test item2",
|
||||||
|
Image: "test.jpg",
|
||||||
|
})
|
||||||
|
|
||||||
|
result, err := grain.HandleMessage(msg, false)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error handling message: %v\n", err)
|
||||||
|
}
|
||||||
|
if len(result) == 0 {
|
||||||
|
t.Errorf("Expected result, got nil\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
setDelivery := GetMessage(SetDeliveryType, &messages.SetDelivery{
|
||||||
|
Provider: "test",
|
||||||
|
Items: []int64{},
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err = grain.HandleMessage(setDelivery, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error handling message: %v\n", err)
|
||||||
|
}
|
||||||
|
if len(grain.Deliveries) != 1 {
|
||||||
|
t.Errorf("Expected 1 delivery, got %d\n", len(grain.Deliveries))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(grain.Deliveries[0].Items) != 2 {
|
||||||
|
t.Errorf("Expected 2 items in delivery, got %d\n", len(grain.Deliveries[0].Items))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|||||||
@@ -88,3 +88,42 @@ func TestItemRequest(t *testing.T) {
|
|||||||
t.Errorf("Expected sku '123', got %s\n", r.Sku)
|
t.Errorf("Expected sku '123', got %s\n", r.Sku)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSetDeliveryMssage(t *testing.T) {
|
||||||
|
h, err := GetMessageHandler(SetDeliveryType)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error getting message handler: %v\n", err)
|
||||||
|
}
|
||||||
|
if h == nil {
|
||||||
|
t.Errorf("Expected message handler, got nil\n")
|
||||||
|
}
|
||||||
|
message := Message{
|
||||||
|
Type: SetDeliveryType,
|
||||||
|
Content: &messages.SetDelivery{
|
||||||
|
Provider: "test",
|
||||||
|
Items: []int64{1, 2},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var b bytes.Buffer
|
||||||
|
err = h.Write(&message, &b)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error writing message: %v\n", err)
|
||||||
|
}
|
||||||
|
result, err := h.Read(b.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error reading message: %v\n", err)
|
||||||
|
}
|
||||||
|
if result == nil {
|
||||||
|
t.Errorf("Expected result, got nil\n")
|
||||||
|
}
|
||||||
|
r, ok := result.(*messages.SetDelivery)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("Expected AddRequest, got %T\n", result)
|
||||||
|
}
|
||||||
|
if len(r.Items) != 2 {
|
||||||
|
t.Errorf("Expected 2 items, got %d\n", len(r.Items))
|
||||||
|
}
|
||||||
|
if r.Provider != "test" {
|
||||||
|
t.Errorf("Expected provider 'test', got %s\n", r.Provider)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ package main
|
|||||||
const (
|
const (
|
||||||
AddRequestType = 1
|
AddRequestType = 1
|
||||||
AddItemType = 2
|
AddItemType = 2
|
||||||
AddDeliveryType = 3
|
//AddDeliveryType = 3
|
||||||
RemoveItemType = 4
|
RemoveItemType = 4
|
||||||
RemoveDeliveryType = 5
|
RemoveDeliveryType = 5
|
||||||
ChangeQuantityType = 6
|
ChangeQuantityType = 6
|
||||||
|
|||||||
31
rpc-pool.go
31
rpc-pool.go
@@ -4,6 +4,10 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RemoteGrainPool struct {
|
type RemoteGrainPool struct {
|
||||||
@@ -41,13 +45,36 @@ func NewRemoteGrain(id CartId, host string) (*RemoteGrain, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
remoteCartLatency = promauto.NewCounter(prometheus.CounterOpts{
|
||||||
|
Name: "cart_remote_grain_calls_total_latency",
|
||||||
|
Help: "The total latency of remote grains",
|
||||||
|
})
|
||||||
|
remoteCartCallsTotal = promauto.NewCounter(prometheus.CounterOpts{
|
||||||
|
Name: "cart_remote_grain_calls_total",
|
||||||
|
Help: "The total number of calls to remote grains",
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
func MeasureLatency(fn func() ([]byte, error)) ([]byte, error) {
|
||||||
|
start := time.Now()
|
||||||
|
data, err := fn()
|
||||||
|
if err != nil {
|
||||||
|
return data, err
|
||||||
|
}
|
||||||
|
elapsed := time.Since(start).Milliseconds()
|
||||||
|
remoteCartLatency.Add(float64(elapsed))
|
||||||
|
remoteCartCallsTotal.Inc()
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
|
func (g *RemoteGrain) HandleMessage(message *Message, isReplay bool) ([]byte, error) {
|
||||||
|
|
||||||
data, err := GetData(message.Write)
|
data, err := GetData(message.Write)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
reply, err := g.Call(RemoteHandleMutation, g.Id, RemoteHandleMutationReply, data)
|
reply, err := MeasureLatency(func() ([]byte, error) { return g.Call(RemoteHandleMutation, g.Id, RemoteHandleMutationReply, data) })
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -61,7 +88,7 @@ func (g *RemoteGrain) GetId() CartId {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *RemoteGrain) GetCurrentState() ([]byte, error) {
|
func (g *RemoteGrain) GetCurrentState() ([]byte, error) {
|
||||||
return g.Call(RemoteGetState, g.Id, RemoteGetStateReply, []byte{})
|
return MeasureLatency(func() ([]byte, error) { return g.Call(RemoteGetState, g.Id, RemoteGetStateReply, []byte{}) })
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRemoteGrainPool(addr string) *RemoteGrainPool {
|
func NewRemoteGrainPool(addr string) *RemoteGrainPool {
|
||||||
|
|||||||
@@ -41,7 +41,9 @@ func (h *GrainHandler) RemoteHandleMessageHandler(id CartId, data []byte) (uint3
|
|||||||
fmt.Println("Error reading message:", err)
|
fmt.Println("Error reading message:", err)
|
||||||
return RemoteHandleMutationReply, nil, err
|
return RemoteHandleMutationReply, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
replyData, err := h.pool.Process(id, msg)
|
replyData, err := h.pool.Process(id, msg)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Error handling message:", err)
|
fmt.Println("Error handling message:", err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user