ref: a4eeb727468a178a4de0dfc718828f26740484ac
parent: 11560391d1c134e9b56152f2e4bea3ba7d9932f2
author: Runxi Yu <runxiyu@umich.edu>
date: Mon Mar 30 10:28:13 EDT 2026
object,store/packed{,/internal/ingest}: Move from format/packfile/ingest
--- a/format/packfile/ingest/api.go
+++ /dev/null
@@ -1,196 +1,0 @@
-package ingest
-
-import (
- "bufio"
- "bytes"
- "errors"
- "io"
- "os"
-
- "codeberg.org/lindenii/furgit/common/iowrap"
- objectid "codeberg.org/lindenii/furgit/object/id"
- objectstore "codeberg.org/lindenii/furgit/object/store"
-)
-
-// Options controls one pack ingest operation.
-type Options struct {- // FixThin appends missing local bases for thin packs.
- FixThin bool
- // WriteRev writes a .rev alongside the .pack and .idx.
- WriteRev bool
- // Base supplies existing objects for thin-pack fixup.
- Base objectstore.Reader
- // Progress receives human-readable progress messages.
- //
- // When nil, no progress output is emitted.
- Progress iowrap.WriteFlusher
- // RequireTrailingEOF requires the source to hit EOF after the pack trailer.
- //
- // This is suitable for exact pack-file readers, but should be disabled for
- // full-duplex transport streams like receive-pack where the peer keeps the
- // connection open to read the server response.
- RequireTrailingEOF bool
-}
-
-// Result describes one successful ingest transaction.
-type Result struct {- // PackName is the destination-relative filename of the written .pack.
- PackName string
- // IdxName is the destination-relative filename of the written .idx.
- IdxName string
- // RevName is the destination-relative filename of the written .rev.
- //
- // RevName is empty when writeRev is false.
- RevName string
- // PackHash is the final pack hash (same hash embedded in .idx/.rev trailers).
- PackHash objectid.ObjectID
- // ObjectCount is the final object count in the resulting pack.
- //
- // If thin fixup appends objects, this includes appended base objects.
- ObjectCount uint32
- // ThinFixed reports whether thin fixup appended local bases.
- ThinFixed bool
-}
-
-// HeaderInfo describes the parsed PACK header.
-type HeaderInfo struct {- Version uint32
- ObjectCount uint32
-}
-
-// DiscardResult describes one successful Discard call.
-type DiscardResult struct {- PackHash objectid.ObjectID
- ObjectCount uint32
-}
-
-// Pending is one started ingest operation awaiting Continue or Discard.
-//
-// Exactly one of Continue or Discard may be called.
-//
-// Labels: MT-Unsafe.
-type Pending struct {- reader *bufio.Reader
- algo objectid.Algorithm
- opts Options
- header HeaderInfo
- headerRaw [packHeaderSize]byte
-
- finalized bool
-}
-
-// Ingest reads and validates one PACK header, returning one pending operation.
-//
-// Labels: Deps-Borrowed, Life-Parent.
-func Ingest(
- src io.Reader,
- algo objectid.Algorithm,
- opts Options,
-) (*Pending, error) {- if algo.Size() == 0 {- return nil, objectid.ErrInvalidAlgorithm
- }
-
- reader := bufio.NewReader(src)
-
- header, headerRaw, err := readAndValidatePackHeader(reader)
- if err != nil {- return nil, err
- }
-
- return &Pending{- reader: reader,
- algo: algo,
- opts: opts,
- header: header,
- headerRaw: headerRaw,
- }, nil
-}
-
-// Header returns parsed PACK header info.
-func (pending *Pending) Header() HeaderInfo {- return pending.header
-}
-
-// Continue ingests the pack stream into destination and writes pack artifacts.
-//
-// Continue invalidates the receiver.
-//
-// Artifacts are published under content-addressed final names derived from the
-// resulting pack hash. If those final names already exist, Continue treats that
-// as success and removes its temporary files.
-func (pending *Pending) Continue(destination *os.Root) (Result, error) {- pending.finalized = true
-
- if pending.header.ObjectCount == 0 {- return Result{}, ErrZeroObjectContinue- }
-
- state, err := newIngestState(
- pending.reader,
- destination,
- pending.algo,
- pending.opts,
- pending.header,
- pending.headerRaw,
- )
- if err != nil {- return Result{}, err- }
-
- return ingest(state)
-}
-
-// Discard consumes and verifies one zero-object pack stream without writing
-// files.
-//
-// Discard invalidates the receiver.
-func (pending *Pending) Discard() (DiscardResult, error) {- pending.finalized = true
-
- if pending.header.ObjectCount != 0 {- return DiscardResult{}, ErrNonZeroDiscard- }
-
- hashImpl, err := pending.algo.New()
- if err != nil {- return DiscardResult{}, err- }
-
- _, _ = hashImpl.Write(pending.headerRaw[:])
-
- trailer := make([]byte, pending.algo.Size())
-
- _, err = io.ReadFull(pending.reader, trailer)
- if err != nil {- return DiscardResult{}, &PackTrailerMismatchError{}- }
-
- computed := hashImpl.Sum(nil)
- if !bytes.Equal(computed, trailer) {- return DiscardResult{}, &PackTrailerMismatchError{}- }
-
- if pending.opts.RequireTrailingEOF {- var probe [1]byte
-
- n, err := pending.reader.Read(probe[:])
- if n > 0 || err == nil {- return DiscardResult{}, errors.New("packfile/ingest: pack has trailing garbage")- }
-
- if err != io.EOF {- return DiscardResult{}, err- }
- }
-
- packHash, err := objectid.FromBytes(pending.algo, trailer)
- if err != nil {- return DiscardResult{}, err- }
-
- return DiscardResult{- PackHash: packHash,
- ObjectCount: 0,
- }, nil
-}
--- a/format/packfile/ingest/byteslice_reader.go
+++ /dev/null
@@ -1,21 +1,0 @@
-package ingest
-
-import "io"
-
-// byteSliceReader implements io.ByteReader on []byte.
-type byteSliceReader struct {- data []byte
- pos int
-}
-
-// ReadByte reads one byte from receiver.
-func (reader *byteSliceReader) ReadByte() (byte, error) {- if reader.pos >= len(reader.data) {- return 0, io.EOF
- }
-
- b := reader.data[reader.pos]
- reader.pos++
-
- return b, nil
-}
--- a/format/packfile/ingest/cache.go
+++ /dev/null
@@ -1,53 +1,0 @@
-package ingest
-
-import (
- "codeberg.org/lindenii/furgit/internal/lru"
- objecttype "codeberg.org/lindenii/furgit/object/type"
-)
-
-// deltaBaseCacheKey identifies one resolved base by record index.
-type deltaBaseCacheKey struct {- recordIdx int
-}
-
-// deltaBaseCacheValue stores one resolved base object payload.
-type deltaBaseCacheValue struct {- realType objecttype.Type
- content []byte
-}
-
-// deltaBaseCache is a bounded LRU for resolved base payloads.
-type deltaBaseCache struct {- lru *lru.Cache[deltaBaseCacheKey, deltaBaseCacheValue]
-}
-
-// newDeltaBaseCache creates one bounded base cache.
-func newDeltaBaseCache(maxBytes int64) *deltaBaseCache {- return &deltaBaseCache{- lru: lru.New(
- maxBytes,
- func(_ deltaBaseCacheKey, value deltaBaseCacheValue) int64 {- return int64(len(value.content))
- },
- nil,
- ),
- }
-}
-
-// get returns one cache entry for recordIdx.
-func (cache *deltaBaseCache) get(recordIdx int) (objecttype.Type, []byte, bool) {- value, ok := cache.lru.Get(deltaBaseCacheKey{recordIdx: recordIdx})- if !ok {- return objecttype.TypeInvalid, nil, false
- }
-
- return value.realType, value.content, true
-}
-
-// add stores one cache entry for recordIdx.
-func (cache *deltaBaseCache) add(recordIdx int, realType objecttype.Type, content []byte) {- cache.lru.Add(deltaBaseCacheKey{recordIdx: recordIdx}, deltaBaseCacheValue{- realType: realType,
- content: content,
- })
-}
--- a/format/packfile/ingest/counting_writer.go
+++ /dev/null
@@ -1,17 +1,0 @@
-package ingest
-
-import "io"
-
-// countingWriter counts bytes written to dst.
-type countingWriter struct {- dst io.Writer
- n int
-}
-
-// Write writes src to dst and tracks output byte count.
-func (writer *countingWriter) Write(src []byte) (int, error) {- n, err := writer.dst.Write(src)
- writer.n += n
-
- return n, err
-}
--- a/format/packfile/ingest/crc.go
+++ /dev/null
@@ -1,22 +1,0 @@
-package ingest
-
-import "fmt"
-
-// beginEntryCRC starts inline CRC accumulation for one packed entry.
-func (scanner *streamScanner) beginEntryCRC() {- scanner.entryCRC = 0
- scanner.inEntryCRC = true
-}
-
-// endEntryCRC finishes inline CRC accumulation for one packed entry.
-func (scanner *streamScanner) endEntryCRC() (uint32, error) {- if !scanner.inEntryCRC {- return 0, fmt.Errorf("packfile/ingest: entry CRC not started")- }
-
- crc := scanner.entryCRC
- scanner.entryCRC = 0
- scanner.inEntryCRC = false
-
- return crc, nil
-}
--- a/format/packfile/ingest/delta_header.go
+++ /dev/null
@@ -1,11 +1,0 @@
-package ingest
-
-import deltaapply "codeberg.org/lindenii/furgit/format/packfile/delta/apply"
-
-// finalizeStreamPackHash consumes trailer bytes and verifies stream integrity.
-// readDeltaHeaderSizes reads source and destination sizes from one delta payload.
-func readDeltaHeaderSizes(payload []byte) (int, int, error) {- reader := &byteSliceReader{data: payload}-
- return deltaapply.ReadHeaderSizes(reader)
-}
--- a/format/packfile/ingest/distance.go
+++ /dev/null
@@ -1,30 +1,0 @@
-package ingest
-
-import (
- "fmt"
- "io"
-)
-
-// readOfsDistanceFromStream reads one ofs-delta encoded distance.
-func readOfsDistanceFromStream(reader io.ByteReader) (uint64, int, error) {- first, err := reader.ReadByte()
- if err != nil {- return 0, 0, fmt.Errorf("read ofs distance first byte: %w", err)- }
-
- dist := uint64(first & 0x7f)
- consumed := 1
-
- b := first
- for b&0x80 != 0 {- b, err = reader.ReadByte()
- if err != nil {- return 0, 0, fmt.Errorf("read ofs distance continuation: %w", err)- }
-
- consumed++
- dist = ((dist + 1) << 7) + uint64(b&0x7f)
- }
-
- return dist, consumed, nil
-}
--- a/format/packfile/ingest/doc.go
+++ /dev/null
@@ -1,3 +1,0 @@
-// Package ingest implements streaming ingestion of one Git pack stream into a
-// destination root, producing .pack/.idx and optionally .rev.
-package ingest
--- a/format/packfile/ingest/drain.go
+++ /dev/null
@@ -1,67 +1,0 @@
-package ingest
-
-import (
- "fmt"
- "io"
-
- "codeberg.org/lindenii/furgit/internal/compress/zlib"
- objectheader "codeberg.org/lindenii/furgit/object/header"
- objectid "codeberg.org/lindenii/furgit/object/id"
- objecttype "codeberg.org/lindenii/furgit/object/type"
-)
-
-// drainEntryPayload inflates one entry payload from stream and returns
-// (inflatedLength, oidForBaseEntry).
-func drainEntryPayload(state *ingestState, record objectRecord) (int64, objectid.ObjectID, error) {- var zero objectid.ObjectID
-
- reader, err := zlib.NewReader(state.stream)
- if err != nil {- return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("open zlib stream: %v", err)}- }
-
- defer func() { _ = reader.Close() }()-
- var total int64
-
- if record.packedType.IsBaseObject() {- header, ok := objectheader.Encode(record.packedType, record.declaredSize)
- if !ok {- return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: "encode object header"}- }
-
- hashImpl, err := state.algo.New()
- if err != nil {- return 0, zero, err
- }
-
- _, _ = hashImpl.Write(header)
-
- n, err := io.Copy(hashImpl, reader)
- if err != nil {- return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("inflate base object: %v", err)}- }
-
- total = n
-
- oid, err := objectid.FromBytes(state.algo, hashImpl.Sum(nil))
- if err != nil {- return 0, zero, err
- }
-
- return total, oid, nil
- }
-
- if record.packedType == objecttype.TypeOfsDelta || record.packedType == objecttype.TypeRefDelta {- n, err := io.Copy(io.Discard, reader)
- if err != nil {- return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("inflate delta payload: %v", err)}- }
-
- total = n
-
- return total, zero, nil
- }
-
- return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: "unsupported payload type"}-}
--- a/format/packfile/ingest/entry.go
+++ /dev/null
@@ -1,91 +1,0 @@
-package ingest
-
-import (
- "fmt"
-
- objecttype "codeberg.org/lindenii/furgit/object/type"
-)
-
-// scanOneEntry scans one pack entry from stream and appends one record.
-func scanOneEntry(state *ingestState, startOffset uint64) (uint64, error) {- state.stream.beginEntryCRC()
-
- record, err := parseEntryPrefix(state, startOffset)
- if err != nil {- return 0, err
- }
-
- payloadStartConsumed := state.stream.consumed
-
- contentLen, oid, err := drainEntryPayload(state, record)
- if err != nil {- return 0, err
- }
-
- consumedInput := state.stream.consumed - payloadStartConsumed
-
- if contentLen != record.declaredSize {- return 0, &MalformedPackEntryError{- Offset: startOffset,
- Reason: fmt.Sprintf("inflated size mismatch got %d want %d", contentLen, record.declaredSize),- }
- }
-
- endOffset := startOffset + uint64(record.headerLen) + consumedInput
- if endOffset > state.stream.consumed {- return 0, &MalformedPackEntryError{- Offset: startOffset,
- Reason: fmt.Sprintf("entry end offset overflow got %d > stream %d", endOffset, state.stream.consumed),- }
- }
-
- record.packedLen = endOffset - startOffset
-
- record.dataOffset = startOffset + uint64(record.headerLen)
- if record.packedLen < uint64(record.headerLen) {- return 0, &MalformedPackEntryError{Offset: startOffset, Reason: "negative payload span"}- }
-
- crc, err := state.stream.endEntryCRC()
- if err != nil {- return 0, err
- }
-
- record.crc32 = crc
-
- if record.packedType.IsBaseObject() {- record.objectID = oid
- record.realType = record.packedType
- record.resolved = true
- }
-
- recordIdx := len(state.records)
- state.records = append(state.records, record)
-
- state.offsetToRecord[record.offset] = recordIdx
- if record.resolved {- state.objectToRecord[record.objectID] = recordIdx
- }
-
- switch record.packedType {- case objecttype.TypeOfsDelta:
- state.ofsDeltas = append(state.ofsDeltas, ofsDeltaRef{- baseOffset: record.baseOffset,
- recordIdx: recordIdx,
- })
- case objecttype.TypeRefDelta:
- state.refDeltas = append(state.refDeltas, refDeltaRef{- baseObject: record.baseObject,
- recordIdx: recordIdx,
- })
- case objecttype.TypeInvalid,
- objecttype.TypeCommit,
- objecttype.TypeTree,
- objecttype.TypeBlob,
- objecttype.TypeTag,
- objecttype.TypeFuture:
- default:
- }
-
- return endOffset, nil
-}
--- a/format/packfile/ingest/entry_header.go
+++ /dev/null
@@ -1,33 +1,0 @@
-package ingest
-
-import (
- "codeberg.org/lindenii/furgit/internal/intconv"
- objecttype "codeberg.org/lindenii/furgit/object/type"
-)
-
-// encodePackEntryHeader encodes one non-delta packed entry header.
-func encodePackEntryHeader(ty objecttype.Type, size int64) []byte {- var out [16]byte
-
- n := 0
-
- s, err := intconv.Int64ToUint64(size)
- if err != nil {- panic(err)
- }
-
- c := (uint8(ty) << 4) | byte(s&0x0f)
-
- s >>= 4
- for s != 0 {- out[n] = c | 0x80
- n++
- c = byte(s & 0x7f)
- s >>= 7
- }
-
- out[n] = c
- n++
-
- return append([]byte(nil), out[:n]...)
-}
--- a/format/packfile/ingest/entry_prefix.go
+++ /dev/null
@@ -1,95 +1,0 @@
-package ingest
-
-import (
- "fmt"
-
- "codeberg.org/lindenii/furgit/internal/intconv"
- objectid "codeberg.org/lindenii/furgit/object/id"
- objecttype "codeberg.org/lindenii/furgit/object/type"
-)
-
-// parseEntryPrefix parses one entry prefix from stream.
-func parseEntryPrefix(state *ingestState, startOffset uint64) (objectRecord, error) {- var record objectRecord
-
- record.offset = startOffset
-
- first, err := state.stream.ReadByte()
- if err != nil {- return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("read first header byte: %v", err)}- }
-
- record.packedType = objecttype.Type((first >> 4) & 0x07)
- size := int64(first & 0x0f)
- headerLen := uint32(1)
- shift := uint(4)
- b := first
-
- for b&0x80 != 0 {- b, err = state.stream.ReadByte()
- if err != nil {- return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("read size continuation: %v", err)}- }
-
- headerLen++
- size |= int64(b&0x7f) << shift
- shift += 7
- }
-
- if size < 0 {- return record, &MalformedPackEntryError{Offset: startOffset, Reason: "negative declared size"}- }
-
- record.declaredSize = size
-
- switch record.packedType {- case objecttype.TypeCommit, objecttype.TypeTree, objecttype.TypeBlob, objecttype.TypeTag:
- case objecttype.TypeRefDelta:
- baseRaw := make([]byte, state.algo.Size())
-
- err := state.stream.readFull(baseRaw)
- if err != nil {- return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("read ref base: %v", err)}- }
-
- baseID, err := objectid.FromBytes(state.algo, baseRaw)
- if err != nil {- return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("parse ref base: %v", err)}- }
-
- record.baseObject = baseID
-
- baseRawLen, err := intconv.IntToUint32(len(baseRaw))
- if err != nil {- return record, err
- }
-
- headerLen += baseRawLen
- case objecttype.TypeOfsDelta:
- dist, consumed, err := readOfsDistanceFromStream(state.stream)
- if err != nil {- return record, &MalformedPackEntryError{Offset: startOffset, Reason: err.Error()}- }
-
- if startOffset <= dist {- return record, &MalformedPackEntryError{Offset: startOffset, Reason: "ofs base offset out of bounds"}- }
-
- record.baseOffset = startOffset - dist
-
- consumedUint32, err := intconv.IntToUint32(consumed)
- if err != nil {- return record, err
- }
-
- headerLen += consumedUint32
- case objecttype.TypeInvalid, objecttype.TypeFuture:
- return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)}- default:
- return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)}- }
-
- record.headerLen = headerLen
-
- return record, nil
-}
--- a/format/packfile/ingest/errors.go
+++ /dev/null
@@ -1,75 +1,0 @@
-package ingest
-
-import (
- "errors"
- "fmt"
-)
-
-// InvalidPackHeaderError reports an invalid or unsupported pack header.
-type InvalidPackHeaderError struct {- Reason string
-}
-
-// Error implements error.
-func (err *InvalidPackHeaderError) Error() string {- return "packfile/ingest: invalid pack header: " + err.Reason
-}
-
-// PackTrailerMismatchError reports a mismatch between computed and trailer pack hash.
-type PackTrailerMismatchError struct{}-
-// Error implements error.
-func (err *PackTrailerMismatchError) Error() string {- return "packfile/ingest: pack trailer hash mismatch"
-}
-
-// ThinPackUnresolvedError reports unresolved REF deltas when fixThin is disabled
-// or when required bases cannot be found in base.
-type ThinPackUnresolvedError struct {- Count int
-}
-
-// Error implements error.
-func (err *ThinPackUnresolvedError) Error() string {- return fmt.Sprintf("packfile/ingest: unresolved thin deltas: %d", err.Count)-}
-
-// MalformedPackEntryError reports malformed entry encoding at one pack offset.
-type MalformedPackEntryError struct {- Offset uint64
- Reason string
-}
-
-// Error implements error.
-func (err *MalformedPackEntryError) Error() string {- return fmt.Sprintf("packfile/ingest: malformed pack entry at offset %d: %s", err.Offset, err.Reason)-}
-
-// DeltaCycleError reports a detected cycle in delta dependency resolution.
-type DeltaCycleError struct {- Offset uint64
-}
-
-// Error implements error.
-func (err *DeltaCycleError) Error() string {- return fmt.Sprintf("packfile/ingest: delta cycle detected at offset %d", err.Offset)-}
-
-// DestinationWriteError reports destination I/O failures.
-type DestinationWriteError struct {- Op string
-}
-
-// Error implements error.
-func (err *DestinationWriteError) Error() string {- return "packfile/ingest: destination write failure: " + err.Op
-}
-
-var errExternalThinBase = errors.New("packfile/ingest: external thin base required")-
-var (
- // ErrZeroObjectContinue indicates Continue was called for a zero-object pack.
- ErrZeroObjectContinue = errors.New("packfile/ingest: cannot continue zero-object pack")- // ErrNonZeroDiscard indicates Discard was called for a non-zero-object pack.
- ErrNonZeroDiscard = errors.New("packfile/ingest: cannot discard non-zero pack")-)
--- a/format/packfile/ingest/file_section_writer.go
+++ /dev/null
@@ -1,22 +1,0 @@
-package ingest
-
-import "os"
-
-// fileSectionWriter writes sequentially to file via WriteAt at one base offset.
-type fileSectionWriter struct {- file *os.File
- off int64
- pos int64
-}
-
-// Write writes src at current section position.
-func (writer *fileSectionWriter) Write(src []byte) (int, error) {- if len(src) == 0 {- return 0, nil
- }
-
- n, err := writer.file.WriteAt(src, writer.off+writer.pos)
- writer.pos += int64(n)
-
- return n, err
-}
--- a/format/packfile/ingest/fill.go
+++ /dev/null
@@ -1,44 +1,0 @@
-package ingest
-
-import (
- "errors"
- "fmt"
- "io"
-)
-
-// fill ensures at least min unread bytes are available in receiver's buffer.
-func (scanner *streamScanner) fill(minLen int) error {- if minLen <= 0 {- return nil
- }
-
- if minLen > len(scanner.buf) {- return fmt.Errorf("packfile/ingest: fill(%d) exceeds scanner buffer", minLen)- }
-
- for scanner.n-scanner.off < minLen {- err := scanner.flushConsumedPrefix()
- if err != nil {- return err
- }
-
- readN, err := scanner.src.Read(scanner.buf[scanner.n:])
- if readN > 0 {- scanner.n += readN
- }
-
- if err != nil {- if errors.Is(err, io.EOF) && scanner.n-scanner.off >= minLen {- return nil
- }
-
- return err
- }
-
- if readN == 0 {- return io.ErrNoProgress
- }
- }
-
- return nil
-}
--- a/format/packfile/ingest/finalize.go
+++ /dev/null
@@ -1,94 +1,0 @@
-package ingest
-
-import (
- "errors"
- "fmt"
- "io/fs"
- "strings"
-
- "codeberg.org/lindenii/furgit/internal/intconv"
-)
-
-// finalizeArtifacts links temporary files to final names and returns Result.
-func finalizeArtifacts(state *ingestState) (Result, error) {- base := "pack-" + state.packHash.String()
- packFinal := base + ".pack"
- idxFinal := base + ".idx"
-
- revFinal := ""
- if state.opts.WriteRev {- revFinal = base + ".rev"
- }
-
- err := linkTempToFinal(state, state.packTmpName, packFinal)
- if err != nil {- return Result{}, err- }
-
- err = linkTempToFinal(state, state.idxTmpName, idxFinal)
- if err != nil {- return Result{}, err- }
-
- if state.opts.WriteRev {- err := linkTempToFinal(state, state.revTmpName, revFinal)
- if err != nil {- return Result{}, err- }
- }
-
- objectCount, err := intconv.IntToUint32(len(state.records))
- if err != nil {- return Result{}, err- }
-
- return Result{- PackName: packFinal,
- IdxName: idxFinal,
- RevName: revFinal,
- PackHash: state.packHash,
- ObjectCount: objectCount,
- ThinFixed: state.thinFixed,
- }, nil
-}
-
-// rollbackTemporaryArtifacts removes temporary files after failure.
-func rollbackTemporaryArtifacts(state *ingestState) {- if state.packTmpName != "" {- _ = state.destination.Remove(state.packTmpName)
- }
-
- if state.idxTmpName != "" {- _ = state.destination.Remove(state.idxTmpName)
- }
-
- if state.revTmpName != "" {- _ = state.destination.Remove(state.revTmpName)
- }
-}
-
-// linkTempToFinal hard-links tmp to final, tolerating existing final paths.
-func linkTempToFinal(state *ingestState, tmp, final string) error {- if tmp == "" || final == "" {- return fmt.Errorf("packfile/ingest: invalid finalize names tmp=%q final=%q", tmp, final)- }
-
- if strings.Contains(final, "/") {- return fmt.Errorf("packfile/ingest: final name must be leaf: %q", final)- }
-
- err := state.destination.Link(tmp, final)
- if err == nil {- _ = state.destination.Remove(tmp)
-
- return nil
- }
-
- if errors.Is(err, fs.ErrExist) {- _ = state.destination.Remove(tmp)
-
- return nil
- }
-
- return err
-}
--- a/format/packfile/ingest/flush.go
+++ /dev/null
@@ -1,37 +1,0 @@
-package ingest
-
-import "fmt"
-
-// flush writes all consumed-but-unflushed bytes to destination pack file.
-func (scanner *streamScanner) flush() error {- return scanner.flushConsumedPrefix()
-}
-
-// flushConsumedPrefix writes scanner.buf[:scanner.off] and compacts unread
-// bytes to the start of buffer.
-func (scanner *streamScanner) flushConsumedPrefix() error {- if scanner.off == 0 {- return nil
- }
-
- written := 0
- for written < scanner.off {- n, err := scanner.dstFile.Write(scanner.buf[written:scanner.off])
- if err != nil {- return &DestinationWriteError{Op: fmt.Sprintf("write pack: %v", err)}- }
-
- if n == 0 {- return &DestinationWriteError{Op: "write pack: short write"}- }
-
- written += n
- }
-
- unread := scanner.n - scanner.off
- copy(scanner.buf[:unread], scanner.buf[scanner.off:scanner.n])
- scanner.off = 0
- scanner.n = unread
-
- return nil
-}
--- a/format/packfile/ingest/hash.go
+++ /dev/null
@@ -1,27 +1,0 @@
-package ingest
-
-import (
- "fmt"
-
- objectheader "codeberg.org/lindenii/furgit/object/header"
- objectid "codeberg.org/lindenii/furgit/object/id"
- objecttype "codeberg.org/lindenii/furgit/object/type"
-)
-
-// hashCanonicalObject hashes canonical object bytes (header+content).
-func hashCanonicalObject(algo objectid.Algorithm, ty objecttype.Type, content []byte) (objectid.ObjectID, error) {- header, ok := objectheader.Encode(ty, int64(len(content)))
- if !ok {- return objectid.ObjectID{}, fmt.Errorf("packfile/ingest: encode object header for type %d", ty)- }
-
- hashImpl, err := algo.New()
- if err != nil {- return objectid.ObjectID{}, err- }
-
- _, _ = hashImpl.Write(header)
- _, _ = hashImpl.Write(content)
-
- return objectid.FromBytes(algo, hashImpl.Sum(nil))
-}
--- a/format/packfile/ingest/header.go
+++ /dev/null
@@ -1,49 +1,0 @@
-package ingest
-
-import (
- "encoding/binary"
- "fmt"
- "io"
-
- "codeberg.org/lindenii/furgit/format/packfile"
-)
-
-const packHeaderSize = 12
-
-// readAndValidatePackHeader reads one PACK header from src and validates it.
-func readAndValidatePackHeader(src io.Reader) (HeaderInfo, [packHeaderSize]byte, error) {- var hdr [packHeaderSize]byte
-
- _, err := io.ReadFull(src, hdr[:])
- if err != nil {- return HeaderInfo{}, [packHeaderSize]byte{}, &InvalidPackHeaderError{- Reason: fmt.Sprintf("read header: %v", err),- }
- }
-
- header, err := parseAndValidatePackHeader(hdr)
- if err != nil {- return HeaderInfo{}, [packHeaderSize]byte{}, err- }
-
- return header, hdr, nil
-}
-
-// parseAndValidatePackHeader validates one already-read PACK header.
-func parseAndValidatePackHeader(hdr [packHeaderSize]byte) (HeaderInfo, error) {- if binary.BigEndian.Uint32(hdr[:4]) != packfile.Signature {- return HeaderInfo{}, &InvalidPackHeaderError{Reason: "signature mismatch"}- }
-
- version := binary.BigEndian.Uint32(hdr[4:8])
- if !packfile.SupportedVersion(version) {- return HeaderInfo{}, &InvalidPackHeaderError{- Reason: fmt.Sprintf("unsupported version %d", version),- }
- }
-
- return HeaderInfo{- Version: version,
- ObjectCount: binary.BigEndian.Uint32(hdr[8:12]),
- }, nil
-}
--- a/format/packfile/ingest/idx_write.go
+++ /dev/null
@@ -1,262 +1,0 @@
-package ingest
-
-import (
- "bytes"
- "encoding/binary"
- "fmt"
- "hash"
- "io"
- "slices"
-
- "codeberg.org/lindenii/furgit/internal/intconv"
- "codeberg.org/lindenii/furgit/internal/progress"
-)
-
-const (
- idxMagicV2 = 0xff744f63
- idxVersionV2 = 2
-)
-
-// writeIdx writes idx v2 for resolved records.
-func writeIdx(state *ingestState) error {- order := buildIdxOrder(state)
-
- hashImpl, err := state.algo.New()
- if err != nil {- return err
- }
-
- write := func(src []byte) error {- _, writeErr := state.idxFile.Write(src)
- if writeErr != nil {- return writeErr
- }
-
- _, writeErr = hashImpl.Write(src)
- if writeErr != nil {- return writeErr
- }
-
- return nil
- }
-
- var (
- scratch [8]byte
- fanout [256]uint32
- )
-
- writeProgressf(state, "writing index fanout...\r")
-
- for _, recordIdx := range order {- idRaw := state.records[recordIdx].objectID.Bytes()
- fanout[idRaw[0]]++
- }
-
- binary.BigEndian.PutUint32(scratch[:4], idxMagicV2)
- binary.BigEndian.PutUint32(scratch[4:8], idxVersionV2)
-
- err = write(scratch[:8])
- if err != nil {- return err
- }
-
- var cumulative uint32
- for i := range fanout {- cumulative += fanout[i]
- binary.BigEndian.PutUint32(scratch[:4], cumulative)
-
- err := write(scratch[:4])
- if err != nil {- return err
- }
- }
-
- writeProgressf(state, "writing index fanout: done.\n")
-
- largeOffsetCount := 0
-
- for idx := range state.records {- if state.records[idx].offset >= 0x80000000 {- largeOffsetCount++
- }
- }
-
- oidMeter := progress.New(progress.Options{- Writer: state.opts.Progress,
- Title: "writing index object ids",
- Total: uint64(len(order)),
- })
-
- var oidDone uint64
-
- for _, recordIdx := range order {- idRaw := state.records[recordIdx].objectID.Bytes()
-
- err := write(idRaw)
- if err != nil {- return err
- }
-
- oidDone++
- oidMeter.Set(oidDone, 0)
- }
-
- if oidDone > 0 {- oidMeter.Stop("done")- }
-
- crcMeter := progress.New(progress.Options{- Writer: state.opts.Progress,
- Title: "writing index crc32",
- Total: uint64(len(order)),
- })
-
- var crcDone uint64
-
- for _, recordIdx := range order {- binary.BigEndian.PutUint32(scratch[:4], state.records[recordIdx].crc32)
-
- err := write(scratch[:4])
- if err != nil {- return err
- }
-
- crcDone++
- crcMeter.Set(crcDone, 0)
- }
-
- if crcDone > 0 {- crcMeter.Stop("done")- }
-
- largeOffsets := make([]uint64, 0)
- offsetMeter := progress.New(progress.Options{- Writer: state.opts.Progress,
- Title: "writing index offsets",
- Total: uint64(len(order)),
- })
-
- var offsetDone uint64
-
- for _, recordIdx := range order {- offset := state.records[recordIdx].offset
- if offset >= 0x80000000 {- largeOffsetIdx, err := intconv.IntToUint32(len(largeOffsets))
- if err != nil {- return err
- }
-
- word := 0x80000000 | largeOffsetIdx
-
- largeOffsets = append(largeOffsets, offset)
-
- binary.BigEndian.PutUint32(scratch[:4], word)
- } else {- binary.BigEndian.PutUint32(scratch[:4], uint32(offset))
- }
-
- err := write(scratch[:4])
- if err != nil {- return err
- }
-
- offsetDone++
- offsetMeter.Set(offsetDone, 0)
- }
-
- if offsetDone > 0 {- offsetMeter.Stop("done")- }
-
- total, err := intconv.IntToUint64(largeOffsetCount)
- if err != nil {- return err
- }
-
- largeOffsetMeter := progress.New(progress.Options{- Writer: state.opts.Progress,
- Title: "writing index large offsets",
- Total: total,
- })
-
- var largeOffsetDone uint64
-
- for _, off := range largeOffsets {- binary.BigEndian.PutUint64(scratch[:8], off)
-
- err := write(scratch[:8])
- if err != nil {- return err
- }
-
- largeOffsetDone++
- largeOffsetMeter.Set(largeOffsetDone, 0)
- }
-
- if largeOffsetDone > 0 {- largeOffsetMeter.Stop("done")- }
-
- writeProgressf(state, "writing index trailer...\r")
-
- err = write(state.packHash.Bytes())
- if err != nil {- return err
- }
-
- idxHash := hashImpl.Sum(nil)
-
- _, err = state.idxFile.Write(idxHash)
- if err != nil {- return err
- }
-
- err = state.idxFile.Sync()
- if err != nil {- return err
- }
-
- writeProgressf(state, "writing index trailer: done.\n")
-
- return nil
-}
-
-// buildIdxOrder returns record indexes sorted by ObjectID.
-func buildIdxOrder(state *ingestState) []int {- out := make([]int, 0, len(state.records))
- for idx := range state.records {- out = append(out, idx)
- }
-
- slices.SortFunc(out, func(a, b int) int {- return bytes.Compare(state.records[a].objectID.Bytes(), state.records[b].objectID.Bytes())
- })
-
- return out
-}
-
-// verifyResolvedRecords checks that all records are fully resolved before index writing.
-func verifyResolvedRecords(state *ingestState) error {- for idx, record := range state.records {- if !record.resolved {- return fmt.Errorf("packfile/ingest: unresolved record %d at offset %d", idx, record.offset)- }
- }
-
- return nil
-}
-
-// writeAndHash writes src to dst and updates hash.
-func writeAndHash(dst io.Writer, hashImpl hash.Hash, src []byte) error {- _, err := dst.Write(src)
- if err != nil {- return err
- }
-
- _, err = hashImpl.Write(src)
- if err != nil {- return err
- }
-
- return nil
-}
--- a/format/packfile/ingest/ingest.go
+++ /dev/null
@@ -1,68 +1,0 @@
-package ingest
-
-import (
- "fmt"
-)
-
-// ingest initializes transaction state and executes the ingest pipeline.
-func ingest(state *ingestState) (out Result, err error) {- err = openTemporaryArtifacts(state)
- if err != nil {- return Result{}, err- }
-
- defer func() {- _ = closeTemporaryArtifacts(state)
- if err != nil {- rollbackTemporaryArtifacts(state)
- }
- }()
-
- err = streamPackAndScan(state)
- if err != nil {- return Result{}, err- }
-
- err = resolveAll(state)
- if err != nil {- return Result{}, err- }
-
- err = maybeFixThin(state)
- if err != nil {- return Result{}, err- }
-
- if state.thinFixed {- err = resolveAll(state)
- if err != nil {- return Result{}, err- }
- }
-
- if len(state.unresolvedRefDeltas) > 0 {- return Result{}, &ThinPackUnresolvedError{Count: len(state.unresolvedRefDeltas)}- }
-
- err = verifyResolvedRecords(state)
- if err != nil {- return Result{}, err- }
-
- err = state.packFile.Sync()
- if err != nil {- return Result{}, &DestinationWriteError{Op: fmt.Sprintf("sync pack: %v", err)}- }
-
- err = writeIdx(state)
- if err != nil {- return Result{}, err- }
-
- err = writeRev(state)
- if err != nil {- return Result{}, err- }
-
- return finalizeArtifacts(state)
-}
--- a/format/packfile/ingest/ingest_test.go
+++ /dev/null
@@ -1,434 +1,0 @@
-package ingest_test
-
-import (
- "bytes"
- "encoding/binary"
- "errors"
- "io"
- "io/fs"
- "os"
- "path/filepath"
- "strings"
- "testing"
-
- "codeberg.org/lindenii/furgit/format/packfile/ingest"
- "codeberg.org/lindenii/furgit/internal/testgit"
- objectid "codeberg.org/lindenii/furgit/object/id"
-)
-
-type noExtraReadReader struct {- reader *bytes.Reader
-}
-
-func (r *noExtraReadReader) Read(p []byte) (int, error) {- if r.reader.Len() == 0 {- return 0, errors.New("unexpected extra read after pack trailer")- }
-
- return r.reader.Read(p)
-}
-
-func beginAndContinue(
- src io.Reader,
- packRoot *os.Root,
- algo objectid.Algorithm,
- opts ingest.Options,
-) (ingest.Result, error) {- pending, err := ingest.Ingest(src, algo, opts)
- if err != nil {- return ingest.Result{}, err- }
-
- return pending.Continue(packRoot)
-}
-
-// fixturePath returns one fixture file path for the selected algorithm.
-func fixturePath(t *testing.T, algo objectid.Algorithm, name string) string {- t.Helper()
-
- dir := algo.String()
- if dir == "" {- t.Fatalf("unsupported fixture algorithm: %v", algo)- }
-
- return filepath.Join("testdata", "fixtures", dir, name)-}
-
-// fixtureBytes reads one fixture file fully.
-func fixtureBytes(t *testing.T, algo objectid.Algorithm, name string) []byte {- t.Helper()
-
- path := fixturePath(t, algo, name)
- dir := filepath.Dir(path)
- base := filepath.Base(path)
-
- root, err := os.OpenRoot(dir)
- if err != nil {- t.Fatalf("open fixture root %q: %v", dir, err)- }
-
- defer func() {- err := root.Close()
- if err != nil {- t.Fatalf("close fixture root %q: %v", dir, err)- }
- }()
-
- data, err := root.ReadFile(base)
- if err != nil {- t.Fatalf("read fixture %q: %v", base, err)- }
-
- return data
-}
-
-// fixtureMetadata parses key=value metadata for one algorithm fixture set.
-func fixtureMetadata(t *testing.T, algo objectid.Algorithm) map[string]string {- t.Helper()
-
- data := fixtureBytes(t, algo, "METADATA.txt")
-
- out := make(map[string]string)
- for line := range strings.SplitSeq(strings.TrimSpace(string(data)), "\n") {- line = strings.TrimSpace(line)
- if line == "" {- continue
- }
-
- key, value, ok := strings.Cut(line, "=")
- if !ok {- t.Fatalf("invalid fixture metadata line %q", line)- }
-
- out[strings.TrimSpace(key)] = strings.TrimSpace(value)
- }
-
- return out
-}
-
-// fixtureOID returns one fixture metadata object ID value.
-func fixtureOID(t *testing.T, algo objectid.Algorithm, key string) objectid.ObjectID {- t.Helper()
-
- meta := fixtureMetadata(t, algo)
-
- hex, ok := meta[key]
- if !ok {- t.Fatalf("missing fixture metadata key %q", key)- }
-
- id, err := objectid.ParseHex(algo, hex)
- if err != nil {- t.Fatalf("parse fixture metadata oid %q: %v", hex, err)- }
-
- return id
-}
-
-// verifyReindexOracle regenerates idx/rev with upstream git index-pack and
-// compares bytes with files produced by ingest.
-func verifyReindexOracle(t *testing.T, repo *testgit.TestRepo, packName, idxName, revName string) {- t.Helper()
-
- oracleDir := t.TempDir()
- oracleIdxPath := filepath.Join(oracleDir, "oracle.idx")
- _ = repo.Run(t, "index-pack", "--rev-index", "-o", oracleIdxPath, filepath.Join("objects", "pack", packName))- oracleRevPath := strings.TrimSuffix(oracleIdxPath, ".idx") + ".rev"
-
- packRoot := repo.OpenPackRoot(t)
-
- gotIdx, err := packRoot.ReadFile(idxName)
- if err != nil {- t.Fatalf("read idx: %v", err)- }
-
- oracleRoot, err := os.OpenRoot(oracleDir)
- if err != nil {- t.Fatalf("open oracle root: %v", err)- }
-
- defer func() {- err := oracleRoot.Close()
- if err != nil {- t.Fatalf("close oracle root: %v", err)- }
- }()
-
- wantIdx, err := oracleRoot.ReadFile(filepath.Base(oracleIdxPath))
- if err != nil {- t.Fatalf("read oracle idx: %v", err)- }
-
- if !bytes.Equal(gotIdx, wantIdx) {- t.Fatal("idx bytes differ from git index-pack output")- }
-
- gotRev, err := packRoot.ReadFile(revName)
- if err != nil {- t.Fatalf("read rev: %v", err)- }
-
- wantRev, err := oracleRoot.ReadFile(filepath.Base(oracleRevPath))
- if err != nil {- t.Fatalf("read oracle rev: %v", err)- }
-
- if !bytes.Equal(gotRev, wantRev) {- t.Fatal("rev bytes differ from git index-pack output")- }
-}
-
-func TestIngestNonThinPackWritesPackIdxRev(t *testing.T) {- t.Parallel()
-
- testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper- head := fixtureOID(t, algo, "head")
- packBytes := fixtureBytes(t, algo, "nonthin.pack")
-
- receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})-
- packRoot := receiver.OpenPackRoot(t)
-
- result, err := beginAndContinue(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{- WriteRev: true,
- RequireTrailingEOF: true,
- })
- if err != nil {- t.Fatalf("Ingest: %v", err)- }
-
- if result.ThinFixed {- t.Fatalf("ThinFixed = true, want false")- }
-
- if result.RevName == "" {- t.Fatal("RevName is empty")- }
-
- _, err = packRoot.Stat(result.PackName)
- if err != nil {- t.Fatalf("stat pack: %v", err)- }
-
- _, err = packRoot.Stat(result.IdxName)
- if err != nil {- t.Fatalf("stat idx: %v", err)- }
-
- _, err = packRoot.Stat(result.RevName)
- if err != nil {- t.Fatalf("stat rev: %v", err)- }
-
- _ = receiver.Run(t, "verify-pack", "-v", filepath.Join("objects", "pack", result.IdxName))- verifyReindexOracle(t, receiver, result.PackName, result.IdxName, result.RevName)
-
- receiver.UpdateRef(t, "refs/heads/main", head)
- _ = receiver.Run(t, "fsck", "--full", "--strict", "--no-progress", "--no-dangling")
- })
-}
-
-func TestIngestThinPackWithoutFixReturnsUnresolved(t *testing.T) {- t.Parallel()
-
- testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper- thinPack := fixtureBytes(t, algo, "thin.pack")
-
- receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})- packRoot := receiver.OpenPackRoot(t)
-
- _, err := beginAndContinue(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{- WriteRev: true,
- RequireTrailingEOF: true,
- })
- if err == nil {- t.Fatal("Ingest error = nil, want error")- }
-
- if _, ok := errors.AsType[*ingest.ThinPackUnresolvedError](err); !ok {- t.Fatalf("Ingest error type = %T (%v), want *ThinPackUnresolvedError", err, err)- }
-
- entries, err := fs.ReadDir(packRoot.FS(), ".")
- if err != nil {- t.Fatalf("ReadDir(pack): %v", err)- }
-
- for _, entry := range entries {- if strings.HasSuffix(entry.Name(), ".pack") {- t.Fatalf("found finalized pack file after failure: %v", entry.Name())- }
- }
- })
-}
-
-func TestIngestThinPackWithFixThin(t *testing.T) {- t.Parallel()
-
- testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper- head := fixtureOID(t, algo, "head")
- basePack := fixtureBytes(t, algo, "base.pack")
- thinPack := fixtureBytes(t, algo, "thin.pack")
- receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})-
- packRoot := receiver.OpenPackRoot(t)
-
- _, err := beginAndContinue(bytes.NewReader(basePack), packRoot, algo, ingest.Options{- RequireTrailingEOF: true,
- })
- if err != nil {- t.Fatalf("ingest base pack: %v", err)- }
-
- receiverRepo := receiver.OpenRepository(t)
-
- result, err := beginAndContinue(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{- FixThin: true,
- WriteRev: true,
- Base: receiverRepo.Objects(),
- RequireTrailingEOF: true,
- })
- if err != nil {- t.Fatalf("Ingest(thin): %v", err)- }
-
- if !result.ThinFixed {- t.Fatal("ThinFixed = false, want true")- }
-
- _ = receiver.Run(t, "verify-pack", "-v", filepath.Join("objects", "pack", result.IdxName))- verifyReindexOracle(t, receiver, result.PackName, result.IdxName, result.RevName)
- receiver.UpdateRef(t, "refs/heads/main", head)
- _ = receiver.Run(t, "fsck", "--full", "--strict", "--no-progress", "--no-dangling")
- })
-}
-
-func TestIngestPackTrailerMismatch(t *testing.T) {- t.Parallel()
-
- testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper- packBytes := fixtureBytes(t, algo, "nonthin.pack")
- if len(packBytes) == 0 {- t.Fatal("empty pack stream")- }
-
- packBytes[len(packBytes)-1] ^= 0xff
-
- receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})- packRoot := receiver.OpenPackRoot(t)
-
- _, err := beginAndContinue(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{- WriteRev: true,
- RequireTrailingEOF: true,
- })
- if err == nil {- t.Fatal("Ingest error = nil, want error")- }
-
- if _, ok := errors.AsType[*ingest.PackTrailerMismatchError](err); !ok {- t.Fatalf("Ingest error type = %T (%v), want *PackTrailerMismatchError", err, err)- }
-
- entries, err := fs.ReadDir(packRoot.FS(), ".")
- if err != nil {- t.Fatalf("ReadDir(pack): %v", err)- }
-
- for _, entry := range entries {- if strings.HasSuffix(entry.Name(), ".pack") {- t.Fatalf("found finalized pack file after failure: %v", entry.Name())- }
- }
- })
-}
-
-func zeroObjectPackBytes(t *testing.T, algo objectid.Algorithm) []byte {- t.Helper()
-
- hashImpl, err := algo.New()
- if err != nil {- t.Fatalf("algo.New: %v", err)- }
-
- var header [12]byte
- copy(header[:4], []byte{'P', 'A', 'C', 'K'})- binary.BigEndian.PutUint32(header[4:8], 2)
- binary.BigEndian.PutUint32(header[8:12], 0)
-
- _, _ = hashImpl.Write(header[:])
-
- return append(header[:], hashImpl.Sum(nil)...)
-}
-
-func TestIngestDiscardZeroObjectPack(t *testing.T) {- t.Parallel()
-
- testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper- packBytes := zeroObjectPackBytes(t, algo)
-
- pending, err := ingest.Ingest(bytes.NewReader(packBytes), algo, ingest.Options{- RequireTrailingEOF: true,
- })
- if err != nil {- t.Fatalf("Ingest: %v", err)- }
-
- if pending.Header().ObjectCount != 0 {- t.Fatalf("ObjectCount = %d, want 0", pending.Header().ObjectCount)- }
-
- discarded, err := pending.Discard()
- if err != nil {- t.Fatalf("Discard: %v", err)- }
-
- if discarded.ObjectCount != 0 {- t.Fatalf("Discard.ObjectCount = %d, want 0", discarded.ObjectCount)- }
- })
-}
-
-func TestIngestContinueRejectsZeroObjectPack(t *testing.T) {- t.Parallel()
-
- testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper- packBytes := zeroObjectPackBytes(t, algo)
- receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})- packRoot := receiver.OpenPackRoot(t)
-
- pending, err := ingest.Ingest(bytes.NewReader(packBytes), algo, ingest.Options{- RequireTrailingEOF: true,
- })
- if err != nil {- t.Fatalf("Ingest: %v", err)- }
-
- _, err = pending.Continue(packRoot)
- if !errors.Is(err, ingest.ErrZeroObjectContinue) {- t.Fatalf("Continue error = %v, want ErrZeroObjectContinue", err)- }
- })
-}
-
-func TestIngestCanFinishWithoutTrailingEOF(t *testing.T) {- t.Parallel()
-
- testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper- head := fixtureOID(t, algo, "head")
- packBytes := fixtureBytes(t, algo, "nonthin.pack")
-
- receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})- packRoot := receiver.OpenPackRoot(t)
-
- result, err := beginAndContinue(&noExtraReadReader{reader: bytes.NewReader(packBytes)}, packRoot, algo, ingest.Options{- WriteRev: true,
- })
- if err != nil {- t.Fatalf("Ingest without trailing EOF: %v", err)- }
-
- receiver.UpdateRef(t, "refs/heads/main", head)
- _ = receiver.Run(t, "verify-pack", "-v", filepath.Join("objects", "pack", result.IdxName))- _ = receiver.Run(t, "fsck", "--full", "--strict", "--no-progress", "--no-dangling")
- })
-}
--- a/format/packfile/ingest/progress_write.go
+++ /dev/null
@@ -1,11 +1,0 @@
-package ingest
-
-import "codeberg.org/lindenii/furgit/internal/utils"
-
-func writeProgressf(state *ingestState, format string, args ...any) {- utils.BestEffortFprintf(state.opts.Progress, format, args...)
-
- if state.opts.Progress != nil {- _ = state.opts.Progress.Flush()
- }
-}
--- a/format/packfile/ingest/record_content.go
+++ /dev/null
@@ -1,29 +1,0 @@
-package ingest
-
-import (
- "fmt"
-
- objecttype "codeberg.org/lindenii/furgit/object/type"
-)
-
-// readBaseRecordContent reads canonical base content for one non-delta record.
-func readBaseRecordContent(state *ingestState, idx int) (objecttype.Type, []byte, error) {- record := state.records[idx]
- if !record.packedType.IsBaseObject() {- return objecttype.TypeInvalid, nil, fmt.Errorf("packfile/ingest: record %d is not a base object", idx)- }
-
- content, err := inflateRecordPayload(state, idx)
- if err != nil {- return objecttype.TypeInvalid, nil, err
- }
-
- if int64(len(content)) != record.declaredSize {- return objecttype.TypeInvalid, nil, &MalformedPackEntryError{- Offset: record.offset,
- Reason: fmt.Sprintf("base content size mismatch got %d want %d", len(content), record.declaredSize),- }
- }
-
- return record.packedType, content, nil
-}
--- a/format/packfile/ingest/record_delta.go
+++ /dev/null
@@ -1,60 +1,0 @@
-package ingest
-
-import (
- "fmt"
-
- deltaapply "codeberg.org/lindenii/furgit/format/packfile/delta/apply"
- objecttype "codeberg.org/lindenii/furgit/object/type"
-)
-
-// applyDeltaRecord applies one delta record onto base content.
-func applyDeltaRecord(state *ingestState, idx int, baseType objecttype.Type, baseContent []byte) (objecttype.Type, []byte, error) {- record := state.records[idx]
- if record.packedType != objecttype.TypeOfsDelta && record.packedType != objecttype.TypeRefDelta {- return objecttype.TypeInvalid, nil, fmt.Errorf("packfile/ingest: record %d is not a delta record", idx)- }
-
- deltaPayload, err := inflateRecordPayload(state, idx)
- if err != nil {- return objecttype.TypeInvalid, nil, err
- }
-
- if int64(len(deltaPayload)) != record.declaredSize {- return objecttype.TypeInvalid, nil, &MalformedPackEntryError{- Offset: record.offset,
- Reason: fmt.Sprintf("delta payload size mismatch got %d want %d", len(deltaPayload), record.declaredSize),- }
- }
-
- srcSize, dstSize, err := readDeltaHeaderSizes(deltaPayload)
- if err != nil {- return objecttype.TypeInvalid, nil, &MalformedPackEntryError{- Offset: record.offset,
- Reason: fmt.Sprintf("read delta header: %v", err),- }
- }
-
- if srcSize != len(baseContent) {- return objecttype.TypeInvalid, nil, &MalformedPackEntryError{- Offset: record.offset,
- Reason: fmt.Sprintf("delta source size mismatch got %d want %d", srcSize, len(baseContent)),- }
- }
-
- content, err := deltaapply.Apply(baseContent, deltaPayload)
- if err != nil {- return objecttype.TypeInvalid, nil, &MalformedPackEntryError{- Offset: record.offset,
- Reason: fmt.Sprintf("apply delta: %v", err),- }
- }
-
- if len(content) != dstSize {- return objecttype.TypeInvalid, nil, &MalformedPackEntryError{- Offset: record.offset,
- Reason: fmt.Sprintf("delta result size mismatch got %d want %d", len(content), dstSize),- }
- }
-
- return baseType, content, nil
-}
--- a/format/packfile/ingest/record_inflate.go
+++ /dev/null
@@ -1,46 +1,0 @@
-package ingest
-
-import (
- "compress/zlib"
- "fmt"
- "io"
-
- "codeberg.org/lindenii/furgit/internal/intconv"
-)
-
-// inflateRecordPayload inflates one record's zlib payload from pack file.
-func inflateRecordPayload(state *ingestState, idx int) ([]byte, error) {- record := state.records[idx]
- if record.packedLen < uint64(record.headerLen) {- return nil, &MalformedPackEntryError{Offset: record.offset, Reason: "entry packed span underflow"}- }
-
- compressedOffset := record.offset + uint64(record.headerLen)
- compressedLen := record.packedLen - uint64(record.headerLen)
-
- compressedOffsetInt64, err := intconv.Uint64ToInt64(compressedOffset)
- if err != nil {- return nil, err
- }
-
- compressedLenInt64, err := intconv.Uint64ToInt64(compressedLen)
- if err != nil {- return nil, err
- }
-
- section := io.NewSectionReader(state.packFile, compressedOffsetInt64, compressedLenInt64)
-
- reader, err := zlib.NewReader(section)
- if err != nil {- return nil, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("open payload zlib: %v", err)}- }
-
- defer func() { _ = reader.Close() }()-
- out, err := io.ReadAll(reader)
- if err != nil {- return nil, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("inflate payload: %v", err)}- }
-
- return out, nil
-}
--- a/format/packfile/ingest/record_resolve.go
+++ /dev/null
@@ -1,116 +1,0 @@
-package ingest
-
-import (
- "fmt"
-
- objecttype "codeberg.org/lindenii/furgit/object/type"
-)
-
-// resolveRecord resolves one record and returns canonical type/content.
-func resolveRecord(state *ingestState, idx int, visiting map[int]struct{}) (objecttype.Type, []byte, error) {- if idx < 0 || idx >= len(state.records) {- return objecttype.TypeInvalid, nil, fmt.Errorf("packfile/ingest: record index out of bounds")- }
-
- if _, ok := visiting[idx]; ok {- return objecttype.TypeInvalid, nil, &DeltaCycleError{Offset: state.records[idx].offset}- }
-
- visiting[idx] = struct{}{}- defer delete(visiting, idx)
-
- record := &state.records[idx]
- if ty, content, ok := state.baseCache.get(idx); ok {- return ty, content, nil
- }
-
- if record.packedType.IsBaseObject() {- ty, content, err := readBaseRecordContent(state, idx)
- if err != nil {- return objecttype.TypeInvalid, nil, err
- }
-
- if record.resolved {- state.baseCache.add(idx, record.realType, content)
-
- return record.realType, content, nil
- }
-
- id, err := hashCanonicalObject(state.algo, ty, content)
- if err != nil {- return objecttype.TypeInvalid, nil, err
- }
-
- record.objectID = id
- record.realType = ty
- record.resolved = true
- state.objectToRecord[id] = idx
- state.baseCache.add(idx, ty, content)
-
- return ty, content, nil
- }
-
- var (
- baseType objecttype.Type
- baseContent []byte
- err error
- )
- switch record.packedType {- case objecttype.TypeOfsDelta:
- baseIdx, ok := state.offsetToRecord[record.baseOffset]
- if !ok {- return objecttype.TypeInvalid, nil, &MalformedPackEntryError{- Offset: record.offset,
- Reason: "missing ofs-delta base entry",
- }
- }
-
- baseType, baseContent, err = resolveRecord(state, baseIdx, visiting)
- if err != nil {- return objecttype.TypeInvalid, nil, err
- }
- case objecttype.TypeRefDelta:
- baseIdx, ok := state.objectToRecord[record.baseObject]
- if ok {- baseType, baseContent, err = resolveRecord(state, baseIdx, visiting)
- if err != nil {- return objecttype.TypeInvalid, nil, err
- }
- } else {- return objecttype.TypeInvalid, nil, errExternalThinBase
- }
- case objecttype.TypeInvalid,
- objecttype.TypeCommit,
- objecttype.TypeTree,
- objecttype.TypeBlob,
- objecttype.TypeTag,
- objecttype.TypeFuture:
- return objecttype.TypeInvalid, nil, &MalformedPackEntryError{- Offset: record.offset,
- Reason: "unsupported delta type",
- }
- default:
- return objecttype.TypeInvalid, nil, &MalformedPackEntryError{- Offset: record.offset,
- Reason: "unsupported delta type",
- }
- }
-
- ty, content, err := applyDeltaRecord(state, idx, baseType, baseContent)
- if err != nil {- return objecttype.TypeInvalid, nil, err
- }
-
- id, err := hashCanonicalObject(state.algo, ty, content)
- if err != nil {- return objecttype.TypeInvalid, nil, err
- }
-
- record.objectID = id
- record.realType = ty
- record.resolved = true
- state.objectToRecord[id] = idx
- state.baseCache.add(idx, ty, content)
-
- return ty, content, nil
-}
--- a/format/packfile/ingest/records.go
+++ /dev/null
@@ -1,46 +1,0 @@
-package ingest
-
-import (
- objectid "codeberg.org/lindenii/furgit/object/id"
- objecttype "codeberg.org/lindenii/furgit/object/type"
-)
-
-// objectRecord stores metadata for one packed object entry.
-type objectRecord struct {- // offset is the entry start offset in the pack file.
- offset uint64
- // headerLen is packed entry header length in bytes.
- headerLen uint32
- // packedLen is total packed entry length in bytes.
- packedLen uint64
- // crc32 is the CRC over the full packed entry.
- crc32 uint32
- // packedType is the entry type tag from the pack stream.
- packedType objecttype.Type
- // realType is canonical object type after delta resolution.
- realType objecttype.Type
- // declaredSize is the declared output object size for this entry.
- declaredSize int64
- // dataOffset is compressed payload start offset for this entry.
- dataOffset uint64
- // baseOffset is OFS base offset when packedType is OFS delta.
- baseOffset uint64
- // baseObject is REF base object ID when packedType is REF delta.
- baseObject objectid.ObjectID
- // objectID is final resolved object ID.
- objectID objectid.ObjectID
- // resolved reports whether objectID/realType are finalized.
- resolved bool
-}
-
-// ofsDeltaRef maps one OFS delta record to its base offset.
-type ofsDeltaRef struct {- baseOffset uint64
- recordIdx int
-}
-
-// refDeltaRef maps one REF delta record to its base object ID.
-type refDeltaRef struct {- baseObject objectid.ObjectID
- recordIdx int
-}
--- a/format/packfile/ingest/resolve_all.go
+++ /dev/null
@@ -1,70 +1,0 @@
-package ingest
-
-import (
- "errors"
-
- "codeberg.org/lindenii/furgit/internal/progress"
-)
-
-// resolveAll resolves all delta records and finalizes ObjectID/RealType for every record.
-func resolveAll(state *ingestState) error {- state.unresolvedRefDeltas = state.unresolvedRefDeltas[:0]
-
- var pending uint32
-
- for idx := range state.records {- if !state.records[idx].resolved {- pending++
- }
- }
-
- if pending == 0 {- return nil
- }
-
- var done uint32
-
- meter := progress.New(progress.Options{- Writer: state.opts.Progress,
- Title: "resolving deltas",
- Total: uint64(pending),
- })
-
- for idx := range state.records {- if state.records[idx].resolved {- continue
- }
-
- done++
- meter.Set(uint64(done), 0)
-
- visiting := make(map[int]struct{})-
- ty, content, err := resolveRecord(state, idx, visiting)
- if err != nil {- if errors.Is(err, errExternalThinBase) {- state.unresolvedRefDeltas = append(state.unresolvedRefDeltas, idx)
-
- continue
- }
-
- return err
- }
-
- id, err := hashCanonicalObject(state.algo, ty, content)
- if err != nil {- return err
- }
-
- record := &state.records[idx]
- record.realType = ty
- record.objectID = id
- record.resolved = true
- state.objectToRecord[id] = idx
- state.baseCache.add(idx, ty, content)
- }
-
- meter.Stop("done")-
- return nil
-}
--- a/format/packfile/ingest/rev_write.go
+++ /dev/null
@@ -1,137 +1,0 @@
-package ingest
-
-import (
- "encoding/binary"
- "slices"
-
- "codeberg.org/lindenii/furgit/internal/intconv"
- "codeberg.org/lindenii/furgit/internal/progress"
-)
-
-const (
- revMagic = 0x52494458
- revVersion = 1
-)
-
-// writeRev writes rev index for resolved records.
-func writeRev(state *ingestState) error {- if !state.opts.WriteRev {- return nil
- }
-
- idxOrder := buildIdxOrder(state)
-
- recordToIdxPos := make([]int, len(state.records))
- for pos, recordIdx := range idxOrder {- recordToIdxPos[recordIdx] = pos
- }
-
- packOrder := buildPackOrder(state)
-
- hashImpl, err := state.algo.New()
- if err != nil {- return err
- }
-
- var scratch [8]byte
-
- writeProgressf(state, "writing reverse index header...\r")
- binary.BigEndian.PutUint32(scratch[:4], revMagic)
-
- err = writeAndHash(state.revFile, hashImpl, scratch[:4])
- if err != nil {- return err
- }
-
- binary.BigEndian.PutUint32(scratch[:4], revVersion)
-
- err = writeAndHash(state.revFile, hashImpl, scratch[:4])
- if err != nil {- return err
- }
-
- binary.BigEndian.PutUint32(scratch[:4], state.algo.PackHashID())
-
- err = writeAndHash(state.revFile, hashImpl, scratch[:4])
- if err != nil {- return err
- }
-
- writeProgressf(state, "writing reverse index header: done.\n")
-
- entriesMeter := progress.New(progress.Options{- Writer: state.opts.Progress,
- Title: "writing reverse index entries",
- Total: uint64(len(packOrder)),
- })
-
- var entriesDone uint64
-
- for _, recordIdx := range packOrder {- recordPos, err := intconv.IntToUint32(recordToIdxPos[recordIdx])
- if err != nil {- return err
- }
-
- binary.BigEndian.PutUint32(scratch[:4], recordPos)
-
- err = writeAndHash(state.revFile, hashImpl, scratch[:4])
- if err != nil {- return err
- }
-
- entriesDone++
- entriesMeter.Set(entriesDone, 0)
- }
-
- if entriesDone > 0 {- entriesMeter.Stop("done")- }
-
- writeProgressf(state, "writing reverse index trailer...\r")
-
- err = writeAndHash(state.revFile, hashImpl, state.packHash.Bytes())
- if err != nil {- return err
- }
-
- revHash := hashImpl.Sum(nil)
-
- _, err = state.revFile.Write(revHash)
- if err != nil {- return err
- }
-
- err = state.revFile.Sync()
- if err != nil {- return err
- }
-
- writeProgressf(state, "writing reverse index trailer: done.\n")
-
- return nil
-}
-
-// buildPackOrder returns record indexes sorted by pack offset.
-func buildPackOrder(state *ingestState) []int {- out := make([]int, 0, len(state.records))
- for idx := range state.records {- out = append(out, idx)
- }
-
- slices.SortFunc(out, func(a, b int) int {- offA := state.records[a].offset
-
- offB := state.records[b].offset
- switch {- case offA < offB:
- return -1
- case offA > offB:
- return 1
- default:
- return 0
- }
- })
-
- return out
-}
--- a/format/packfile/ingest/rewrite_header_trailer.go
+++ /dev/null
@@ -1,89 +1,0 @@
-package ingest
-
-import (
- "encoding/binary"
- "io"
-
- "codeberg.org/lindenii/furgit/internal/intconv"
- objectid "codeberg.org/lindenii/furgit/object/id"
-)
-
-// rewritePackHeaderAndTrailer rewrites object count and trailer hash using ReadAt/WriteAt.
-func rewritePackHeaderAndTrailer(state *ingestState) error {- var countRaw [4]byte
-
- recordCountUint32, err := intconv.IntToUint32(len(state.records))
- if err != nil {- return err
- }
-
- binary.BigEndian.PutUint32(countRaw[:], recordCountUint32)
-
- _, err = state.packFile.WriteAt(countRaw[:], 8)
- if err != nil {- return err
- }
-
- info, err := state.packFile.Stat()
- if err != nil {- return err
- }
-
- endWithoutTrailer := info.Size()
-
- hashImpl, err := state.algo.New()
- if err != nil {- return err
- }
-
- var (
- buf [128 << 10]byte
- pos int64
- )
- for pos < endWithoutTrailer {- want := int64(len(buf))
-
- remaining := endWithoutTrailer - pos
- if remaining < want {- want = remaining
- }
-
- n, err := state.packFile.ReadAt(buf[:want], pos)
- if err != nil && err != io.EOF {- return err
- }
-
- if n == 0 {- return io.ErrUnexpectedEOF
- }
-
- _, _ = hashImpl.Write(buf[:n])
- pos += int64(n)
- }
-
- sum := hashImpl.Sum(nil)
-
- _, err = state.packFile.WriteAt(sum, endWithoutTrailer)
- if err != nil {- return err
- }
-
- packHash, err := objectid.FromBytes(state.algo, sum)
- if err != nil {- return err
- }
-
- state.packHash = packHash
- state.objectCountHeader = recordCountUint32
-
- sumLenInt64 := int64(len(sum))
-
- newConsumed, err := intconv.Int64ToUint64(endWithoutTrailer + sumLenInt64)
- if err != nil {- return err
- }
-
- state.stream.consumed = newConsumed
-
- return nil
-}
--- a/format/packfile/ingest/scan.go
+++ /dev/null
@@ -1,105 +1,0 @@
-package ingest
-
-import (
- "fmt"
-
- "codeberg.org/lindenii/furgit/internal/progress"
- objectid "codeberg.org/lindenii/furgit/object/id"
-)
-
-// streamPackAndScan copies src into temp .pack while scanning packed entries.
-func streamPackAndScan(state *ingestState) error {- hashImpl, err := state.algo.New()
- if err != nil {- return err
- }
-
- state.stream = newStreamScanner(
- state.src,
- state.packFile,
- hashImpl,
- state.algo.Size(),
- )
-
- writeProgressf(state, "validating pack header...\r")
-
- err = seedStreamWithPackHeader(state)
- if err != nil {- return err
- }
-
- writeProgressf(state, "validating pack header: done.\n")
-
- state.records = make([]objectRecord, 0, state.objectCountHeader)
- state.ofsDeltas = make([]ofsDeltaRef, 0, state.objectCountHeader)
- state.refDeltas = make([]refDeltaRef, 0, state.objectCountHeader)
-
- total := state.objectCountHeader
- meter := progress.New(progress.Options{- Writer: state.opts.Progress,
- Title: "receiving objects",
- Total: uint64(total),
- Throughput: true,
- })
-
- for i := range total {- nextOffset, err := scanOneEntry(state, state.stream.consumed)
- if err != nil {- return err
- }
-
- if nextOffset != state.stream.consumed {- return fmt.Errorf("packfile/ingest: internal stream offset mismatch")- }
-
- done := i + 1
- meter.Set(uint64(done), state.stream.consumed)
- }
-
- meter.Stop("done")-
- err = state.stream.finishAndFlushTrailer(state.opts.RequireTrailingEOF)
- if err != nil {- return err
- }
-
- if len(state.stream.packTrailer) != state.algo.Size() {- return fmt.Errorf("packfile/ingest: invalid trailer size")- }
-
- packHash, err := objectid.FromBytes(state.algo, state.stream.packTrailer)
- if err != nil {- return err
- }
-
- state.packHash = packHash
-
- return state.stream.flush()
-}
-
-// seedStreamWithPackHeader writes the already-validated PACK header to output,
-// seeds the running pack hash, and advances stream offset accounting.
-func seedStreamWithPackHeader(state *ingestState) error {- written := 0
- for written < len(state.packHeaderRaw) {- n, err := state.packFile.Write(state.packHeaderRaw[written:])
- if err != nil {- return &DestinationWriteError{Op: fmt.Sprintf("write pack header: %v", err)}- }
-
- if n == 0 {- return &DestinationWriteError{Op: "write pack header: short write"}- }
-
- written += n
- }
-
- _, err := state.stream.hash.Write(state.packHeaderRaw[:])
- if err != nil {- return err
- }
-
- state.stream.consumed = packHeaderSize
-
- return nil
-}
--- a/format/packfile/ingest/state.go
+++ /dev/null
@@ -1,70 +1,0 @@
-package ingest
-
-import (
- "io"
- "os"
-
- objectid "codeberg.org/lindenii/furgit/object/id"
-)
-
-const (
- defaultDeltaBaseCacheMaxBytes = 32 << 20
-)
-
-// ingestState holds mutable state for one Ingest call.
-type ingestState struct {- src io.Reader
- destination *os.Root
- algo objectid.Algorithm
- opts Options
-
- packHeaderRaw [packHeaderSize]byte
-
- packFile *os.File
- packTmpName string
- idxFile *os.File
- idxTmpName string
- revFile *os.File
- revTmpName string
-
- stream *streamScanner
-
- records []objectRecord
- ofsDeltas []ofsDeltaRef
- refDeltas []refDeltaRef
- unresolvedRefDeltas []int
- offsetToRecord map[uint64]int
- objectToRecord map[objectid.ObjectID]int
-
- baseCache *deltaBaseCache
- packHash objectid.ObjectID
-
- objectCountHeader uint32
- thinFixed bool
-}
-
-// newIngestState constructs one call-local ingest state.
-func newIngestState(
- src io.Reader,
- destination *os.Root,
- algo objectid.Algorithm,
- opts Options,
- header HeaderInfo,
- headerRaw [packHeaderSize]byte,
-) (*ingestState, error) {- if algo.Size() == 0 {- return nil, objectid.ErrInvalidAlgorithm
- }
-
- return &ingestState{- src: src,
- destination: destination,
- algo: algo,
- opts: opts,
- packHeaderRaw: headerRaw,
- objectCountHeader: header.ObjectCount,
- offsetToRecord: make(map[uint64]int),
- objectToRecord: make(map[objectid.ObjectID]int),
- baseCache: newDeltaBaseCache(defaultDeltaBaseCacheMaxBytes),
- }, nil
-}
--- a/format/packfile/ingest/stream.go
+++ /dev/null
@@ -1,111 +1,0 @@
-package ingest
-
-import (
- "errors"
- "hash"
- "io"
- "os"
-)
-
-const streamScannerBufferSize = 64 << 10
-
-// streamScanner incrementally reads/consumes one pack stream while mirroring
-// consumed bytes into one destination pack file.
-type streamScanner struct {- src io.Reader
- dstFile *os.File
-
- // Input buffer window: buf[off:n] is unread.
- buf []byte
- off int
- n int
-
- // Absolute consumed stream bytes.
- consumed uint64
-
- // Running pack hash over consumed bytes while hashEnabled is true.
- hash hash.Hash
- hashSize int
- hashEnabled bool
-
- // Entry CRC state while one entry is being consumed.
- entryCRC uint32
- inEntryCRC bool
-
- packTrailer []byte
-}
-
-// newStreamScanner constructs one scanner with fixed input buffering.
-func newStreamScanner(src io.Reader, dstFile *os.File, hash hash.Hash, hashSize int) *streamScanner {- return &streamScanner{- src: src,
- dstFile: dstFile,
- buf: make([]byte, streamScannerBufferSize),
- hash: hash,
- hashSize: hashSize,
- hashEnabled: true,
- }
-}
-
-// Read implements io.Reader.
-func (scanner *streamScanner) Read(dst []byte) (int, error) {- if len(dst) == 0 {- return 0, nil
- }
-
- if scanner.n-scanner.off == 0 {- err := scanner.fill(1)
- if err != nil {- if errors.Is(err, io.EOF) {- return 0, io.EOF
- }
-
- return 0, err
- }
- }
-
- unread := scanner.n - scanner.off
- if unread == 0 {- return 0, io.EOF
- }
-
- n := min(len(dst), unread)
-
- copy(dst, scanner.buf[scanner.off:scanner.off+n])
-
- err := scanner.use(n)
- if err != nil {- return 0, err
- }
-
- return n, nil
-}
-
-// ReadByte implements io.ByteReader without allocation.
-func (scanner *streamScanner) ReadByte() (byte, error) {- if scanner.n-scanner.off == 0 {- err := scanner.fill(1)
- if err != nil {- return 0, err
- }
- }
-
- b := scanner.buf[scanner.off]
-
- err := scanner.use(1)
- if err != nil {- return 0, err
- }
-
- return b, nil
-}
-
-// readFull reads exactly len(dst) bytes through receiver.
-func (scanner *streamScanner) readFull(dst []byte) error {- _, err := io.ReadFull(scanner, dst)
- if err != nil {- return err
- }
-
- return nil
-}
--- a/format/packfile/ingest/temp.go
+++ /dev/null
@@ -1,103 +1,0 @@
-package ingest
-
-import (
- "crypto/rand"
- "errors"
- "fmt"
- "io/fs"
- "os"
-)
-
-// openTemporaryArtifacts creates/open temp pack/idx/(rev) files under destination.
-func openTemporaryArtifacts(state *ingestState) error {- packName, packFile, err := createTempFile(state.destination, "tmp_pack_")
- if err != nil {- return err
- }
-
- idxName, idxFile, err := createTempFile(state.destination, "tmp_idx_")
- if err != nil {- _ = packFile.Close()
- _ = state.destination.Remove(packName)
-
- return err
- }
-
- revName := ""
-
- var revFile *os.File
- if state.opts.WriteRev {- revName, revFile, err = createTempFile(state.destination, "tmp_rev_")
- if err != nil {- _ = idxFile.Close()
- _ = state.destination.Remove(idxName)
- _ = packFile.Close()
- _ = state.destination.Remove(packName)
-
- return err
- }
- }
-
- state.packTmpName = packName
- state.packFile = packFile
- state.idxTmpName = idxName
- state.idxFile = idxFile
- state.revTmpName = revName
- state.revFile = revFile
-
- return nil
-}
-
-// closeTemporaryArtifacts closes all temporary artifact file descriptors.
-func closeTemporaryArtifacts(state *ingestState) error {- var out error
-
- if state.packFile != nil {- err := state.packFile.Close()
- if err != nil && out == nil {- out = err
- }
-
- state.packFile = nil
- }
-
- if state.idxFile != nil {- err := state.idxFile.Close()
- if err != nil && out == nil {- out = err
- }
-
- state.idxFile = nil
- }
-
- if state.revFile != nil {- err := state.revFile.Close()
- if err != nil && out == nil {- out = err
- }
-
- state.revFile = nil
- }
-
- return out
-}
-
-// createTempFile creates one temporary file under root using prefix.
-func createTempFile(root *os.Root, prefix string) (string, *os.File, error) {- for range 32 {- name := prefix + rand.Text()
-
- file, err := root.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0o644)
- if err == nil {- return name, file, nil
- }
-
- if errors.Is(err, fs.ErrExist) {- continue
- }
-
- return "", nil, fmt.Errorf("packfile/ingest: create temp file %q: %w", name, err)- }
-
- return "", nil, fmt.Errorf("packfile/ingest: unable to create temporary file for prefix %q", prefix)-}
--- a/format/packfile/ingest/testdata/fixtures/sha1/METADATA.txt
+++ /dev/null
@@ -1,3 +1,0 @@
-format=sha1
-head=200c960359dad025b4170284c518919eb4a24305
-base=4bc507fc631ea78474d83c47548743c9f1dda0dc
binary files a/format/packfile/ingest/testdata/fixtures/sha1/base.pack /dev/null differ
binary files a/format/packfile/ingest/testdata/fixtures/sha1/nonthin.pack /dev/null differ
binary files a/format/packfile/ingest/testdata/fixtures/sha1/thin.pack /dev/null differ
--- a/format/packfile/ingest/testdata/fixtures/sha256/METADATA.txt
+++ /dev/null
@@ -1,3 +1,0 @@
-format=sha256
-head=35cc0f4cd1c73524187540494058d233a2ecbd071c85d496a2250d8e0c805ef8
-base=b4abe46895f0bb5aa22fd42d28d428413f265359734c288752e3c2d270eec276
binary files a/format/packfile/ingest/testdata/fixtures/sha256/base.pack /dev/null differ
binary files a/format/packfile/ingest/testdata/fixtures/sha256/nonthin.pack /dev/null differ
binary files a/format/packfile/ingest/testdata/fixtures/sha256/thin.pack /dev/null differ
--- a/format/packfile/ingest/thin_append.go
+++ /dev/null
@@ -1,91 +1,0 @@
-package ingest
-
-import (
- "compress/zlib"
- "hash/crc32"
- "io"
-
- "codeberg.org/lindenii/furgit/internal/intconv"
- objectid "codeberg.org/lindenii/furgit/object/id"
- objecttype "codeberg.org/lindenii/furgit/object/type"
-)
-
-// appendBaseObject appends one base object as a new packed non-delta entry.
-func appendBaseObject(state *ingestState, id objectid.ObjectID, realType objecttype.Type, content []byte) (int, error) {- start := state.stream.consumed
-
- header := encodePackEntryHeader(realType, int64(len(content)))
-
- startInt64, err := intconv.Uint64ToInt64(start)
- if err != nil {- return 0, err
- }
-
- _, err = state.packFile.WriteAt(header, startInt64)
- if err != nil {- return 0, err
- }
-
- headerLenInt64 := int64(len(header))
- section := &fileSectionWriter{file: state.packFile, off: startInt64 + headerLenInt64}- crc := crc32.NewIEEE()
-
- _, err = crc.Write(header)
- if err != nil {- return 0, err
- }
-
- counting := &countingWriter{dst: section}-
- zw := zlib.NewWriter(io.MultiWriter(counting, crc))
-
- _, err = zw.Write(content)
- if err != nil {- return 0, err
- }
-
- err = zw.Close()
- if err != nil {- return 0, err
- }
-
- headerLenUint64, err := intconv.IntToUint64(len(header))
- if err != nil {- return 0, err
- }
-
- countingNUint64, err := intconv.IntToUint64(counting.n)
- if err != nil {- return 0, err
- }
-
- packedLen := headerLenUint64 + countingNUint64
- end := start + packedLen
- state.stream.consumed = end
-
- headerLenUint32, err := intconv.IntToUint32(len(header))
- if err != nil {- return 0, err
- }
-
- record := objectRecord{- offset: start,
- headerLen: headerLenUint32,
- packedLen: packedLen,
- crc32: crc.Sum32(),
- packedType: realType,
- realType: realType,
- declaredSize: int64(len(content)),
- dataOffset: start + headerLenUint64,
- objectID: id,
- resolved: true,
- }
-
- recordIdx := len(state.records)
- state.records = append(state.records, record)
- state.offsetToRecord[start] = recordIdx
- state.objectToRecord[id] = recordIdx
- state.baseCache.add(recordIdx, realType, content)
-
- return recordIdx, nil
-}
--- a/format/packfile/ingest/thin_fix.go
+++ /dev/null
@@ -1,99 +1,0 @@
-package ingest
-
-import (
- "errors"
- "fmt"
-
- "codeberg.org/lindenii/furgit/internal/intconv"
- "codeberg.org/lindenii/furgit/internal/progress"
- objectstore "codeberg.org/lindenii/furgit/object/store"
-)
-
-// maybeFixThin appends missing bases and rewrites pack header/trailer when needed.
-func maybeFixThin(state *ingestState) error {- if len(state.unresolvedRefDeltas) == 0 {- return nil
- }
-
- writeProgressf(
- state,
- "fixing thin pack: %d unresolved bases\r",
- len(state.unresolvedRefDeltas),
- )
-
- if !state.opts.FixThin {- return &ThinPackUnresolvedError{Count: len(state.unresolvedRefDeltas)}- }
-
- if state.opts.Base == nil {- return &ThinPackUnresolvedError{Count: len(state.unresolvedRefDeltas)}- }
-
- hashSize := int64(state.algo.Size())
-
- info, err := state.packFile.Stat()
- if err != nil {- return err
- }
-
- size := info.Size()
- if size < hashSize {- return fmt.Errorf("packfile/ingest: pack too short to trim trailer")- }
-
- newEnd := size - hashSize
-
- err = state.packFile.Truncate(newEnd)
- if err != nil {- return err
- }
-
- consumed, err := intconv.Int64ToUint64(newEnd)
- if err != nil {- return err
- }
-
- state.stream.consumed = consumed
-
- baseIDs := unresolvedThinBaseIDs(state)
-
- total := len(baseIDs)
- meter := progress.New(progress.Options{- Writer: state.opts.Progress,
- Title: "fixing thin pack",
- Total: uint64(total),
- })
- meter.Set(0, 0)
-
- var appended uint64
-
- for _, id := range baseIDs {- ty, content, err := state.opts.Base.ReadBytesContent(id)
- if err != nil {- if errors.Is(err, objectstore.ErrObjectNotFound) {- continue
- }
-
- return fmt.Errorf("packfile/ingest: read thin base %s: %w", id, err)- }
-
- _, err = appendBaseObject(state, id, ty, content)
- if err != nil {- return err
- }
-
- state.thinFixed = true
-
- appended++
- meter.Set(appended, 0)
- }
-
- err = rewritePackHeaderAndTrailer(state)
- if err != nil {- return err
- }
-
- meter.Stop(fmt.Sprintf("appended %d/%d, done", appended, total))-
- return nil
-}
--- a/format/packfile/ingest/thin_unresolved.go
+++ /dev/null
@@ -1,34 +1,0 @@
-package ingest
-
-import (
- "bytes"
- "slices"
-
- objectid "codeberg.org/lindenii/furgit/object/id"
- objecttype "codeberg.org/lindenii/furgit/object/type"
-)
-
-// unresolvedThinBaseIDs returns sorted unique unresolved ref base IDs.
-func unresolvedThinBaseIDs(state *ingestState) []objectid.ObjectID {- seen := make(map[objectid.ObjectID]struct{})-
- for _, idx := range state.unresolvedRefDeltas {- record := state.records[idx]
- if record.packedType != objecttype.TypeRefDelta {- continue
- }
-
- seen[record.baseObject] = struct{}{}- }
-
- out := make([]objectid.ObjectID, 0, len(seen))
- for id := range seen {- out = append(out, id)
- }
-
- slices.SortFunc(out, func(a, b objectid.ObjectID) int {- return bytes.Compare(a.RawBytes(), b.RawBytes())
- })
-
- return out
-}
--- a/format/packfile/ingest/trailer.go
+++ /dev/null
@@ -1,58 +1,0 @@
-package ingest
-
-import (
- "bytes"
- "errors"
- "fmt"
- "io"
-)
-
-// finishAndFlushTrailer reads trailer hash bytes, verifies trailer checksum,
-// and optionally requires the source stream to hit EOF afterward.
-func (scanner *streamScanner) finishAndFlushTrailer(requireTrailingEOF bool) error {- if scanner.hashSize <= 0 {- return fmt.Errorf("packfile/ingest: invalid hash size")- }
-
- trailer := make([]byte, scanner.hashSize)
-
- scanner.hashEnabled = false
-
- err := scanner.readFull(trailer)
- if err != nil {- return &PackTrailerMismatchError{}- }
-
- scanner.packTrailer = append(scanner.packTrailer[:0], trailer...)
-
- if scanner.n-scanner.off > 0 {- return fmt.Errorf("packfile/ingest: pack has trailing garbage")- }
-
- if !requireTrailingEOF {- computed := scanner.hash.Sum(nil)
- if !bytes.Equal(computed, trailer) {- return &PackTrailerMismatchError{}- }
-
- return nil
- }
-
- var probe [1]byte
-
- n, err := scanner.Read(probe[:])
- if n > 0 || err == nil {- return fmt.Errorf("packfile/ingest: pack has trailing garbage")- }
-
- if !errors.Is(err, io.EOF) {- return err
- }
-
- computed := scanner.hash.Sum(nil)
- if !bytes.Equal(computed, trailer) {- return &PackTrailerMismatchError{}- }
-
- return nil
-}
--- a/format/packfile/ingest/use.go
+++ /dev/null
@@ -1,34 +1,0 @@
-package ingest
-
-import (
- "fmt"
- "hash/crc32"
-)
-
-// use consumes n unread bytes and updates accounting/checksum state.
-func (scanner *streamScanner) use(n int) error {- if n < 0 || n > scanner.n-scanner.off {- return fmt.Errorf("packfile/ingest: invalid consume length %d", n)- }
-
- if n == 0 {- return nil
- }
-
- chunk := scanner.buf[scanner.off : scanner.off+n]
- if scanner.hashEnabled {- _, err := scanner.hash.Write(chunk)
- if err != nil {- return err
- }
- }
-
- if scanner.inEntryCRC {- scanner.entryCRC = crc32.Update(scanner.entryCRC, crc32.IEEETable, chunk)
- }
-
- scanner.off += n
- scanner.consumed += uint64(n)
-
- return nil
-}
--- a/object/store/packed/doc.go
+++ b/object/store/packed/doc.go
@@ -1,3 +1,3 @@
-// Package packed provides Git object reading from pack/index files under an
-// objects/pack directory.
+// Package packed provides Git object reading from, and pack writing to,
+// an objects/pack directory.
package packed
--- /dev/null
+++ b/object/store/packed/internal/ingest/byteslice_reader.go
@@ -1,0 +1,21 @@
+package ingest
+
+import "io"
+
+// byteSliceReader implements io.ByteReader on []byte.
+type byteSliceReader struct {+ data []byte
+ pos int
+}
+
+// ReadByte reads one byte from receiver.
+func (reader *byteSliceReader) ReadByte() (byte, error) {+ if reader.pos >= len(reader.data) {+ return 0, io.EOF
+ }
+
+ b := reader.data[reader.pos]
+ reader.pos++
+
+ return b, nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/cache.go
@@ -1,0 +1,53 @@
+package ingest
+
+import (
+ "codeberg.org/lindenii/furgit/internal/lru"
+ objecttype "codeberg.org/lindenii/furgit/object/type"
+)
+
+// deltaBaseCacheKey identifies one resolved base by record index.
+type deltaBaseCacheKey struct {+ recordIdx int
+}
+
+// deltaBaseCacheValue stores one resolved base object payload.
+type deltaBaseCacheValue struct {+ realType objecttype.Type
+ content []byte
+}
+
+// deltaBaseCache is a bounded LRU for resolved base payloads.
+type deltaBaseCache struct {+ lru *lru.Cache[deltaBaseCacheKey, deltaBaseCacheValue]
+}
+
+// newDeltaBaseCache creates one bounded base cache.
+func newDeltaBaseCache(maxBytes int64) *deltaBaseCache {+ return &deltaBaseCache{+ lru: lru.New(
+ maxBytes,
+ func(_ deltaBaseCacheKey, value deltaBaseCacheValue) int64 {+ return int64(len(value.content))
+ },
+ nil,
+ ),
+ }
+}
+
+// get returns one cache entry for recordIdx.
+func (cache *deltaBaseCache) get(recordIdx int) (objecttype.Type, []byte, bool) {+ value, ok := cache.lru.Get(deltaBaseCacheKey{recordIdx: recordIdx})+ if !ok {+ return objecttype.TypeInvalid, nil, false
+ }
+
+ return value.realType, value.content, true
+}
+
+// add stores one cache entry for recordIdx.
+func (cache *deltaBaseCache) add(recordIdx int, realType objecttype.Type, content []byte) {+ cache.lru.Add(deltaBaseCacheKey{recordIdx: recordIdx}, deltaBaseCacheValue{+ realType: realType,
+ content: content,
+ })
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/counting_writer.go
@@ -1,0 +1,17 @@
+package ingest
+
+import "io"
+
+// countingWriter counts bytes written to dst.
+type countingWriter struct {+ dst io.Writer
+ n int
+}
+
+// Write writes src to dst and tracks output byte count.
+func (writer *countingWriter) Write(src []byte) (int, error) {+ n, err := writer.dst.Write(src)
+ writer.n += n
+
+ return n, err
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/crc.go
@@ -1,0 +1,22 @@
+package ingest
+
+import "fmt"
+
+// beginEntryCRC starts inline CRC accumulation for one packed entry.
+func (scanner *streamScanner) beginEntryCRC() {+ scanner.entryCRC = 0
+ scanner.inEntryCRC = true
+}
+
+// endEntryCRC finishes inline CRC accumulation for one packed entry.
+func (scanner *streamScanner) endEntryCRC() (uint32, error) {+ if !scanner.inEntryCRC {+ return 0, fmt.Errorf("packfile/ingest: entry CRC not started")+ }
+
+ crc := scanner.entryCRC
+ scanner.entryCRC = 0
+ scanner.inEntryCRC = false
+
+ return crc, nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/delta_header.go
@@ -1,0 +1,11 @@
+package ingest
+
+import deltaapply "codeberg.org/lindenii/furgit/format/packfile/delta/apply"
+
+// finalizeStreamPackHash consumes trailer bytes and verifies stream integrity.
+// readDeltaHeaderSizes reads source and destination sizes from one delta payload.
+func readDeltaHeaderSizes(payload []byte) (int, int, error) {+ reader := &byteSliceReader{data: payload}+
+ return deltaapply.ReadHeaderSizes(reader)
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/distance.go
@@ -1,0 +1,30 @@
+package ingest
+
+import (
+ "fmt"
+ "io"
+)
+
+// readOfsDistanceFromStream reads one ofs-delta encoded distance.
+func readOfsDistanceFromStream(reader io.ByteReader) (uint64, int, error) {+ first, err := reader.ReadByte()
+ if err != nil {+ return 0, 0, fmt.Errorf("read ofs distance first byte: %w", err)+ }
+
+ dist := uint64(first & 0x7f)
+ consumed := 1
+
+ b := first
+ for b&0x80 != 0 {+ b, err = reader.ReadByte()
+ if err != nil {+ return 0, 0, fmt.Errorf("read ofs distance continuation: %w", err)+ }
+
+ consumed++
+ dist = ((dist + 1) << 7) + uint64(b&0x7f)
+ }
+
+ return dist, consumed, nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/doc.go
@@ -1,0 +1,3 @@
+// Package ingest implements streaming ingestion of one Git pack stream into a
+// packed destination root, producing .pack/.idx and optionally .rev.
+package ingest
--- /dev/null
+++ b/object/store/packed/internal/ingest/drain.go
@@ -1,0 +1,67 @@
+package ingest
+
+import (
+ "fmt"
+ "io"
+
+ "codeberg.org/lindenii/furgit/internal/compress/zlib"
+ objectheader "codeberg.org/lindenii/furgit/object/header"
+ objectid "codeberg.org/lindenii/furgit/object/id"
+ objecttype "codeberg.org/lindenii/furgit/object/type"
+)
+
+// drainEntryPayload inflates one entry payload from stream and returns
+// (inflatedLength, oidForBaseEntry).
+func drainEntryPayload(state *ingestState, record objectRecord) (int64, objectid.ObjectID, error) {+ var zero objectid.ObjectID
+
+ reader, err := zlib.NewReader(state.stream)
+ if err != nil {+ return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("open zlib stream: %v", err)}+ }
+
+ defer func() { _ = reader.Close() }()+
+ var total int64
+
+ if record.packedType.IsBaseObject() {+ header, ok := objectheader.Encode(record.packedType, record.declaredSize)
+ if !ok {+ return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: "encode object header"}+ }
+
+ hashImpl, err := state.algo.New()
+ if err != nil {+ return 0, zero, err
+ }
+
+ _, _ = hashImpl.Write(header)
+
+ n, err := io.Copy(hashImpl, reader)
+ if err != nil {+ return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("inflate base object: %v", err)}+ }
+
+ total = n
+
+ oid, err := objectid.FromBytes(state.algo, hashImpl.Sum(nil))
+ if err != nil {+ return 0, zero, err
+ }
+
+ return total, oid, nil
+ }
+
+ if record.packedType == objecttype.TypeOfsDelta || record.packedType == objecttype.TypeRefDelta {+ n, err := io.Copy(io.Discard, reader)
+ if err != nil {+ return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("inflate delta payload: %v", err)}+ }
+
+ total = n
+
+ return total, zero, nil
+ }
+
+ return 0, zero, &MalformedPackEntryError{Offset: record.offset, Reason: "unsupported payload type"}+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/entry.go
@@ -1,0 +1,91 @@
+package ingest
+
+import (
+ "fmt"
+
+ objecttype "codeberg.org/lindenii/furgit/object/type"
+)
+
+// scanOneEntry scans one pack entry from stream and appends one record.
+func scanOneEntry(state *ingestState, startOffset uint64) (uint64, error) {+ state.stream.beginEntryCRC()
+
+ record, err := parseEntryPrefix(state, startOffset)
+ if err != nil {+ return 0, err
+ }
+
+ payloadStartConsumed := state.stream.consumed
+
+ contentLen, oid, err := drainEntryPayload(state, record)
+ if err != nil {+ return 0, err
+ }
+
+ consumedInput := state.stream.consumed - payloadStartConsumed
+
+ if contentLen != record.declaredSize {+ return 0, &MalformedPackEntryError{+ Offset: startOffset,
+ Reason: fmt.Sprintf("inflated size mismatch got %d want %d", contentLen, record.declaredSize),+ }
+ }
+
+ endOffset := startOffset + uint64(record.headerLen) + consumedInput
+ if endOffset > state.stream.consumed {+ return 0, &MalformedPackEntryError{+ Offset: startOffset,
+ Reason: fmt.Sprintf("entry end offset overflow got %d > stream %d", endOffset, state.stream.consumed),+ }
+ }
+
+ record.packedLen = endOffset - startOffset
+
+ record.dataOffset = startOffset + uint64(record.headerLen)
+ if record.packedLen < uint64(record.headerLen) {+ return 0, &MalformedPackEntryError{Offset: startOffset, Reason: "negative payload span"}+ }
+
+ crc, err := state.stream.endEntryCRC()
+ if err != nil {+ return 0, err
+ }
+
+ record.crc32 = crc
+
+ if record.packedType.IsBaseObject() {+ record.objectID = oid
+ record.realType = record.packedType
+ record.resolved = true
+ }
+
+ recordIdx := len(state.records)
+ state.records = append(state.records, record)
+
+ state.offsetToRecord[record.offset] = recordIdx
+ if record.resolved {+ state.objectToRecord[record.objectID] = recordIdx
+ }
+
+ switch record.packedType {+ case objecttype.TypeOfsDelta:
+ state.ofsDeltas = append(state.ofsDeltas, ofsDeltaRef{+ baseOffset: record.baseOffset,
+ recordIdx: recordIdx,
+ })
+ case objecttype.TypeRefDelta:
+ state.refDeltas = append(state.refDeltas, refDeltaRef{+ baseObject: record.baseObject,
+ recordIdx: recordIdx,
+ })
+ case objecttype.TypeInvalid,
+ objecttype.TypeCommit,
+ objecttype.TypeTree,
+ objecttype.TypeBlob,
+ objecttype.TypeTag,
+ objecttype.TypeFuture:
+ default:
+ }
+
+ return endOffset, nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/entry_header.go
@@ -1,0 +1,33 @@
+package ingest
+
+import (
+ "codeberg.org/lindenii/furgit/internal/intconv"
+ objecttype "codeberg.org/lindenii/furgit/object/type"
+)
+
+// encodePackEntryHeader encodes one non-delta packed entry header.
+func encodePackEntryHeader(ty objecttype.Type, size int64) []byte {+ var out [16]byte
+
+ n := 0
+
+ s, err := intconv.Int64ToUint64(size)
+ if err != nil {+ panic(err)
+ }
+
+ c := (uint8(ty) << 4) | byte(s&0x0f)
+
+ s >>= 4
+ for s != 0 {+ out[n] = c | 0x80
+ n++
+ c = byte(s & 0x7f)
+ s >>= 7
+ }
+
+ out[n] = c
+ n++
+
+ return append([]byte(nil), out[:n]...)
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/entry_prefix.go
@@ -1,0 +1,95 @@
+package ingest
+
+import (
+ "fmt"
+
+ "codeberg.org/lindenii/furgit/internal/intconv"
+ objectid "codeberg.org/lindenii/furgit/object/id"
+ objecttype "codeberg.org/lindenii/furgit/object/type"
+)
+
+// parseEntryPrefix parses one entry prefix from stream.
+func parseEntryPrefix(state *ingestState, startOffset uint64) (objectRecord, error) {+ var record objectRecord
+
+ record.offset = startOffset
+
+ first, err := state.stream.ReadByte()
+ if err != nil {+ return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("read first header byte: %v", err)}+ }
+
+ record.packedType = objecttype.Type((first >> 4) & 0x07)
+ size := int64(first & 0x0f)
+ headerLen := uint32(1)
+ shift := uint(4)
+ b := first
+
+ for b&0x80 != 0 {+ b, err = state.stream.ReadByte()
+ if err != nil {+ return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("read size continuation: %v", err)}+ }
+
+ headerLen++
+ size |= int64(b&0x7f) << shift
+ shift += 7
+ }
+
+ if size < 0 {+ return record, &MalformedPackEntryError{Offset: startOffset, Reason: "negative declared size"}+ }
+
+ record.declaredSize = size
+
+ switch record.packedType {+ case objecttype.TypeCommit, objecttype.TypeTree, objecttype.TypeBlob, objecttype.TypeTag:
+ case objecttype.TypeRefDelta:
+ baseRaw := make([]byte, state.algo.Size())
+
+ err := state.stream.readFull(baseRaw)
+ if err != nil {+ return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("read ref base: %v", err)}+ }
+
+ baseID, err := objectid.FromBytes(state.algo, baseRaw)
+ if err != nil {+ return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("parse ref base: %v", err)}+ }
+
+ record.baseObject = baseID
+
+ baseRawLen, err := intconv.IntToUint32(len(baseRaw))
+ if err != nil {+ return record, err
+ }
+
+ headerLen += baseRawLen
+ case objecttype.TypeOfsDelta:
+ dist, consumed, err := readOfsDistanceFromStream(state.stream)
+ if err != nil {+ return record, &MalformedPackEntryError{Offset: startOffset, Reason: err.Error()}+ }
+
+ if startOffset <= dist {+ return record, &MalformedPackEntryError{Offset: startOffset, Reason: "ofs base offset out of bounds"}+ }
+
+ record.baseOffset = startOffset - dist
+
+ consumedUint32, err := intconv.IntToUint32(consumed)
+ if err != nil {+ return record, err
+ }
+
+ headerLen += consumedUint32
+ case objecttype.TypeInvalid, objecttype.TypeFuture:
+ return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)}+ default:
+ return record, &MalformedPackEntryError{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)}+ }
+
+ record.headerLen = headerLen
+
+ return record, nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/errors.go
@@ -1,0 +1,68 @@
+package ingest
+
+import (
+ "errors"
+ "fmt"
+)
+
+// InvalidPackHeaderError reports an invalid or unsupported pack header.
+type InvalidPackHeaderError struct {+ Reason string
+}
+
+// Error implements error.
+func (err *InvalidPackHeaderError) Error() string {+ return "packfile/ingest: invalid pack header: " + err.Reason
+}
+
+// PackTrailerMismatchError reports a mismatch between computed and trailer pack hash.
+type PackTrailerMismatchError struct{}+
+// Error implements error.
+func (err *PackTrailerMismatchError) Error() string {+ return "packfile/ingest: pack trailer hash mismatch"
+}
+
+// ThinPackUnresolvedError reports unresolved REF deltas when fixThin is disabled
+// or when required bases cannot be found in base.
+type ThinPackUnresolvedError struct {+ Count int
+}
+
+// Error implements error.
+func (err *ThinPackUnresolvedError) Error() string {+ return fmt.Sprintf("packfile/ingest: unresolved thin deltas: %d", err.Count)+}
+
+// MalformedPackEntryError reports malformed entry encoding at one pack offset.
+type MalformedPackEntryError struct {+ Offset uint64
+ Reason string
+}
+
+// Error implements error.
+func (err *MalformedPackEntryError) Error() string {+ return fmt.Sprintf("packfile/ingest: malformed pack entry at offset %d: %s", err.Offset, err.Reason)+}
+
+// DeltaCycleError reports a detected cycle in delta dependency resolution.
+type DeltaCycleError struct {+ Offset uint64
+}
+
+// Error implements error.
+func (err *DeltaCycleError) Error() string {+ return fmt.Sprintf("packfile/ingest: delta cycle detected at offset %d", err.Offset)+}
+
+// DestinationWriteError reports destination I/O failures.
+type DestinationWriteError struct {+ Op string
+}
+
+// Error implements error.
+func (err *DestinationWriteError) Error() string {+ return "packfile/ingest: destination write failure: " + err.Op
+}
+
+var errExternalThinBase = errors.New("packfile/ingest: external thin base required")--- /dev/null
+++ b/object/store/packed/internal/ingest/file_section_writer.go
@@ -1,0 +1,22 @@
+package ingest
+
+import "os"
+
+// fileSectionWriter writes sequentially to file via WriteAt at one base offset.
+type fileSectionWriter struct {+ file *os.File
+ off int64
+ pos int64
+}
+
+// Write writes src at current section position.
+func (writer *fileSectionWriter) Write(src []byte) (int, error) {+ if len(src) == 0 {+ return 0, nil
+ }
+
+ n, err := writer.file.WriteAt(src, writer.off+writer.pos)
+ writer.pos += int64(n)
+
+ return n, err
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/fill.go
@@ -1,0 +1,44 @@
+package ingest
+
+import (
+ "errors"
+ "fmt"
+ "io"
+)
+
+// fill ensures at least min unread bytes are available in receiver's buffer.
+func (scanner *streamScanner) fill(minLen int) error {+ if minLen <= 0 {+ return nil
+ }
+
+ if minLen > len(scanner.buf) {+ return fmt.Errorf("packfile/ingest: fill(%d) exceeds scanner buffer", minLen)+ }
+
+ for scanner.n-scanner.off < minLen {+ err := scanner.flushConsumedPrefix()
+ if err != nil {+ return err
+ }
+
+ readN, err := scanner.src.Read(scanner.buf[scanner.n:])
+ if readN > 0 {+ scanner.n += readN
+ }
+
+ if err != nil {+ if errors.Is(err, io.EOF) && scanner.n-scanner.off >= minLen {+ return nil
+ }
+
+ return err
+ }
+
+ if readN == 0 {+ return io.ErrNoProgress
+ }
+ }
+
+ return nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/finalize.go
@@ -1,0 +1,94 @@
+package ingest
+
+import (
+ "errors"
+ "fmt"
+ "io/fs"
+ "strings"
+
+ "codeberg.org/lindenii/furgit/internal/intconv"
+)
+
+// finalizeArtifacts links temporary files to final names and returns Result.
+func finalizeArtifacts(state *ingestState) (Result, error) {+ base := "pack-" + state.packHash.String()
+ packFinal := base + ".pack"
+ idxFinal := base + ".idx"
+
+ revFinal := ""
+ if state.opts.WriteRev {+ revFinal = base + ".rev"
+ }
+
+ err := linkTempToFinal(state, state.packTmpName, packFinal)
+ if err != nil {+ return Result{}, err+ }
+
+ err = linkTempToFinal(state, state.idxTmpName, idxFinal)
+ if err != nil {+ return Result{}, err+ }
+
+ if state.opts.WriteRev {+ err := linkTempToFinal(state, state.revTmpName, revFinal)
+ if err != nil {+ return Result{}, err+ }
+ }
+
+ objectCount, err := intconv.IntToUint32(len(state.records))
+ if err != nil {+ return Result{}, err+ }
+
+ return Result{+ PackName: packFinal,
+ IdxName: idxFinal,
+ RevName: revFinal,
+ PackHash: state.packHash,
+ ObjectCount: objectCount,
+ ThinFixed: state.thinFixed,
+ }, nil
+}
+
+// rollbackTemporaryArtifacts removes temporary files after failure.
+func rollbackTemporaryArtifacts(state *ingestState) {+ if state.packTmpName != "" {+ _ = state.destination.Remove(state.packTmpName)
+ }
+
+ if state.idxTmpName != "" {+ _ = state.destination.Remove(state.idxTmpName)
+ }
+
+ if state.revTmpName != "" {+ _ = state.destination.Remove(state.revTmpName)
+ }
+}
+
+// linkTempToFinal hard-links tmp to final, tolerating existing final paths.
+func linkTempToFinal(state *ingestState, tmp, final string) error {+ if tmp == "" || final == "" {+ return fmt.Errorf("packfile/ingest: invalid finalize names tmp=%q final=%q", tmp, final)+ }
+
+ if strings.Contains(final, "/") {+ return fmt.Errorf("packfile/ingest: final name must be leaf: %q", final)+ }
+
+ err := state.destination.Link(tmp, final)
+ if err == nil {+ _ = state.destination.Remove(tmp)
+
+ return nil
+ }
+
+ if errors.Is(err, fs.ErrExist) {+ _ = state.destination.Remove(tmp)
+
+ return nil
+ }
+
+ return err
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/flush.go
@@ -1,0 +1,37 @@
+package ingest
+
+import "fmt"
+
+// flush writes all consumed-but-unflushed bytes to destination pack file.
+func (scanner *streamScanner) flush() error {+ return scanner.flushConsumedPrefix()
+}
+
+// flushConsumedPrefix writes scanner.buf[:scanner.off] and compacts unread
+// bytes to the start of buffer.
+func (scanner *streamScanner) flushConsumedPrefix() error {+ if scanner.off == 0 {+ return nil
+ }
+
+ written := 0
+ for written < scanner.off {+ n, err := scanner.dstFile.Write(scanner.buf[written:scanner.off])
+ if err != nil {+ return &DestinationWriteError{Op: fmt.Sprintf("write pack: %v", err)}+ }
+
+ if n == 0 {+ return &DestinationWriteError{Op: "write pack: short write"}+ }
+
+ written += n
+ }
+
+ unread := scanner.n - scanner.off
+ copy(scanner.buf[:unread], scanner.buf[scanner.off:scanner.n])
+ scanner.off = 0
+ scanner.n = unread
+
+ return nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/hash.go
@@ -1,0 +1,27 @@
+package ingest
+
+import (
+ "fmt"
+
+ objectheader "codeberg.org/lindenii/furgit/object/header"
+ objectid "codeberg.org/lindenii/furgit/object/id"
+ objecttype "codeberg.org/lindenii/furgit/object/type"
+)
+
+// hashCanonicalObject hashes canonical object bytes (header+content).
+func hashCanonicalObject(algo objectid.Algorithm, ty objecttype.Type, content []byte) (objectid.ObjectID, error) {+ header, ok := objectheader.Encode(ty, int64(len(content)))
+ if !ok {+ return objectid.ObjectID{}, fmt.Errorf("packfile/ingest: encode object header for type %d", ty)+ }
+
+ hashImpl, err := algo.New()
+ if err != nil {+ return objectid.ObjectID{}, err+ }
+
+ _, _ = hashImpl.Write(header)
+ _, _ = hashImpl.Write(content)
+
+ return objectid.FromBytes(algo, hashImpl.Sum(nil))
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/header.go
@@ -1,0 +1,54 @@
+package ingest
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+
+ "codeberg.org/lindenii/furgit/format/packfile"
+)
+
+const packHeaderSize = 12
+
+type packHeader struct {+ Version uint32
+ ObjectCount uint32
+}
+
+// readAndValidatePackHeader reads one PACK header from src and validates it.
+func readAndValidatePackHeader(src io.Reader) (packHeader, [packHeaderSize]byte, error) {+ var hdr [packHeaderSize]byte
+
+ _, err := io.ReadFull(src, hdr[:])
+ if err != nil {+ return packHeader{}, [packHeaderSize]byte{}, &InvalidPackHeaderError{+ Reason: fmt.Sprintf("read header: %v", err),+ }
+ }
+
+ header, err := parseAndValidatePackHeader(hdr)
+ if err != nil {+ return packHeader{}, [packHeaderSize]byte{}, err+ }
+
+ return header, hdr, nil
+}
+
+// parseAndValidatePackHeader validates one already-read PACK header.
+func parseAndValidatePackHeader(hdr [packHeaderSize]byte) (packHeader, error) {+ if binary.BigEndian.Uint32(hdr[:4]) != packfile.Signature {+ return packHeader{}, &InvalidPackHeaderError{Reason: "signature mismatch"}+ }
+
+ version := binary.BigEndian.Uint32(hdr[4:8])
+ if !packfile.SupportedVersion(version) {+ return packHeader{}, &InvalidPackHeaderError{+ Reason: fmt.Sprintf("unsupported version %d", version),+ }
+ }
+
+ return packHeader{+ Version: version,
+ ObjectCount: binary.BigEndian.Uint32(hdr[8:12]),
+ }, nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/idx_write.go
@@ -1,0 +1,262 @@
+package ingest
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "hash"
+ "io"
+ "slices"
+
+ "codeberg.org/lindenii/furgit/internal/intconv"
+ "codeberg.org/lindenii/furgit/internal/progress"
+)
+
+const (
+ idxMagicV2 = 0xff744f63
+ idxVersionV2 = 2
+)
+
+// writeIdx writes idx v2 for resolved records.
+func writeIdx(state *ingestState) error {+ order := buildIdxOrder(state)
+
+ hashImpl, err := state.algo.New()
+ if err != nil {+ return err
+ }
+
+ write := func(src []byte) error {+ _, writeErr := state.idxFile.Write(src)
+ if writeErr != nil {+ return writeErr
+ }
+
+ _, writeErr = hashImpl.Write(src)
+ if writeErr != nil {+ return writeErr
+ }
+
+ return nil
+ }
+
+ var (
+ scratch [8]byte
+ fanout [256]uint32
+ )
+
+ writeProgressf(state, "writing index fanout...\r")
+
+ for _, recordIdx := range order {+ idRaw := state.records[recordIdx].objectID.Bytes()
+ fanout[idRaw[0]]++
+ }
+
+ binary.BigEndian.PutUint32(scratch[:4], idxMagicV2)
+ binary.BigEndian.PutUint32(scratch[4:8], idxVersionV2)
+
+ err = write(scratch[:8])
+ if err != nil {+ return err
+ }
+
+ var cumulative uint32
+ for i := range fanout {+ cumulative += fanout[i]
+ binary.BigEndian.PutUint32(scratch[:4], cumulative)
+
+ err := write(scratch[:4])
+ if err != nil {+ return err
+ }
+ }
+
+ writeProgressf(state, "writing index fanout: done.\n")
+
+ largeOffsetCount := 0
+
+ for idx := range state.records {+ if state.records[idx].offset >= 0x80000000 {+ largeOffsetCount++
+ }
+ }
+
+ oidMeter := progress.New(progress.Options{+ Writer: state.opts.Progress,
+ Title: "writing index object ids",
+ Total: uint64(len(order)),
+ })
+
+ var oidDone uint64
+
+ for _, recordIdx := range order {+ idRaw := state.records[recordIdx].objectID.Bytes()
+
+ err := write(idRaw)
+ if err != nil {+ return err
+ }
+
+ oidDone++
+ oidMeter.Set(oidDone, 0)
+ }
+
+ if oidDone > 0 {+ oidMeter.Stop("done")+ }
+
+ crcMeter := progress.New(progress.Options{+ Writer: state.opts.Progress,
+ Title: "writing index crc32",
+ Total: uint64(len(order)),
+ })
+
+ var crcDone uint64
+
+ for _, recordIdx := range order {+ binary.BigEndian.PutUint32(scratch[:4], state.records[recordIdx].crc32)
+
+ err := write(scratch[:4])
+ if err != nil {+ return err
+ }
+
+ crcDone++
+ crcMeter.Set(crcDone, 0)
+ }
+
+ if crcDone > 0 {+ crcMeter.Stop("done")+ }
+
+ largeOffsets := make([]uint64, 0)
+ offsetMeter := progress.New(progress.Options{+ Writer: state.opts.Progress,
+ Title: "writing index offsets",
+ Total: uint64(len(order)),
+ })
+
+ var offsetDone uint64
+
+ for _, recordIdx := range order {+ offset := state.records[recordIdx].offset
+ if offset >= 0x80000000 {+ largeOffsetIdx, err := intconv.IntToUint32(len(largeOffsets))
+ if err != nil {+ return err
+ }
+
+ word := 0x80000000 | largeOffsetIdx
+
+ largeOffsets = append(largeOffsets, offset)
+
+ binary.BigEndian.PutUint32(scratch[:4], word)
+ } else {+ binary.BigEndian.PutUint32(scratch[:4], uint32(offset))
+ }
+
+ err := write(scratch[:4])
+ if err != nil {+ return err
+ }
+
+ offsetDone++
+ offsetMeter.Set(offsetDone, 0)
+ }
+
+ if offsetDone > 0 {+ offsetMeter.Stop("done")+ }
+
+ total, err := intconv.IntToUint64(largeOffsetCount)
+ if err != nil {+ return err
+ }
+
+ largeOffsetMeter := progress.New(progress.Options{+ Writer: state.opts.Progress,
+ Title: "writing index large offsets",
+ Total: total,
+ })
+
+ var largeOffsetDone uint64
+
+ for _, off := range largeOffsets {+ binary.BigEndian.PutUint64(scratch[:8], off)
+
+ err := write(scratch[:8])
+ if err != nil {+ return err
+ }
+
+ largeOffsetDone++
+ largeOffsetMeter.Set(largeOffsetDone, 0)
+ }
+
+ if largeOffsetDone > 0 {+ largeOffsetMeter.Stop("done")+ }
+
+ writeProgressf(state, "writing index trailer...\r")
+
+ err = write(state.packHash.Bytes())
+ if err != nil {+ return err
+ }
+
+ idxHash := hashImpl.Sum(nil)
+
+ _, err = state.idxFile.Write(idxHash)
+ if err != nil {+ return err
+ }
+
+ err = state.idxFile.Sync()
+ if err != nil {+ return err
+ }
+
+ writeProgressf(state, "writing index trailer: done.\n")
+
+ return nil
+}
+
+// buildIdxOrder returns record indexes sorted by ObjectID.
+func buildIdxOrder(state *ingestState) []int {+ out := make([]int, 0, len(state.records))
+ for idx := range state.records {+ out = append(out, idx)
+ }
+
+ slices.SortFunc(out, func(a, b int) int {+ return bytes.Compare(state.records[a].objectID.Bytes(), state.records[b].objectID.Bytes())
+ })
+
+ return out
+}
+
+// verifyResolvedRecords checks that all records are fully resolved before index writing.
+func verifyResolvedRecords(state *ingestState) error {+ for idx, record := range state.records {+ if !record.resolved {+ return fmt.Errorf("packfile/ingest: unresolved record %d at offset %d", idx, record.offset)+ }
+ }
+
+ return nil
+}
+
+// writeAndHash writes src to dst and updates hash.
+func writeAndHash(dst io.Writer, hashImpl hash.Hash, src []byte) error {+ _, err := dst.Write(src)
+ if err != nil {+ return err
+ }
+
+ _, err = hashImpl.Write(src)
+ if err != nil {+ return err
+ }
+
+ return nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/ingest.go
@@ -1,0 +1,68 @@
+package ingest
+
+import (
+ "fmt"
+)
+
+// ingest initializes transaction state and executes the ingest pipeline.
+func ingest(state *ingestState) (out Result, err error) {+ err = openTemporaryArtifacts(state)
+ if err != nil {+ return Result{}, err+ }
+
+ defer func() {+ _ = closeTemporaryArtifacts(state)
+ if err != nil {+ rollbackTemporaryArtifacts(state)
+ }
+ }()
+
+ err = streamPackAndScan(state)
+ if err != nil {+ return Result{}, err+ }
+
+ err = resolveAll(state)
+ if err != nil {+ return Result{}, err+ }
+
+ err = maybeFixThin(state)
+ if err != nil {+ return Result{}, err+ }
+
+ if state.thinFixed {+ err = resolveAll(state)
+ if err != nil {+ return Result{}, err+ }
+ }
+
+ if len(state.unresolvedRefDeltas) > 0 {+ return Result{}, &ThinPackUnresolvedError{Count: len(state.unresolvedRefDeltas)}+ }
+
+ err = verifyResolvedRecords(state)
+ if err != nil {+ return Result{}, err+ }
+
+ err = state.packFile.Sync()
+ if err != nil {+ return Result{}, &DestinationWriteError{Op: fmt.Sprintf("sync pack: %v", err)}+ }
+
+ err = writeIdx(state)
+ if err != nil {+ return Result{}, err+ }
+
+ err = writeRev(state)
+ if err != nil {+ return Result{}, err+ }
+
+ return finalizeArtifacts(state)
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/ingest_test.go
@@ -1,0 +1,421 @@
+package ingest_test
+
+import (
+ "bytes"
+ "encoding/binary"
+ "errors"
+ "io"
+ "io/fs"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+
+ "codeberg.org/lindenii/furgit/internal/testgit"
+ objectid "codeberg.org/lindenii/furgit/object/id"
+ "codeberg.org/lindenii/furgit/object/store/packed/internal/ingest"
+)
+
+type noExtraReadReader struct {+ reader *bytes.Reader
+}
+
+func (r *noExtraReadReader) Read(p []byte) (int, error) {+ if r.reader.Len() == 0 {+ return 0, errors.New("unexpected extra read after pack trailer")+ }
+
+ return r.reader.Read(p)
+}
+
+func writePack(
+ src io.Reader,
+ packRoot *os.Root,
+ algo objectid.Algorithm,
+ opts ingest.Options,
+) (ingest.Result, error) {+ return ingest.WritePack(packRoot, algo, src, opts)
+}
+
+// fixturePath returns one fixture file path for the selected algorithm.
+func fixturePath(t *testing.T, algo objectid.Algorithm, name string) string {+ t.Helper()
+
+ dir := algo.String()
+ if dir == "" {+ t.Fatalf("unsupported fixture algorithm: %v", algo)+ }
+
+ return filepath.Join("testdata", "fixtures", dir, name)+}
+
+// fixtureBytes reads one fixture file fully.
+func fixtureBytes(t *testing.T, algo objectid.Algorithm, name string) []byte {+ t.Helper()
+
+ path := fixturePath(t, algo, name)
+ dir := filepath.Dir(path)
+ base := filepath.Base(path)
+
+ root, err := os.OpenRoot(dir)
+ if err != nil {+ t.Fatalf("open fixture root %q: %v", dir, err)+ }
+
+ defer func() {+ err := root.Close()
+ if err != nil {+ t.Fatalf("close fixture root %q: %v", dir, err)+ }
+ }()
+
+ data, err := root.ReadFile(base)
+ if err != nil {+ t.Fatalf("read fixture %q: %v", base, err)+ }
+
+ return data
+}
+
+// fixtureMetadata parses key=value metadata for one algorithm fixture set.
+func fixtureMetadata(t *testing.T, algo objectid.Algorithm) map[string]string {+ t.Helper()
+
+ data := fixtureBytes(t, algo, "METADATA.txt")
+
+ out := make(map[string]string)
+ for line := range strings.SplitSeq(strings.TrimSpace(string(data)), "\n") {+ line = strings.TrimSpace(line)
+ if line == "" {+ continue
+ }
+
+ key, value, ok := strings.Cut(line, "=")
+ if !ok {+ t.Fatalf("invalid fixture metadata line %q", line)+ }
+
+ out[strings.TrimSpace(key)] = strings.TrimSpace(value)
+ }
+
+ return out
+}
+
+// fixtureOID returns one fixture metadata object ID value.
+func fixtureOID(t *testing.T, algo objectid.Algorithm, key string) objectid.ObjectID {+ t.Helper()
+
+ meta := fixtureMetadata(t, algo)
+
+ hex, ok := meta[key]
+ if !ok {+ t.Fatalf("missing fixture metadata key %q", key)+ }
+
+ id, err := objectid.ParseHex(algo, hex)
+ if err != nil {+ t.Fatalf("parse fixture metadata oid %q: %v", hex, err)+ }
+
+ return id
+}
+
+// verifyReindexOracle regenerates idx/rev with upstream git index-pack and
+// compares bytes with files produced by ingest.
+func verifyReindexOracle(t *testing.T, repo *testgit.TestRepo, packName, idxName, revName string) {+ t.Helper()
+
+ oracleDir := t.TempDir()
+ oracleIdxPath := filepath.Join(oracleDir, "oracle.idx")
+ _ = repo.Run(t, "index-pack", "--rev-index", "-o", oracleIdxPath, filepath.Join("objects", "pack", packName))+ oracleRevPath := strings.TrimSuffix(oracleIdxPath, ".idx") + ".rev"
+
+ packRoot := repo.OpenPackRoot(t)
+
+ gotIdx, err := packRoot.ReadFile(idxName)
+ if err != nil {+ t.Fatalf("read idx: %v", err)+ }
+
+ oracleRoot, err := os.OpenRoot(oracleDir)
+ if err != nil {+ t.Fatalf("open oracle root: %v", err)+ }
+
+ defer func() {+ err := oracleRoot.Close()
+ if err != nil {+ t.Fatalf("close oracle root: %v", err)+ }
+ }()
+
+ wantIdx, err := oracleRoot.ReadFile(filepath.Base(oracleIdxPath))
+ if err != nil {+ t.Fatalf("read oracle idx: %v", err)+ }
+
+ if !bytes.Equal(gotIdx, wantIdx) {+ t.Fatal("idx bytes differ from git index-pack output")+ }
+
+ gotRev, err := packRoot.ReadFile(revName)
+ if err != nil {+ t.Fatalf("read rev: %v", err)+ }
+
+ wantRev, err := oracleRoot.ReadFile(filepath.Base(oracleRevPath))
+ if err != nil {+ t.Fatalf("read oracle rev: %v", err)+ }
+
+ if !bytes.Equal(gotRev, wantRev) {+ t.Fatal("rev bytes differ from git index-pack output")+ }
+}
+
+func TestIngestNonThinPackWritesPackIdxRev(t *testing.T) {+ t.Parallel()
+
+ testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper+ head := fixtureOID(t, algo, "head")
+ packBytes := fixtureBytes(t, algo, "nonthin.pack")
+
+ receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})+
+ packRoot := receiver.OpenPackRoot(t)
+
+ result, err := writePack(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{+ WriteRev: true,
+ RequireTrailingEOF: true,
+ })
+ if err != nil {+ t.Fatalf("Ingest: %v", err)+ }
+
+ if result.ThinFixed {+ t.Fatalf("ThinFixed = true, want false")+ }
+
+ if result.RevName == "" {+ t.Fatal("RevName is empty")+ }
+
+ _, err = packRoot.Stat(result.PackName)
+ if err != nil {+ t.Fatalf("stat pack: %v", err)+ }
+
+ _, err = packRoot.Stat(result.IdxName)
+ if err != nil {+ t.Fatalf("stat idx: %v", err)+ }
+
+ _, err = packRoot.Stat(result.RevName)
+ if err != nil {+ t.Fatalf("stat rev: %v", err)+ }
+
+ _ = receiver.Run(t, "verify-pack", "-v", filepath.Join("objects", "pack", result.IdxName))+ verifyReindexOracle(t, receiver, result.PackName, result.IdxName, result.RevName)
+
+ receiver.UpdateRef(t, "refs/heads/main", head)
+ _ = receiver.Run(t, "fsck", "--full", "--strict", "--no-progress", "--no-dangling")
+ })
+}
+
+func TestIngestThinPackWithoutFixReturnsUnresolved(t *testing.T) {+ t.Parallel()
+
+ testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper+ thinPack := fixtureBytes(t, algo, "thin.pack")
+
+ receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})+ packRoot := receiver.OpenPackRoot(t)
+
+ _, err := writePack(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{+ WriteRev: true,
+ RequireTrailingEOF: true,
+ })
+ if err == nil {+ t.Fatal("Ingest error = nil, want error")+ }
+
+ if _, ok := errors.AsType[*ingest.ThinPackUnresolvedError](err); !ok {+ t.Fatalf("Ingest error type = %T (%v), want *ThinPackUnresolvedError", err, err)+ }
+
+ entries, err := fs.ReadDir(packRoot.FS(), ".")
+ if err != nil {+ t.Fatalf("ReadDir(pack): %v", err)+ }
+
+ for _, entry := range entries {+ if strings.HasSuffix(entry.Name(), ".pack") {+ t.Fatalf("found finalized pack file after failure: %v", entry.Name())+ }
+ }
+ })
+}
+
+func TestIngestThinPackWithFixThin(t *testing.T) {+ t.Parallel()
+
+ testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper+ head := fixtureOID(t, algo, "head")
+ basePack := fixtureBytes(t, algo, "base.pack")
+ thinPack := fixtureBytes(t, algo, "thin.pack")
+ receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})+
+ packRoot := receiver.OpenPackRoot(t)
+
+ _, err := writePack(bytes.NewReader(basePack), packRoot, algo, ingest.Options{+ RequireTrailingEOF: true,
+ })
+ if err != nil {+ t.Fatalf("ingest base pack: %v", err)+ }
+
+ receiverRepo := receiver.OpenRepository(t)
+
+ result, err := writePack(bytes.NewReader(thinPack), packRoot, algo, ingest.Options{+ FixThin: true,
+ WriteRev: true,
+ Base: receiverRepo.Objects(),
+ RequireTrailingEOF: true,
+ })
+ if err != nil {+ t.Fatalf("Ingest(thin): %v", err)+ }
+
+ if !result.ThinFixed {+ t.Fatal("ThinFixed = false, want true")+ }
+
+ _ = receiver.Run(t, "verify-pack", "-v", filepath.Join("objects", "pack", result.IdxName))+ verifyReindexOracle(t, receiver, result.PackName, result.IdxName, result.RevName)
+ receiver.UpdateRef(t, "refs/heads/main", head)
+ _ = receiver.Run(t, "fsck", "--full", "--strict", "--no-progress", "--no-dangling")
+ })
+}
+
+func TestIngestPackTrailerMismatch(t *testing.T) {+ t.Parallel()
+
+ testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper+ packBytes := fixtureBytes(t, algo, "nonthin.pack")
+ if len(packBytes) == 0 {+ t.Fatal("empty pack stream")+ }
+
+ packBytes[len(packBytes)-1] ^= 0xff
+
+ receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})+ packRoot := receiver.OpenPackRoot(t)
+
+ _, err := writePack(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{+ WriteRev: true,
+ RequireTrailingEOF: true,
+ })
+ if err == nil {+ t.Fatal("Ingest error = nil, want error")+ }
+
+ if _, ok := errors.AsType[*ingest.PackTrailerMismatchError](err); !ok {+ t.Fatalf("Ingest error type = %T (%v), want *PackTrailerMismatchError", err, err)+ }
+
+ entries, err := fs.ReadDir(packRoot.FS(), ".")
+ if err != nil {+ t.Fatalf("ReadDir(pack): %v", err)+ }
+
+ for _, entry := range entries {+ if strings.HasSuffix(entry.Name(), ".pack") {+ t.Fatalf("found finalized pack file after failure: %v", entry.Name())+ }
+ }
+ })
+}
+
+func zeroObjectPackBytes(t *testing.T, algo objectid.Algorithm) []byte {+ t.Helper()
+
+ hashImpl, err := algo.New()
+ if err != nil {+ t.Fatalf("algo.New: %v", err)+ }
+
+ var header [12]byte
+ copy(header[:4], []byte{'P', 'A', 'C', 'K'})+ binary.BigEndian.PutUint32(header[4:8], 2)
+ binary.BigEndian.PutUint32(header[8:12], 0)
+
+ _, _ = hashImpl.Write(header[:])
+
+ return append(header[:], hashImpl.Sum(nil)...)
+}
+
+func TestIngestZeroObjectPackIsDiscardedInternally(t *testing.T) {+ t.Parallel()
+
+ testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper+ packBytes := zeroObjectPackBytes(t, algo)
+ receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})+ packRoot := receiver.OpenPackRoot(t)
+
+ result, err := writePack(bytes.NewReader(packBytes), packRoot, algo, ingest.Options{+ RequireTrailingEOF: true,
+ })
+ if err != nil {+ t.Fatalf("WritePack: %v", err)+ }
+
+ if result.ObjectCount != 0 {+ t.Fatalf("ObjectCount = %d, want 0", result.ObjectCount)+ }
+
+ if result.PackName != "" {+ t.Fatalf("PackName = %q, want empty", result.PackName)+ }
+
+ if result.IdxName != "" {+ t.Fatalf("IdxName = %q, want empty", result.IdxName)+ }
+
+ if result.RevName != "" {+ t.Fatalf("RevName = %q, want empty", result.RevName)+ }
+
+ entries, err := fs.ReadDir(packRoot.FS(), ".")
+ if err != nil {+ t.Fatalf("ReadDir(pack): %v", err)+ }
+
+ if len(entries) != 0 {+ t.Fatalf("unexpected files after zero-object pack: %d", len(entries))+ }
+ })
+}
+
+func TestIngestCanFinishWithoutTrailingEOF(t *testing.T) {+ t.Parallel()
+
+ testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) { //nolint:thelper+ head := fixtureOID(t, algo, "head")
+ packBytes := fixtureBytes(t, algo, "nonthin.pack")
+
+ receiver := testgit.NewRepo(t, testgit.RepoOptions{ObjectFormat: algo, Bare: true})+ packRoot := receiver.OpenPackRoot(t)
+
+ result, err := writePack(&noExtraReadReader{reader: bytes.NewReader(packBytes)}, packRoot, algo, ingest.Options{+ WriteRev: true,
+ })
+ if err != nil {+ t.Fatalf("Ingest without trailing EOF: %v", err)+ }
+
+ receiver.UpdateRef(t, "refs/heads/main", head)
+ _ = receiver.Run(t, "verify-pack", "-v", filepath.Join("objects", "pack", result.IdxName))+ _ = receiver.Run(t, "fsck", "--full", "--strict", "--no-progress", "--no-dangling")
+ })
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/options.go
@@ -1,0 +1,26 @@
+package ingest
+
+import (
+ "codeberg.org/lindenii/furgit/common/iowrap"
+ objectstore "codeberg.org/lindenii/furgit/object/store"
+)
+
+// Options controls one pack ingest operation.
+type Options struct {+ // FixThin appends missing local bases for thin packs.
+ FixThin bool
+ // WriteRev writes a .rev alongside the .pack and .idx.
+ WriteRev bool
+ // Base supplies existing objects for thin-pack fixup.
+ Base objectstore.Reader
+ // Progress receives human-readable progress messages.
+ //
+ // When nil, no progress output is emitted.
+ Progress iowrap.WriteFlusher
+ // RequireTrailingEOF requires the source to hit EOF after the pack trailer.
+ //
+ // This is suitable for exact pack-file readers, but should be disabled for
+ // full-duplex transport streams like receive-pack where the peer keeps the
+ // connection open to read the server response.
+ RequireTrailingEOF bool
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/progress_write.go
@@ -1,0 +1,11 @@
+package ingest
+
+import "codeberg.org/lindenii/furgit/internal/utils"
+
+func writeProgressf(state *ingestState, format string, args ...any) {+ utils.BestEffortFprintf(state.opts.Progress, format, args...)
+
+ if state.opts.Progress != nil {+ _ = state.opts.Progress.Flush()
+ }
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/record_content.go
@@ -1,0 +1,29 @@
+package ingest
+
+import (
+ "fmt"
+
+ objecttype "codeberg.org/lindenii/furgit/object/type"
+)
+
+// readBaseRecordContent reads canonical base content for one non-delta record.
+func readBaseRecordContent(state *ingestState, idx int) (objecttype.Type, []byte, error) {+ record := state.records[idx]
+ if !record.packedType.IsBaseObject() {+ return objecttype.TypeInvalid, nil, fmt.Errorf("packfile/ingest: record %d is not a base object", idx)+ }
+
+ content, err := inflateRecordPayload(state, idx)
+ if err != nil {+ return objecttype.TypeInvalid, nil, err
+ }
+
+ if int64(len(content)) != record.declaredSize {+ return objecttype.TypeInvalid, nil, &MalformedPackEntryError{+ Offset: record.offset,
+ Reason: fmt.Sprintf("base content size mismatch got %d want %d", len(content), record.declaredSize),+ }
+ }
+
+ return record.packedType, content, nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/record_delta.go
@@ -1,0 +1,60 @@
+package ingest
+
+import (
+ "fmt"
+
+ deltaapply "codeberg.org/lindenii/furgit/format/packfile/delta/apply"
+ objecttype "codeberg.org/lindenii/furgit/object/type"
+)
+
+// applyDeltaRecord applies one delta record onto base content.
+func applyDeltaRecord(state *ingestState, idx int, baseType objecttype.Type, baseContent []byte) (objecttype.Type, []byte, error) {+ record := state.records[idx]
+ if record.packedType != objecttype.TypeOfsDelta && record.packedType != objecttype.TypeRefDelta {+ return objecttype.TypeInvalid, nil, fmt.Errorf("packfile/ingest: record %d is not a delta record", idx)+ }
+
+ deltaPayload, err := inflateRecordPayload(state, idx)
+ if err != nil {+ return objecttype.TypeInvalid, nil, err
+ }
+
+ if int64(len(deltaPayload)) != record.declaredSize {+ return objecttype.TypeInvalid, nil, &MalformedPackEntryError{+ Offset: record.offset,
+ Reason: fmt.Sprintf("delta payload size mismatch got %d want %d", len(deltaPayload), record.declaredSize),+ }
+ }
+
+ srcSize, dstSize, err := readDeltaHeaderSizes(deltaPayload)
+ if err != nil {+ return objecttype.TypeInvalid, nil, &MalformedPackEntryError{+ Offset: record.offset,
+ Reason: fmt.Sprintf("read delta header: %v", err),+ }
+ }
+
+ if srcSize != len(baseContent) {+ return objecttype.TypeInvalid, nil, &MalformedPackEntryError{+ Offset: record.offset,
+ Reason: fmt.Sprintf("delta source size mismatch got %d want %d", srcSize, len(baseContent)),+ }
+ }
+
+ content, err := deltaapply.Apply(baseContent, deltaPayload)
+ if err != nil {+ return objecttype.TypeInvalid, nil, &MalformedPackEntryError{+ Offset: record.offset,
+ Reason: fmt.Sprintf("apply delta: %v", err),+ }
+ }
+
+ if len(content) != dstSize {+ return objecttype.TypeInvalid, nil, &MalformedPackEntryError{+ Offset: record.offset,
+ Reason: fmt.Sprintf("delta result size mismatch got %d want %d", len(content), dstSize),+ }
+ }
+
+ return baseType, content, nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/record_inflate.go
@@ -1,0 +1,46 @@
+package ingest
+
+import (
+ "compress/zlib"
+ "fmt"
+ "io"
+
+ "codeberg.org/lindenii/furgit/internal/intconv"
+)
+
+// inflateRecordPayload inflates one record's zlib payload from pack file.
+func inflateRecordPayload(state *ingestState, idx int) ([]byte, error) {+ record := state.records[idx]
+ if record.packedLen < uint64(record.headerLen) {+ return nil, &MalformedPackEntryError{Offset: record.offset, Reason: "entry packed span underflow"}+ }
+
+ compressedOffset := record.offset + uint64(record.headerLen)
+ compressedLen := record.packedLen - uint64(record.headerLen)
+
+ compressedOffsetInt64, err := intconv.Uint64ToInt64(compressedOffset)
+ if err != nil {+ return nil, err
+ }
+
+ compressedLenInt64, err := intconv.Uint64ToInt64(compressedLen)
+ if err != nil {+ return nil, err
+ }
+
+ section := io.NewSectionReader(state.packFile, compressedOffsetInt64, compressedLenInt64)
+
+ reader, err := zlib.NewReader(section)
+ if err != nil {+ return nil, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("open payload zlib: %v", err)}+ }
+
+ defer func() { _ = reader.Close() }()+
+ out, err := io.ReadAll(reader)
+ if err != nil {+ return nil, &MalformedPackEntryError{Offset: record.offset, Reason: fmt.Sprintf("inflate payload: %v", err)}+ }
+
+ return out, nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/record_resolve.go
@@ -1,0 +1,116 @@
+package ingest
+
+import (
+ "fmt"
+
+ objecttype "codeberg.org/lindenii/furgit/object/type"
+)
+
+// resolveRecord resolves one record and returns canonical type/content.
+func resolveRecord(state *ingestState, idx int, visiting map[int]struct{}) (objecttype.Type, []byte, error) {+ if idx < 0 || idx >= len(state.records) {+ return objecttype.TypeInvalid, nil, fmt.Errorf("packfile/ingest: record index out of bounds")+ }
+
+ if _, ok := visiting[idx]; ok {+ return objecttype.TypeInvalid, nil, &DeltaCycleError{Offset: state.records[idx].offset}+ }
+
+ visiting[idx] = struct{}{}+ defer delete(visiting, idx)
+
+ record := &state.records[idx]
+ if ty, content, ok := state.baseCache.get(idx); ok {+ return ty, content, nil
+ }
+
+ if record.packedType.IsBaseObject() {+ ty, content, err := readBaseRecordContent(state, idx)
+ if err != nil {+ return objecttype.TypeInvalid, nil, err
+ }
+
+ if record.resolved {+ state.baseCache.add(idx, record.realType, content)
+
+ return record.realType, content, nil
+ }
+
+ id, err := hashCanonicalObject(state.algo, ty, content)
+ if err != nil {+ return objecttype.TypeInvalid, nil, err
+ }
+
+ record.objectID = id
+ record.realType = ty
+ record.resolved = true
+ state.objectToRecord[id] = idx
+ state.baseCache.add(idx, ty, content)
+
+ return ty, content, nil
+ }
+
+ var (
+ baseType objecttype.Type
+ baseContent []byte
+ err error
+ )
+ switch record.packedType {+ case objecttype.TypeOfsDelta:
+ baseIdx, ok := state.offsetToRecord[record.baseOffset]
+ if !ok {+ return objecttype.TypeInvalid, nil, &MalformedPackEntryError{+ Offset: record.offset,
+ Reason: "missing ofs-delta base entry",
+ }
+ }
+
+ baseType, baseContent, err = resolveRecord(state, baseIdx, visiting)
+ if err != nil {+ return objecttype.TypeInvalid, nil, err
+ }
+ case objecttype.TypeRefDelta:
+ baseIdx, ok := state.objectToRecord[record.baseObject]
+ if ok {+ baseType, baseContent, err = resolveRecord(state, baseIdx, visiting)
+ if err != nil {+ return objecttype.TypeInvalid, nil, err
+ }
+ } else {+ return objecttype.TypeInvalid, nil, errExternalThinBase
+ }
+ case objecttype.TypeInvalid,
+ objecttype.TypeCommit,
+ objecttype.TypeTree,
+ objecttype.TypeBlob,
+ objecttype.TypeTag,
+ objecttype.TypeFuture:
+ return objecttype.TypeInvalid, nil, &MalformedPackEntryError{+ Offset: record.offset,
+ Reason: "unsupported delta type",
+ }
+ default:
+ return objecttype.TypeInvalid, nil, &MalformedPackEntryError{+ Offset: record.offset,
+ Reason: "unsupported delta type",
+ }
+ }
+
+ ty, content, err := applyDeltaRecord(state, idx, baseType, baseContent)
+ if err != nil {+ return objecttype.TypeInvalid, nil, err
+ }
+
+ id, err := hashCanonicalObject(state.algo, ty, content)
+ if err != nil {+ return objecttype.TypeInvalid, nil, err
+ }
+
+ record.objectID = id
+ record.realType = ty
+ record.resolved = true
+ state.objectToRecord[id] = idx
+ state.baseCache.add(idx, ty, content)
+
+ return ty, content, nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/records.go
@@ -1,0 +1,46 @@
+package ingest
+
+import (
+ objectid "codeberg.org/lindenii/furgit/object/id"
+ objecttype "codeberg.org/lindenii/furgit/object/type"
+)
+
+// objectRecord stores metadata for one packed object entry.
+type objectRecord struct {+ // offset is the entry start offset in the pack file.
+ offset uint64
+ // headerLen is packed entry header length in bytes.
+ headerLen uint32
+ // packedLen is total packed entry length in bytes.
+ packedLen uint64
+ // crc32 is the CRC over the full packed entry.
+ crc32 uint32
+ // packedType is the entry type tag from the pack stream.
+ packedType objecttype.Type
+ // realType is canonical object type after delta resolution.
+ realType objecttype.Type
+ // declaredSize is the declared output object size for this entry.
+ declaredSize int64
+ // dataOffset is compressed payload start offset for this entry.
+ dataOffset uint64
+ // baseOffset is OFS base offset when packedType is OFS delta.
+ baseOffset uint64
+ // baseObject is REF base object ID when packedType is REF delta.
+ baseObject objectid.ObjectID
+ // objectID is final resolved object ID.
+ objectID objectid.ObjectID
+ // resolved reports whether objectID/realType are finalized.
+ resolved bool
+}
+
+// ofsDeltaRef maps one OFS delta record to its base offset.
+type ofsDeltaRef struct {+ baseOffset uint64
+ recordIdx int
+}
+
+// refDeltaRef maps one REF delta record to its base object ID.
+type refDeltaRef struct {+ baseObject objectid.ObjectID
+ recordIdx int
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/resolve_all.go
@@ -1,0 +1,70 @@
+package ingest
+
+import (
+ "errors"
+
+ "codeberg.org/lindenii/furgit/internal/progress"
+)
+
+// resolveAll resolves all delta records and finalizes ObjectID/RealType for every record.
+func resolveAll(state *ingestState) error {+ state.unresolvedRefDeltas = state.unresolvedRefDeltas[:0]
+
+ var pending uint32
+
+ for idx := range state.records {+ if !state.records[idx].resolved {+ pending++
+ }
+ }
+
+ if pending == 0 {+ return nil
+ }
+
+ var done uint32
+
+ meter := progress.New(progress.Options{+ Writer: state.opts.Progress,
+ Title: "resolving deltas",
+ Total: uint64(pending),
+ })
+
+ for idx := range state.records {+ if state.records[idx].resolved {+ continue
+ }
+
+ done++
+ meter.Set(uint64(done), 0)
+
+ visiting := make(map[int]struct{})+
+ ty, content, err := resolveRecord(state, idx, visiting)
+ if err != nil {+ if errors.Is(err, errExternalThinBase) {+ state.unresolvedRefDeltas = append(state.unresolvedRefDeltas, idx)
+
+ continue
+ }
+
+ return err
+ }
+
+ id, err := hashCanonicalObject(state.algo, ty, content)
+ if err != nil {+ return err
+ }
+
+ record := &state.records[idx]
+ record.realType = ty
+ record.objectID = id
+ record.resolved = true
+ state.objectToRecord[id] = idx
+ state.baseCache.add(idx, ty, content)
+ }
+
+ meter.Stop("done")+
+ return nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/result.go
@@ -1,0 +1,23 @@
+package ingest
+
+import objectid "codeberg.org/lindenii/furgit/object/id"
+
+// Result describes one successful ingest transaction.
+type Result struct {+ // PackName is the destination-relative filename of the written .pack.
+ PackName string
+ // IdxName is the destination-relative filename of the written .idx.
+ IdxName string
+ // RevName is the destination-relative filename of the written .rev.
+ //
+ // RevName is empty when writeRev is false.
+ RevName string
+ // PackHash is the final pack hash (same hash embedded in .idx/.rev trailers).
+ PackHash objectid.ObjectID
+ // ObjectCount is the final object count in the resulting pack.
+ //
+ // If thin fixup appends objects, this includes appended base objects.
+ ObjectCount uint32
+ // ThinFixed reports whether thin fixup appended local bases.
+ ThinFixed bool
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/rev_write.go
@@ -1,0 +1,137 @@
+package ingest
+
+import (
+ "encoding/binary"
+ "slices"
+
+ "codeberg.org/lindenii/furgit/internal/intconv"
+ "codeberg.org/lindenii/furgit/internal/progress"
+)
+
+const (
+ revMagic = 0x52494458
+ revVersion = 1
+)
+
+// writeRev writes rev index for resolved records.
+func writeRev(state *ingestState) error {+ if !state.opts.WriteRev {+ return nil
+ }
+
+ idxOrder := buildIdxOrder(state)
+
+ recordToIdxPos := make([]int, len(state.records))
+ for pos, recordIdx := range idxOrder {+ recordToIdxPos[recordIdx] = pos
+ }
+
+ packOrder := buildPackOrder(state)
+
+ hashImpl, err := state.algo.New()
+ if err != nil {+ return err
+ }
+
+ var scratch [8]byte
+
+ writeProgressf(state, "writing reverse index header...\r")
+ binary.BigEndian.PutUint32(scratch[:4], revMagic)
+
+ err = writeAndHash(state.revFile, hashImpl, scratch[:4])
+ if err != nil {+ return err
+ }
+
+ binary.BigEndian.PutUint32(scratch[:4], revVersion)
+
+ err = writeAndHash(state.revFile, hashImpl, scratch[:4])
+ if err != nil {+ return err
+ }
+
+ binary.BigEndian.PutUint32(scratch[:4], state.algo.PackHashID())
+
+ err = writeAndHash(state.revFile, hashImpl, scratch[:4])
+ if err != nil {+ return err
+ }
+
+ writeProgressf(state, "writing reverse index header: done.\n")
+
+ entriesMeter := progress.New(progress.Options{+ Writer: state.opts.Progress,
+ Title: "writing reverse index entries",
+ Total: uint64(len(packOrder)),
+ })
+
+ var entriesDone uint64
+
+ for _, recordIdx := range packOrder {+ recordPos, err := intconv.IntToUint32(recordToIdxPos[recordIdx])
+ if err != nil {+ return err
+ }
+
+ binary.BigEndian.PutUint32(scratch[:4], recordPos)
+
+ err = writeAndHash(state.revFile, hashImpl, scratch[:4])
+ if err != nil {+ return err
+ }
+
+ entriesDone++
+ entriesMeter.Set(entriesDone, 0)
+ }
+
+ if entriesDone > 0 {+ entriesMeter.Stop("done")+ }
+
+ writeProgressf(state, "writing reverse index trailer...\r")
+
+ err = writeAndHash(state.revFile, hashImpl, state.packHash.Bytes())
+ if err != nil {+ return err
+ }
+
+ revHash := hashImpl.Sum(nil)
+
+ _, err = state.revFile.Write(revHash)
+ if err != nil {+ return err
+ }
+
+ err = state.revFile.Sync()
+ if err != nil {+ return err
+ }
+
+ writeProgressf(state, "writing reverse index trailer: done.\n")
+
+ return nil
+}
+
+// buildPackOrder returns record indexes sorted by pack offset.
+func buildPackOrder(state *ingestState) []int {+ out := make([]int, 0, len(state.records))
+ for idx := range state.records {+ out = append(out, idx)
+ }
+
+ slices.SortFunc(out, func(a, b int) int {+ offA := state.records[a].offset
+
+ offB := state.records[b].offset
+ switch {+ case offA < offB:
+ return -1
+ case offA > offB:
+ return 1
+ default:
+ return 0
+ }
+ })
+
+ return out
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/rewrite_header_trailer.go
@@ -1,0 +1,89 @@
+package ingest
+
+import (
+ "encoding/binary"
+ "io"
+
+ "codeberg.org/lindenii/furgit/internal/intconv"
+ objectid "codeberg.org/lindenii/furgit/object/id"
+)
+
+// rewritePackHeaderAndTrailer rewrites object count and trailer hash using ReadAt/WriteAt.
+func rewritePackHeaderAndTrailer(state *ingestState) error {+ var countRaw [4]byte
+
+ recordCountUint32, err := intconv.IntToUint32(len(state.records))
+ if err != nil {+ return err
+ }
+
+ binary.BigEndian.PutUint32(countRaw[:], recordCountUint32)
+
+ _, err = state.packFile.WriteAt(countRaw[:], 8)
+ if err != nil {+ return err
+ }
+
+ info, err := state.packFile.Stat()
+ if err != nil {+ return err
+ }
+
+ endWithoutTrailer := info.Size()
+
+ hashImpl, err := state.algo.New()
+ if err != nil {+ return err
+ }
+
+ var (
+ buf [128 << 10]byte
+ pos int64
+ )
+ for pos < endWithoutTrailer {+ want := int64(len(buf))
+
+ remaining := endWithoutTrailer - pos
+ if remaining < want {+ want = remaining
+ }
+
+ n, err := state.packFile.ReadAt(buf[:want], pos)
+ if err != nil && err != io.EOF {+ return err
+ }
+
+ if n == 0 {+ return io.ErrUnexpectedEOF
+ }
+
+ _, _ = hashImpl.Write(buf[:n])
+ pos += int64(n)
+ }
+
+ sum := hashImpl.Sum(nil)
+
+ _, err = state.packFile.WriteAt(sum, endWithoutTrailer)
+ if err != nil {+ return err
+ }
+
+ packHash, err := objectid.FromBytes(state.algo, sum)
+ if err != nil {+ return err
+ }
+
+ state.packHash = packHash
+ state.objectCountHeader = recordCountUint32
+
+ sumLenInt64 := int64(len(sum))
+
+ newConsumed, err := intconv.Int64ToUint64(endWithoutTrailer + sumLenInt64)
+ if err != nil {+ return err
+ }
+
+ state.stream.consumed = newConsumed
+
+ return nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/scan.go
@@ -1,0 +1,105 @@
+package ingest
+
+import (
+ "fmt"
+
+ "codeberg.org/lindenii/furgit/internal/progress"
+ objectid "codeberg.org/lindenii/furgit/object/id"
+)
+
+// streamPackAndScan copies src into temp .pack while scanning packed entries.
+func streamPackAndScan(state *ingestState) error {+ hashImpl, err := state.algo.New()
+ if err != nil {+ return err
+ }
+
+ state.stream = newStreamScanner(
+ state.src,
+ state.packFile,
+ hashImpl,
+ state.algo.Size(),
+ )
+
+ writeProgressf(state, "validating pack header...\r")
+
+ err = seedStreamWithPackHeader(state)
+ if err != nil {+ return err
+ }
+
+ writeProgressf(state, "validating pack header: done.\n")
+
+ state.records = make([]objectRecord, 0, state.objectCountHeader)
+ state.ofsDeltas = make([]ofsDeltaRef, 0, state.objectCountHeader)
+ state.refDeltas = make([]refDeltaRef, 0, state.objectCountHeader)
+
+ total := state.objectCountHeader
+ meter := progress.New(progress.Options{+ Writer: state.opts.Progress,
+ Title: "receiving objects",
+ Total: uint64(total),
+ Throughput: true,
+ })
+
+ for i := range total {+ nextOffset, err := scanOneEntry(state, state.stream.consumed)
+ if err != nil {+ return err
+ }
+
+ if nextOffset != state.stream.consumed {+ return fmt.Errorf("packfile/ingest: internal stream offset mismatch")+ }
+
+ done := i + 1
+ meter.Set(uint64(done), state.stream.consumed)
+ }
+
+ meter.Stop("done")+
+ err = state.stream.finishAndFlushTrailer(state.opts.RequireTrailingEOF)
+ if err != nil {+ return err
+ }
+
+ if len(state.stream.packTrailer) != state.algo.Size() {+ return fmt.Errorf("packfile/ingest: invalid trailer size")+ }
+
+ packHash, err := objectid.FromBytes(state.algo, state.stream.packTrailer)
+ if err != nil {+ return err
+ }
+
+ state.packHash = packHash
+
+ return state.stream.flush()
+}
+
+// seedStreamWithPackHeader writes the already-validated PACK header to output,
+// seeds the running pack hash, and advances stream offset accounting.
+func seedStreamWithPackHeader(state *ingestState) error {+ written := 0
+ for written < len(state.packHeaderRaw) {+ n, err := state.packFile.Write(state.packHeaderRaw[written:])
+ if err != nil {+ return &DestinationWriteError{Op: fmt.Sprintf("write pack header: %v", err)}+ }
+
+ if n == 0 {+ return &DestinationWriteError{Op: "write pack header: short write"}+ }
+
+ written += n
+ }
+
+ _, err := state.stream.hash.Write(state.packHeaderRaw[:])
+ if err != nil {+ return err
+ }
+
+ state.stream.consumed = packHeaderSize
+
+ return nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/state.go
@@ -1,0 +1,70 @@
+package ingest
+
+import (
+ "io"
+ "os"
+
+ objectid "codeberg.org/lindenii/furgit/object/id"
+)
+
+const (
+ defaultDeltaBaseCacheMaxBytes = 32 << 20
+)
+
+// ingestState holds mutable state for one Ingest call.
+type ingestState struct {+ src io.Reader
+ destination *os.Root
+ algo objectid.Algorithm
+ opts Options
+
+ packHeaderRaw [packHeaderSize]byte
+
+ packFile *os.File
+ packTmpName string
+ idxFile *os.File
+ idxTmpName string
+ revFile *os.File
+ revTmpName string
+
+ stream *streamScanner
+
+ records []objectRecord
+ ofsDeltas []ofsDeltaRef
+ refDeltas []refDeltaRef
+ unresolvedRefDeltas []int
+ offsetToRecord map[uint64]int
+ objectToRecord map[objectid.ObjectID]int
+
+ baseCache *deltaBaseCache
+ packHash objectid.ObjectID
+
+ objectCountHeader uint32
+ thinFixed bool
+}
+
+// newIngestState constructs one call-local ingest state.
+func newIngestState(
+ src io.Reader,
+ destination *os.Root,
+ algo objectid.Algorithm,
+ opts Options,
+ header packHeader,
+ headerRaw [packHeaderSize]byte,
+) (*ingestState, error) {+ if algo.Size() == 0 {+ return nil, objectid.ErrInvalidAlgorithm
+ }
+
+ return &ingestState{+ src: src,
+ destination: destination,
+ algo: algo,
+ opts: opts,
+ packHeaderRaw: headerRaw,
+ objectCountHeader: header.ObjectCount,
+ offsetToRecord: make(map[uint64]int),
+ objectToRecord: make(map[objectid.ObjectID]int),
+ baseCache: newDeltaBaseCache(defaultDeltaBaseCacheMaxBytes),
+ }, nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/stream.go
@@ -1,0 +1,111 @@
+package ingest
+
+import (
+ "errors"
+ "hash"
+ "io"
+ "os"
+)
+
+const streamScannerBufferSize = 64 << 10
+
+// streamScanner incrementally reads/consumes one pack stream while mirroring
+// consumed bytes into one destination pack file.
+type streamScanner struct {+ src io.Reader
+ dstFile *os.File
+
+ // Input buffer window: buf[off:n] is unread.
+ buf []byte
+ off int
+ n int
+
+ // Absolute consumed stream bytes.
+ consumed uint64
+
+ // Running pack hash over consumed bytes while hashEnabled is true.
+ hash hash.Hash
+ hashSize int
+ hashEnabled bool
+
+ // Entry CRC state while one entry is being consumed.
+ entryCRC uint32
+ inEntryCRC bool
+
+ packTrailer []byte
+}
+
+// newStreamScanner constructs one scanner with fixed input buffering.
+func newStreamScanner(src io.Reader, dstFile *os.File, hash hash.Hash, hashSize int) *streamScanner {+ return &streamScanner{+ src: src,
+ dstFile: dstFile,
+ buf: make([]byte, streamScannerBufferSize),
+ hash: hash,
+ hashSize: hashSize,
+ hashEnabled: true,
+ }
+}
+
+// Read implements io.Reader.
+func (scanner *streamScanner) Read(dst []byte) (int, error) {+ if len(dst) == 0 {+ return 0, nil
+ }
+
+ if scanner.n-scanner.off == 0 {+ err := scanner.fill(1)
+ if err != nil {+ if errors.Is(err, io.EOF) {+ return 0, io.EOF
+ }
+
+ return 0, err
+ }
+ }
+
+ unread := scanner.n - scanner.off
+ if unread == 0 {+ return 0, io.EOF
+ }
+
+ n := min(len(dst), unread)
+
+ copy(dst, scanner.buf[scanner.off:scanner.off+n])
+
+ err := scanner.use(n)
+ if err != nil {+ return 0, err
+ }
+
+ return n, nil
+}
+
+// ReadByte implements io.ByteReader without allocation.
+func (scanner *streamScanner) ReadByte() (byte, error) {+ if scanner.n-scanner.off == 0 {+ err := scanner.fill(1)
+ if err != nil {+ return 0, err
+ }
+ }
+
+ b := scanner.buf[scanner.off]
+
+ err := scanner.use(1)
+ if err != nil {+ return 0, err
+ }
+
+ return b, nil
+}
+
+// readFull reads exactly len(dst) bytes through receiver.
+func (scanner *streamScanner) readFull(dst []byte) error {+ _, err := io.ReadFull(scanner, dst)
+ if err != nil {+ return err
+ }
+
+ return nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/temp.go
@@ -1,0 +1,103 @@
+package ingest
+
+import (
+ "crypto/rand"
+ "errors"
+ "fmt"
+ "io/fs"
+ "os"
+)
+
+// openTemporaryArtifacts creates/open temp pack/idx/(rev) files under destination.
+func openTemporaryArtifacts(state *ingestState) error {+ packName, packFile, err := createTempFile(state.destination, "tmp_pack_")
+ if err != nil {+ return err
+ }
+
+ idxName, idxFile, err := createTempFile(state.destination, "tmp_idx_")
+ if err != nil {+ _ = packFile.Close()
+ _ = state.destination.Remove(packName)
+
+ return err
+ }
+
+ revName := ""
+
+ var revFile *os.File
+ if state.opts.WriteRev {+ revName, revFile, err = createTempFile(state.destination, "tmp_rev_")
+ if err != nil {+ _ = idxFile.Close()
+ _ = state.destination.Remove(idxName)
+ _ = packFile.Close()
+ _ = state.destination.Remove(packName)
+
+ return err
+ }
+ }
+
+ state.packTmpName = packName
+ state.packFile = packFile
+ state.idxTmpName = idxName
+ state.idxFile = idxFile
+ state.revTmpName = revName
+ state.revFile = revFile
+
+ return nil
+}
+
+// closeTemporaryArtifacts closes all temporary artifact file descriptors.
+func closeTemporaryArtifacts(state *ingestState) error {+ var out error
+
+ if state.packFile != nil {+ err := state.packFile.Close()
+ if err != nil && out == nil {+ out = err
+ }
+
+ state.packFile = nil
+ }
+
+ if state.idxFile != nil {+ err := state.idxFile.Close()
+ if err != nil && out == nil {+ out = err
+ }
+
+ state.idxFile = nil
+ }
+
+ if state.revFile != nil {+ err := state.revFile.Close()
+ if err != nil && out == nil {+ out = err
+ }
+
+ state.revFile = nil
+ }
+
+ return out
+}
+
+// createTempFile creates one temporary file under root using prefix.
+func createTempFile(root *os.Root, prefix string) (string, *os.File, error) {+ for range 32 {+ name := prefix + rand.Text()
+
+ file, err := root.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR, 0o644)
+ if err == nil {+ return name, file, nil
+ }
+
+ if errors.Is(err, fs.ErrExist) {+ continue
+ }
+
+ return "", nil, fmt.Errorf("packfile/ingest: create temp file %q: %w", name, err)+ }
+
+ return "", nil, fmt.Errorf("packfile/ingest: unable to create temporary file for prefix %q", prefix)+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/testdata/fixtures/sha1/METADATA.txt
@@ -1,0 +1,3 @@
+format=sha1
+head=200c960359dad025b4170284c518919eb4a24305
+base=4bc507fc631ea78474d83c47548743c9f1dda0dc
binary files /dev/null b/object/store/packed/internal/ingest/testdata/fixtures/sha1/base.pack differ
binary files /dev/null b/object/store/packed/internal/ingest/testdata/fixtures/sha1/nonthin.pack differ
binary files /dev/null b/object/store/packed/internal/ingest/testdata/fixtures/sha1/thin.pack differ
--- /dev/null
+++ b/object/store/packed/internal/ingest/testdata/fixtures/sha256/METADATA.txt
@@ -1,0 +1,3 @@
+format=sha256
+head=35cc0f4cd1c73524187540494058d233a2ecbd071c85d496a2250d8e0c805ef8
+base=b4abe46895f0bb5aa22fd42d28d428413f265359734c288752e3c2d270eec276
binary files /dev/null b/object/store/packed/internal/ingest/testdata/fixtures/sha256/base.pack differ
binary files /dev/null b/object/store/packed/internal/ingest/testdata/fixtures/sha256/nonthin.pack differ
binary files /dev/null b/object/store/packed/internal/ingest/testdata/fixtures/sha256/thin.pack differ
--- /dev/null
+++ b/object/store/packed/internal/ingest/thin_append.go
@@ -1,0 +1,91 @@
+package ingest
+
+import (
+ "compress/zlib"
+ "hash/crc32"
+ "io"
+
+ "codeberg.org/lindenii/furgit/internal/intconv"
+ objectid "codeberg.org/lindenii/furgit/object/id"
+ objecttype "codeberg.org/lindenii/furgit/object/type"
+)
+
+// appendBaseObject appends one base object as a new packed non-delta entry.
+func appendBaseObject(state *ingestState, id objectid.ObjectID, realType objecttype.Type, content []byte) (int, error) {+ start := state.stream.consumed
+
+ header := encodePackEntryHeader(realType, int64(len(content)))
+
+ startInt64, err := intconv.Uint64ToInt64(start)
+ if err != nil {+ return 0, err
+ }
+
+ _, err = state.packFile.WriteAt(header, startInt64)
+ if err != nil {+ return 0, err
+ }
+
+ headerLenInt64 := int64(len(header))
+ section := &fileSectionWriter{file: state.packFile, off: startInt64 + headerLenInt64}+ crc := crc32.NewIEEE()
+
+ _, err = crc.Write(header)
+ if err != nil {+ return 0, err
+ }
+
+ counting := &countingWriter{dst: section}+
+ zw := zlib.NewWriter(io.MultiWriter(counting, crc))
+
+ _, err = zw.Write(content)
+ if err != nil {+ return 0, err
+ }
+
+ err = zw.Close()
+ if err != nil {+ return 0, err
+ }
+
+ headerLenUint64, err := intconv.IntToUint64(len(header))
+ if err != nil {+ return 0, err
+ }
+
+ countingNUint64, err := intconv.IntToUint64(counting.n)
+ if err != nil {+ return 0, err
+ }
+
+ packedLen := headerLenUint64 + countingNUint64
+ end := start + packedLen
+ state.stream.consumed = end
+
+ headerLenUint32, err := intconv.IntToUint32(len(header))
+ if err != nil {+ return 0, err
+ }
+
+ record := objectRecord{+ offset: start,
+ headerLen: headerLenUint32,
+ packedLen: packedLen,
+ crc32: crc.Sum32(),
+ packedType: realType,
+ realType: realType,
+ declaredSize: int64(len(content)),
+ dataOffset: start + headerLenUint64,
+ objectID: id,
+ resolved: true,
+ }
+
+ recordIdx := len(state.records)
+ state.records = append(state.records, record)
+ state.offsetToRecord[start] = recordIdx
+ state.objectToRecord[id] = recordIdx
+ state.baseCache.add(recordIdx, realType, content)
+
+ return recordIdx, nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/thin_fix.go
@@ -1,0 +1,99 @@
+package ingest
+
+import (
+ "errors"
+ "fmt"
+
+ "codeberg.org/lindenii/furgit/internal/intconv"
+ "codeberg.org/lindenii/furgit/internal/progress"
+ objectstore "codeberg.org/lindenii/furgit/object/store"
+)
+
+// maybeFixThin appends missing bases and rewrites pack header/trailer when needed.
+func maybeFixThin(state *ingestState) error {+ if len(state.unresolvedRefDeltas) == 0 {+ return nil
+ }
+
+ writeProgressf(
+ state,
+ "fixing thin pack: %d unresolved bases\r",
+ len(state.unresolvedRefDeltas),
+ )
+
+ if !state.opts.FixThin {+ return &ThinPackUnresolvedError{Count: len(state.unresolvedRefDeltas)}+ }
+
+ if state.opts.Base == nil {+ return &ThinPackUnresolvedError{Count: len(state.unresolvedRefDeltas)}+ }
+
+ hashSize := int64(state.algo.Size())
+
+ info, err := state.packFile.Stat()
+ if err != nil {+ return err
+ }
+
+ size := info.Size()
+ if size < hashSize {+ return fmt.Errorf("packfile/ingest: pack too short to trim trailer")+ }
+
+ newEnd := size - hashSize
+
+ err = state.packFile.Truncate(newEnd)
+ if err != nil {+ return err
+ }
+
+ consumed, err := intconv.Int64ToUint64(newEnd)
+ if err != nil {+ return err
+ }
+
+ state.stream.consumed = consumed
+
+ baseIDs := unresolvedThinBaseIDs(state)
+
+ total := len(baseIDs)
+ meter := progress.New(progress.Options{+ Writer: state.opts.Progress,
+ Title: "fixing thin pack",
+ Total: uint64(total),
+ })
+ meter.Set(0, 0)
+
+ var appended uint64
+
+ for _, id := range baseIDs {+ ty, content, err := state.opts.Base.ReadBytesContent(id)
+ if err != nil {+ if errors.Is(err, objectstore.ErrObjectNotFound) {+ continue
+ }
+
+ return fmt.Errorf("packfile/ingest: read thin base %s: %w", id, err)+ }
+
+ _, err = appendBaseObject(state, id, ty, content)
+ if err != nil {+ return err
+ }
+
+ state.thinFixed = true
+
+ appended++
+ meter.Set(appended, 0)
+ }
+
+ err = rewritePackHeaderAndTrailer(state)
+ if err != nil {+ return err
+ }
+
+ meter.Stop(fmt.Sprintf("appended %d/%d, done", appended, total))+
+ return nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/thin_unresolved.go
@@ -1,0 +1,34 @@
+package ingest
+
+import (
+ "bytes"
+ "slices"
+
+ objectid "codeberg.org/lindenii/furgit/object/id"
+ objecttype "codeberg.org/lindenii/furgit/object/type"
+)
+
+// unresolvedThinBaseIDs returns sorted unique unresolved ref base IDs.
+func unresolvedThinBaseIDs(state *ingestState) []objectid.ObjectID {+ seen := make(map[objectid.ObjectID]struct{})+
+ for _, idx := range state.unresolvedRefDeltas {+ record := state.records[idx]
+ if record.packedType != objecttype.TypeRefDelta {+ continue
+ }
+
+ seen[record.baseObject] = struct{}{}+ }
+
+ out := make([]objectid.ObjectID, 0, len(seen))
+ for id := range seen {+ out = append(out, id)
+ }
+
+ slices.SortFunc(out, func(a, b objectid.ObjectID) int {+ return bytes.Compare(a.RawBytes(), b.RawBytes())
+ })
+
+ return out
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/trailer.go
@@ -1,0 +1,58 @@
+package ingest
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "io"
+)
+
+// finishAndFlushTrailer reads trailer hash bytes, verifies trailer checksum,
+// and optionally requires the source stream to hit EOF afterward.
+func (scanner *streamScanner) finishAndFlushTrailer(requireTrailingEOF bool) error {+ if scanner.hashSize <= 0 {+ return fmt.Errorf("packfile/ingest: invalid hash size")+ }
+
+ trailer := make([]byte, scanner.hashSize)
+
+ scanner.hashEnabled = false
+
+ err := scanner.readFull(trailer)
+ if err != nil {+ return &PackTrailerMismatchError{}+ }
+
+ scanner.packTrailer = append(scanner.packTrailer[:0], trailer...)
+
+ if scanner.n-scanner.off > 0 {+ return fmt.Errorf("packfile/ingest: pack has trailing garbage")+ }
+
+ if !requireTrailingEOF {+ computed := scanner.hash.Sum(nil)
+ if !bytes.Equal(computed, trailer) {+ return &PackTrailerMismatchError{}+ }
+
+ return nil
+ }
+
+ var probe [1]byte
+
+ n, err := scanner.Read(probe[:])
+ if n > 0 || err == nil {+ return fmt.Errorf("packfile/ingest: pack has trailing garbage")+ }
+
+ if !errors.Is(err, io.EOF) {+ return err
+ }
+
+ computed := scanner.hash.Sum(nil)
+ if !bytes.Equal(computed, trailer) {+ return &PackTrailerMismatchError{}+ }
+
+ return nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/use.go
@@ -1,0 +1,34 @@
+package ingest
+
+import (
+ "fmt"
+ "hash/crc32"
+)
+
+// use consumes n unread bytes and updates accounting/checksum state.
+func (scanner *streamScanner) use(n int) error {+ if n < 0 || n > scanner.n-scanner.off {+ return fmt.Errorf("packfile/ingest: invalid consume length %d", n)+ }
+
+ if n == 0 {+ return nil
+ }
+
+ chunk := scanner.buf[scanner.off : scanner.off+n]
+ if scanner.hashEnabled {+ _, err := scanner.hash.Write(chunk)
+ if err != nil {+ return err
+ }
+ }
+
+ if scanner.inEntryCRC {+ scanner.entryCRC = crc32.Update(scanner.entryCRC, crc32.IEEETable, chunk)
+ }
+
+ scanner.off += n
+ scanner.consumed += uint64(n)
+
+ return nil
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/write.go
@@ -1,0 +1,50 @@
+package ingest
+
+import (
+ "bufio"
+ "io"
+ "os"
+
+ objectid "codeberg.org/lindenii/furgit/object/id"
+)
+
+// WritePack ingests one pack stream into destination and writes pack artifacts.
+//
+// Artifacts are published under content-addressed final names derived from the
+// resulting pack hash. If those final names already exist, WritePack treats
+// that as success and removes its temporary files.
+func WritePack(
+ destination *os.Root,
+ algo objectid.Algorithm,
+ src io.Reader,
+ opts Options,
+) (Result, error) {+ if algo.Size() == 0 {+ return Result{}, objectid.ErrInvalidAlgorithm+ }
+
+ reader := bufio.NewReader(src)
+
+ header, headerRaw, err := readAndValidatePackHeader(reader)
+ if err != nil {+ return Result{}, err+ }
+
+ if header.ObjectCount == 0 {+ return discardZeroObjectPack(reader, algo, opts, headerRaw)
+ }
+
+ state, err := newIngestState(
+ reader,
+ destination,
+ algo,
+ opts,
+ header,
+ headerRaw,
+ )
+ if err != nil {+ return Result{}, err+ }
+
+ return ingest(state)
+}
--- /dev/null
+++ b/object/store/packed/internal/ingest/write_empty.go
@@ -1,0 +1,58 @@
+package ingest
+
+import (
+ "bytes"
+ "errors"
+ "io"
+
+ objectid "codeberg.org/lindenii/furgit/object/id"
+)
+
+func discardZeroObjectPack(
+ src io.Reader,
+ algo objectid.Algorithm,
+ opts Options,
+ headerRaw [packHeaderSize]byte,
+) (Result, error) {+ hashImpl, err := algo.New()
+ if err != nil {+ return Result{}, err+ }
+
+ _, _ = hashImpl.Write(headerRaw[:])
+
+ trailer := make([]byte, algo.Size())
+
+ _, err = io.ReadFull(src, trailer)
+ if err != nil {+ return Result{}, &PackTrailerMismatchError{}+ }
+
+ computed := hashImpl.Sum(nil)
+ if !bytes.Equal(computed, trailer) {+ return Result{}, &PackTrailerMismatchError{}+ }
+
+ if opts.RequireTrailingEOF {+ var probe [1]byte
+
+ n, err := src.Read(probe[:])
+ if n > 0 || err == nil {+ return Result{}, errors.New("packfile/ingest: pack has trailing garbage")+ }
+
+ if err != io.EOF {+ return Result{}, err+ }
+ }
+
+ packHash, err := objectid.FromBytes(algo, trailer)
+ if err != nil {+ return Result{}, err+ }
+
+ return Result{+ PackHash: packHash,
+ ObjectCount: 0,
+ }, nil
+}
--- /dev/null
+++ b/object/store/packed/writer.go
@@ -1,0 +1,17 @@
+package packed
+
+import (
+ "io"
+
+ objectstore "codeberg.org/lindenii/furgit/object/store"
+ "codeberg.org/lindenii/furgit/object/store/packed/internal/ingest"
+)
+
+var _ objectstore.PackWriter = (*Store)(nil)
+
+// WritePack ingests one pack stream into the packed store.
+func (store *Store) WritePack(src io.Reader, _ objectstore.PackWriteOptions) error {+ _, err := ingest.WritePack(store.root, store.algo, src, ingest.Options{})+
+ return err
+}
--
⑨