shithub: neoventi

ref: dbb6fc6e19e87ac2e212637c1061d3af619d4f94
dir: /main.c/

View raw version
#include <u.h>
#include <libc.h>
#include <bio.h>

#include "whack.h"

#define	U8GET(p)	((p)[0])
#define	U16GET(p)	(((p)[0]<<8)|(p)[1])
#define	U32GET(p)	((u32int)(((p)[0]<<24)|((p)[1]<<16)|((p)[2]<<8)|(p)[3]))
#define	U64GET(p)	(((u64int)U32GET(p)<<32)|(u64int)U32GET((p)+4))
#define MACHINE "kaladin"

typedef enum {
	VtRerror	= 1,
	VtTping		= 2,
	VtRping,
	VtThello	= 4,
	VtRhello,
	VtTgoodbye	= 6,
	VtRgoodbye,	/* not used */
	VtTauth0	= 8,
	VtRauth0,
	VtTauth1	= 10,
	VtRauth1,
	VtTread		= 12,
	VtRread,
	VtTwrite	= 14,
	VtRwrite,
	VtTsync		= 16,
	VtRsync,
	VtTmax
} VtTag;

enum {
	/* blank space at beginning of partition - useful for config
	 * or for when you accidentally {mount /dev/.../arenas /mnt/venti} */
	PartBlank = 256 * 1024,
	NameSize = 64,
	/* FIXME make arena part head a full block? breaking change for new arena partitions, removes
	 * need for special casing and magic around it. Just read the damn block. */
	HeadSize = 512,
	ArenaPartMagic = 0xa9e4a5e7U,
	ISectMagic = 0xd15c5ec7U,
	IBucketSize		= 6,
	IEntrySize		= 38,
	MaxAMap = 31*1024,
	ClumpInfoSize = 25,
	ABlockLog = 9, /* All reads are of 512 byte sectors??? Yikes. We should probably use a larger size, FIXME. */
};

typedef struct {
	int fd;
} VtConn;

typedef struct {
	char name[NameSize];
	u32int clumpmagic;
	u32int clumpmax;
	u32int blocksize;
	u32int version;
	u32int ctime, wtime;

	/* disk info */
	u64int size;
	u64int base;
	int fd;
	struct {
		u32int clumps;
	} memstats;
} VtArena;

typedef struct {
	int		blocklog;		/* log2(blocksize) */
	int		buckmax;		/* max. entries in a index bucket */
	u32int		tabbase;		/* base address of index config table on disk */
	u32int		tabsize;		/* max. bytes in index config */

	u32int		version;
	u32int		bucketmagic;
	char		name[NameSize];	/* text label */
	char		index[NameSize];	/* index owning the section */
	u32int		blocksize;		/* size of hash buckets in index */
	u32int		blockbase;		/* address of start of on disk index table */
	u32int		blocks;			/* total blocks on disk; some may be unused */
	u32int		start;			/* first bucket in this section */
	u32int		stop;			/* limit of buckets in this section */
	int fd;
} VtISect;

typedef struct {
	char name[NameSize];
	u64int start, stop;
} MapEntry;

struct {
	u32int blocksize;
	u32int buckets;
	VtISect *sects;
	int nsects;
	u32int div;
	u32int namap;
	MapEntry *amap;
} index;

VtArena *arenas = nil;
u32int numarenas = 0;

int
stru64int(char *s, u64int *r)
{
	char *t;
	u64int n, nn, m;
	int c;

	m = ((u64int)~(u64int)0) / 10;
	n = 0;
	for(t = s; ; t++){
		c = *t;
		if(c < '0' || c > '9')
			break;
		if(n > m)
			return -1;
		nn = n * 10 + c - '0';
		if(nn < n)
			return -1;
		n = nn;
	}
	*r = n;
	return s != t && *t == '\0';
}

int
u64log2(u64int v)
{
	for(int i = 0; i < 64; i++)
		if((v >> i) <= 1)
			return i;
	return -1;
}

int
stru32int(char *s, u32int *r)
{
	u64int tmp;
	if(stru64int(s, &tmp) < 0)
		return -1;
	if(tmp & 0xFFFFFFFF00000000)
		return -1;
	*r = (u32int)(tmp & 0xFFFFFFFF);
	return 0;
}

static int
Brdu32(Biobufhdr *bio, u32int *u32)
{
	char *line = Brdline(bio, '\n');
	if(line == nil)
		return 0;
	return stru32int(line, u32) == 0;
}

void
parseargs(int argc, char **argv)
{
	if(argc != 1)
		sysfatal("TODO arg parsing");
}

static int
parsemap(Biobufhdr *b, MapEntry **map, u32int *nmap)
{
	u32int i;
	char *s;
	char *fields[4];
	if(!Brdu32(b, nmap))
		return 0;
	if(*nmap > MaxAMap)
		return 0;
	print("reading %d map entries\n", *nmap);
	*map = realloc(*map, *nmap * sizeof(MapEntry));
	for(i = 0; i < *nmap; i += 1){
		s = Brdline(b, '\n');
		if(getfields(s, fields, 3, 0, "\t") != 3)
			sysfatal("corrupt index map: %s", s);
		memcpy((*map)[i].name, fields[0], NameSize);
		(*map)[i].name[NameSize-1] = 0;
		if(stru64int(fields[1], &(*map)[i].start) < 0)
			sysfatal("corrupt index map: %s", fields[1]);
		if(stru64int(fields[2], &(*map)[i].stop) < 0)
			sysfatal("corrupt index map: %s", fields[2]);
	//	print("amap entry %d: [%llud, %llud)\n", i, (*map)[i].start, (*map)[i].stop);
	}
	return 1;
}

static u64int
partlen(int fd, char *path)
{
	Dir *dir = dirfstat(fd);
	u64int len;
	if(dir == nil)
		sysfatal("Cannot stat partition %s", path);
	if(dir->length == 0)
		sysfatal("can't determine size of partition %s", path);
	len = dir->length;
	free(dir);
	return len;
}

static void
loadarena(VtArena *arena)
{
	u32int magic, version;
	char *buf = malloc(arena->blocksize);
	u8int *p = (void*)buf;
	if(pread(arena->fd, buf, arena->blocksize, arena->base + arena->size) != arena->blocksize)
		sysfatal("failed to pread");
	magic = U32GET(p);
	version = U32GET(p + 4);
	if(strncmp(arena->name, buf + 8, strlen(arena->name)) != 0)
		sysfatal("arena name mismatch: %s vs %s, ver %d", arena->name, buf + 8, version);
	
}

// FIXME see initarenapart and initarena/loadarena in venti/venti

static void
initarena(VtArena *arena, int fd, MapEntry entry, u32int blocksize)
{
	arena->fd = fd;
	arena->blocksize = blocksize;
	arena->clumpmax = blocksize / ClumpInfoSize;
	arena->base = entry.start + blocksize;
	arena->size = entry.stop - entry.start - 2*blocksize;
	memcpy(arena->name, entry.name, NameSize);
	loadarena(arena);
}

static void
readarenatable(int fd, u32int tabbase, u32int tabsize, char *path, u32int blocksize)
{
	Biobufhdr bio;
	char *buf;
	MapEntry *map = nil;
	u32int nmap;
	buf = malloc(tabsize);
	if(buf == nil)
		sysfatal("oom; you're a loser: %r");
	if(Binits(&bio, fd, OREAD, (uchar*)buf, tabsize))
		sysfatal("failed to init biobuf: %r");
	if(Bseek(&bio, tabbase, 0) != tabbase)
		sysfatal("seek failed: %r");
	parsemap(&bio, &map, &nmap);
	print("found %d arenas\n", nmap);
	arenas = realloc(arenas, sizeof(VtArena) * (nmap + numarenas));
	if(!arenas)
		sysfatal("oom");
	for(; nmap > 0; nmap -= 1)
		initarena(&arenas[numarenas++], fd, map[nmap-1], blocksize);
	free(map);
}

static void
initarenapart(char *path)
{
	u32int version, magic, blocksize, arenabase, tabbase, tabsize;
	u64int size;
	char buf[HeadSize];
	u8int *p = (void*)buf;
	MapEntry *map;
	u32int nmap;
	int fd;
	
	if((fd = open(path, OREAD)) < 0)
		sysfatal("failed to open arena %s: %r", path);
	if(pread(fd, buf, HeadSize, PartBlank) != HeadSize)
		sysfatal("failed to read arena header table: %r");
	magic = U32GET(p);
	version = U32GET(p + 4);
	blocksize = U32GET(p + 8);
	arenabase = U32GET(p + 12);
	if(magic != ArenaPartMagic)
		sysfatal("bad arena partition magic number: %#ux expected ArenaPartMagic (%#ux)", magic, ArenaPartMagic);
	if(version != 3)
		sysfatal("bad arena partition version: only 3 is supported, found %d", version);
	if(blocksize & (blocksize - 1))
		sysfatal("invalid block size: %d is not a power of two", blocksize);
	/* Head is not perfectly aligned; table must be aligned as first block */
	tabbase = (PartBlank + HeadSize + blocksize - 1) & ~(blocksize - 1);
	if(tabbase >= arenabase)
		sysfatal("arena partition table overlaps with storage");
	tabsize = arenabase - tabbase;
	size = partlen(fd, path) & ~(u64int)(blocksize - 1);
	
	readarenatable(fd, tabbase, tabsize, path, blocksize);
}

static void
initarenas(void)
{
	initarenapart("/dev/" MACHINE "/arenas");
}

static void
initisectpart(char *path)
{
	u32int magic;
	char buf[HeadSize];
	u8int *p = (void*)buf;

	index.sects = realloc(index.sects, sizeof(VtISect) * (index.nsects + 1));
	VtISect *sect = &index.sects[index.nsects++];
	
	if((sect->fd = open(path, OREAD)) < 0)
		sysfatal("failed to open index section");
	if(pread(sect->fd, buf, HeadSize, PartBlank) != HeadSize)
		sysfatal("failed to read index section header");
	magic = U32GET(p);
	sect->version = U32GET(p + 4);
	memcpy(sect->name, buf + 8, NameSize);
	memcpy(sect->index, buf + 8 + NameSize, NameSize);
	sect->blocksize = U32GET(p + 8 + 2*NameSize);
	sect->blockbase = U32GET(p + 12 + 2*NameSize);
	sect->blocks = U32GET(p + 16 + 2 * NameSize);
	sect->start = U32GET(p + 20 + 2 * NameSize);
	sect->stop = U32GET(p + 24 + 2 * NameSize);
	sect->index[NameSize-1] = 0;
	sect->name[NameSize-1] = 0;
	sect->bucketmagic = 0;
	if(magic != ISectMagic)
		sysfatal("invalid / corrupt index section");
	if(sect->version == 2)
		sect->bucketmagic = U32GET(p + 28 + 2*NameSize);
	else if(sect->version != 1)
		sysfatal("unrecognized index section version %d; only 1 and 2 are supported", sect->version);
	sect->buckmax = (sect->blocksize - IBucketSize) / IEntrySize;
	sect->blocklog = u64log2(sect->blocksize);
	if(sect->blocksize != (1 << sect->blocklog))
		sysfatal("Illegal or corrupt index section");
	sect->tabbase = (PartBlank + HeadSize + sect->blocksize - 1) & ~(sect->blocksize - 1);
	if(sect->tabbase >= sect->blockbase)
		sysfatal("illegal or corrupt index section: config table overlaps bucket store");
	sect->tabsize = sect->blockbase - sect->tabbase;
	if(sect->blockbase + (u64int)sect->blocks * sect->blocksize != partlen(sect->fd, path) & ~(u64int)(sect->blocksize - 1))
		sysfatal("invalid or corrupt index section header: invalid blocks");
	if(sect->stop - sect->start > sect->blocks)
		sysfatal("invalid or corrupt index section: section overflows available space");
	if(sect->stop < sect->start)
		sysfatal("invalid or corrupt index section: impossible range");
}

static void
parseindex(void)
{
	/* parse the index header from the first section */
	u32int version;
	int i;
	Biobufhdr bio;
	char *buf = malloc(index.sects[0].tabsize);
	char *line;
	if(Binits(&bio, index.sects[0].fd, OREAD, (uchar*)buf, index.sects[0].tabsize))
		sysfatal("failed to init biobuf: %r");
	if(Bseek(&bio, index.sects[0].tabbase, 0) != index.sects[0].tabbase)
		sysfatal("seek failed: %r");
	line = Brdline(&bio, '\n');
	if(memcmp(line, "venti index configuration", 25) != 0)
		sysfatal("invalid magic found in index header");
	if(!Brdu32(&bio, &version) || version != 1)
		sysfatal("failed to read version or version unsupported");
	print("Parsing index v1...\n");
	line = Brdline(&bio, '\n');
	if(Blinelen(&bio) >= NameSize)
		sysfatal("invalid or corrupt index: name too big");
	if(memcmp(line, index.sects[0].index, strlen(index.sects[0].index)) != 0)
		sysfatal("invalid or corrupt index: index/section mismatch");
	if(!Brdu32(&bio, &index.blocksize))
		sysfatal("invalid or corrupt index: failed to read blocksize");
	/* Section map, then arena map; see parseamap */
	/* Parse both maps, overwrite the section map; we don't need it */
	parsemap(&bio, &index.amap, &index.namap);
	parsemap(&bio, &index.amap, &index.namap);
	/* Validation code here */
	for(i = 0; i < index.nsects; i += 1){
		/* TODO validate section */
		index.buckets = index.sects[i].stop;
	}
	index.div = (((u64int)1<<32)+index.buckets-1) / index.buckets;
	if((((u64int)1 << 32) - 1) / index.div + 1 != index.buckets)
		sysfatal("corrupt index: divisor and buckets inconsistent");
	/* Lastly, maparenas */
}

static void
initindex(void)
{
	initisectpart("/dev/" MACHINE "/isect");
	parseindex();
}

static void
init(void)
{
	initarenas();
	initindex();
}

static void
validate(void)
{
//	sysfatal("valid arenas are impossible.");
}

static void
vtversion(VtConn conn)
{
	char c;
	if(fprint(conn.fd, "venti-02-neoventi\n") == 18)
		while(read(conn.fd, &c, 1) == 1)
			if(c == '\n')
				return;
	fprint(conn.fd, "FUCK OFF\n");
	close(conn.fd);
	sysfatal("venti handshake failed: %r");
}

static int
vtrecv(VtConn conn, char *buf)
{
	u16int len;
	if(read(conn.fd, buf, 2) != 2){
		werrstr("Failed to read message size: %r");
		return 0;
	}
	len = (buf[0] << 8 | buf[1]);
	if(read(conn.fd, buf + 2, len) != len){
		werrstr("Failed to read message: %r");
		return 0;
	}
	return 1;
}

VtISect
isectforbucket(u32int buck)
{
	int r, l, m;

	l = 1;
	r = index.nsects - 1;
	while(l <= r){
		m = (r + l) >> 1;
		if(index.sects[m].start <= buck)
			l = m + 1;
		else
			r = m - 1;
	}
	return index.sects[l - 1];
}

static int
bucketlookup(u8int *bucket, u16int nb, u8int *score, u16int *entry)
{
	for(*entry = 0; *entry <= nb; *entry += 1){
		if(memcmp(&bucket[*entry * IEntrySize], score, 20) == 0)
			return 1;
	}
	return 0;
}

static VtArena
arenafromindex(u64int aindex)
{
	u64int i;
	for(i = 0; i < numarenas; i += 1){
		if(strcmp(arenas[i].name, index.amap[aindex].name) == 0)
			return arenas[i];
	}
	sysfatal("arena not found");
	return arenas[0];
}

static u64int
aindexfromaddr(u64int addr)
{
	u64int a;
	for(a = 0; a < index.namap; a += 1)
		if(addr >= index.amap[a].start && addr < index.amap[a].stop)
			return a;
	sysfatal("internal corruption: arena not found for arenaindex");
	return 0;
}

static int
vtreadlookup(u8int *score, VtArena *arena, u64int *addr, u16int *size, u8int *blocks)
{
	u8int *buf;
	u16int bentries;
	u64int bucket = U32GET(score) / index.div;
	u16int entry;
	u64int aindex;
	VtISect sect = isectforbucket(bucket);
	bucket -= sect.start;
	buf = malloc(sect.blocksize);
	if(buf == nil)
		sysfatal("OOM");
	if(pread(sect.fd, (char*)buf, sect.blocksize, sect.blockbase + (bucket << sect.blocklog)) != sect.blocksize)
		sysfatal("Failed to read bucket");
	bentries = U16GET(buf);
	if(sect.bucketmagic && U32GET(buf + 2) != sect.bucketmagic)
		sysfatal("index is corrupt: invalid bucket magic: sect %lux, buck %lux", sect.bucketmagic, U32GET(buf + 2));
	if(!bucketlookup(buf + 6, bentries, score, &entry))
		sysfatal("entry not found in bucket");
	*addr = U64GET((buf + 6 + (entry * IEntrySize) + 26));
	*size = U16GET((buf + 6 + (entry * IEntrySize) + 34));
	*blocks = buf[6 + (entry*IEntrySize) + 37];
	aindex = aindexfromaddr(*addr);
	*arena = arenafromindex(aindex);
	*addr -= index.amap[aindex].start;
	free(buf);
	return 1;
}

static u64int
arenadirsize(VtArena arena)
{
	return ((arena.memstats.clumps / (arena.blocksize / 25)) + 1) * arena.blocksize;
}

static void
vtreadarena(VtArena arena, u64int addr, uchar *dbuf, u16int *size)
{
	u64int end = arena.size - arenadirsize(arena);
	char *buf = malloc(arena.blocksize);
	u16int off, n, m;
	if(addr + *size > end)
		*size = end - addr;
	addr += arena.base;
	off = addr & (arena.blocksize-1);
	addr -= off;
	n = 0;
	while(n < *size){
		// Read the next block
		if(pread(arena.fd, buf, arena.blocksize, addr) != arena.blocksize)
			sysfatal("pread failed!");
		m = arena.blocksize - off;
		if(m > *size - n)
			m = *size - n;
		memmove(&dbuf[n], &buf[off], m);
		n += m;
		off = 0;
		addr += arena.blocksize;
	}
}

static int
readclump(uchar *dst, VtArena arena, u64int addr, u8int blocks)
{
	uchar *buf = malloc(blocks << ABlockLog);
	u32int magic;
	u16int size;
	size = blocks<<ABlockLog;
	vtreadarena(arena, addr, buf, &size);
	size = U16GET(buf+7);
	if(buf[29] == 2){
		if(unwhack(dst, size, buf+38, U16GET(buf+5)) != size)
			sysfatal("decompression failed: %r");
	} else if(buf[29] == 1)
		memcpy(dst, buf+38, size);
	free(buf);
	return 1;
}

static void
vtread(VtConn conn, char *buf)
{
	u8int *score;
	VtArena arena;
	u64int addr;
	u16int size;
	u32int off;
	u8int blocks;
	uchar *dbuf;
	score = (u8int*)buf + 4;
	if(!vtreadlookup(score, &arena, &addr, &size, &blocks))
		sysfatal("todo graceful read errors");
	// Response: VtRread, msg tag, data
	dbuf = malloc(4 + size);
	dbuf[0] = (size+2)>>8;
	dbuf[1] = (size+2) & 0xFF;
	dbuf[2] = VtRread;
	dbuf[3] = buf[3];
	readclump(dbuf+4, arena, addr, blocks);
	if(write(conn.fd, dbuf, size + 4) != size+4)
		sysfatal("failed to write data");
}

static int
vtconnhandle(VtConn conn, char *buf)
{
	switch(buf[2]){
	case VtTread:
		vtread(conn, buf);
		return 1;
	case VtTgoodbye:
		return 0;
	case VtTsync:
		print("we don't support vtsync yet. Hanging up!\n");
		return 0;
	default:
		sysfatal("TODO safely hang up vtconns");
	}
	return 0;
}

static void
handle(int ctl, char *dir)
{
	char buf[0x10002];
	VtConn conn;
	conn.fd = accept(ctl, dir);
	if(conn.fd < 0)
		sysfatal("failed to accept connection: %r");
	print("received a connection at %s, fd %d\n", dir, conn.fd);
	vtversion(conn);
	if(!vtrecv(conn, buf))
		sysfatal("msg recv failed: %r");
	if(buf[2] != VtThello)
		sysfatal("received message before hello: %d", buf[2]);
	if(buf[4] != 0 || buf[5] != 2 || buf[6] != '0' || buf[7] != '2')
		sysfatal("unsupported protocol version requested in Thello: %d %d %d %d", buf[4], buf[5], buf[6], buf[7]);
	buf[2] = VtRhello;
	buf[6] = 'n';
	buf[7] = 'o';
	buf[1] = 8;
	if(write(conn.fd, buf, 10) != 10)
		sysfatal("failed to rhello: %r");
	while(vtrecv(conn, buf)){
		if(!vtconnhandle(conn, buf))
			break;
	}
	close(conn.fd);
}

static void
serve(void)
{
	char adir[NETPATHLEN], dir[NETPATHLEN];
	int fd, ctl;
	fd = announce("tcp!127.1!14011", adir);
	if(fd < 0)
		sysfatal("%r");
	procsetname("neoventi/server");
	for(ctl = listen(adir, dir); ctl >= 0; ctl = listen(adir, dir)){
		handle(ctl, dir);
		close(ctl);
	}
	fprint(2, "server has died\n");
}

void
main(int argc, char **argv)
{
	parseargs(argc, argv);
	print("Initializing neoventi build 1...\n");
	init();
	validate();
	serve();
}