package proxy import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "log" "net/http" "time" messages "git.k6n.net/go-cart-actor/proto/control" "go.opentelemetry.io/contrib/bridges/otelslog" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" ) // RemoteHost mirrors the lightweight controller used for remote node // interaction. type RemoteHost[V any] struct { host string httpBase string conn *grpc.ClientConn transport *http.Transport client *http.Client controlClient messages.ControlPlaneClient missedPings int } const name = "proxy" var ( tracer = otel.Tracer(name) meter = otel.Meter(name) logger = otelslog.NewLogger(name) ) // MockResponseWriter implements http.ResponseWriter to capture responses for proxy calls. type MockResponseWriter struct { StatusCode int HeaderMap http.Header Body *bytes.Buffer } func NewMockResponseWriter() *MockResponseWriter { return &MockResponseWriter{ StatusCode: 200, HeaderMap: make(http.Header), Body: &bytes.Buffer{}, } } func (m *MockResponseWriter) Header() http.Header { return m.HeaderMap } func (m *MockResponseWriter) Write(data []byte) (int, error) { return m.Body.Write(data) } func (m *MockResponseWriter) WriteHeader(statusCode int) { m.StatusCode = statusCode } func NewRemoteHost[V any](host string) (*RemoteHost[V], error) { target := fmt.Sprintf("%s:1337", host) conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Printf("AddRemote: dial %s failed: %v", target, err) return nil, err } controlClient := messages.NewControlPlaneClient(conn) transport := &http.Transport{ MaxIdleConns: 100, MaxIdleConnsPerHost: 100, DisableKeepAlives: false, IdleConnTimeout: 120 * time.Second, } client := &http.Client{Transport: transport, Timeout: 10 * time.Second} return &RemoteHost[V]{ host: host, httpBase: fmt.Sprintf("http://%s:8080", host), conn: conn, transport: transport, client: client, controlClient: controlClient, missedPings: 0, }, nil } func (h *RemoteHost[V]) Name() string { return h.host } func (h *RemoteHost[V]) Close() error { if h.conn != nil { h.conn.Close() } return nil } func (h *RemoteHost[V]) Ping() bool { var err error = errors.ErrUnsupported for err != nil { ctx, cancel := context.WithTimeout(context.Background(), time.Second) _, err = h.controlClient.Ping(ctx, &messages.Empty{}) cancel() if err != nil { h.missedPings++ log.Printf("Ping %s failed (%d) %v", h.host, h.missedPings, err) } if !h.IsHealthy() { return false } time.Sleep(time.Millisecond * 200) } h.missedPings = 0 return true } func (h *RemoteHost[V]) Get(ctx context.Context, id uint64, grain any) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() reply, error := h.controlClient.Get(ctx, &messages.GetRequest{Id: id}) if error != nil { return error } return json.Unmarshal(reply.Grain.Value, grain) } func (h *RemoteHost[V]) Apply(ctx context.Context, id uint64, mutation ...proto.Message) (bool, error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() toSend := make([]*anypb.Any, len(mutation)) for i, msg := range mutation { anyMsg, err := anypb.New(msg) if err != nil { return false, fmt.Errorf("failed to pack message: %w", err) } toSend[i] = anyMsg } resp, err := h.controlClient.Apply(ctx, &messages.ApplyRequest{ Id: id, Messages: toSend, }) if err != nil { h.missedPings++ log.Printf("Apply %s failed: %v", h.host, err) return false, err } h.missedPings = 0 return resp.Accepted, nil } func (h *RemoteHost[V]) Negotiate(knownHosts []string) ([]string, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() resp, err := h.controlClient.Negotiate(ctx, &messages.NegotiateRequest{ KnownHosts: knownHosts, }) if err != nil { h.missedPings++ log.Printf("Negotiate %s failed: %v", h.host, err) return nil, err } h.missedPings = 0 return resp.Hosts, nil } func (h *RemoteHost[V]) GetActorIds() []uint64 { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() reply, err := h.controlClient.GetLocalActorIds(ctx, &messages.Empty{}) if err != nil { log.Printf("Init remote %s: GetCartIds error: %v", h.host, err) h.missedPings++ return []uint64{} } return reply.GetIds() } func (h *RemoteHost[V]) AnnounceOwnership(ownerHost string, uids []uint64) { _, err := h.controlClient.AnnounceOwnership(context.Background(), &messages.OwnershipAnnounce{ Host: ownerHost, Ids: uids, }) if err != nil { log.Printf("ownership announce to %s failed: %v", h.host, err) h.missedPings++ return } h.missedPings = 0 } func (h *RemoteHost[V]) AnnounceExpiry(uids []uint64) { _, err := h.controlClient.AnnounceExpiry(context.Background(), &messages.ExpiryAnnounce{ Host: h.host, Ids: uids, }) if err != nil { log.Printf("expiry announce to %s failed: %v", h.host, err) h.missedPings++ return } h.missedPings = 0 } func (h *RemoteHost[V]) Proxy(id uint64, w http.ResponseWriter, r *http.Request, customBody io.Reader) (bool, error) { target := fmt.Sprintf("%s%s", h.httpBase, r.URL.RequestURI()) ctx, span := tracer.Start(r.Context(), "remote_proxy") defer span.End() span.SetAttributes( attribute.String("component", "proxy"), attribute.String("cartid", fmt.Sprintf("%d", id)), attribute.String("host", h.host), attribute.String("method", r.Method), attribute.String("target", target), ) logger.InfoContext(ctx, "proxying request", "cartid", id, "host", h.host, "method", r.Method) var bdy io.Reader = r.Body if customBody != nil { bdy = customBody } req, err := http.NewRequestWithContext(ctx, r.Method, target, bdy) if err != nil { span.RecordError(err) http.Error(w, "proxy build error", http.StatusBadGateway) return false, err } //r.Body = io.NopCloser(bytes.NewReader(bodyCopy)) req.Header.Set("X-Forwarded-Host", r.Host) for k, v := range r.Header { for _, vv := range v { req.Header.Add(k, vv) } } res, err := h.client.Do(req) if err != nil { span.RecordError(err) http.Error(w, "proxy request error", http.StatusBadGateway) return false, err } defer res.Body.Close() span.SetAttributes(attribute.Int("status_code", res.StatusCode)) for k, v := range res.Header { for _, vv := range v { w.Header().Add(k, vv) } } w.Header().Set("X-Cart-Owner-Routed", "true") w.WriteHeader(res.StatusCode) _, copyErr := io.Copy(w, res.Body) if copyErr != nil { span.RecordError(copyErr) return true, copyErr } return true, nil } func (r *RemoteHost[V]) IsHealthy() bool { return r.missedPings < 3 }