ref: dff530bd93b9b7200d5d492b4ccb86c17daadf21
dir: /format/pack/ingest/stream_scan.go/
package ingest
import (
"encoding/binary"
"fmt"
"io"
deltaapply "codeberg.org/lindenii/furgit/format/delta/apply"
packfmt "codeberg.org/lindenii/furgit/format/pack"
"codeberg.org/lindenii/furgit/internal/compress/zlib"
"codeberg.org/lindenii/furgit/internal/intconv"
"codeberg.org/lindenii/furgit/objectheader"
"codeberg.org/lindenii/furgit/objectid"
"codeberg.org/lindenii/furgit/objecttype"
)
// streamPackAndScan copies src into temp .pack while scanning packed entries.
func streamPackAndScan(state *ingestState) error {
hashImpl, err := state.algo.New()
if err != nil {
return err
}
state.stream = newStreamScanner(
state.src,
state.packFile,
hashImpl,
state.algo.Size(),
)
err = readAndValidatePackHeader(state)
if err != nil {
return err
}
state.records = make([]objectRecord, 0, state.objectCountHeader)
state.ofsDeltas = make([]ofsDeltaRef, 0, state.objectCountHeader)
state.refDeltas = make([]refDeltaRef, 0, state.objectCountHeader)
for range state.objectCountHeader {
nextOffset, err := scanOneEntry(state, state.stream.consumed)
if err != nil {
return err
}
if nextOffset != state.stream.consumed {
return fmt.Errorf("format/pack/ingest: internal stream offset mismatch")
}
}
err = state.stream.finishAndFlushTrailer()
if err != nil {
return err
}
if len(state.stream.packTrailer) != state.algo.Size() {
return fmt.Errorf("format/pack/ingest: invalid trailer size")
}
packHash, err := objectid.FromBytes(state.algo, state.stream.packTrailer)
if err != nil {
return err
}
state.packHash = packHash
return state.stream.flush()
}
// readAndValidatePackHeader reads and validates PACK header from the stream.
func readAndValidatePackHeader(state *ingestState) error {
var hdr [12]byte
err := state.stream.readFull(hdr[:])
if err != nil {
return &ErrInvalidPackHeader{Reason: fmt.Sprintf("read header: %v", err)}
}
if binary.BigEndian.Uint32(hdr[:4]) != packfmt.Signature {
return &ErrInvalidPackHeader{Reason: "signature mismatch"}
}
version := binary.BigEndian.Uint32(hdr[4:8])
if !packfmt.VersionSupported(version) {
return &ErrInvalidPackHeader{Reason: fmt.Sprintf("unsupported version %d", version)}
}
state.objectCountHeader = binary.BigEndian.Uint32(hdr[8:12])
if state.objectCountHeader == 0 {
return &ErrInvalidPackHeader{Reason: "zero objects"}
}
return nil
}
// scanOneEntry scans one pack entry from stream and appends one record.
func scanOneEntry(state *ingestState, startOffset uint64) (uint64, error) {
state.stream.beginEntryCRC()
record, err := parseEntryPrefix(state, startOffset)
if err != nil {
return 0, err
}
contentLen, consumedInput, oid, err := drainEntryPayload(state, record)
if err != nil {
return 0, err
}
if contentLen != record.declaredSize {
return 0, &ErrMalformedPackEntry{
Offset: startOffset,
Reason: fmt.Sprintf("inflated size mismatch got %d want %d", contentLen, record.declaredSize),
}
}
endOffset := startOffset + uint64(record.headerLen) + consumedInput
if endOffset > state.stream.consumed {
return 0, &ErrMalformedPackEntry{
Offset: startOffset,
Reason: fmt.Sprintf("entry end offset overflow got %d > stream %d", endOffset, state.stream.consumed),
}
}
record.packedLen = endOffset - startOffset
record.dataOffset = startOffset + uint64(record.headerLen)
if record.packedLen < uint64(record.headerLen) {
return 0, &ErrMalformedPackEntry{Offset: startOffset, Reason: "negative payload span"}
}
crc, err := state.stream.endEntryCRC()
if err != nil {
return 0, err
}
record.crc32 = crc
if packfmt.IsBaseObjectType(record.packedType) {
record.objectID = oid
record.realType = record.packedType
record.resolved = true
}
recordIdx := len(state.records)
state.records = append(state.records, record)
state.offsetToRecord[record.offset] = recordIdx
if record.resolved {
state.objectToRecord[record.objectID] = recordIdx
}
switch record.packedType {
case objecttype.TypeOfsDelta:
state.ofsDeltas = append(state.ofsDeltas, ofsDeltaRef{
baseOffset: record.baseOffset,
recordIdx: recordIdx,
})
case objecttype.TypeRefDelta:
state.refDeltas = append(state.refDeltas, refDeltaRef{
baseObject: record.baseObject,
recordIdx: recordIdx,
})
case objecttype.TypeInvalid,
objecttype.TypeCommit,
objecttype.TypeTree,
objecttype.TypeBlob,
objecttype.TypeTag,
objecttype.TypeFuture:
default:
}
return endOffset, nil
}
// parseEntryPrefix parses one entry prefix from stream.
func parseEntryPrefix(state *ingestState, startOffset uint64) (objectRecord, error) {
var record objectRecord
record.offset = startOffset
first, err := state.stream.ReadByte()
if err != nil {
return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read first header byte: %v", err)}
}
record.packedType = objecttype.Type((first >> 4) & 0x07)
size := int64(first & 0x0f)
headerLen := uint32(1)
shift := uint(4)
b := first
for b&0x80 != 0 {
b, err = state.stream.ReadByte()
if err != nil {
return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read size continuation: %v", err)}
}
headerLen++
size |= int64(b&0x7f) << shift
shift += 7
}
if size < 0 {
return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: "negative declared size"}
}
record.declaredSize = size
switch record.packedType {
case objecttype.TypeCommit, objecttype.TypeTree, objecttype.TypeBlob, objecttype.TypeTag:
case objecttype.TypeRefDelta:
baseRaw := make([]byte, state.algo.Size())
err := state.stream.readFull(baseRaw)
if err != nil {
return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("read ref base: %v", err)}
}
baseID, err := objectid.FromBytes(state.algo, baseRaw)
if err != nil {
return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("parse ref base: %v", err)}
}
record.baseObject = baseID
baseRawLen, err := intconv.IntToUint32(len(baseRaw))
if err != nil {
return record, err
}
headerLen += baseRawLen
case objecttype.TypeOfsDelta:
dist, consumed, err := readOfsDistanceFromStream(state.stream)
if err != nil {
return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: err.Error()}
}
if startOffset <= dist {
return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: "ofs base offset out of bounds"}
}
record.baseOffset = startOffset - dist
consumedUint32, err := intconv.IntToUint32(consumed)
if err != nil {
return record, err
}
headerLen += consumedUint32
case objecttype.TypeInvalid, objecttype.TypeFuture:
return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)}
default:
return record, &ErrMalformedPackEntry{Offset: startOffset, Reason: fmt.Sprintf("unsupported object type %d", record.packedType)}
}
record.headerLen = headerLen
return record, nil
}
// drainEntryPayload inflates one entry payload from stream and returns
// (inflatedLength, consumedInput, oidForBaseEntry).
func drainEntryPayload(state *ingestState, record objectRecord) (int64, uint64, objectid.ObjectID, error) {
var zero objectid.ObjectID
reader, err := zlib.NewReader(state.stream)
if err != nil {
return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("open zlib stream: %v", err)}
}
defer func() { _ = reader.Close() }()
var total int64
if packfmt.IsBaseObjectType(record.packedType) {
header, ok := objectheader.Encode(record.packedType, record.declaredSize)
if !ok {
return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: "encode object header"}
}
hashImpl, err := state.algo.New()
if err != nil {
return 0, 0, zero, err
}
_, _ = hashImpl.Write(header)
n, err := io.Copy(hashImpl, reader)
if err != nil {
return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("inflate base object: %v", err)}
}
total = n
oid, err := objectid.FromBytes(state.algo, hashImpl.Sum(nil))
if err != nil {
return 0, 0, zero, err
}
return total, reader.InputConsumed(), oid, nil
}
if record.packedType == objecttype.TypeOfsDelta || record.packedType == objecttype.TypeRefDelta {
n, err := io.Copy(io.Discard, reader)
if err != nil {
return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: fmt.Sprintf("inflate delta payload: %v", err)}
}
total = n
return total, reader.InputConsumed(), zero, nil
}
return 0, 0, zero, &ErrMalformedPackEntry{Offset: record.offset, Reason: "unsupported payload type"}
}
// readOfsDistanceFromStream reads one ofs-delta encoded distance.
func readOfsDistanceFromStream(reader io.ByteReader) (uint64, int, error) {
first, err := reader.ReadByte()
if err != nil {
return 0, 0, fmt.Errorf("read ofs distance first byte: %w", err)
}
dist := uint64(first & 0x7f)
consumed := 1
b := first
for b&0x80 != 0 {
b, err = reader.ReadByte()
if err != nil {
return 0, 0, fmt.Errorf("read ofs distance continuation: %w", err)
}
consumed++
dist = ((dist + 1) << 7) + uint64(b&0x7f)
}
return dist, consumed, nil
}
// finalizeStreamPackHash consumes trailer bytes and verifies stream integrity.
// readDeltaHeaderSizes reads source and destination sizes from one delta payload.
func readDeltaHeaderSizes(payload []byte) (int, int, error) {
reader := &byteSliceReader{data: payload}
return deltaapply.ReadHeaderSizes(reader)
}
// byteSliceReader implements io.ByteReader on []byte.
type byteSliceReader struct {
data []byte
pos int
}
// ReadByte reads one byte from receiver.
func (reader *byteSliceReader) ReadByte() (byte, error) {
if reader.pos >= len(reader.data) {
return 0, io.EOF
}
b := reader.data[reader.pos]
reader.pos++
return b, nil
}