shithub: furgit

Download patch

ref: 564c8ecc84ed1bfc28ea3a0251020051906b8548
parent: 3a567b7738b49f4b49cd9efed5e721d9d286fb45
author: Runxi Yu <me@runxiyu.org>
date: Fri Feb 20 23:55:02 EST 2026

objectstore/loose: Add streaming writer

--- a/objectstore/loose/write_bytes.go
+++ b/objectstore/loose/write_bytes.go
@@ -1,38 +1,28 @@
 package loose
 
 import (
-	"compress/zlib"
-	"crypto/rand"
-	"errors"
-	"fmt"
-	"io/fs"
-	"os"
-	"path/filepath"
+	"bytes"
 
-	"codeberg.org/lindenii/furgit/objectheader"
 	"codeberg.org/lindenii/furgit/objectid"
 	"codeberg.org/lindenii/furgit/objecttype"
 )
 
-const tempObjectFilePrefix = "tmp_obj_"
-
 // WriteBytesFull writes a full serialized object as "type size\\x00content".
 func (store *Store) WriteBytesFull(raw []byte) (objectid.ObjectID, error) {
 	var zero objectid.ObjectID
 
-	if _, _, err := parseRaw(raw); err != nil {
+	writer, finalize, err := store.WriteWriterFull()
+	if err != nil {
 		return zero, err
 	}
-
-	id := store.algo.Sum(raw)
-	relPath, err := store.objectPath(id)
-	if err != nil {
+	if _, err := bytes.NewReader(raw).WriteTo(writer); err != nil {
+		_ = writer.Close()
 		return zero, err
 	}
-	if err := store.writeCompressedAtomic(relPath, raw); err != nil {
+	if err := writer.Close(); err != nil {
 		return zero, err
 	}
-	return id, nil
+	return finalize()
 }
 
 // WriteBytesContent writes typed content bytes as a loose object.
@@ -39,85 +29,16 @@
 func (store *Store) WriteBytesContent(ty objecttype.Type, content []byte) (objectid.ObjectID, error) {
 	var zero objectid.ObjectID
 
-	header, ok := objectheader.Encode(ty, int64(len(content)))
-	if !ok {
-		return zero, fmt.Errorf("objectstore/loose: failed to encode object header for type %d", ty)
-	}
-
-	raw := make([]byte, len(header)+len(content))
-	copy(raw, header)
-	copy(raw[len(header):], content)
-	return store.WriteBytesFull(raw)
-}
-
-// writeCompressedAtomic compresses raw and writes it to relPath atomically.
-func (store *Store) writeCompressedAtomic(relPath string, raw []byte) error {
-	if _, err := store.root.Stat(relPath); err == nil {
-		return nil
-	} else if !errors.Is(err, fs.ErrNotExist) {
-		return err
-	}
-
-	dir := filepath.Dir(relPath)
-	if err := store.root.MkdirAll(dir, 0o755); err != nil {
-		return err
-	}
-
-	tmpRelPath, tmpFile, err := store.createTempObjectFile(dir)
+	writer, finalize, err := store.WriteWriterContent(ty, int64(len(content)))
 	if err != nil {
-		return err
+		return zero, err
 	}
-
-	cleanup := true
-	defer func() {
-		if tmpFile != nil {
-			_ = tmpFile.Close()
-		}
-		if cleanup {
-			_ = store.root.Remove(tmpRelPath)
-		}
-	}()
-
-	zw := zlib.NewWriter(tmpFile)
-	if _, err := zw.Write(raw); err != nil {
-		_ = zw.Close()
-		return err
+	if _, err := bytes.NewReader(content).WriteTo(writer); err != nil {
+		_ = writer.Close()
+		return zero, err
 	}
-	if err := zw.Close(); err != nil {
-		return err
+	if err := writer.Close(); err != nil {
+		return zero, err
 	}
-	if err := tmpFile.Sync(); err != nil {
-		return err
-	}
-	if err := tmpFile.Close(); err != nil {
-		return err
-	}
-	tmpFile = nil
-
-	if err := store.root.Rename(tmpRelPath, relPath); err != nil {
-		if errors.Is(err, fs.ErrExist) {
-			return nil
-		}
-		return err
-	}
-
-	cleanup = false
-	return nil
-}
-
-// createTempObjectFile creates a unique temporary object file within dir.
-func (store *Store) createTempObjectFile(dir string) (string, *os.File, error) {
-	for range 16 {
-		relPath := filepath.Join(dir, tempObjectFilePrefix+rand.Text())
-		file, err := store.root.OpenFile(relPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o644)
-		if err == nil {
-			return relPath, file, nil
-		}
-		if errors.Is(err, fs.ErrExist) {
-			continue
-		}
-		return "", nil, err
-	}
-
-	return "", nil, errors.New("objectstore/loose: failed to create temporary object file")
+	return finalize()
 }
--- a/objectstore/loose/write_test.go
+++ b/objectstore/loose/write_test.go
@@ -2,6 +2,7 @@
 
 import (
 	"bytes"
+	"io"
 	"testing"
 
 	"codeberg.org/lindenii/furgit/internal/testgit"
@@ -10,12 +11,12 @@
 	"codeberg.org/lindenii/furgit/objecttype"
 )
 
-func TestLooseStoreWriteBytesContentAgainstGit(t *testing.T) {
+func TestLooseStoreWriteWriterContentAgainstGit(t *testing.T) {
 	testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) {
 		testRepo := testgit.NewBareRepo(t, algo)
 		store := openLooseStore(t, testRepo.Dir(), algo)
 
-		content := []byte("written-by-loose-store\n")
+		content := []byte("written-by-content-writer\n")
 		expectedHex := testRepo.RunInput(t, content, "hash-object", "-t", "blob", "--stdin")
 		expectedID, err := objectid.ParseHex(algo, expectedHex)
 		if err != nil {
@@ -22,35 +23,56 @@
 			t.Fatalf("ParseHex(expected): %v", err)
 		}
 
-		writtenID, err := store.WriteBytesContent(objecttype.TypeBlob, content)
+		writer, finalize, err := store.WriteWriterContent(objecttype.TypeBlob, int64(len(content)))
 		if err != nil {
-			t.Fatalf("WriteBytesContent: %v", err)
+			t.Fatalf("WriteWriterContent: %v", err)
 		}
-		if writtenID != expectedID {
-			t.Fatalf("WriteBytesContent id = %s, want %s", writtenID, expectedID)
+		if _, err := io.Copy(writer, bytes.NewReader(content)); err != nil {
+			t.Fatalf("WriteWriterContent write: %v", err)
 		}
-
-		gotBody := testRepo.CatFile(t, "blob", writtenID)
-		if !bytes.Equal(gotBody, content) {
-			t.Fatalf("git cat-file body mismatch")
+		if err := writer.Close(); err != nil {
+			t.Fatalf("WriteWriterContent close: %v", err)
 		}
-
-		writtenID2, err := store.WriteBytesContent(objecttype.TypeBlob, content)
+		writtenID, err := finalize()
 		if err != nil {
-			t.Fatalf("WriteBytesContent second write: %v", err)
+			t.Fatalf("WriteWriterContent finalize: %v", err)
 		}
-		if writtenID2 != expectedID {
-			t.Fatalf("WriteBytesContent second id = %s, want %s", writtenID2, expectedID)
+		if writtenID != expectedID {
+			t.Fatalf("WriteWriterContent id = %s, want %s", writtenID, expectedID)
 		}
-	})
+
+			gotBody := testRepo.CatFile(t, "blob", writtenID)
+			if !bytes.Equal(gotBody, content) {
+				t.Fatalf("git cat-file body mismatch")
+			}
+
+			// Writing the same object again should succeed and return the same ID.
+			writer, finalize, err = store.WriteWriterContent(objecttype.TypeBlob, int64(len(content)))
+			if err != nil {
+				t.Fatalf("WriteWriterContent second: %v", err)
+			}
+			if _, err := io.Copy(writer, bytes.NewReader(content)); err != nil {
+				t.Fatalf("WriteWriterContent second write: %v", err)
+			}
+			if err := writer.Close(); err != nil {
+				t.Fatalf("WriteWriterContent second close: %v", err)
+			}
+			writtenID2, err := finalize()
+			if err != nil {
+				t.Fatalf("WriteWriterContent second finalize: %v", err)
+			}
+			if writtenID2 != expectedID {
+				t.Fatalf("WriteWriterContent second id = %s, want %s", writtenID2, expectedID)
+			}
+		})
 }
 
-func TestLooseStoreWriteBytesFullAgainstGit(t *testing.T) {
+func TestLooseStoreWriteWriterFullAgainstGit(t *testing.T) {
 	testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) {
 		testRepo := testgit.NewBareRepo(t, algo)
 		store := openLooseStore(t, testRepo.Dir(), algo)
 
-		body := []byte("full-write-body\n")
+		body := []byte("full-writer-body\n")
 		header, ok := objectheader.Encode(objecttype.TypeBlob, int64(len(body)))
 		if !ok {
 			t.Fatalf("objectheader.Encode failed")
@@ -60,12 +82,22 @@
 		copy(raw[len(header):], body)
 
 		wantID := algo.Sum(raw)
-		gotID, err := store.WriteBytesFull(raw)
+		writer, finalize, err := store.WriteWriterFull()
 		if err != nil {
-			t.Fatalf("WriteBytesFull: %v", err)
+			t.Fatalf("WriteWriterFull: %v", err)
 		}
+		if _, err := io.Copy(writer, bytes.NewReader(raw)); err != nil {
+			t.Fatalf("WriteWriterFull write: %v", err)
+		}
+		if err := writer.Close(); err != nil {
+			t.Fatalf("WriteWriterFull close: %v", err)
+		}
+		gotID, err := finalize()
+		if err != nil {
+			t.Fatalf("WriteWriterFull finalize: %v", err)
+		}
 		if gotID != wantID {
-			t.Fatalf("WriteBytesFull id = %s, want %s", gotID, wantID)
+			t.Fatalf("WriteWriterFull id = %s, want %s", gotID, wantID)
 		}
 
 		gotBody := testRepo.CatFile(t, "blob", gotID)
@@ -75,19 +107,70 @@
 	})
 }
 
-func TestLooseStoreWriteValidationErrors(t *testing.T) {
+func TestLooseStoreWriterValidationErrors(t *testing.T) {
 	testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) {
 		testRepo := testgit.NewBareRepo(t, algo)
 		store := openLooseStore(t, testRepo.Dir(), algo)
 
-		if _, err := store.WriteBytesFull([]byte("blob 1\x00hello")); err == nil {
-			t.Fatalf("WriteBytesFull expected size/content mismatch error")
-		}
-		if _, err := store.WriteBytesFull([]byte("not-a-header")); err == nil {
-			t.Fatalf("WriteBytesFull expected malformed header error")
-		}
-		if _, err := store.WriteBytesContent(objecttype.TypeInvalid, []byte("x")); err == nil {
-			t.Fatalf("WriteBytesContent expected invalid type error")
-		}
+		t.Run("content overflow", func(t *testing.T) {
+			writer, finalize, err := store.WriteWriterContent(objecttype.TypeBlob, 1)
+			if err != nil {
+				t.Fatalf("WriteWriterContent: %v", err)
+			}
+			if _, err := writer.Write([]byte("hello")); err == nil {
+				t.Fatalf("expected overflow error")
+			}
+			_ = writer.Close()
+			if _, err := finalize(); err == nil {
+				t.Fatalf("expected finalize error after overflow")
+			}
+		})
+
+		t.Run("content short", func(t *testing.T) {
+			writer, finalize, err := store.WriteWriterContent(objecttype.TypeBlob, 5)
+			if err != nil {
+				t.Fatalf("WriteWriterContent: %v", err)
+			}
+			if _, err := writer.Write([]byte("x")); err != nil {
+				t.Fatalf("write short: %v", err)
+			}
+			if err := writer.Close(); err != nil {
+				t.Fatalf("close short: %v", err)
+			}
+			if _, err := finalize(); err == nil {
+				t.Fatalf("expected finalize error for short content")
+			}
+		})
+
+		t.Run("full malformed header", func(t *testing.T) {
+			writer, finalize, err := store.WriteWriterFull()
+			if err != nil {
+				t.Fatalf("WriteWriterFull: %v", err)
+			}
+			if _, err := writer.Write([]byte("not-a-header")); err != nil {
+				t.Fatalf("write malformed header bytes unexpectedly failed: %v", err)
+			}
+			if err := writer.Close(); err != nil {
+				t.Fatalf("close malformed header: %v", err)
+			}
+			if _, err := finalize(); err == nil {
+				t.Fatalf("expected finalize error for malformed header")
+			}
+		})
+
+		t.Run("full size mismatch", func(t *testing.T) {
+			writer, finalize, err := store.WriteWriterFull()
+			if err != nil {
+				t.Fatalf("WriteWriterFull: %v", err)
+			}
+			raw := []byte("blob 1\x00hello")
+			if _, err := io.Copy(writer, bytes.NewReader(raw)); err == nil {
+				t.Fatalf("expected overflow error")
+			}
+			_ = writer.Close()
+			if _, err := finalize(); err == nil {
+				t.Fatalf("expected finalize error after mismatch")
+			}
+		})
 	})
 }
--- /dev/null
+++ b/objectstore/loose/write_writer.go
@@ -1,0 +1,282 @@
+package loose
+
+import (
+	"bytes"
+	"compress/zlib"
+	"crypto/rand"
+	"errors"
+	"fmt"
+	"hash"
+	"io"
+	"io/fs"
+	"os"
+	"path/filepath"
+
+	"codeberg.org/lindenii/furgit/objectheader"
+	"codeberg.org/lindenii/furgit/objectid"
+	"codeberg.org/lindenii/furgit/objecttype"
+)
+
+const tempObjectFilePrefix = "tmp_obj_"
+
+// WriteWriterContent returns a writer for object content bytes.
+// The writer accepts exactly size bytes. After closing the writer,
+// call finalize to atomically publish the loose object and get its ID.
+func (store *Store) WriteWriterContent(ty objecttype.Type, size int64) (io.WriteCloser, func() (objectid.ObjectID, error), error) {
+	if size < 0 {
+		return nil, nil, errors.New("objectstore/loose: negative content size")
+	}
+
+	header, ok := objectheader.Encode(ty, size)
+	if !ok {
+		return nil, nil, fmt.Errorf("objectstore/loose: failed to encode object header for type %d", ty)
+	}
+
+	writer, err := store.newStreamWriter(false)
+	if err != nil {
+		return nil, nil, err
+	}
+	writer.headerDone = true
+	writer.expectedContentLeft = size
+	if err := writer.writeRawChunk(header); err != nil {
+		_ = writer.Close()
+		return nil, nil, err
+	}
+
+	return writer, writer.Finalize, nil
+}
+
+// WriteWriterFull returns a writer for full raw object bytes:
+// "type size\0content". After closing the writer, call finalize
+// to atomically publish the loose object and get its ID.
+func (store *Store) WriteWriterFull() (io.WriteCloser, func() (objectid.ObjectID, error), error) {
+	writer, err := store.newStreamWriter(true)
+	if err != nil {
+		return nil, nil, err
+	}
+	return writer, writer.Finalize, nil
+}
+
+// streamWriter incrementally hashes and deflates an object into a temp file.
+// Finalize validates size accounting and atomically renames the temp file.
+type streamWriter struct {
+	// store owns path and root operations used by this write session.
+	store *Store
+	// file is the temporary destination file under objects/.
+	file *os.File
+	// zw compresses raw object bytes into file.
+	zw *zlib.Writer
+	// hash receives the same raw bytes used to compute the resulting object ID.
+	hash hash.Hash
+
+	// tmpRelPath is the relative path of file under the objects root.
+	tmpRelPath string
+
+	// fullMode selects full-object input ("type size\0content") as opposed to content-only input.
+	fullMode bool
+
+	// headerBuf accumulates header bytes while fullMode parses up to the first NUL.
+	headerBuf []byte
+	// headerDone reports whether the full-object header has been parsed.
+	headerDone bool
+	// expectedContentLeft tracks remaining declared content bytes.
+	expectedContentLeft int64
+
+	closed    bool
+	finalized bool
+	finalID   objectid.ObjectID
+	finalErr  error
+}
+
+// newStreamWriter creates a stream writer with a temp file rooted in objects/.
+func (store *Store) newStreamWriter(fullMode bool) (*streamWriter, error) {
+	hashFn, err := store.algo.New()
+	if err != nil {
+		return nil, err
+	}
+
+	tmpRelPath, file, err := store.createTempObjectFile(".")
+	if err != nil {
+		return nil, err
+	}
+
+	return &streamWriter{
+		store:      store,
+		file:       file,
+		zw:         zlib.NewWriter(file),
+		hash:       hashFn,
+		tmpRelPath: tmpRelPath,
+		fullMode:   fullMode,
+		headerBuf:  make([]byte, 0, 64),
+	}, nil
+}
+
+// Write validates and writes raw bytes into the stream.
+// In full mode, it parses and enforces the streamed header-declared content size.
+func (writer *streamWriter) Write(src []byte) (int, error) {
+	if writer.finalized {
+		return 0, errors.New("objectstore/loose: write after finalize")
+	}
+	if writer.closed {
+		return 0, errors.New("objectstore/loose: write after close")
+	}
+
+	if writer.fullMode {
+		if err := writer.acceptFull(src); err != nil {
+			return 0, err
+		}
+	} else {
+		if err := writer.acceptContent(int64(len(src))); err != nil {
+			return 0, err
+		}
+	}
+
+	if err := writer.writeRawChunk(src); err != nil {
+		return 0, err
+	}
+	return len(src), nil
+}
+
+// acceptFull validates and accounts raw full-object input.
+func (writer *streamWriter) acceptFull(src []byte) error {
+	if !writer.headerDone {
+		if nul := bytes.IndexByte(src, 0); nul >= 0 {
+			headerChunkLen := nul + 1
+			writer.headerBuf = append(writer.headerBuf, src[:headerChunkLen]...)
+			_, size, _, ok := objectheader.Parse(writer.headerBuf)
+			if !ok {
+				return errors.New("objectstore/loose: malformed object header")
+			}
+			writer.headerDone = true
+			writer.expectedContentLeft = size
+			return writer.acceptContent(int64(len(src) - headerChunkLen))
+		}
+
+		writer.headerBuf = append(writer.headerBuf, src...)
+		return nil
+	}
+
+	return writer.acceptContent(int64(len(src)))
+}
+
+// acceptContent validates and accounts content byte counts.
+func (writer *streamWriter) acceptContent(n int64) error {
+	if n > writer.expectedContentLeft {
+		return errors.New("objectstore/loose: object content exceeds declared size")
+	}
+	writer.expectedContentLeft -= n
+	return nil
+}
+
+// writeRawChunk forwards raw bytes to the hash and deflate pipeline.
+func (writer *streamWriter) writeRawChunk(src []byte) error {
+	if _, err := writer.hash.Write(src); err != nil {
+		return err
+	}
+	if _, err := writer.zw.Write(src); err != nil {
+		return err
+	}
+	return nil
+}
+
+// Close flushes and closes the underlying zlib stream and temp file.
+// It is safe to call multiple times.
+func (writer *streamWriter) Close() error {
+	if writer.closed {
+		return nil
+	}
+	writer.closed = true
+
+	errZlib := writer.zw.Close()
+	errSync := writer.file.Sync()
+	errFile := writer.file.Close()
+	writer.file = nil
+	return errors.Join(errZlib, errSync, errFile)
+}
+
+// Finalize validates write completeness and atomically publishes the object.
+// Publication is no-clobber: it links tmpRelPath to the object path and treats
+// existing destination objects as success.
+func (writer *streamWriter) Finalize() (objectid.ObjectID, error) {
+	if writer.finalized {
+		return writer.finalID, writer.finalErr
+	}
+	writer.finalized = true
+
+	var zero objectid.ObjectID
+
+	if !writer.closed {
+		if err := writer.Close(); err != nil {
+			writer.finalErr = err
+			return zero, err
+		}
+	}
+
+	if writer.fullMode && !writer.headerDone {
+		writer.finalErr = errors.New("objectstore/loose: missing full object header")
+		return zero, writer.finalErr
+	}
+	if writer.expectedContentLeft != 0 {
+		writer.finalErr = errors.New("objectstore/loose: object content shorter than declared size")
+		return zero, writer.finalErr
+	}
+
+	idBytes := writer.hash.Sum(nil)
+	id, err := objectid.FromBytes(writer.store.algo, idBytes)
+	if err != nil {
+		writer.finalErr = err
+		return zero, err
+	}
+
+	relPath, err := writer.store.objectPath(id)
+	if err != nil {
+		writer.finalErr = err
+		return zero, err
+	}
+
+	dir := filepath.Dir(relPath)
+	if err := writer.store.root.MkdirAll(dir, 0o755); err != nil {
+		writer.finalErr = err
+		return zero, err
+	}
+
+	cleanup := true
+	defer func() {
+		if cleanup {
+			_ = writer.store.root.Remove(writer.tmpRelPath)
+		}
+	}()
+
+	if err := writer.store.root.Link(writer.tmpRelPath, relPath); err != nil {
+		if errors.Is(err, fs.ErrExist) {
+			writer.finalID = id
+			cleanup = false
+			_ = writer.store.root.Remove(writer.tmpRelPath)
+			return id, nil
+		}
+		writer.finalErr = err
+		return zero, err
+	}
+
+	writer.finalID = id
+	cleanup = false
+	return id, nil
+}
+
+// createTempObjectFile creates a unique temporary object file within dir.
+// The returned path is relative to the objects root.
+func (store *Store) createTempObjectFile(dir string) (string, *os.File, error) {
+	for range 16 {
+		relPath := filepath.Join(dir, tempObjectFilePrefix+rand.Text())
+		file, err := store.root.OpenFile(relPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o644)
+		if err == nil {
+			return relPath, file, nil
+		}
+		if errors.Is(err, fs.ErrExist) {
+			continue
+		}
+		return "", nil, err
+	}
+
+	return "", nil, errors.New("objectstore/loose: failed to create temporary object file")
+}
--