slask
Some checks failed
Build and Publish / BuildAndDeployAmd64 (push) Successful in 38s
Build and Publish / BuildAndDeployArm64 (push) Has been cancelled

This commit is contained in:
matst80
2025-12-02 09:04:42 +01:00
parent 060b3dfbf0
commit ee1b96fece
6 changed files with 254 additions and 76 deletions

View File

@@ -3,6 +3,7 @@ package proxy
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
@@ -22,7 +23,7 @@ import (
// RemoteHost mirrors the lightweight controller used for remote node
// interaction.
type RemoteHost struct {
type RemoteHost[V any] struct {
host string
httpBase string
conn *grpc.ClientConn
@@ -67,7 +68,7 @@ func (m *MockResponseWriter) WriteHeader(statusCode int) {
m.StatusCode = statusCode
}
func NewRemoteHost(host string) (*RemoteHost, error) {
func NewRemoteHost[V any](host string) (*RemoteHost[V], error) {
target := fmt.Sprintf("%s:1337", host)
@@ -88,7 +89,7 @@ func NewRemoteHost(host string) (*RemoteHost, error) {
}
client := &http.Client{Transport: transport, Timeout: 10 * time.Second}
return &RemoteHost{
return &RemoteHost[V]{
host: host,
httpBase: fmt.Sprintf("http://%s:8080", host),
conn: conn,
@@ -99,18 +100,18 @@ func NewRemoteHost(host string) (*RemoteHost, error) {
}, nil
}
func (h *RemoteHost) Name() string {
func (h *RemoteHost[V]) Name() string {
return h.host
}
func (h *RemoteHost) Close() error {
func (h *RemoteHost[V]) Close() error {
if h.conn != nil {
h.conn.Close()
}
return nil
}
func (h *RemoteHost) Ping() bool {
func (h *RemoteHost[V]) Ping() bool {
var err error = errors.ErrUnsupported
for err != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@@ -130,7 +131,17 @@ func (h *RemoteHost) Ping() bool {
return true
}
func (h *RemoteHost) Apply(ctx context.Context, id uint64, mutation ...proto.Message) (bool, error) {
func (h *RemoteHost[V]) Get(ctx context.Context, id uint64, grain *V) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
reply, error := h.controlClient.Get(ctx, &messages.GetRequest{Id: id})
if error != nil {
return error
}
return json.Unmarshal(reply.Grain.Value, grain)
}
func (h *RemoteHost[V]) Apply(ctx context.Context, id uint64, mutation ...proto.Message) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
@@ -156,7 +167,7 @@ func (h *RemoteHost) Apply(ctx context.Context, id uint64, mutation ...proto.Mes
return resp.Accepted, nil
}
func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) {
func (h *RemoteHost[V]) Negotiate(knownHosts []string) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@@ -172,7 +183,7 @@ func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) {
return resp.Hosts, nil
}
func (h *RemoteHost) GetActorIds() []uint64 {
func (h *RemoteHost[V]) GetActorIds() []uint64 {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
reply, err := h.controlClient.GetLocalActorIds(ctx, &messages.Empty{})
@@ -184,7 +195,7 @@ func (h *RemoteHost) GetActorIds() []uint64 {
return reply.GetIds()
}
func (h *RemoteHost) AnnounceOwnership(ownerHost string, uids []uint64) {
func (h *RemoteHost[V]) AnnounceOwnership(ownerHost string, uids []uint64) {
_, err := h.controlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{
Host: ownerHost,
Ids: uids,
@@ -197,7 +208,7 @@ func (h *RemoteHost) AnnounceOwnership(ownerHost string, uids []uint64) {
h.missedPings = 0
}
func (h *RemoteHost) AnnounceExpiry(uids []uint64) {
func (h *RemoteHost[V]) AnnounceExpiry(uids []uint64) {
_, err := h.controlClient.AnnounceExpiry(context.Background(), &messages.ExpiryAnnounce{
Host: h.host,
Ids: uids,
@@ -210,7 +221,7 @@ func (h *RemoteHost) AnnounceExpiry(uids []uint64) {
h.missedPings = 0
}
func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request, customBody io.Reader) (bool, error) {
func (h *RemoteHost[V]) Proxy(id uint64, w http.ResponseWriter, r *http.Request, customBody io.Reader) (bool, error) {
target := fmt.Sprintf("%s%s", h.httpBase, r.URL.RequestURI())
ctx, span := tracer.Start(r.Context(), "remote_proxy")
@@ -267,6 +278,6 @@ func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request, cu
}
func (r *RemoteHost) IsHealthy() bool {
func (r *RemoteHost[V]) IsHealthy() bool {
return r.missedPings < 3
}