shithub: furgit

Download patch

ref: ae5c818674e2c9ca950ca7a9bf93f1283e7411b7
parent: 1df19d6ebe4dccf7de3bcf16cf6037d169832ce3
author: Runxi Yu <me@runxiyu.org>
date: Sun Mar 8 08:03:26 EDT 2026

receivepack, format/pack/ingest: Two-stage ingestion

--- a/cmd/index-pack/main.go
+++ b/cmd/index-pack/main.go
@@ -85,12 +85,28 @@
 
 	defer func() { _ = destinationRoot.Close() }()
 
-	result, err := ingest.Ingest(os.Stdin, destinationRoot, algo, ingest.Options{
+	pending, err := ingest.Ingest(os.Stdin, algo, ingest.Options{
 		FixThin:            fixThin,
 		WriteRev:           writeRev,
 		Base:               base,
 		RequireTrailingEOF: true,
 	})
+	if err != nil {
+		return err
+	}
+
+	if pending.Header().ObjectCount == 0 {
+		discarded, err := pending.Discard()
+		if err != nil {
+			return err
+		}
+
+		_, _ = fmt.Fprintf(os.Stdout, "pack\t%s\n", discarded.PackHash.String())
+
+		return nil
+	}
+
+	result, err := pending.Continue(destinationRoot)
 	if err != nil {
 		return err
 	}
--- a/format/pack/ingest/api.go
+++ b/format/pack/ingest/api.go
@@ -1,6 +1,9 @@
 package ingest
 
 import (
+	"bufio"
+	"bytes"
+	"errors"
 	"io"
 	"os"
 
@@ -48,23 +51,138 @@
 	ThinFixed bool
 }
 
-// Ingest ingests one pack stream from src into destination.
-//
-// Ingest performs streaming pack read/write/verification, delta resolution,
-// optional thin fixup, then writes .idx and optionally .rev.
-//
-// destination ownership and lifecycle are managed by the caller.
-// Ingest does not perform quarantine promotion/migration.
+// HeaderInfo describes the parsed PACK header.
+type HeaderInfo struct {
+	Version     uint32
+	ObjectCount uint32
+}
+
+// DiscardResult describes one successful Discard call.
+type DiscardResult struct {
+	PackHash    objectid.ObjectID
+	ObjectCount uint32
+}
+
+// Pending is one started ingest operation awaiting Continue or Discard.
+type Pending struct {
+	reader    *bufio.Reader
+	algo      objectid.Algorithm
+	opts      Options
+	header    HeaderInfo
+	headerRaw [packHeaderSize]byte
+
+	finalized bool
+}
+
+// Ingest reads and validates one PACK header, returning one pending operation.
 func Ingest(
 	src io.Reader,
-	destination *os.Root,
 	algo objectid.Algorithm,
 	opts Options,
-) (Result, error) {
-	state, err := newIngestState(src, destination, algo, opts)
+) (*Pending, error) {
+	if algo.Size() == 0 {
+		return nil, objectid.ErrInvalidAlgorithm
+	}
+
+	reader := bufio.NewReader(src)
+
+	header, headerRaw, err := readAndValidatePackHeader(reader)
 	if err != nil {
+		return nil, err
+	}
+
+	return &Pending{
+		reader:    reader,
+		algo:      algo,
+		opts:      opts,
+		header:    header,
+		headerRaw: headerRaw,
+	}, nil
+}
+
+// Header returns parsed PACK header info.
+func (pending *Pending) Header() HeaderInfo {
+	return pending.header
+}
+
+// Continue ingests the pack stream into destination and writes pack artifacts.
+func (pending *Pending) Continue(destination *os.Root) (Result, error) {
+	if pending.finalized {
+		return Result{}, ErrAlreadyFinalized
+	}
+
+	pending.finalized = true
+
+	if pending.header.ObjectCount == 0 {
+		return Result{}, ErrZeroObjectContinue
+	}
+
+	state, err := newIngestState(
+		pending.reader,
+		destination,
+		pending.algo,
+		pending.opts,
+		pending.header,
+		pending.headerRaw,
+	)
+	if err != nil {
 		return Result{}, err
 	}
 
 	return ingest(state)
+}
+
+// Discard consumes and verifies one zero-object pack stream without writing files.
+func (pending *Pending) Discard() (DiscardResult, error) {
+	if pending.finalized {
+		return DiscardResult{}, ErrAlreadyFinalized
+	}
+
+	pending.finalized = true
+
+	if pending.header.ObjectCount != 0 {
+		return DiscardResult{}, ErrNonZeroDiscard
+	}
+
+	hashImpl, err := pending.algo.New()
+	if err != nil {
+		return DiscardResult{}, err
+	}
+
+	_, _ = hashImpl.Write(pending.headerRaw[:])
+
+	trailer := make([]byte, pending.algo.Size())
+
+	_, err = io.ReadFull(pending.reader, trailer)
+	if err != nil {
+		return DiscardResult{}, &PackTrailerMismatchError{}
+	}
+
+	computed := hashImpl.Sum(nil)
+	if !bytes.Equal(computed, trailer) {
+		return DiscardResult{}, &PackTrailerMismatchError{}
+	}
+
+	if pending.opts.RequireTrailingEOF {
+		var probe [1]byte
+
+		n, err := pending.reader.Read(probe[:])
+		if n > 0 || err == nil {
+			return DiscardResult{}, errors.New("format/pack/ingest: pack has trailing garbage")
+		}
+
+		if err != io.EOF {
+			return DiscardResult{}, err
+		}
+	}
+
+	packHash, err := objectid.FromBytes(pending.algo, trailer)
+	if err != nil {
+		return DiscardResult{}, err
+	}
+
+	return DiscardResult{
+		PackHash:    packHash,
+		ObjectCount: 0,
+	}, nil
 }
--- a/format/pack/ingest/errors.go
+++ b/format/pack/ingest/errors.go
@@ -66,3 +66,12 @@
 }
 
 var errExternalThinBase = errors.New("format/pack/ingest: external thin base required")
+
+var (
+	// ErrAlreadyFinalized indicates Continue/Discard already called.
+	ErrAlreadyFinalized = errors.New("format/pack/ingest: operation already finalized")
+	// ErrZeroObjectContinue indicates Continue was called for a zero-object pack.
+	ErrZeroObjectContinue = errors.New("format/pack/ingest: cannot continue zero-object pack")
+	// ErrNonZeroDiscard indicates Discard was called for a non-zero-object pack.
+	ErrNonZeroDiscard = errors.New("format/pack/ingest: cannot discard non-zero pack")
+)
--- a/format/pack/ingest/header.go
+++ b/format/pack/ingest/header.go
@@ -3,32 +3,47 @@
 import (
 	"encoding/binary"
 	"fmt"
+	"io"
 
 	"codeberg.org/lindenii/furgit/format/pack"
 )
 
-// readAndValidatePackHeader reads and validates PACK header from the stream.
-func readAndValidatePackHeader(state *ingestState) error {
-	var hdr [12]byte
+const packHeaderSize = 12
 
-	err := state.stream.readFull(hdr[:])
+// readAndValidatePackHeader reads one PACK header from src and validates it.
+func readAndValidatePackHeader(src io.Reader) (HeaderInfo, [packHeaderSize]byte, error) {
+	var hdr [packHeaderSize]byte
+
+	_, err := io.ReadFull(src, hdr[:])
 	if err != nil {
-		return &InvalidPackHeaderError{Reason: fmt.Sprintf("read header: %v", err)}
+		return HeaderInfo{}, [packHeaderSize]byte{}, &InvalidPackHeaderError{
+			Reason: fmt.Sprintf("read header: %v", err),
+		}
 	}
 
+	header, err := parseAndValidatePackHeader(hdr)
+	if err != nil {
+		return HeaderInfo{}, [packHeaderSize]byte{}, err
+	}
+
+	return header, hdr, nil
+}
+
+// parseAndValidatePackHeader validates one already-read PACK header.
+func parseAndValidatePackHeader(hdr [packHeaderSize]byte) (HeaderInfo, error) {
 	if binary.BigEndian.Uint32(hdr[:4]) != pack.Signature {
-		return &InvalidPackHeaderError{Reason: "signature mismatch"}
+		return HeaderInfo{}, &InvalidPackHeaderError{Reason: "signature mismatch"}
 	}
 
 	version := binary.BigEndian.Uint32(hdr[4:8])
 	if !pack.VersionSupported(version) {
-		return &InvalidPackHeaderError{Reason: fmt.Sprintf("unsupported version %d", version)}
+		return HeaderInfo{}, &InvalidPackHeaderError{
+			Reason: fmt.Sprintf("unsupported version %d", version),
+		}
 	}
 
-	state.objectCountHeader = binary.BigEndian.Uint32(hdr[8:12])
-	if state.objectCountHeader == 0 {
-		return &InvalidPackHeaderError{Reason: "zero objects"}
-	}
-
-	return nil
+	return HeaderInfo{
+		Version:     version,
+		ObjectCount: binary.BigEndian.Uint32(hdr[8:12]),
+	}, nil
 }
--- a/format/pack/ingest/ingest_test.go
+++ b/format/pack/ingest/ingest_test.go
@@ -2,7 +2,9 @@
 
 import (
 	"bytes"
+	"encoding/binary"
 	"errors"
+	"io"
 	"io/fs"
 	"os"
 	"path/filepath"
@@ -26,6 +28,20 @@
 	return r.reader.Read(p)
 }
 
+func beginAndContinue(
+	src io.Reader,
+	packRoot *os.Root,
+	algo objectid.Algorithm,
+	opts ingest.Options,
+) (ingest.Result, error) {
+	pending, err := ingest.Ingest(src, algo, opts)
+	if err != nil {
+		return ingest.Result{}, err
+	}
+
+	return pending.Continue(packRoot)
+}
+
 // fixturePath returns one fixture file path for the selected algorithm.
 func fixturePath(t *testing.T, algo objectid.Algorithm, name string) string {
 	t.Helper()
@@ -173,7 +189,7 @@
 
 		packRoot := receiver.OpenPackRoot(t)
 
-		result, err := ingest.Ingest(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{
+		result, err := beginAndContinue(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{
 			WriteRev:           true,
 			RequireTrailingEOF: true,
 		})
@@ -221,7 +237,7 @@
 		receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})
 		packRoot := receiver.OpenPackRoot(t)
 
-		_, err := ingest.Ingest(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{
+		_, err := beginAndContinue(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{
 			WriteRev:           true,
 			RequireTrailingEOF: true,
 		})
@@ -257,7 +273,7 @@
 
 		packRoot := receiver.OpenPackRoot(t)
 
-		_, err := ingest.Ingest(bytes.NewReader(basePack), packRoot, algo, ingest.Options{
+		_, err := beginAndContinue(bytes.NewReader(basePack), packRoot, algo, ingest.Options{
 			RequireTrailingEOF: true,
 		})
 		if err != nil {
@@ -266,7 +282,7 @@
 
 		receiverRepo := receiver.OpenRepository(t)
 
-		result, err := ingest.Ingest(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{
+		result, err := beginAndContinue(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{
 			FixThin:            true,
 			WriteRev:           true,
 			Base:               receiverRepo.Objects(),
@@ -301,7 +317,7 @@
 		receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})
 		packRoot := receiver.OpenPackRoot(t)
 
-		_, err := ingest.Ingest(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{
+		_, err := beginAndContinue(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{
 			WriteRev:           true,
 			RequireTrailingEOF: true,
 		})
@@ -326,6 +342,74 @@
 	})
 }
 
+func zeroObjectPackBytes(t *testing.T, algo objectid.Algorithm) []byte {
+	t.Helper()
+
+	hashImpl, err := algo.New()
+	if err != nil {
+		t.Fatalf("algo.New: %v", err)
+	}
+
+	var header [12]byte
+	copy(header[:4], []byte{'P', 'A', 'C', 'K'})
+	binary.BigEndian.PutUint32(header[4:8], 2)
+	binary.BigEndian.PutUint32(header[8:12], 0)
+
+	_, _ = hashImpl.Write(header[:])
+
+	return append(header[:], hashImpl.Sum(nil)...)
+}
+
+func TestIngestDiscardZeroObjectPack(t *testing.T) {
+	t.Parallel()
+
+	testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper
+		packBytes := zeroObjectPackBytes(t, algo)
+
+		pending, err := ingest.Ingest(bytes.NewReader(packBytes), algo, ingest.Options{
+			RequireTrailingEOF: true,
+		})
+		if err != nil {
+			t.Fatalf("Ingest: %v", err)
+		}
+
+		if pending.Header().ObjectCount != 0 {
+			t.Fatalf("ObjectCount = %d, want 0", pending.Header().ObjectCount)
+		}
+
+		discarded, err := pending.Discard()
+		if err != nil {
+			t.Fatalf("Discard: %v", err)
+		}
+
+		if discarded.ObjectCount != 0 {
+			t.Fatalf("Discard.ObjectCount = %d, want 0", discarded.ObjectCount)
+		}
+	})
+}
+
+func TestIngestContinueRejectsZeroObjectPack(t *testing.T) {
+	t.Parallel()
+
+	testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper
+		packBytes := zeroObjectPackBytes(t, algo)
+		receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})
+		packRoot := receiver.OpenPackRoot(t)
+
+		pending, err := ingest.Ingest(bytes.NewReader(packBytes), algo, ingest.Options{
+			RequireTrailingEOF: true,
+		})
+		if err != nil {
+			t.Fatalf("Ingest: %v", err)
+		}
+
+		_, err = pending.Continue(packRoot)
+		if !errors.Is(err, ingest.ErrZeroObjectContinue) {
+			t.Fatalf("Continue error = %v, want ErrZeroObjectContinue", err)
+		}
+	})
+}
+
 func TestIngestCanFinishWithoutTrailingEOF(t *testing.T) {
 	t.Parallel()
 
@@ -336,7 +420,7 @@
 		receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})
 		packRoot := receiver.OpenPackRoot(t)
 
-		result, err := ingest.Ingest(&noExtraReadReader{reader: bytes.NewReader(packBytes)}, packRoot, algo, ingest.Options{
+		result, err := beginAndContinue(&noExtraReadReader{reader: bytes.NewReader(packBytes)}, packRoot, algo, ingest.Options{
 			WriteRev: true,
 		})
 		if err != nil {
--- a/format/pack/ingest/scan.go
+++ b/format/pack/ingest/scan.go
@@ -23,7 +23,7 @@
 
 	utils.WriteProgressf(state.opts.Progress, "validating pack header...\r")
 
-	err = readAndValidatePackHeader(state)
+	err = seedStreamWithPackHeader(state)
 	if err != nil {
 		return err
 	}
@@ -74,4 +74,31 @@
 	state.packHash = packHash
 
 	return state.stream.flush()
+}
+
+// seedStreamWithPackHeader writes the already-validated PACK header to output,
+// seeds the running pack hash, and advances stream offset accounting.
+func seedStreamWithPackHeader(state *ingestState) error {
+	written := 0
+	for written < len(state.packHeaderRaw) {
+		n, err := state.packFile.Write(state.packHeaderRaw[written:])
+		if err != nil {
+			return &DestinationWriteError{Op: fmt.Sprintf("write pack header: %v", err)}
+		}
+
+		if n == 0 {
+			return &DestinationWriteError{Op: "write pack header: short write"}
+		}
+
+		written += n
+	}
+
+	_, err := state.stream.hash.Write(state.packHeaderRaw[:])
+	if err != nil {
+		return err
+	}
+
+	state.stream.consumed = packHeaderSize
+
+	return nil
 }
--- a/format/pack/ingest/state.go
+++ b/format/pack/ingest/state.go
@@ -18,6 +18,8 @@
 	algo        objectid.Algorithm
 	opts        Options
 
+	packHeaderRaw [packHeaderSize]byte
+
 	packFile    *os.File
 	packTmpName string
 	idxFile     *os.File
@@ -47,6 +49,8 @@
 	destination *os.Root,
 	algo objectid.Algorithm,
 	opts Options,
+	header HeaderInfo,
+	headerRaw [packHeaderSize]byte,
 ) (*ingestState, error) {
 	if algo.Size() == 0 {
 		return nil, objectid.ErrInvalidAlgorithm
@@ -53,12 +57,14 @@
 	}
 
 	return &ingestState{
-		src:            src,
-		destination:    destination,
-		algo:           algo,
-		opts:           opts,
-		offsetToRecord: make(map[uint64]int),
-		objectToRecord: make(map[objectid.ObjectID]int),
-		baseCache:      newDeltaBaseCache(defaultDeltaBaseCacheMaxBytes),
+		src:               src,
+		destination:       destination,
+		algo:              algo,
+		opts:              opts,
+		packHeaderRaw:     headerRaw,
+		objectCountHeader: header.ObjectCount,
+		offsetToRecord:    make(map[uint64]int),
+		objectToRecord:    make(map[objectid.ObjectID]int),
+		baseCache:         newDeltaBaseCache(defaultDeltaBaseCacheMaxBytes),
 	}, nil
 }
--- a/receivepack/int_test.go
+++ b/receivepack/int_test.go
@@ -797,6 +797,61 @@
 	})
 }
 
+func TestReceivePackGitPushRefUpdateWithoutNewObjectsSucceeds(t *testing.T) {
+	t.Parallel()
+
+	testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper
+		t.Parallel()
+
+		sender := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo})
+		blobID, treeID := sender.MakeSingleFileTree(t, "base.txt", []byte("base\n"))
+		commitID := sender.CommitTree(t, treeID, "base")
+		sender.UpdateRef(t, "refs/heads/main", commitID)
+
+		receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})
+		receiver.HashObject(t, "blob", sender.RunBytes(t, "cat-file", "blob", blobID.String()))
+		receiver.HashObject(t, "tree", sender.RunBytes(t, "cat-file", "tree", treeID.String()))
+		receiver.HashObject(t, "commit", sender.RunBytes(t, "cat-file", "commit", commitID.String()))
+		receiver.UpdateRef(t, "refs/heads/main", commitID)
+
+		repo := receiver.OpenRepository(t)
+		objectsRoot := receiver.OpenObjectsRoot(t)
+
+		stdout, stderr, clientErr, serverErr := runGitPushFD(
+			t,
+			sender,
+			receivepack.Options{
+				Algorithm:       algo,
+				Refs:            repo.Refs(),
+				ExistingObjects: repo.Objects(),
+				ObjectsRoot:     objectsRoot,
+			},
+			"push", "--porcelain", "fd::3,4/test", "refs/heads/main:refs/heads/topic",
+		)
+		if clientErr != nil {
+			t.Fatalf("git push failed: %v\nstdout=%s\nstderr=%s", clientErr, stdout, stderr)
+		}
+
+		if serverErr != nil {
+			t.Fatalf("ReceivePack: %v", serverErr)
+		}
+
+		resolved, err := receiver.OpenRepository(t).Refs().ResolveFully("refs/heads/topic")
+		if err != nil {
+			t.Fatalf("ResolveFully(topic): %v", err)
+		}
+
+		if resolved.ID != commitID {
+			t.Fatalf("refs/heads/topic = %s, want %s", resolved.ID, commitID)
+		}
+
+		packs := receiver.Run(t, "count-objects", "-v")
+		if !strings.Contains(packs, "packs: 0") {
+			t.Fatalf("count-objects output shows unexpected promoted pack: %q", packs)
+		}
+	})
+}
+
 func TestReceivePackGitPushAtomicDelete(t *testing.T) {
 	t.Parallel()
 
--- a/receivepack/service/execute.go
+++ b/receivepack/service/execute.go
@@ -77,7 +77,7 @@
 		return result, nil
 	}
 
-	if req.PackExpected {
+	if req.PackExpected && quarantineRoot != nil {
 		// Git migrates quarantined objects into permanent storage immediately
 		// before starting ref updates.
 		utils.WriteProgressf(service.opts.Progress, "promoting quarantine...\r")
--- a/receivepack/service/ingest_quarantine.go
+++ b/receivepack/service/ingest_quarantine.go
@@ -34,6 +34,51 @@
 		return "", nil, false
 	}
 
+	pending, err := ingest.Ingest(
+		req.Pack,
+		service.opts.Algorithm,
+		ingest.Options{
+			FixThin:  true,
+			WriteRev: true,
+			Base:     service.opts.ExistingObjects,
+			Progress: service.opts.Progress,
+		},
+	)
+	if err != nil {
+		utils.WriteProgressf(service.opts.Progress, "unpack failed: %v\n", err)
+
+		result.UnpackError = err.Error()
+		fillCommandErrors(result, commands, err.Error())
+
+		return "", nil, false
+	}
+
+	if pending.Header().ObjectCount == 0 {
+		discarded, err := pending.Discard()
+		if err != nil {
+			utils.WriteProgressf(service.opts.Progress, "unpack failed: %v\n", err)
+
+			result.UnpackError = err.Error()
+			fillCommandErrors(result, commands, err.Error())
+
+			return "", nil, false
+		}
+
+		result.Ingest = &ingest.Result{
+			PackHash:    discarded.PackHash,
+			ObjectCount: discarded.ObjectCount,
+		}
+
+		utils.WriteProgressf(
+			service.opts.Progress,
+			"unpacking: done (%d objects, %s).\n",
+			discarded.ObjectCount,
+			discarded.PackHash,
+		)
+
+		return "", nil, true
+	}
+
 	utils.WriteProgressf(service.opts.Progress, "creating quarantine...\r")
 
 	quarantineName, quarantineRoot, err := service.createQuarantineRoot()
@@ -62,17 +107,7 @@
 	utils.WriteProgressf(service.opts.Progress, "creating quarantine: done.\n")
 	utils.WriteProgressf(service.opts.Progress, "unpacking...\r")
 
-	ingested, err := ingest.Ingest(
-		req.Pack,
-		quarantinePackRoot,
-		service.opts.Algorithm,
-		ingest.Options{
-			FixThin:  true,
-			WriteRev: true,
-			Base:     service.opts.ExistingObjects,
-			Progress: service.opts.Progress,
-		},
-	)
+	ingested, err := pending.Continue(quarantinePackRoot)
 
 	_ = quarantinePackRoot.Close()
 
--