package main import ( "bufio" "crypto/hmac" "crypto/sha256" "encoding/hex" "encoding/json" "encoding/xml" "errors" "flag" "fmt" "html/template" "math" "io" "log" "net/http" "os" "path" "strconv" "strings" "syscall" "time" "pipkin/url" ) type Pipkin struct { client *http.Client config Config log *Log templates string } type Config struct { Host string `json:"host"` Port string `json:"port"` Buckets map[string]Bucket `json:"buckets"` Hosts map[string]Host `json:"hosts"` Proxy bool `json:"proxied"` LogLevel string `json:"loglevel"` } type Bucket struct { KeyID string `json:"keyID"` AppKey string `json:"appKey"` Region string `json:"region"` Endpoint string `json:"endpoint"` Host string `json:"host"` } type Host struct { AutoIndex bool `json:"autoindex"` Bucket string `json:"bucket"` Index string `json:"index"` Path string `json:"path"` } type Index struct { Path string CommonPrefixes []Prefixes Contents []Contents Name string MaxKeys uint KeyCount uint } type Prefixes struct { Prefix string } type Contents struct { ETag string Key string LastModified string Size string } // Headers we receive from clients and forward on to S3 endpoints. var ALLOWED_REQUEST_HEADERS = [...]string{ "accept", "accept-encoding", "accept-language", "date", "if-match", "if-modified-since", "if-none-match", "if-unmodified-since", "range", "x-amz-content-sha256", "x-amz-date", "x-amz-server-side-encryption-customer-algorithm", "x-amz-server-side-encryption-customer-key", "x-amz-server-side-encryption-customer-key-md5", } // Headers we receive from S3 and send back to requesting clients. var ALLOWED_RESPONSE_HEADERS = [...]string{ "accept-ranges", "content-encoding", "content-length", "content-range", "content-type", "last-modified", } // Fetch transparently forwards HTTP GET and HEAD requests to S3 services. // // Host header values are indexed to buckets may specify unique path, index // file, and directory listing behavior on a host-by-host basis. func (pk *Pipkin) fetch(w http.ResponseWriter, r *http.Request) { // We only support GET and HEAD methods; anything that can make S3 udpates // (DELETE, PUT...) is undesired here. if r.Method != "GET" && r.Method != "HEAD" { http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) return } // Host header is only useful when we're fronted by a proxy, otherwise we // should use r.Host. In most cases, setting Proxied to false will produce // the desired behavior. var hostStr string if pk.config.Proxy { hostStr = r.URL.Host } else { hostStr = strings.Split(r.Host, ":")[0] } host, ok := pk.config.Hosts[hostStr] if !ok { pk.log.Errorf("Host %s not found in configuration\n", hostStr) http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) return } // If an Index is configured and our key ends in "/", update our path to // include it, regardless of whether it exists in S3; takes precedence over // AutoIndex, and its absence results in a 404 response. key := r.URL.Path var err error if strings.HasSuffix(key, "/") && host.Index != "" { key = path.Join(key, host.Index) } // Pre-sign! We roll our own HMAC-SHA256 signature routines to avoid relying // on Amazon's (large) SDK; our memory footprint should be low. presigned, err := pk.presignQuery(r.Method, host, key) if err != nil { pk.log.Errorf("Failed to presign: %s %s %s %s: %s\n", r.Method, hostStr, host.Bucket, key, err.Error()) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } req, err := http.NewRequest(r.Method, presigned, nil) if err != nil { pk.log.Errorf("Failed to create new request: %s %s %s\n", r.Method, presigned, err.Error()) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } // Filter spurious headers; untrusted clients should not be messing with S3. for name, headers := range r.Header { for _, v := range headers { for _, a := range ALLOWED_REQUEST_HEADERS { if strings.ToLower(name) == a { req.Header.Set(name, v) } } } } // Perform outbound request to S3; status codes 200-399 are considered // valid. resp, err := pk.client.Do(req) if err != nil { pk.log.Errorf("Failed to perform request: %s %s %s\n", req.Method, req.URL.String(), err.Error()) http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) return } defer resp.Body.Close() statusOK := resp.StatusCode >= 200 && resp.StatusCode < 400 pk.log.Infof("%s %d %s %s %s\n", r.Method, resp.StatusCode, hostStr, host.Bucket, key) if !statusOK { http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) return } // If our key ends in "/" and AutoIndex is set to true, try to decode any // received XML as a directory index and return that instead, unless an // Index is configured, which takes precedence over AutoIndex and is handled // above. // // The downstream webserver should be configured to uniquely map directory // paths to indexes if such behavior is desired; index discovery logic and // state maintenance increases complexity, when we really want to be a // a minimal request shim with few behavioral assumptions. // // NGINX, for example, can map paths directly to index files, so we can use // directory index logic *and* point to index files for paths declared in // advance, if desired // // location = / { // proxy_pass http://127.0.0.1:8085/index.html; // .... // } if strings.HasSuffix(key, "/") { if host.AutoIndex { if mime, ok := resp.Header["Content-Type"]; ok && mime[0] == "application/xml" { reader := bufio.NewReader(resp.Body) decoder := xml.NewDecoder(reader) var index Index index.Path = r.URL.Path if err := decoder.Decode(&index); err != nil { pk.log.Errorf("Failed to decode XML response: %s: %s\n", req.URL.String(), err.Error()) http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) return } // We receive the same (empty) XML response from S3 when // requesting non-existent directories and empty directories, so // we'll return 404 for both cases. if index.KeyCount == 0 { pk.log.Infof("No keys received: %s %s %s %s\n", r.Method, hostStr, host.Bucket, key) http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) return } // FIXME: There is probably a more efficient way to prepare // paths for rendering, here. relativeDir := path.Join(host.Path, key) if strings.HasSuffix(key, "/") { relativeDir += "/" } for i := range index.CommonPrefixes { prefixTrim := strings.TrimPrefix(index.CommonPrefixes[i].Prefix, strings.TrimPrefix(relativeDir, "/"), ) index.CommonPrefixes[i].Prefix = prefixTrim } for i := range index.Contents { prefixTrim := strings.TrimPrefix(index.Contents[i].Key, strings.TrimPrefix(relativeDir, "/"), ) index.Contents[i].Key = prefixTrim } t, err := template.New("index.html").Funcs(template.FuncMap{ "prettyByteSize": prettyByteSize, "formatLastModified": formatLastModified, }).ParseFiles(path.Join(pk.templates, "index.html")) if err != nil { pk.log.Errorf("Failed to render template: %s %s %s: %s\n", hostStr, host.Bucket, key, err.Error()) http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) } if err := t.Execute(w, index); err != nil { pk.log.Errorf("Failed to render template: %s %s %s: %s\n", hostStr, host.Bucket, key, err.Error()) http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) } return } } else { http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) return } } // Filter any S3-specific response headers; potentially sensitive // information exposure via IDs, tags... for name, headers := range resp.Header { for _, v := range headers { for _, a := range ALLOWED_RESPONSE_HEADERS { if strings.ToLower(name) == a { w.Header().Set(name, v) } } } } // Stream response buffer directly to client; memory-efficient over // io.ReadAll(). w.WriteHeader(resp.StatusCode) if _, err = io.Copy(w, resp.Body); err != nil { if errors.Is(err, syscall.ECONNRESET) { pk.log.Infof("Connection reset: %s %s %s: %s\n", hostStr, host.Bucket, key, err.Error()) } else if errors.Is(err, syscall.EPIPE) { pk.log.Infof("Broken pipe: %s %s %s: %s\n", hostStr, host.Bucket, key, err.Error()) } else { pk.log.Errorf("Error writing response body: %s %s %s: %s\n", hostStr, host.Bucket, key, err.Error()) } } } // presignQuery returns the request's signed URL containing query parameters // used to authenticate against S3 services (Amazon S3, Backblaze B2 ...). func (pk *Pipkin) presignQuery(method string, host Host, key string) (string, error) { amzBucket := pk.config.Buckets[host.Bucket] amzDateShort := time.Now().UTC().Format("20060102") amzDateLong := time.Now().UTC().Format("20060102T150405Z") // ISO 8601 amzAlgorithm := "AWS4-HMAC-SHA256" amzService := "s3" // ////aws4_request amzCred := fmt.Sprintf("%s/%s/%s/%s/aws4_request", amzBucket.KeyID, amzDateShort, amzBucket.Region, amzService) var amzKey, amzPrefix string amzKey = path.Join(host.Path, key) if strings.HasSuffix(key, "/") { // path.Join() strips trailing slash, but we need one for prefix if !strings.HasSuffix(amzKey, "/") { amzKey += "/" } // Request for non-root directory if len(amzKey) > 1 { amzPrefix = strings.TrimPrefix(amzKey, "/") } amzKey = "/" } // Seconds for which the generated presigned URL is valid; for example, // 86400 (24 hours). The minimum value we can set is 1, and the maximum is // 604800 (seven days). amzExpires := "86400" // The headers that we used to calculate the signature. The following // headers are required in the signature calculations: // - The HTTP host header. // - Any x-amz-* headers that we plan to add to the request. amzSignedHeaders := "host" // Populate URI key/value pairs; used in canonical request signature and in // the final signed URL we use to make our request to S3, with the // X-Amz-Signature header below added once the signature is calculated. uri := url.Values{} uri.Add("X-Amz-Algorithm", amzAlgorithm) uri.Add("X-Amz-Credential", amzCred) uri.Add("X-Amz-Date", amzDateLong) uri.Add("X-Amz-Expires", amzExpires) uri.Add("X-Amz-SignedHeaders", amzSignedHeaders) if strings.HasSuffix(amzKey, "/") { uri.Add("list-type", "2") uri.Add("max-keys", "50000") uri.Add("prefix", amzPrefix) uri.Add("delimiter", "/") } // GET // /test.txt // $encoded_canonicalURI // host:examplebucket.s3.amazonaws.com // // host // UNSIGNED-PAYLOAD canonicalRequest := fmt.Sprintf("%s\n%s\n%s\nhost:%s\n\n%s\n%s", method, strings.Replace(url.PathEscape(amzKey), "%2F", "/", -1), uri.Encode(), amzBucket.Host, amzSignedHeaders, "UNSIGNED-PAYLOAD") // //s3/aws4_request amzScope := fmt.Sprintf("%s/%s/%s/aws4_request", amzDateShort, amzBucket.Region, amzService) canonicalRequestSHA256 := sha256.Sum256([]byte(canonicalRequest)) canonicalRequestSHA256Hex := hex.EncodeToString(canonicalRequestSHA256[:]) // AWS4-HMAC-SHA256 // 20130524T000000Z // 20130524/us-east-1/s3/aws4_request // 3bfa292879f6447bbcda7001decf97f4a54dc650c8942174ae0a9121cf58ad04 strToSign := fmt.Sprintf("%s\n%s\n%s\n%s", amzAlgorithm, amzDateLong, amzScope, canonicalRequestSHA256Hex) dateKey := hmac.New(sha256.New, []byte("AWS4"+amzBucket.AppKey)) dateKey.Write([]byte(amzDateShort)) dateRegionKey := hmac.New(sha256.New, dateKey.Sum(nil)) dateRegionKey.Write([]byte(amzBucket.Region)) dateRegionServiceKey := hmac.New(sha256.New, dateRegionKey.Sum(nil)) dateRegionServiceKey.Write([]byte(amzService)) signingKey := hmac.New(sha256.New, dateRegionServiceKey.Sum(nil)) signingKey.Write([]byte("aws4_request")) amzSignatureSha256 := hmac.New(sha256.New, signingKey.Sum(nil)) amzSignatureSha256.Write([]byte(strToSign)) amzSignatureSha256Hex := hex.EncodeToString(amzSignatureSha256.Sum(nil)) signedURL, err := url.Parse(amzBucket.Endpoint) if err != nil { return "", err } uri.Add("X-Amz-Signature", amzSignatureSha256Hex) signedURL.Path = amzKey signedURL.RawQuery = uri.Encode() return signedURL.String(), nil } func prettyByteSize(b string) string { bf, _ := strconv.ParseFloat(b, 0) for _, unit := range []string{"", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"} { if math.Abs(bf) < 1024.0 { return fmt.Sprintf("%3.1f%sB", bf, unit) } bf /= 1024.0 } return fmt.Sprintf("%.1fYiB", bf) } func formatLastModified(value string) string { t, _ := time.Parse("2006-01-02T15:04:05.000Z", value) return t.Format(time.DateTime) } func main() { // We should re-use socket connections between requests. client := &http.Client{} pk := &Pipkin{client: client} var confPath string flag.StringVar(&confPath, "conf", "pipkin.json", "path to JSON config") flag.StringVar(&pk.templates, "templates", "./templates", "path to template directory") flag.Parse() confFile, err := os.ReadFile(confPath) if err != nil { log.Fatal(err) } err = json.Unmarshal([]byte(confFile), &pk.config) if err != nil { log.Fatal(err) } pk.log = NewLogger(pk.config.LogLevel) http.HandleFunc("/", pk.fetch) http.ListenAndServe(pk.config.Host+":"+pk.config.Port, nil) }