ref: ae5c818674e2c9ca950ca7a9bf93f1283e7411b7
parent: 1df19d6ebe4dccf7de3bcf16cf6037d169832ce3
author: Runxi Yu <me@runxiyu.org>
date: Sun Mar 8 08:03:26 EDT 2026
receivepack, format/pack/ingest: Two-stage ingestion
--- a/cmd/index-pack/main.go
+++ b/cmd/index-pack/main.go
@@ -85,12 +85,28 @@
defer func() { _ = destinationRoot.Close() }()- result, err := ingest.Ingest(os.Stdin, destinationRoot, algo, ingest.Options{+ pending, err := ingest.Ingest(os.Stdin, algo, ingest.Options{FixThin: fixThin,
WriteRev: writeRev,
Base: base,
RequireTrailingEOF: true,
})
+ if err != nil {+ return err
+ }
+
+ if pending.Header().ObjectCount == 0 {+ discarded, err := pending.Discard()
+ if err != nil {+ return err
+ }
+
+ _, _ = fmt.Fprintf(os.Stdout, "pack\t%s\n", discarded.PackHash.String())
+
+ return nil
+ }
+
+ result, err := pending.Continue(destinationRoot)
if err != nil {return err
}
--- a/format/pack/ingest/api.go
+++ b/format/pack/ingest/api.go
@@ -1,6 +1,9 @@
package ingest
import (
+ "bufio"
+ "bytes"
+ "errors"
"io"
"os"
@@ -48,23 +51,138 @@
ThinFixed bool
}
-// Ingest ingests one pack stream from src into destination.
-//
-// Ingest performs streaming pack read/write/verification, delta resolution,
-// optional thin fixup, then writes .idx and optionally .rev.
-//
-// destination ownership and lifecycle are managed by the caller.
-// Ingest does not perform quarantine promotion/migration.
+// HeaderInfo describes the parsed PACK header.
+type HeaderInfo struct {+ Version uint32
+ ObjectCount uint32
+}
+
+// DiscardResult describes one successful Discard call.
+type DiscardResult struct {+ PackHash objectid.ObjectID
+ ObjectCount uint32
+}
+
+// Pending is one started ingest operation awaiting Continue or Discard.
+type Pending struct {+ reader *bufio.Reader
+ algo objectid.Algorithm
+ opts Options
+ header HeaderInfo
+ headerRaw [packHeaderSize]byte
+
+ finalized bool
+}
+
+// Ingest reads and validates one PACK header, returning one pending operation.
func Ingest(
src io.Reader,
- destination *os.Root,
algo objectid.Algorithm,
opts Options,
-) (Result, error) {- state, err := newIngestState(src, destination, algo, opts)
+) (*Pending, error) {+ if algo.Size() == 0 {+ return nil, objectid.ErrInvalidAlgorithm
+ }
+
+ reader := bufio.NewReader(src)
+
+ header, headerRaw, err := readAndValidatePackHeader(reader)
if err != nil {+ return nil, err
+ }
+
+ return &Pending{+ reader: reader,
+ algo: algo,
+ opts: opts,
+ header: header,
+ headerRaw: headerRaw,
+ }, nil
+}
+
+// Header returns parsed PACK header info.
+func (pending *Pending) Header() HeaderInfo {+ return pending.header
+}
+
+// Continue ingests the pack stream into destination and writes pack artifacts.
+func (pending *Pending) Continue(destination *os.Root) (Result, error) {+ if pending.finalized {+ return Result{}, ErrAlreadyFinalized+ }
+
+ pending.finalized = true
+
+ if pending.header.ObjectCount == 0 {+ return Result{}, ErrZeroObjectContinue+ }
+
+ state, err := newIngestState(
+ pending.reader,
+ destination,
+ pending.algo,
+ pending.opts,
+ pending.header,
+ pending.headerRaw,
+ )
+ if err != nil { return Result{}, err}
return ingest(state)
+}
+
+// Discard consumes and verifies one zero-object pack stream without writing files.
+func (pending *Pending) Discard() (DiscardResult, error) {+ if pending.finalized {+ return DiscardResult{}, ErrAlreadyFinalized+ }
+
+ pending.finalized = true
+
+ if pending.header.ObjectCount != 0 {+ return DiscardResult{}, ErrNonZeroDiscard+ }
+
+ hashImpl, err := pending.algo.New()
+ if err != nil {+ return DiscardResult{}, err+ }
+
+ _, _ = hashImpl.Write(pending.headerRaw[:])
+
+ trailer := make([]byte, pending.algo.Size())
+
+ _, err = io.ReadFull(pending.reader, trailer)
+ if err != nil {+ return DiscardResult{}, &PackTrailerMismatchError{}+ }
+
+ computed := hashImpl.Sum(nil)
+ if !bytes.Equal(computed, trailer) {+ return DiscardResult{}, &PackTrailerMismatchError{}+ }
+
+ if pending.opts.RequireTrailingEOF {+ var probe [1]byte
+
+ n, err := pending.reader.Read(probe[:])
+ if n > 0 || err == nil {+ return DiscardResult{}, errors.New("format/pack/ingest: pack has trailing garbage")+ }
+
+ if err != io.EOF {+ return DiscardResult{}, err+ }
+ }
+
+ packHash, err := objectid.FromBytes(pending.algo, trailer)
+ if err != nil {+ return DiscardResult{}, err+ }
+
+ return DiscardResult{+ PackHash: packHash,
+ ObjectCount: 0,
+ }, nil
}
--- a/format/pack/ingest/errors.go
+++ b/format/pack/ingest/errors.go
@@ -66,3 +66,12 @@
}
var errExternalThinBase = errors.New("format/pack/ingest: external thin base required")+
+var (
+ // ErrAlreadyFinalized indicates Continue/Discard already called.
+ ErrAlreadyFinalized = errors.New("format/pack/ingest: operation already finalized")+ // ErrZeroObjectContinue indicates Continue was called for a zero-object pack.
+ ErrZeroObjectContinue = errors.New("format/pack/ingest: cannot continue zero-object pack")+ // ErrNonZeroDiscard indicates Discard was called for a non-zero-object pack.
+ ErrNonZeroDiscard = errors.New("format/pack/ingest: cannot discard non-zero pack")+)
--- a/format/pack/ingest/header.go
+++ b/format/pack/ingest/header.go
@@ -3,32 +3,47 @@
import (
"encoding/binary"
"fmt"
+ "io"
"codeberg.org/lindenii/furgit/format/pack"
)
-// readAndValidatePackHeader reads and validates PACK header from the stream.
-func readAndValidatePackHeader(state *ingestState) error {- var hdr [12]byte
+const packHeaderSize = 12
- err := state.stream.readFull(hdr[:])
+// readAndValidatePackHeader reads one PACK header from src and validates it.
+func readAndValidatePackHeader(src io.Reader) (HeaderInfo, [packHeaderSize]byte, error) {+ var hdr [packHeaderSize]byte
+
+ _, err := io.ReadFull(src, hdr[:])
if err != nil {- return &InvalidPackHeaderError{Reason: fmt.Sprintf("read header: %v", err)}+ return HeaderInfo{}, [packHeaderSize]byte{}, &InvalidPackHeaderError{+ Reason: fmt.Sprintf("read header: %v", err),+ }
}
+ header, err := parseAndValidatePackHeader(hdr)
+ if err != nil {+ return HeaderInfo{}, [packHeaderSize]byte{}, err+ }
+
+ return header, hdr, nil
+}
+
+// parseAndValidatePackHeader validates one already-read PACK header.
+func parseAndValidatePackHeader(hdr [packHeaderSize]byte) (HeaderInfo, error) { if binary.BigEndian.Uint32(hdr[:4]) != pack.Signature {- return &InvalidPackHeaderError{Reason: "signature mismatch"}+ return HeaderInfo{}, &InvalidPackHeaderError{Reason: "signature mismatch"}}
version := binary.BigEndian.Uint32(hdr[4:8])
if !pack.VersionSupported(version) {- return &InvalidPackHeaderError{Reason: fmt.Sprintf("unsupported version %d", version)}+ return HeaderInfo{}, &InvalidPackHeaderError{+ Reason: fmt.Sprintf("unsupported version %d", version),+ }
}
- state.objectCountHeader = binary.BigEndian.Uint32(hdr[8:12])
- if state.objectCountHeader == 0 {- return &InvalidPackHeaderError{Reason: "zero objects"}- }
-
- return nil
+ return HeaderInfo{+ Version: version,
+ ObjectCount: binary.BigEndian.Uint32(hdr[8:12]),
+ }, nil
}
--- a/format/pack/ingest/ingest_test.go
+++ b/format/pack/ingest/ingest_test.go
@@ -2,7 +2,9 @@
import (
"bytes"
+ "encoding/binary"
"errors"
+ "io"
"io/fs"
"os"
"path/filepath"
@@ -26,6 +28,20 @@
return r.reader.Read(p)
}
+func beginAndContinue(
+ src io.Reader,
+ packRoot *os.Root,
+ algo objectid.Algorithm,
+ opts ingest.Options,
+) (ingest.Result, error) {+ pending, err := ingest.Ingest(src, algo, opts)
+ if err != nil {+ return ingest.Result{}, err+ }
+
+ return pending.Continue(packRoot)
+}
+
// fixturePath returns one fixture file path for the selected algorithm.
func fixturePath(t *testing.T, algo objectid.Algorithm, name string) string {t.Helper()
@@ -173,7 +189,7 @@
packRoot := receiver.OpenPackRoot(t)
- result, err := ingest.Ingest(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{+ result, err := beginAndContinue(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{WriteRev: true,
RequireTrailingEOF: true,
})
@@ -221,7 +237,7 @@
receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})packRoot := receiver.OpenPackRoot(t)
- _, err := ingest.Ingest(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{+ _, err := beginAndContinue(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{WriteRev: true,
RequireTrailingEOF: true,
})
@@ -257,7 +273,7 @@
packRoot := receiver.OpenPackRoot(t)
- _, err := ingest.Ingest(bytes.NewReader(basePack), packRoot, algo, ingest.Options{+ _, err := beginAndContinue(bytes.NewReader(basePack), packRoot, algo, ingest.Options{RequireTrailingEOF: true,
})
if err != nil {@@ -266,7 +282,7 @@
receiverRepo := receiver.OpenRepository(t)
- result, err := ingest.Ingest(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{+ result, err := beginAndContinue(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{FixThin: true,
WriteRev: true,
Base: receiverRepo.Objects(),
@@ -301,7 +317,7 @@
receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})packRoot := receiver.OpenPackRoot(t)
- _, err := ingest.Ingest(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{+ _, err := beginAndContinue(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{WriteRev: true,
RequireTrailingEOF: true,
})
@@ -326,6 +342,74 @@
})
}
+func zeroObjectPackBytes(t *testing.T, algo objectid.Algorithm) []byte {+ t.Helper()
+
+ hashImpl, err := algo.New()
+ if err != nil {+ t.Fatalf("algo.New: %v", err)+ }
+
+ var header [12]byte
+ copy(header[:4], []byte{'P', 'A', 'C', 'K'})+ binary.BigEndian.PutUint32(header[4:8], 2)
+ binary.BigEndian.PutUint32(header[8:12], 0)
+
+ _, _ = hashImpl.Write(header[:])
+
+ return append(header[:], hashImpl.Sum(nil)...)
+}
+
+func TestIngestDiscardZeroObjectPack(t *testing.T) {+ t.Parallel()
+
+ testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper+ packBytes := zeroObjectPackBytes(t, algo)
+
+ pending, err := ingest.Ingest(bytes.NewReader(packBytes), algo, ingest.Options{+ RequireTrailingEOF: true,
+ })
+ if err != nil {+ t.Fatalf("Ingest: %v", err)+ }
+
+ if pending.Header().ObjectCount != 0 {+ t.Fatalf("ObjectCount = %d, want 0", pending.Header().ObjectCount)+ }
+
+ discarded, err := pending.Discard()
+ if err != nil {+ t.Fatalf("Discard: %v", err)+ }
+
+ if discarded.ObjectCount != 0 {+ t.Fatalf("Discard.ObjectCount = %d, want 0", discarded.ObjectCount)+ }
+ })
+}
+
+func TestIngestContinueRejectsZeroObjectPack(t *testing.T) {+ t.Parallel()
+
+ testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper+ packBytes := zeroObjectPackBytes(t, algo)
+ receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})+ packRoot := receiver.OpenPackRoot(t)
+
+ pending, err := ingest.Ingest(bytes.NewReader(packBytes), algo, ingest.Options{+ RequireTrailingEOF: true,
+ })
+ if err != nil {+ t.Fatalf("Ingest: %v", err)+ }
+
+ _, err = pending.Continue(packRoot)
+ if !errors.Is(err, ingest.ErrZeroObjectContinue) {+ t.Fatalf("Continue error = %v, want ErrZeroObjectContinue", err)+ }
+ })
+}
+
func TestIngestCanFinishWithoutTrailingEOF(t *testing.T) {t.Parallel()
@@ -336,7 +420,7 @@
receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})packRoot := receiver.OpenPackRoot(t)
- result, err := ingest.Ingest(&noExtraReadReader{reader: bytes.NewReader(packBytes)}, packRoot, algo, ingest.Options{+ result, err := beginAndContinue(&noExtraReadReader{reader: bytes.NewReader(packBytes)}, packRoot, algo, ingest.Options{WriteRev: true,
})
if err != nil {--- a/format/pack/ingest/scan.go
+++ b/format/pack/ingest/scan.go
@@ -23,7 +23,7 @@
utils.WriteProgressf(state.opts.Progress, "validating pack header...\r")
- err = readAndValidatePackHeader(state)
+ err = seedStreamWithPackHeader(state)
if err != nil {return err
}
@@ -74,4 +74,31 @@
state.packHash = packHash
return state.stream.flush()
+}
+
+// seedStreamWithPackHeader writes the already-validated PACK header to output,
+// seeds the running pack hash, and advances stream offset accounting.
+func seedStreamWithPackHeader(state *ingestState) error {+ written := 0
+ for written < len(state.packHeaderRaw) {+ n, err := state.packFile.Write(state.packHeaderRaw[written:])
+ if err != nil {+ return &DestinationWriteError{Op: fmt.Sprintf("write pack header: %v", err)}+ }
+
+ if n == 0 {+ return &DestinationWriteError{Op: "write pack header: short write"}+ }
+
+ written += n
+ }
+
+ _, err := state.stream.hash.Write(state.packHeaderRaw[:])
+ if err != nil {+ return err
+ }
+
+ state.stream.consumed = packHeaderSize
+
+ return nil
}
--- a/format/pack/ingest/state.go
+++ b/format/pack/ingest/state.go
@@ -18,6 +18,8 @@
algo objectid.Algorithm
opts Options
+ packHeaderRaw [packHeaderSize]byte
+
packFile *os.File
packTmpName string
idxFile *os.File
@@ -47,6 +49,8 @@
destination *os.Root,
algo objectid.Algorithm,
opts Options,
+ header HeaderInfo,
+ headerRaw [packHeaderSize]byte,
) (*ingestState, error) { if algo.Size() == 0 {return nil, objectid.ErrInvalidAlgorithm
@@ -53,12 +57,14 @@
}
return &ingestState{- src: src,
- destination: destination,
- algo: algo,
- opts: opts,
- offsetToRecord: make(map[uint64]int),
- objectToRecord: make(map[objectid.ObjectID]int),
- baseCache: newDeltaBaseCache(defaultDeltaBaseCacheMaxBytes),
+ src: src,
+ destination: destination,
+ algo: algo,
+ opts: opts,
+ packHeaderRaw: headerRaw,
+ objectCountHeader: header.ObjectCount,
+ offsetToRecord: make(map[uint64]int),
+ objectToRecord: make(map[objectid.ObjectID]int),
+ baseCache: newDeltaBaseCache(defaultDeltaBaseCacheMaxBytes),
}, nil
}
--- a/receivepack/int_test.go
+++ b/receivepack/int_test.go
@@ -797,6 +797,61 @@
})
}
+func TestReceivePackGitPushRefUpdateWithoutNewObjectsSucceeds(t *testing.T) {+ t.Parallel()
+
+ testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper+ t.Parallel()
+
+ sender := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo})+ blobID, treeID := sender.MakeSingleFileTree(t, "base.txt", []byte("base\n"))+ commitID := sender.CommitTree(t, treeID, "base")
+ sender.UpdateRef(t, "refs/heads/main", commitID)
+
+ receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})+ receiver.HashObject(t, "blob", sender.RunBytes(t, "cat-file", "blob", blobID.String()))
+ receiver.HashObject(t, "tree", sender.RunBytes(t, "cat-file", "tree", treeID.String()))
+ receiver.HashObject(t, "commit", sender.RunBytes(t, "cat-file", "commit", commitID.String()))
+ receiver.UpdateRef(t, "refs/heads/main", commitID)
+
+ repo := receiver.OpenRepository(t)
+ objectsRoot := receiver.OpenObjectsRoot(t)
+
+ stdout, stderr, clientErr, serverErr := runGitPushFD(
+ t,
+ sender,
+ receivepack.Options{+ Algorithm: algo,
+ Refs: repo.Refs(),
+ ExistingObjects: repo.Objects(),
+ ObjectsRoot: objectsRoot,
+ },
+ "push", "--porcelain", "fd::3,4/test", "refs/heads/main:refs/heads/topic",
+ )
+ if clientErr != nil {+ t.Fatalf("git push failed: %v\nstdout=%s\nstderr=%s", clientErr, stdout, stderr)+ }
+
+ if serverErr != nil {+ t.Fatalf("ReceivePack: %v", serverErr)+ }
+
+ resolved, err := receiver.OpenRepository(t).Refs().ResolveFully("refs/heads/topic")+ if err != nil {+ t.Fatalf("ResolveFully(topic): %v", err)+ }
+
+ if resolved.ID != commitID {+ t.Fatalf("refs/heads/topic = %s, want %s", resolved.ID, commitID)+ }
+
+ packs := receiver.Run(t, "count-objects", "-v")
+ if !strings.Contains(packs, "packs: 0") {+ t.Fatalf("count-objects output shows unexpected promoted pack: %q", packs)+ }
+ })
+}
+
func TestReceivePackGitPushAtomicDelete(t *testing.T) {t.Parallel()
--- a/receivepack/service/execute.go
+++ b/receivepack/service/execute.go
@@ -77,7 +77,7 @@
return result, nil
}
- if req.PackExpected {+ if req.PackExpected && quarantineRoot != nil {// Git migrates quarantined objects into permanent storage immediately
// before starting ref updates.
utils.WriteProgressf(service.opts.Progress, "promoting quarantine...\r")
--- a/receivepack/service/ingest_quarantine.go
+++ b/receivepack/service/ingest_quarantine.go
@@ -34,6 +34,51 @@
return "", nil, false
}
+ pending, err := ingest.Ingest(
+ req.Pack,
+ service.opts.Algorithm,
+ ingest.Options{+ FixThin: true,
+ WriteRev: true,
+ Base: service.opts.ExistingObjects,
+ Progress: service.opts.Progress,
+ },
+ )
+ if err != nil {+ utils.WriteProgressf(service.opts.Progress, "unpack failed: %v\n", err)
+
+ result.UnpackError = err.Error()
+ fillCommandErrors(result, commands, err.Error())
+
+ return "", nil, false
+ }
+
+ if pending.Header().ObjectCount == 0 {+ discarded, err := pending.Discard()
+ if err != nil {+ utils.WriteProgressf(service.opts.Progress, "unpack failed: %v\n", err)
+
+ result.UnpackError = err.Error()
+ fillCommandErrors(result, commands, err.Error())
+
+ return "", nil, false
+ }
+
+ result.Ingest = &ingest.Result{+ PackHash: discarded.PackHash,
+ ObjectCount: discarded.ObjectCount,
+ }
+
+ utils.WriteProgressf(
+ service.opts.Progress,
+ "unpacking: done (%d objects, %s).\n",
+ discarded.ObjectCount,
+ discarded.PackHash,
+ )
+
+ return "", nil, true
+ }
+
utils.WriteProgressf(service.opts.Progress, "creating quarantine...\r")
quarantineName, quarantineRoot, err := service.createQuarantineRoot()
@@ -62,17 +107,7 @@
utils.WriteProgressf(service.opts.Progress, "creating quarantine: done.\n")
utils.WriteProgressf(service.opts.Progress, "unpacking...\r")
- ingested, err := ingest.Ingest(
- req.Pack,
- quarantinePackRoot,
- service.opts.Algorithm,
- ingest.Options{- FixThin: true,
- WriteRev: true,
- Base: service.opts.ExistingObjects,
- Progress: service.opts.Progress,
- },
- )
+ ingested, err := pending.Continue(quarantinePackRoot)
_ = quarantinePackRoot.Close()
--
⑨