284 lines
6.8 KiB
Go
284 lines
6.8 KiB
Go
package proxy
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"time"
|
|
|
|
"git.k6n.net/go-cart-actor/pkg/messages"
|
|
"go.opentelemetry.io/contrib/bridges/otelslog"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/types/known/anypb"
|
|
)
|
|
|
|
// RemoteHost mirrors the lightweight controller used for remote node
|
|
// interaction.
|
|
type RemoteHost[V any] struct {
|
|
host string
|
|
httpBase string
|
|
conn *grpc.ClientConn
|
|
transport *http.Transport
|
|
client *http.Client
|
|
controlClient messages.ControlPlaneClient
|
|
missedPings int
|
|
}
|
|
|
|
const name = "proxy"
|
|
|
|
var (
|
|
tracer = otel.Tracer(name)
|
|
meter = otel.Meter(name)
|
|
logger = otelslog.NewLogger(name)
|
|
)
|
|
|
|
// MockResponseWriter implements http.ResponseWriter to capture responses for proxy calls.
|
|
type MockResponseWriter struct {
|
|
StatusCode int
|
|
HeaderMap http.Header
|
|
Body *bytes.Buffer
|
|
}
|
|
|
|
func NewMockResponseWriter() *MockResponseWriter {
|
|
return &MockResponseWriter{
|
|
StatusCode: 200,
|
|
HeaderMap: make(http.Header),
|
|
Body: &bytes.Buffer{},
|
|
}
|
|
}
|
|
|
|
func (m *MockResponseWriter) Header() http.Header {
|
|
return m.HeaderMap
|
|
}
|
|
|
|
func (m *MockResponseWriter) Write(data []byte) (int, error) {
|
|
return m.Body.Write(data)
|
|
}
|
|
|
|
func (m *MockResponseWriter) WriteHeader(statusCode int) {
|
|
m.StatusCode = statusCode
|
|
}
|
|
|
|
func NewRemoteHost[V any](host string) (*RemoteHost[V], error) {
|
|
|
|
target := fmt.Sprintf("%s:1337", host)
|
|
|
|
conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
|
|
if err != nil {
|
|
log.Printf("AddRemote: dial %s failed: %v", target, err)
|
|
return nil, err
|
|
}
|
|
|
|
controlClient := messages.NewControlPlaneClient(conn)
|
|
|
|
transport := &http.Transport{
|
|
MaxIdleConns: 100,
|
|
MaxIdleConnsPerHost: 100,
|
|
DisableKeepAlives: false,
|
|
IdleConnTimeout: 120 * time.Second,
|
|
}
|
|
client := &http.Client{Transport: transport, Timeout: 10 * time.Second}
|
|
|
|
return &RemoteHost[V]{
|
|
host: host,
|
|
httpBase: fmt.Sprintf("http://%s:8080", host),
|
|
conn: conn,
|
|
transport: transport,
|
|
client: client,
|
|
controlClient: controlClient,
|
|
missedPings: 0,
|
|
}, nil
|
|
}
|
|
|
|
func (h *RemoteHost[V]) Name() string {
|
|
return h.host
|
|
}
|
|
|
|
func (h *RemoteHost[V]) Close() error {
|
|
if h.conn != nil {
|
|
h.conn.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *RemoteHost[V]) Ping() bool {
|
|
var err error = errors.ErrUnsupported
|
|
for err != nil {
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
_, err = h.controlClient.Ping(ctx, &messages.Empty{})
|
|
cancel()
|
|
if err != nil {
|
|
h.missedPings++
|
|
log.Printf("Ping %s failed (%d) %v", h.host, h.missedPings, err)
|
|
}
|
|
if !h.IsHealthy() {
|
|
return false
|
|
}
|
|
time.Sleep(time.Millisecond * 200)
|
|
}
|
|
|
|
h.missedPings = 0
|
|
return true
|
|
}
|
|
|
|
func (h *RemoteHost[V]) Get(ctx context.Context, id uint64, grain *V) error {
|
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
reply, error := h.controlClient.Get(ctx, &messages.GetRequest{Id: id})
|
|
if error != nil {
|
|
return error
|
|
}
|
|
return json.Unmarshal(reply.Grain.Value, grain)
|
|
}
|
|
|
|
func (h *RemoteHost[V]) Apply(ctx context.Context, id uint64, mutation ...proto.Message) (bool, error) {
|
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
toSend := make([]*anypb.Any, len(mutation))
|
|
for i, msg := range mutation {
|
|
anyMsg, err := anypb.New(msg)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to pack message: %w", err)
|
|
}
|
|
toSend[i] = anyMsg
|
|
}
|
|
|
|
resp, err := h.controlClient.Apply(ctx, &messages.ApplyRequest{
|
|
Id: id,
|
|
Messages: toSend,
|
|
})
|
|
if err != nil {
|
|
h.missedPings++
|
|
log.Printf("Apply %s failed: %v", h.host, err)
|
|
return false, err
|
|
}
|
|
h.missedPings = 0
|
|
return resp.Accepted, nil
|
|
}
|
|
|
|
func (h *RemoteHost[V]) Negotiate(knownHosts []string) ([]string, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
resp, err := h.controlClient.Negotiate(ctx, &messages.NegotiateRequest{
|
|
KnownHosts: knownHosts,
|
|
})
|
|
if err != nil {
|
|
h.missedPings++
|
|
log.Printf("Negotiate %s failed: %v", h.host, err)
|
|
return nil, err
|
|
}
|
|
h.missedPings = 0
|
|
return resp.Hosts, nil
|
|
}
|
|
|
|
func (h *RemoteHost[V]) GetActorIds() []uint64 {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
reply, err := h.controlClient.GetLocalActorIds(ctx, &messages.Empty{})
|
|
if err != nil {
|
|
log.Printf("Init remote %s: GetCartIds error: %v", h.host, err)
|
|
h.missedPings++
|
|
return []uint64{}
|
|
}
|
|
return reply.GetIds()
|
|
}
|
|
|
|
func (h *RemoteHost[V]) AnnounceOwnership(ownerHost string, uids []uint64) {
|
|
_, err := h.controlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{
|
|
Host: ownerHost,
|
|
Ids: uids,
|
|
})
|
|
if err != nil {
|
|
log.Printf("ownership announce to %s failed: %v", h.host, err)
|
|
h.missedPings++
|
|
return
|
|
}
|
|
h.missedPings = 0
|
|
}
|
|
|
|
func (h *RemoteHost[V]) AnnounceExpiry(uids []uint64) {
|
|
_, err := h.controlClient.AnnounceExpiry(context.Background(), &messages.ExpiryAnnounce{
|
|
Host: h.host,
|
|
Ids: uids,
|
|
})
|
|
if err != nil {
|
|
log.Printf("expiry announce to %s failed: %v", h.host, err)
|
|
h.missedPings++
|
|
return
|
|
}
|
|
h.missedPings = 0
|
|
}
|
|
|
|
func (h *RemoteHost[V]) Proxy(id uint64, w http.ResponseWriter, r *http.Request, customBody io.Reader) (bool, error) {
|
|
target := fmt.Sprintf("%s%s", h.httpBase, r.URL.RequestURI())
|
|
|
|
ctx, span := tracer.Start(r.Context(), "remote_proxy")
|
|
defer span.End()
|
|
span.SetAttributes(
|
|
attribute.String("component", "proxy"),
|
|
attribute.String("cartid", fmt.Sprintf("%d", id)),
|
|
attribute.String("host", h.host),
|
|
attribute.String("method", r.Method),
|
|
attribute.String("target", target),
|
|
)
|
|
logger.InfoContext(ctx, "proxying request", "cartid", id, "host", h.host, "method", r.Method)
|
|
|
|
var bdy io.Reader = r.Body
|
|
if customBody != nil {
|
|
bdy = customBody
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, r.Method, target, bdy)
|
|
if err != nil {
|
|
span.RecordError(err)
|
|
http.Error(w, "proxy build error", http.StatusBadGateway)
|
|
return false, err
|
|
}
|
|
//r.Body = io.NopCloser(bytes.NewReader(bodyCopy))
|
|
req.Header.Set("X-Forwarded-Host", r.Host)
|
|
|
|
for k, v := range r.Header {
|
|
for _, vv := range v {
|
|
req.Header.Add(k, vv)
|
|
}
|
|
}
|
|
res, err := h.client.Do(req)
|
|
if err != nil {
|
|
span.RecordError(err)
|
|
http.Error(w, "proxy request error", http.StatusBadGateway)
|
|
return false, err
|
|
}
|
|
defer res.Body.Close()
|
|
span.SetAttributes(attribute.Int("status_code", res.StatusCode))
|
|
for k, v := range res.Header {
|
|
for _, vv := range v {
|
|
w.Header().Add(k, vv)
|
|
}
|
|
}
|
|
w.Header().Set("X-Cart-Owner-Routed", "true")
|
|
|
|
w.WriteHeader(res.StatusCode)
|
|
_, copyErr := io.Copy(w, res.Body)
|
|
if copyErr != nil {
|
|
span.RecordError(copyErr)
|
|
return true, copyErr
|
|
}
|
|
return true, nil
|
|
|
|
}
|
|
|
|
func (r *RemoteHost[V]) IsHealthy() bool {
|
|
return r.missedPings < 3
|
|
}
|