Merge branch 'main' of https://git.tornberg.me/mats/go-cart-actor
All checks were successful
Build and Publish / Metadata (push) Successful in 11s
Build and Publish / BuildAndDeployAmd64 (push) Successful in 49s
Build and Publish / BuildAndDeployArm64 (push) Successful in 4m37s

This commit is contained in:
2025-11-10 19:47:23 +01:00
14 changed files with 588 additions and 106 deletions

View File

@@ -8,6 +8,10 @@ import (
"time"
messages "git.tornberg.me/go-cart-actor/pkg/messages"
"go.opentelemetry.io/contrib/bridges/otelslog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
@@ -19,9 +23,74 @@ type ControlServer[V any] struct {
pool GrainPool[V]
}
const name = "grpc_server"
var (
tracer = otel.Tracer(name)
meter = otel.Meter(name)
logger = otelslog.NewLogger(name)
pingCalls metric.Int64Counter
negotiateCalls metric.Int64Counter
getLocalActorIdsCalls metric.Int64Counter
announceOwnershipCalls metric.Int64Counter
announceExpiryCalls metric.Int64Counter
closingCalls metric.Int64Counter
)
func init() {
var err error
pingCalls, err = meter.Int64Counter("grpc.ping_calls",
metric.WithDescription("Number of ping calls"),
metric.WithUnit("{calls}"))
if err != nil {
panic(err)
}
negotiateCalls, err = meter.Int64Counter("grpc.negotiate_calls",
metric.WithDescription("Number of negotiate calls"),
metric.WithUnit("{calls}"))
if err != nil {
panic(err)
}
getLocalActorIdsCalls, err = meter.Int64Counter("grpc.get_local_actor_ids_calls",
metric.WithDescription("Number of get local actor ids calls"),
metric.WithUnit("{calls}"))
if err != nil {
panic(err)
}
announceOwnershipCalls, err = meter.Int64Counter("grpc.announce_ownership_calls",
metric.WithDescription("Number of announce ownership calls"),
metric.WithUnit("{calls}"))
if err != nil {
panic(err)
}
announceExpiryCalls, err = meter.Int64Counter("grpc.announce_expiry_calls",
metric.WithDescription("Number of announce expiry calls"),
metric.WithUnit("{calls}"))
if err != nil {
panic(err)
}
closingCalls, err = meter.Int64Counter("grpc.closing_calls",
metric.WithDescription("Number of closing calls"),
metric.WithUnit("{calls}"))
if err != nil {
panic(err)
}
}
func (s *ControlServer[V]) AnnounceOwnership(ctx context.Context, req *messages.OwnershipAnnounce) (*messages.OwnerChangeAck, error) {
ctx, span := tracer.Start(ctx, "grpc_announce_ownership")
defer span.End()
span.SetAttributes(
attribute.String("component", "controlplane"),
attribute.String("host", req.Host),
attribute.Int("id_count", len(req.Ids)),
)
logger.InfoContext(ctx, "announce ownership", "host", req.Host, "id_count", len(req.Ids))
announceOwnershipCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", req.Host)))
err := s.pool.HandleOwnershipChange(req.Host, req.Ids)
if err != nil {
span.RecordError(err)
return &messages.OwnerChangeAck{
Accepted: false,
Message: "owner change failed",
@@ -36,7 +105,20 @@ func (s *ControlServer[V]) AnnounceOwnership(ctx context.Context, req *messages.
}
func (s *ControlServer[V]) AnnounceExpiry(ctx context.Context, req *messages.ExpiryAnnounce) (*messages.OwnerChangeAck, error) {
ctx, span := tracer.Start(ctx, "grpc_announce_expiry")
defer span.End()
span.SetAttributes(
attribute.String("component", "controlplane"),
attribute.String("host", req.Host),
attribute.Int("id_count", len(req.Ids)),
)
logger.InfoContext(ctx, "announce expiry", "host", req.Host, "id_count", len(req.Ids))
announceExpiryCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", req.Host)))
err := s.pool.HandleRemoteExpiry(req.Host, req.Ids)
if err != nil {
span.RecordError(err)
}
return &messages.OwnerChangeAck{
Accepted: err == nil,
Message: "expiry acknowledged",
@@ -45,15 +127,28 @@ func (s *ControlServer[V]) AnnounceExpiry(ctx context.Context, req *messages.Exp
// ControlPlane: Ping
func (s *ControlServer[V]) Ping(ctx context.Context, _ *messages.Empty) (*messages.PingReply, error) {
host := s.pool.Hostname()
pingCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", host)))
// log.Printf("got ping")
return &messages.PingReply{
Host: s.pool.Hostname(),
Host: host,
UnixTime: time.Now().Unix(),
}, nil
}
// ControlPlane: Negotiate (merge host views)
func (s *ControlServer[V]) Negotiate(ctx context.Context, req *messages.NegotiateRequest) (*messages.NegotiateReply, error) {
ctx, span := tracer.Start(ctx, "grpc_negotiate")
defer span.End()
span.SetAttributes(
attribute.String("component", "controlplane"),
attribute.Int("known_hosts_count", len(req.KnownHosts)),
)
logger.InfoContext(ctx, "negotiate", "known_hosts_count", len(req.KnownHosts))
negotiateCalls.Add(ctx, 1)
s.pool.Negotiate(req.KnownHosts)
return &messages.NegotiateReply{Hosts: req.GetKnownHosts()}, nil
@@ -61,13 +156,33 @@ func (s *ControlServer[V]) Negotiate(ctx context.Context, req *messages.Negotiat
// ControlPlane: GetCartIds (locally owned carts only)
func (s *ControlServer[V]) GetLocalActorIds(ctx context.Context, _ *messages.Empty) (*messages.ActorIdsReply, error) {
return &messages.ActorIdsReply{Ids: s.pool.GetLocalIds()}, nil
ctx, span := tracer.Start(ctx, "grpc_get_local_actor_ids")
defer span.End()
ids := s.pool.GetLocalIds()
span.SetAttributes(
attribute.String("component", "controlplane"),
attribute.Int("id_count", len(ids)),
)
logger.InfoContext(ctx, "get local actor ids", "id_count", len(ids))
getLocalActorIdsCalls.Add(ctx, 1)
return &messages.ActorIdsReply{Ids: ids}, nil
}
// ControlPlane: Closing (peer shutdown notification)
func (s *ControlServer[V]) Closing(ctx context.Context, req *messages.ClosingNotice) (*messages.OwnerChangeAck, error) {
if req.GetHost() != "" {
s.pool.RemoveHost(req.GetHost())
ctx, span := tracer.Start(ctx, "grpc_closing")
defer span.End()
host := req.GetHost()
span.SetAttributes(
attribute.String("component", "controlplane"),
attribute.String("host", host),
)
logger.InfoContext(ctx, "closing notice", "host", host)
closingCalls.Add(ctx, 1, metric.WithAttributes(attribute.String("host", host)))
if host != "" {
s.pool.RemoveHost(host)
}
return &messages.OwnerChangeAck{
Accepted: true,

View File

@@ -88,19 +88,49 @@ var (
ErrMissingReservation = errors.New("missing reservation")
)
func makeKeysAndArgs(req ...ReserveRequest) ([]string, []string) {
keys := make([]string, len(req))
args := make([]string, len(req))
for i, r := range req {
if r.Quantity <= 0 {
return nil, nil
}
keys[i] = getInventoryKey(r.SKU, r.LocationID)
args[i] = strconv.Itoa(int(r.Quantity))
}
return keys, args
}
func (s *RedisInventoryService) ReservationCheck(req ...ReserveRequest) error {
if len(req) == 0 {
return ErrMissingReservation
}
keys, args := makeKeysAndArgs(req...)
if keys == nil || args == nil {
return ErrInvalidQuantity
}
cmd := reservationCheck.Run(s.ctx, s.client, keys, args)
if err := cmd.Err(); err != nil {
return err
}
if val, err := cmd.Int(); err != nil {
return err
} else if val != 1 {
return ErrInsufficientInventory
}
return nil
}
func (s *RedisInventoryService) ReserveInventory(req ...ReserveRequest) error {
if len(req) == 0 {
return ErrMissingReservation
}
keys := make([]string, len(req))
args := make([]string, len(req))
for i, r := range req {
if r.Quantity <= 0 {
return ErrInvalidQuantity
}
keys[i] = getInventoryKey(r.SKU, r.LocationID)
args[i] = strconv.Itoa(int(r.Quantity))
keys, args := makeKeysAndArgs(req...)
if keys == nil || args == nil {
return ErrInvalidQuantity
}
cmd := reserveScript.Run(s.ctx, s.client, keys, args)
if err := cmd.Err(); err != nil {
@@ -114,6 +144,47 @@ func (s *RedisInventoryService) ReserveInventory(req ...ReserveRequest) error {
return nil
}
var reservationCheck = redis.NewScript(`
-- Get the number of keys passed
local num_keys = #KEYS
-- Ensure the number of keys matches the number of quantities
if num_keys ~= #ARGV then
return {err = "Script requires the same number of keys and quantities."}
end
local new_values = {}
local payload = {}
-- ---
-- 1. CHECK PHASE
-- ---
-- Loop through all keys to check their values first
for i = 1, num_keys do
local key = KEYS[i]
local quantity_to_check = tonumber(ARGV[i])
-- Fail if the quantity is not a valid number
if not quantity_to_check then
return {err = "Invalid quantity provided for key: " .. key}
end
-- Get the current value stored at the key
local current_val = tonumber(redis.call('GET', key))
-- Check the condition
-- Fail if:
-- 1. The key doesn't exist (current_val is nil)
-- 2. The value is not > the required quantity
if not current_val or current_val <= quantity_to_check then
-- Return 0 to indicate the operation failed and no changes were made
return 0
end
end
return 1
`)
var reserveScript = redis.NewScript(`
-- Get the number of keys passed
local num_keys = #KEYS

View File

@@ -10,6 +10,9 @@ import (
"time"
messages "git.tornberg.me/go-cart-actor/pkg/messages"
"go.opentelemetry.io/contrib/bridges/otelslog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
@@ -26,6 +29,14 @@ type RemoteHost struct {
missedPings int
}
const name = "proxy"
var (
tracer = otel.Tracer(name)
meter = otel.Meter(name)
logger = otelslog.NewLogger(name)
)
func NewRemoteHost(host string) (*RemoteHost, error) {
target := fmt.Sprintf("%s:1337", host)
@@ -49,7 +60,7 @@ func NewRemoteHost(host string) (*RemoteHost, error) {
return &RemoteHost{
host: host,
httpBase: fmt.Sprintf("http://%s:8080/cart", host),
httpBase: fmt.Sprintf("http://%s:8080", host),
conn: conn,
transport: transport,
client: client,
@@ -146,8 +157,22 @@ func (h *RemoteHost) AnnounceExpiry(uids []uint64) {
func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (bool, error) {
target := fmt.Sprintf("%s%s", h.httpBase, r.URL.RequestURI())
req, err := http.NewRequestWithContext(r.Context(), r.Method, target, r.Body)
log.Printf("proxy target: %s, method: %s", target, r.Method)
ctx, span := tracer.Start(r.Context(), "remote_proxy")
defer span.End()
span.SetAttributes(
attribute.String("component", "proxy"),
attribute.String("cartid", fmt.Sprintf("%d", id)),
attribute.String("host", h.host),
attribute.String("method", r.Method),
attribute.String("target", target),
)
logger.InfoContext(ctx, "proxying request", "cartid", id, "host", h.host, "method", r.Method)
req, err := http.NewRequestWithContext(ctx, r.Method, target, r.Body)
if err != nil {
span.RecordError(err)
http.Error(w, "proxy build error", http.StatusBadGateway)
return false, err
}
@@ -161,10 +186,12 @@ func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (b
}
res, err := h.client.Do(req)
if err != nil {
span.RecordError(err)
http.Error(w, "proxy request error", http.StatusBadGateway)
return false, err
}
defer res.Body.Close()
span.SetAttributes(attribute.Int("status_code", res.StatusCode))
for k, v := range res.Header {
for _, vv := range v {
w.Header().Add(k, vv)
@@ -175,6 +202,7 @@ func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (b
w.WriteHeader(res.StatusCode)
_, copyErr := io.Copy(w, res.Body)
if copyErr != nil {
span.RecordError(copyErr)
return true, copyErr
}
return true, nil