Files
go-cart-actor/pkg/proxy/remotehost.go
matst80 330093bdec
All checks were successful
Build and Publish / BuildAndDeployAmd64 (push) Successful in 36s
Build and Publish / BuildAndDeployArm64 (push) Successful in 4m29s
more
2025-11-28 14:33:26 +01:00

241 lines
5.8 KiB
Go

package proxy
import (
"context"
"errors"
"fmt"
"io"
"log"
"net/http"
"time"
"git.k6n.net/go-cart-actor/pkg/messages"
"go.opentelemetry.io/contrib/bridges/otelslog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
// RemoteHost mirrors the lightweight controller used for remote node
// interaction.
type RemoteHost struct {
host string
httpBase string
conn *grpc.ClientConn
transport *http.Transport
client *http.Client
controlClient messages.ControlPlaneClient
missedPings int
}
const name = "proxy"
var (
tracer = otel.Tracer(name)
meter = otel.Meter(name)
logger = otelslog.NewLogger(name)
)
func NewRemoteHost(host string) (*RemoteHost, error) {
target := fmt.Sprintf("%s:1337", host)
conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Printf("AddRemote: dial %s failed: %v", target, err)
return nil, err
}
controlClient := messages.NewControlPlaneClient(conn)
transport := &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
DisableKeepAlives: false,
IdleConnTimeout: 120 * time.Second,
}
client := &http.Client{Transport: transport, Timeout: 10 * time.Second}
return &RemoteHost{
host: host,
httpBase: fmt.Sprintf("http://%s:8080", host),
conn: conn,
transport: transport,
client: client,
controlClient: controlClient,
missedPings: 0,
}, nil
}
func (h *RemoteHost) Name() string {
return h.host
}
func (h *RemoteHost) Close() error {
if h.conn != nil {
h.conn.Close()
}
return nil
}
func (h *RemoteHost) Ping() bool {
var err error = errors.ErrUnsupported
for err != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = h.controlClient.Ping(ctx, &messages.Empty{})
cancel()
if err != nil {
h.missedPings++
log.Printf("Ping %s failed (%d) %v", h.host, h.missedPings, err)
}
if !h.IsHealthy() {
return false
}
time.Sleep(time.Millisecond * 200)
}
h.missedPings = 0
return true
}
func (h *RemoteHost) Apply(ctx context.Context, id uint64, mutation ...proto.Message) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
toSend := make([]*anypb.Any, len(mutation))
for i, msg := range mutation {
anyMsg, err := anypb.New(msg)
if err != nil {
return false, fmt.Errorf("failed to pack message: %w", err)
}
toSend[i] = anyMsg
}
resp, err := h.controlClient.Apply(ctx, &messages.ApplyRequest{
Id: id,
Messages: toSend,
})
if err != nil {
h.missedPings++
log.Printf("Apply %s failed: %v", h.host, err)
return false, err
}
h.missedPings = 0
return resp.Accepted, nil
}
func (h *RemoteHost) Negotiate(knownHosts []string) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := h.controlClient.Negotiate(ctx, &messages.NegotiateRequest{
KnownHosts: knownHosts,
})
if err != nil {
h.missedPings++
log.Printf("Negotiate %s failed: %v", h.host, err)
return nil, err
}
h.missedPings = 0
return resp.Hosts, nil
}
func (h *RemoteHost) GetActorIds() []uint64 {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
reply, err := h.controlClient.GetLocalActorIds(ctx, &messages.Empty{})
if err != nil {
log.Printf("Init remote %s: GetCartIds error: %v", h.host, err)
h.missedPings++
return []uint64{}
}
return reply.GetIds()
}
func (h *RemoteHost) AnnounceOwnership(ownerHost string, uids []uint64) {
_, err := h.controlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{
Host: ownerHost,
Ids: uids,
})
if err != nil {
log.Printf("ownership announce to %s failed: %v", h.host, err)
h.missedPings++
return
}
h.missedPings = 0
}
func (h *RemoteHost) AnnounceExpiry(uids []uint64) {
_, err := h.controlClient.AnnounceExpiry(context.Background(), &messages.ExpiryAnnounce{
Host: h.host,
Ids: uids,
})
if err != nil {
log.Printf("expiry announce to %s failed: %v", h.host, err)
h.missedPings++
return
}
h.missedPings = 0
}
func (h *RemoteHost) Proxy(id uint64, w http.ResponseWriter, r *http.Request) (bool, error) {
target := fmt.Sprintf("%s%s", h.httpBase, r.URL.RequestURI())
ctx, span := tracer.Start(r.Context(), "remote_proxy")
defer span.End()
span.SetAttributes(
attribute.String("component", "proxy"),
attribute.String("cartid", fmt.Sprintf("%d", id)),
attribute.String("host", h.host),
attribute.String("method", r.Method),
attribute.String("target", target),
)
logger.InfoContext(ctx, "proxying request", "cartid", id, "host", h.host, "method", r.Method)
req, err := http.NewRequestWithContext(ctx, r.Method, target, r.Body)
if err != nil {
span.RecordError(err)
http.Error(w, "proxy build error", http.StatusBadGateway)
return false, err
}
//r.Body = io.NopCloser(bytes.NewReader(bodyCopy))
req.Header.Set("X-Forwarded-Host", r.Host)
for k, v := range r.Header {
for _, vv := range v {
req.Header.Add(k, vv)
}
}
res, err := h.client.Do(req)
if err != nil {
span.RecordError(err)
http.Error(w, "proxy request error", http.StatusBadGateway)
return false, err
}
defer res.Body.Close()
span.SetAttributes(attribute.Int("status_code", res.StatusCode))
for k, v := range res.Header {
for _, vv := range v {
w.Header().Add(k, vv)
}
}
w.Header().Set("X-Cart-Owner-Routed", "true")
w.WriteHeader(res.StatusCode)
_, copyErr := io.Copy(w, res.Body)
if copyErr != nil {
span.RecordError(copyErr)
return true, copyErr
}
return true, nil
}
func (r *RemoteHost) IsHealthy() bool {
return r.missedPings < 3
}