ref: 564c8ecc84ed1bfc28ea3a0251020051906b8548
parent: 3a567b7738b49f4b49cd9efed5e721d9d286fb45
author: Runxi Yu <me@runxiyu.org>
date: Fri Feb 20 23:55:02 EST 2026
objectstore/loose: Add streaming writer
--- a/objectstore/loose/write_bytes.go
+++ b/objectstore/loose/write_bytes.go
@@ -1,38 +1,28 @@
package loose
import (
- "compress/zlib"
- "crypto/rand"
- "errors"
- "fmt"
- "io/fs"
- "os"
- "path/filepath"
+ "bytes"
- "codeberg.org/lindenii/furgit/objectheader"
"codeberg.org/lindenii/furgit/objectid"
"codeberg.org/lindenii/furgit/objecttype"
)
-const tempObjectFilePrefix = "tmp_obj_"
-
// WriteBytesFull writes a full serialized object as "type size\\x00content".
func (store *Store) WriteBytesFull(raw []byte) (objectid.ObjectID, error) {var zero objectid.ObjectID
- if _, _, err := parseRaw(raw); err != nil {+ writer, finalize, err := store.WriteWriterFull()
+ if err != nil {return zero, err
}
-
- id := store.algo.Sum(raw)
- relPath, err := store.objectPath(id)
- if err != nil {+ if _, err := bytes.NewReader(raw).WriteTo(writer); err != nil {+ _ = writer.Close()
return zero, err
}
- if err := store.writeCompressedAtomic(relPath, raw); err != nil {+ if err := writer.Close(); err != nil {return zero, err
}
- return id, nil
+ return finalize()
}
// WriteBytesContent writes typed content bytes as a loose object.
@@ -39,85 +29,16 @@
func (store *Store) WriteBytesContent(ty objecttype.Type, content []byte) (objectid.ObjectID, error) {var zero objectid.ObjectID
- header, ok := objectheader.Encode(ty, int64(len(content)))
- if !ok {- return zero, fmt.Errorf("objectstore/loose: failed to encode object header for type %d", ty)- }
-
- raw := make([]byte, len(header)+len(content))
- copy(raw, header)
- copy(raw[len(header):], content)
- return store.WriteBytesFull(raw)
-}
-
-// writeCompressedAtomic compresses raw and writes it to relPath atomically.
-func (store *Store) writeCompressedAtomic(relPath string, raw []byte) error {- if _, err := store.root.Stat(relPath); err == nil {- return nil
- } else if !errors.Is(err, fs.ErrNotExist) {- return err
- }
-
- dir := filepath.Dir(relPath)
- if err := store.root.MkdirAll(dir, 0o755); err != nil {- return err
- }
-
- tmpRelPath, tmpFile, err := store.createTempObjectFile(dir)
+ writer, finalize, err := store.WriteWriterContent(ty, int64(len(content)))
if err != nil {- return err
+ return zero, err
}
-
- cleanup := true
- defer func() {- if tmpFile != nil {- _ = tmpFile.Close()
- }
- if cleanup {- _ = store.root.Remove(tmpRelPath)
- }
- }()
-
- zw := zlib.NewWriter(tmpFile)
- if _, err := zw.Write(raw); err != nil {- _ = zw.Close()
- return err
+ if _, err := bytes.NewReader(content).WriteTo(writer); err != nil {+ _ = writer.Close()
+ return zero, err
}
- if err := zw.Close(); err != nil {- return err
+ if err := writer.Close(); err != nil {+ return zero, err
}
- if err := tmpFile.Sync(); err != nil {- return err
- }
- if err := tmpFile.Close(); err != nil {- return err
- }
- tmpFile = nil
-
- if err := store.root.Rename(tmpRelPath, relPath); err != nil {- if errors.Is(err, fs.ErrExist) {- return nil
- }
- return err
- }
-
- cleanup = false
- return nil
-}
-
-// createTempObjectFile creates a unique temporary object file within dir.
-func (store *Store) createTempObjectFile(dir string) (string, *os.File, error) {- for range 16 {- relPath := filepath.Join(dir, tempObjectFilePrefix+rand.Text())
- file, err := store.root.OpenFile(relPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o644)
- if err == nil {- return relPath, file, nil
- }
- if errors.Is(err, fs.ErrExist) {- continue
- }
- return "", nil, err
- }
-
- return "", nil, errors.New("objectstore/loose: failed to create temporary object file")+ return finalize()
}
--- a/objectstore/loose/write_test.go
+++ b/objectstore/loose/write_test.go
@@ -2,6 +2,7 @@
import (
"bytes"
+ "io"
"testing"
"codeberg.org/lindenii/furgit/internal/testgit"
@@ -10,12 +11,12 @@
"codeberg.org/lindenii/furgit/objecttype"
)
-func TestLooseStoreWriteBytesContentAgainstGit(t *testing.T) {+func TestLooseStoreWriteWriterContentAgainstGit(t *testing.T) { testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) {testRepo := testgit.NewBareRepo(t, algo)
store := openLooseStore(t, testRepo.Dir(), algo)
- content := []byte("written-by-loose-store\n")+ content := []byte("written-by-content-writer\n")expectedHex := testRepo.RunInput(t, content, "hash-object", "-t", "blob", "--stdin")
expectedID, err := objectid.ParseHex(algo, expectedHex)
if err != nil {@@ -22,35 +23,56 @@
t.Fatalf("ParseHex(expected): %v", err)}
- writtenID, err := store.WriteBytesContent(objecttype.TypeBlob, content)
+ writer, finalize, err := store.WriteWriterContent(objecttype.TypeBlob, int64(len(content)))
if err != nil {- t.Fatalf("WriteBytesContent: %v", err)+ t.Fatalf("WriteWriterContent: %v", err)}
- if writtenID != expectedID {- t.Fatalf("WriteBytesContent id = %s, want %s", writtenID, expectedID)+ if _, err := io.Copy(writer, bytes.NewReader(content)); err != nil {+ t.Fatalf("WriteWriterContent write: %v", err)}
-
- gotBody := testRepo.CatFile(t, "blob", writtenID)
- if !bytes.Equal(gotBody, content) {- t.Fatalf("git cat-file body mismatch")+ if err := writer.Close(); err != nil {+ t.Fatalf("WriteWriterContent close: %v", err)}
-
- writtenID2, err := store.WriteBytesContent(objecttype.TypeBlob, content)
+ writtenID, err := finalize()
if err != nil {- t.Fatalf("WriteBytesContent second write: %v", err)+ t.Fatalf("WriteWriterContent finalize: %v", err)}
- if writtenID2 != expectedID {- t.Fatalf("WriteBytesContent second id = %s, want %s", writtenID2, expectedID)+ if writtenID != expectedID {+ t.Fatalf("WriteWriterContent id = %s, want %s", writtenID, expectedID)}
- })
+
+ gotBody := testRepo.CatFile(t, "blob", writtenID)
+ if !bytes.Equal(gotBody, content) {+ t.Fatalf("git cat-file body mismatch")+ }
+
+ // Writing the same object again should succeed and return the same ID.
+ writer, finalize, err = store.WriteWriterContent(objecttype.TypeBlob, int64(len(content)))
+ if err != nil {+ t.Fatalf("WriteWriterContent second: %v", err)+ }
+ if _, err := io.Copy(writer, bytes.NewReader(content)); err != nil {+ t.Fatalf("WriteWriterContent second write: %v", err)+ }
+ if err := writer.Close(); err != nil {+ t.Fatalf("WriteWriterContent second close: %v", err)+ }
+ writtenID2, err := finalize()
+ if err != nil {+ t.Fatalf("WriteWriterContent second finalize: %v", err)+ }
+ if writtenID2 != expectedID {+ t.Fatalf("WriteWriterContent second id = %s, want %s", writtenID2, expectedID)+ }
+ })
}
-func TestLooseStoreWriteBytesFullAgainstGit(t *testing.T) {+func TestLooseStoreWriteWriterFullAgainstGit(t *testing.T) { testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) {testRepo := testgit.NewBareRepo(t, algo)
store := openLooseStore(t, testRepo.Dir(), algo)
- body := []byte("full-write-body\n")+ body := []byte("full-writer-body\n")header, ok := objectheader.Encode(objecttype.TypeBlob, int64(len(body)))
if !ok { t.Fatalf("objectheader.Encode failed")@@ -60,12 +82,22 @@
copy(raw[len(header):], body)
wantID := algo.Sum(raw)
- gotID, err := store.WriteBytesFull(raw)
+ writer, finalize, err := store.WriteWriterFull()
if err != nil {- t.Fatalf("WriteBytesFull: %v", err)+ t.Fatalf("WriteWriterFull: %v", err)}
+ if _, err := io.Copy(writer, bytes.NewReader(raw)); err != nil {+ t.Fatalf("WriteWriterFull write: %v", err)+ }
+ if err := writer.Close(); err != nil {+ t.Fatalf("WriteWriterFull close: %v", err)+ }
+ gotID, err := finalize()
+ if err != nil {+ t.Fatalf("WriteWriterFull finalize: %v", err)+ }
if gotID != wantID {- t.Fatalf("WriteBytesFull id = %s, want %s", gotID, wantID)+ t.Fatalf("WriteWriterFull id = %s, want %s", gotID, wantID)}
gotBody := testRepo.CatFile(t, "blob", gotID)
@@ -75,19 +107,70 @@
})
}
-func TestLooseStoreWriteValidationErrors(t *testing.T) {+func TestLooseStoreWriterValidationErrors(t *testing.T) { testgit.ForEachAlgorithm(t, func(t *testing.T, algo objectid.Algorithm) {testRepo := testgit.NewBareRepo(t, algo)
store := openLooseStore(t, testRepo.Dir(), algo)
- if _, err := store.WriteBytesFull([]byte("blob 1\x00hello")); err == nil {- t.Fatalf("WriteBytesFull expected size/content mismatch error")- }
- if _, err := store.WriteBytesFull([]byte("not-a-header")); err == nil {- t.Fatalf("WriteBytesFull expected malformed header error")- }
- if _, err := store.WriteBytesContent(objecttype.TypeInvalid, []byte("x")); err == nil {- t.Fatalf("WriteBytesContent expected invalid type error")- }
+ t.Run("content overflow", func(t *testing.T) {+ writer, finalize, err := store.WriteWriterContent(objecttype.TypeBlob, 1)
+ if err != nil {+ t.Fatalf("WriteWriterContent: %v", err)+ }
+ if _, err := writer.Write([]byte("hello")); err == nil {+ t.Fatalf("expected overflow error")+ }
+ _ = writer.Close()
+ if _, err := finalize(); err == nil {+ t.Fatalf("expected finalize error after overflow")+ }
+ })
+
+ t.Run("content short", func(t *testing.T) {+ writer, finalize, err := store.WriteWriterContent(objecttype.TypeBlob, 5)
+ if err != nil {+ t.Fatalf("WriteWriterContent: %v", err)+ }
+ if _, err := writer.Write([]byte("x")); err != nil {+ t.Fatalf("write short: %v", err)+ }
+ if err := writer.Close(); err != nil {+ t.Fatalf("close short: %v", err)+ }
+ if _, err := finalize(); err == nil {+ t.Fatalf("expected finalize error for short content")+ }
+ })
+
+ t.Run("full malformed header", func(t *testing.T) {+ writer, finalize, err := store.WriteWriterFull()
+ if err != nil {+ t.Fatalf("WriteWriterFull: %v", err)+ }
+ if _, err := writer.Write([]byte("not-a-header")); err != nil {+ t.Fatalf("write malformed header bytes unexpectedly failed: %v", err)+ }
+ if err := writer.Close(); err != nil {+ t.Fatalf("close malformed header: %v", err)+ }
+ if _, err := finalize(); err == nil {+ t.Fatalf("expected finalize error for malformed header")+ }
+ })
+
+ t.Run("full size mismatch", func(t *testing.T) {+ writer, finalize, err := store.WriteWriterFull()
+ if err != nil {+ t.Fatalf("WriteWriterFull: %v", err)+ }
+ raw := []byte("blob 1\x00hello")+ if _, err := io.Copy(writer, bytes.NewReader(raw)); err == nil {+ t.Fatalf("expected overflow error")+ }
+ _ = writer.Close()
+ if _, err := finalize(); err == nil {+ t.Fatalf("expected finalize error after mismatch")+ }
+ })
})
}
--- /dev/null
+++ b/objectstore/loose/write_writer.go
@@ -1,0 +1,282 @@
+package loose
+
+import (
+ "bytes"
+ "compress/zlib"
+ "crypto/rand"
+ "errors"
+ "fmt"
+ "hash"
+ "io"
+ "io/fs"
+ "os"
+ "path/filepath"
+
+ "codeberg.org/lindenii/furgit/objectheader"
+ "codeberg.org/lindenii/furgit/objectid"
+ "codeberg.org/lindenii/furgit/objecttype"
+)
+
+const tempObjectFilePrefix = "tmp_obj_"
+
+// WriteWriterContent returns a writer for object content bytes.
+// The writer accepts exactly size bytes. After closing the writer,
+// call finalize to atomically publish the loose object and get its ID.
+func (store *Store) WriteWriterContent(ty objecttype.Type, size int64) (io.WriteCloser, func() (objectid.ObjectID, error), error) {+ if size < 0 {+ return nil, nil, errors.New("objectstore/loose: negative content size")+ }
+
+ header, ok := objectheader.Encode(ty, size)
+ if !ok {+ return nil, nil, fmt.Errorf("objectstore/loose: failed to encode object header for type %d", ty)+ }
+
+ writer, err := store.newStreamWriter(false)
+ if err != nil {+ return nil, nil, err
+ }
+ writer.headerDone = true
+ writer.expectedContentLeft = size
+ if err := writer.writeRawChunk(header); err != nil {+ _ = writer.Close()
+ return nil, nil, err
+ }
+
+ return writer, writer.Finalize, nil
+}
+
+// WriteWriterFull returns a writer for full raw object bytes:
+// "type size\0content". After closing the writer, call finalize
+// to atomically publish the loose object and get its ID.
+func (store *Store) WriteWriterFull() (io.WriteCloser, func() (objectid.ObjectID, error), error) {+ writer, err := store.newStreamWriter(true)
+ if err != nil {+ return nil, nil, err
+ }
+ return writer, writer.Finalize, nil
+}
+
+// streamWriter incrementally hashes and deflates an object into a temp file.
+// Finalize validates size accounting and atomically renames the temp file.
+type streamWriter struct {+ // store owns path and root operations used by this write session.
+ store *Store
+ // file is the temporary destination file under objects/.
+ file *os.File
+ // zw compresses raw object bytes into file.
+ zw *zlib.Writer
+ // hash receives the same raw bytes used to compute the resulting object ID.
+ hash hash.Hash
+
+ // tmpRelPath is the relative path of file under the objects root.
+ tmpRelPath string
+
+ // fullMode selects full-object input ("type size\0content") as opposed to content-only input.+ fullMode bool
+
+ // headerBuf accumulates header bytes while fullMode parses up to the first NUL.
+ headerBuf []byte
+ // headerDone reports whether the full-object header has been parsed.
+ headerDone bool
+ // expectedContentLeft tracks remaining declared content bytes.
+ expectedContentLeft int64
+
+ closed bool
+ finalized bool
+ finalID objectid.ObjectID
+ finalErr error
+}
+
+// newStreamWriter creates a stream writer with a temp file rooted in objects/.
+func (store *Store) newStreamWriter(fullMode bool) (*streamWriter, error) {+ hashFn, err := store.algo.New()
+ if err != nil {+ return nil, err
+ }
+
+ tmpRelPath, file, err := store.createTempObjectFile(".")+ if err != nil {+ return nil, err
+ }
+
+ return &streamWriter{+ store: store,
+ file: file,
+ zw: zlib.NewWriter(file),
+ hash: hashFn,
+ tmpRelPath: tmpRelPath,
+ fullMode: fullMode,
+ headerBuf: make([]byte, 0, 64),
+ }, nil
+}
+
+// Write validates and writes raw bytes into the stream.
+// In full mode, it parses and enforces the streamed header-declared content size.
+func (writer *streamWriter) Write(src []byte) (int, error) {+ if writer.finalized {+ return 0, errors.New("objectstore/loose: write after finalize")+ }
+ if writer.closed {+ return 0, errors.New("objectstore/loose: write after close")+ }
+
+ if writer.fullMode {+ if err := writer.acceptFull(src); err != nil {+ return 0, err
+ }
+ } else {+ if err := writer.acceptContent(int64(len(src))); err != nil {+ return 0, err
+ }
+ }
+
+ if err := writer.writeRawChunk(src); err != nil {+ return 0, err
+ }
+ return len(src), nil
+}
+
+// acceptFull validates and accounts raw full-object input.
+func (writer *streamWriter) acceptFull(src []byte) error {+ if !writer.headerDone {+ if nul := bytes.IndexByte(src, 0); nul >= 0 {+ headerChunkLen := nul + 1
+ writer.headerBuf = append(writer.headerBuf, src[:headerChunkLen]...)
+ _, size, _, ok := objectheader.Parse(writer.headerBuf)
+ if !ok {+ return errors.New("objectstore/loose: malformed object header")+ }
+ writer.headerDone = true
+ writer.expectedContentLeft = size
+ return writer.acceptContent(int64(len(src) - headerChunkLen))
+ }
+
+ writer.headerBuf = append(writer.headerBuf, src...)
+ return nil
+ }
+
+ return writer.acceptContent(int64(len(src)))
+}
+
+// acceptContent validates and accounts content byte counts.
+func (writer *streamWriter) acceptContent(n int64) error {+ if n > writer.expectedContentLeft {+ return errors.New("objectstore/loose: object content exceeds declared size")+ }
+ writer.expectedContentLeft -= n
+ return nil
+}
+
+// writeRawChunk forwards raw bytes to the hash and deflate pipeline.
+func (writer *streamWriter) writeRawChunk(src []byte) error {+ if _, err := writer.hash.Write(src); err != nil {+ return err
+ }
+ if _, err := writer.zw.Write(src); err != nil {+ return err
+ }
+ return nil
+}
+
+// Close flushes and closes the underlying zlib stream and temp file.
+// It is safe to call multiple times.
+func (writer *streamWriter) Close() error {+ if writer.closed {+ return nil
+ }
+ writer.closed = true
+
+ errZlib := writer.zw.Close()
+ errSync := writer.file.Sync()
+ errFile := writer.file.Close()
+ writer.file = nil
+ return errors.Join(errZlib, errSync, errFile)
+}
+
+// Finalize validates write completeness and atomically publishes the object.
+// Publication is no-clobber: it links tmpRelPath to the object path and treats
+// existing destination objects as success.
+func (writer *streamWriter) Finalize() (objectid.ObjectID, error) {+ if writer.finalized {+ return writer.finalID, writer.finalErr
+ }
+ writer.finalized = true
+
+ var zero objectid.ObjectID
+
+ if !writer.closed {+ if err := writer.Close(); err != nil {+ writer.finalErr = err
+ return zero, err
+ }
+ }
+
+ if writer.fullMode && !writer.headerDone {+ writer.finalErr = errors.New("objectstore/loose: missing full object header")+ return zero, writer.finalErr
+ }
+ if writer.expectedContentLeft != 0 {+ writer.finalErr = errors.New("objectstore/loose: object content shorter than declared size")+ return zero, writer.finalErr
+ }
+
+ idBytes := writer.hash.Sum(nil)
+ id, err := objectid.FromBytes(writer.store.algo, idBytes)
+ if err != nil {+ writer.finalErr = err
+ return zero, err
+ }
+
+ relPath, err := writer.store.objectPath(id)
+ if err != nil {+ writer.finalErr = err
+ return zero, err
+ }
+
+ dir := filepath.Dir(relPath)
+ if err := writer.store.root.MkdirAll(dir, 0o755); err != nil {+ writer.finalErr = err
+ return zero, err
+ }
+
+ cleanup := true
+ defer func() {+ if cleanup {+ _ = writer.store.root.Remove(writer.tmpRelPath)
+ }
+ }()
+
+ if err := writer.store.root.Link(writer.tmpRelPath, relPath); err != nil {+ if errors.Is(err, fs.ErrExist) {+ writer.finalID = id
+ cleanup = false
+ _ = writer.store.root.Remove(writer.tmpRelPath)
+ return id, nil
+ }
+ writer.finalErr = err
+ return zero, err
+ }
+
+ writer.finalID = id
+ cleanup = false
+ return id, nil
+}
+
+// createTempObjectFile creates a unique temporary object file within dir.
+// The returned path is relative to the objects root.
+func (store *Store) createTempObjectFile(dir string) (string, *os.File, error) {+ for range 16 {+ relPath := filepath.Join(dir, tempObjectFilePrefix+rand.Text())
+ file, err := store.root.OpenFile(relPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o644)
+ if err == nil {+ return relPath, file, nil
+ }
+ if errors.Is(err, fs.ErrExist) {+ continue
+ }
+ return "", nil, err
+ }
+
+ return "", nil, errors.New("objectstore/loose: failed to create temporary object file")+}
--
⑨