shithub: ridefs

ref: 062872e1e79275fd20111be57068da7fb631c5fe
dir: /ridefs.c/

View raw version
#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <thread.h>
#include <9p.h>
#include <stdio.h>
#include <json.h>

typedef struct Evfifo Evfifo;
typedef struct Qfile Qfile;
typedef struct Client Client;
typedef struct Rmsg Rmsg;
typedef struct Rfid Rfid;

struct Evfifo {
	int in, out;
	int open;
	int pid;      /* event handler */
};

struct Qfile {
	RWLock;
	int fd;
};

struct Client {
	Ref;
	QLock;

	/* config */
	char *addr;

	/* internal */
	Evfifo ev;
	Qfile qtext, tdata, tctl;
	char *qctl, *qinfo;
};

enum {
	RHandshake,
	THandshake,
	TIdentify,
	RIdentify,
	TExecute,
	RAppendSessionOutput,
	RSetPromptType,
	CMDCOUNT
};

static char *cmdtab[] = {
	"SupportedProtocols",
	"UsingProtocol",
	"Identify",
	"ReplyIdentify",
	"Execute",
	"AppendSessionOutput",
	"SetPromptType",
};

enum {
	Qroot,
		Qrctl,
		Qclone,
		Qclient,
			Qctl,
			Qtext,
			Qinfo,
			Qevent,
	QCOUNT
};

static char *nametab[] = {
	".",
		"ctl",
		"clone",
		nil,
			"ctl",
			"text",
			"info",
			"event",
};

struct Rfid {
	Ref;
	RWLock *l;
	void (*unlock)(RWLock*);
	int kind;
	int client;
	int pid;
};

#define VERSION 0
static Client *cpool;
static char *mtpt;
static char *service;
static char *defport;
static char *user;
static char *qrctl;
static long time0;
static uint nclients;
static int pid;
static int debug;

void*
ecalloc(ulong n){
	void *p;

	p = emalloc9p(n);
	setmalloctag(p, getcallerpc(&n));
	memset(p, 0, n);

	return p;
}

void*
erealloc(void *p, ulong n){
	p = erealloc9p(p, n);
	setrealloctag(p, getcallerpc(&n));
	return p;
}

char*
estrdup(char *s){
	if(s == nil)
		return nil;

	s = estrdup9p(s);
	setrealloctag(s, getcallerpc(&s));
	return s;
}

Client*
clientref(int i){
	if(i < 0 || i > nclients)
		return nil;
	return &cpool[i];
}

int
genqrctl(void){
	if(qrctl)
		free(qrctl);
	qrctl = smprint(
		"version %d\n"
		"pid %d\n"
		"nclients %d\n"
		"debug %d\n"
		"defport %s\n",
		VERSION, pid, nclients, debug, defport);

	return strlen(qrctl);
}

int
genqctl(int client){
	Client *c;

	c = clientref(client);
	if(c->qctl)
		free(c->qctl);
	c->qctl = smprint(
		"%d\n"
		"connect %s\n"
		"pid %d\n",
		client, c->addr, c->ev.pid);

	return strlen(c->qctl);
}

int
mkclient(void){
	Client *c;
	int i;

	for(i = 0; i < nclients; i++)
		if(cpool[i].ref == 0)
			break;
	if(i == nclients)
		return -1;
	c = &cpool[i];

	incref(c);
	genqctl(i);

	return i;
}

void
freeclient(int i){
	Client *c;

	c = clientref(i);
	if(c == nil || c->ref == 0)
		return;
	if(decref(c))
		return;

	if(c->ev.in) close(c->ev.in);
	if(c->ev.out) close(c->ev.out);
	if(c->ev.pid > 0) postnote(PNPROC, c->ev.pid, "kill");
	if(c->qtext.fd) close(c->qtext.fd);
	if(c->tdata.fd) close(c->tdata.fd);
	if(c->tctl.fd) close(c->tctl.fd);
	if(c->qctl) free(c->qctl);
	if(c->qinfo) free(c->qinfo);

	memset(c, 0, sizeof(*c));
}

static void
mkqid(Qid *q, int k, int client){
	q->vers = VERSION;
	q->path = ((u64int)client<<32) | k&0xffffffff;

	switch(k){
	case Qroot:
	case Qclient:
		q->type = QTDIR; break;
	default:
		q->type = QTFILE;
	}
}

char*
jsonesc(char *s){
	char c, *b;
	int i, j, sz;

	sz = 32;
	b = ecalloc(32);
	for(i = j = 0, c = s[i]; c != '\0'; c = s[++i]){
		if(i == sz-7){ sz *= 2; b = realloc(b, sz); }
		if(c >= 0 && c < 32 || c == '"' || c == '\\')
			j += sprintf(&b[j], "\\u%04x", c);
		else
			b[j] = c;
			j++;
	}

	return b;
}

long
writecmd(int fd, int cmd, ...){
	va_list v;
	char *j, *s;
	long n;
	char **strv;
	int i, strc;

	j = nil;
	strc = 0;
	strv = nil;
	va_start(v, cmd);
	switch(cmd){
	case THandshake:
		j = vsmprint("%d", v);
		break;
	case TIdentify:
		j = vsmprint("{\"apiVersion\":%d,\"identity\":%d}", v);
		break;
	case TExecute:
		strc = 1;
		strv = ecalloc(strc * sizeof(*strv));
		strv[0] = jsonesc(va_arg(v, char*));
		j = smprint("{\"text\":\"%s\",\"trace\":%d}", strv[0], va_arg(v, int));
		break;
	}
	va_end(v);

	switch(cmd){
	case THandshake: s = smprint("%s=%s", cmdtab[cmd], j); break;
	default: s = smprint("[\"%s\",%s]", cmdtab[cmd], j); break;
	}

	n = 8 + strlen(s);
	fprint(fd, "%c%c%c%cRIDE%s",
		(char)(24>>n & 0xff),
		(char)(16>>n & 0xff),
		(char)(8>>n & 0xff),
		(char)(n & 0xff),
		s);

	if(debug)
		fprint(2, "ridefs: T: %s\n", s);
	for(i = 0; i < strc; i++)
		free(strv[i]);
	if(strv) free(strv);
	free(j);
	free(s);
	return n;
}

long
readcmd(int fd, int *cmd, JSON **arg){
	JSON *j, *v;
	char *pld, buf[9];
	long n;
	int i, ret;

	if((n = readn(fd, buf, 8)) < 0)
		return n;
	buf[9] = '\0';
	if(strcmp(&buf[4], "RIDE") != 0)
		return -1;

	n  = buf[0]<<24 & 0xff000000;
	n |= buf[1]<<16 & 0x00ff0000;
	n |= buf[2]<<8  & 0x0000ff00;
	n |= buf[3]     & 0x000000ff;
	n -= 8; /* len + magic */
	pld = ecalloc(n+1 + 32); /* normalization overhead */
	if((n = readn(fd, pld, n)) < 0)
		return n;

	/* Normalize bespoke handshake payload to JSON */
	if((j = jsonparse(pld)) == nil)
	if(sscanf(pld, "SupportedProtocols=%i", &i) == 1){
		sprintf(pld, "[\"SupportedProtocols\",%i]", i);
		j = jsonparse(pld);
	}
	free(pld);

	ret = -1;
	if(j->t != JSONArray || j->first == nil)
		goto end;
	v = j->first->val;
	if(v->t != JSONString)
		goto end;
	for(*cmd = 0; *cmd < CMDCOUNT; (*cmd)++)
		if(strcmp(v->s, cmdtab[*cmd]) == 0)
			break;
	if(*cmd == CMDCOUNT)
		goto end;

	if(debug)
		fprint(2, "ridefs: R: %J\n", j);
	*arg = j->first->next->val;
	j->first->next->val = nil;
	ret = n;

	end:
	jsonfree(j);
	return ret;
}

char*
rideinit(char *addr, int *fd, int *cfd, char **info){
	int i, cmd;
	char *b;
	JSON *j;
	JSONEl *d;
	FILE *f;

	b = netmkaddr(addr, "tcp", defport);
	if(debug)
		fprintf(stderr, "dialing %s\n", addr);
	if((*fd = dial(b, nil, nil, cfd)) < 0)
		return "failed to dial";

	if(readcmd(*fd, &cmd, &j) < 0)
		return "failed to read handshake";
	if(cmd != RHandshake || j->n != 2)
		return "unrecognized protocol";
	if(writecmd(*fd, THandshake, 2) < 0)
		return "failed to write handshake";
	jsonfree(j);

	if(writecmd(*fd, TIdentify, 1, 1) < 0)
		return "failed to send identification message";
	if(readcmd(*fd, &cmd, &j) < 0 || cmd != RIdentify)
		return "failed to receive identification reply";

	f = sopenw();
	for(i = 0, d = j->first; d != nil; d = d->next){
		switch(d->val->t){
		case JSONBool:
		case JSONNumber:
			i += fprintf(f, "%s %lld\n", d->name, d->val->n); break;
		case JSONString:
			i += fprintf(f, "%s %s\n", d->name, d->val->s); break;
		}
	}
	*info = estrdup(f->buf);
	fclose(f);
	jsonfree(j);

	return nil;
}

void
handlemsgs(int in, Evfifo *ev, Qfile *out){
	JSON *j, *v;
	char *s, *e;
	int cmd, t;

	while(1){
		if(readcmd(in, &cmd, &j) < 0)
			continue;

		switch(cmd){
		case RAppendSessionOutput:
			v = jsonbyname(j, "type");
			t = v->n;
			v = jsonbyname(j, "result");
			s = v->s;
			e = smprint("o %d %ld %lld", t, strlen(s), seek(out->fd, 0, 1));
			break;
		case RSetPromptType:
			v = jsonbyname(j, "type");
			switch(t = v->n){
			case 1: /* normal prompt */
			case 2: /* ⎕ input */
				s = "      ";
				break;
			case 0: /* no prompt */
			case 3: /* line editor */
			case 4: /* ⍞ input */
			case 5: /* unforeseen */
			default:
				s = ""; break;
			}
			e = smprint("p %d %ld %d %s", t, strlen(s), -1, s);
			s = nil;
			break;
		default: continue;
		}

		if(s){
			wlock(out);
			fprint(out->fd, "%s", s);
			wunlock(out);
		}

		if(e){
			fprint(ev->in, "%s\n", e);
			free(e);
		}

		jsonfree(j);
	}
}

long
writeqrctl(char *b, long){
	char *s;
	char sep[7];

	sprintf(sep, " \t\n\f\r\v");
	for(s = strtok(b, sep); s != nil; s = strtok(nil, sep)){
		if(strcmp(s, "nclients") == 0)
			nclients = strtoul(strtok(nil, sep), nil, 0);
		if(strcmp(s, "debug") == 0)
			debug = atoi(strtok(nil, sep));
		if(strcmp(s, "defport") == 0)
			defport = estrdup(strtok(nil, sep));
	}

	cpool = erealloc(cpool, nclients*sizeof(*cpool));
	genqrctl();

	return strlen(qrctl);
}

long
writeqctl(char *b, long n, int client){
	Client *c;
	char *s, sep[7];

	c = clientref(client);
	sprintf(sep, " \t\n\f\r\v");
	for(s = strtok(b, sep); s != nil; s = strtok(nil, sep)){
		if(strcmp(s, "connect") == 0)
			c->addr = estrdup(strtok(nil, sep));
	}

	genqctl(client);

	return n;
}

void
mkdirent(Dir *d, int kind, int client){
	Client *c;
	Dir *dir;
	char *nm;

	memset(d, 0, sizeof(*d));
	mkqid(&d->qid, kind, client);
	d->mode = 0444;
	d->atime = d->mtime = time0;
	d->uid = estrdup(user);
	d->gid = estrdup(user);
	d->muid = estrdup(user);
	if(nil != (nm = nametab[kind]))
		d->name = estrdup(nm);
	if(kind == Qclient){
		nm = ecalloc(client+2); /* client+1 >= log(client) */
		sprintf(nm, "%i", client);
		d->name = estrdup(nm);
	}
	if(d->qid.type & QTDIR)
		d->mode |= DMDIR | 0111;

	c = clientref(client);
	switch(kind){
	case Qrctl: d->mode = 0666; break;
	case Qctl:
	case Qtext: d->mode = 0666; break;
	}

	switch(kind){
	case Qrctl: d->length = strlen(qrctl); break;
	case Qctl: d->length = strlen(c->qctl); break;
	case Qinfo: d->length = strlen(c->qinfo); break;
	case Qtext:
		if(c->qtext.fd){
			dir = dirfstat(c->qtext.fd);
			d->length = dir->length;
			free(dir);
		} else
			d->length = 0;
		break;
	}
}

int
genqroot(int i, Dir *d, void*){
	int j;

	i += Qroot + 1;
	if(i < Qclient){
		mkdirent(d, i, -1);
	} else {
		i -= Qclient;
		for(j = i; j < nclients && cpool[j].ref == 0; j++);
		if(j == nclients)
			return -1;
		mkdirent(d, Qclient, j);
	}

	return 0;
}

int
genqclient(int i, Dir *d, void *aux){
	int client;
	Client *c;

	client = (vlong)aux;
	c = clientref(client);

	i += Qclient + 1;
	if(i >= Qtext && c->addr == nil) i++;
	if(i >= Qinfo && c->qinfo == nil) i++;
	if(i >= Qevent && c->ev.in == 0) i++;
	if(i == QCOUNT)
		return -1;

	mkdirent(d, i, client);

	return 0;
}

static void
fsdestroyfid(Fid *fid){
	Rfid *f;
	Client *c;

	if((f = fid->aux) == nil)
		return;

	if(c = clientref(f->client))
		switch(f->kind){
		case Qevent: c->ev.open -= c->ev.open ? 1 : 0; break;
		}

	freeclient(f->client);
	free(f);
}

static void
fsstart(Srv*){
	if(mtpt != nil)
		unmount(nil, mtpt);
}

static void
fsend(Srv*){
	postnote(PNGROUP, pid, "shutdown");
	exits(nil);
}

static void
fsattach(Req *r){
	Rfid *f;

	user = getuser();
	time0 = time(0);
	pid = getpid();
	genqrctl();

	f = ecalloc(sizeof(*f));
	f->kind = Qroot;
	f->client = -1; /* no client */

	mkqid(&r->fid->qid, f->kind, f->client);
	r->fid->aux = f;
	r->ofcall.qid = r->fid->qid;

	respond(r, nil);
}

static void
fsopen(Req *r){
	char *err;
	Rfid *f;
	Client *c;
	char *s;

	f = r->fid->aux;
	c = clientref(f->client);
	err = nil;
	switch(f->kind){
	case Qclone:
		if((f->client = mkclient()) < 0)
			err = "reached client limit";
		if(err)
			break;

		f->kind = Qctl;

		mkqid(&r->ofcall.qid, f->kind, f->client);
		r->fid->qid = r->ofcall.qid;
		break;
	case Qtext:
		switch(f->pid = rfork(RFPROC|RFNOWAIT|RFMEM)){
		case 0:
			f->l = &c->qtext;
			f->unlock = wunlock;
			wlock(f->l);
			if(c->tdata.fd == 0){
				if(err = rideinit(c->addr, &c->tdata.fd, &c->tctl.fd, &c->qinfo))
					goto done;
				pipe(&c->ev.in);
				switch(c->ev.pid = rfork(RFPROC|RFNOWAIT|RFMEM)){
				case 0:
					s = smprint("/tmp/ride.%d.%d", pid, f->client);
					c->qtext.fd = create(s, ORDWR|ORCLOSE, DMREAD|DMAPPEND);
					free(s);
					handlemsgs(c->tdata.fd, &c->ev, &c->qtext);
					return;
				case -1: err = "unable to fork message handler"; break;
				default: break;
				}
			}
			genqctl(f->client);

			done:
			wunlock(f->l);
			respond(r, err);
			exits(nil);
		case -1: err = "failed to init ride"; break;
		default: return;
		}
		break;
	case Qevent: qlock(c); c->ev.open++; qunlock(c); break;
	default: break;
	}

	respond(r, err);
}

static void
fsread(Req *r){
	Rfid *f;
	Client *c;
	char *err;
	int fd;

	err = nil;
	f = r->fid->aux;
	c = clientref(f->client);
	switch(f->kind){
	case Qroot: dirread9p(r, genqroot, nil); break;
	case Qrctl: readstr(r, qrctl); break;
	case Qclient: dirread9p(r, genqclient, (void*)f->client); break;
	case Qctl: readstr(r, c->qctl); break;
	case Qinfo: readstr(r, c->qinfo); break;
	case Qtext:
		switch(f->pid = rfork(RFPROC|RFNOWAIT|RFMEM)){
		case 0:
			f->l = &c->qtext;
			f->unlock = runlock;
			rlock(f->l);

			fd = dup(c->qtext.fd, -1);
			r->ofcall.count = pread(fd, r->ofcall.data, r->ifcall.count, r->ifcall.offset);
			close(fd);

			runlock(f->l);
			respond(r, nil);
			exits(nil);
		case -1: err = "failed to fork read"; break;
		default: return;
		}
		break;
	case Qevent:
		switch(f->pid = rfork(RFPROC|RFNOWAIT|RFMEM)){
		case 0:
			r->ofcall.count = read(c->ev.out, r->ofcall.data, r->ifcall.count);
			respond(r, nil);
			exits(nil);
		case -1: err = "failed to fork read"; break;
		default: return;
		}
		break;
	default: err = "read prohibited"; break;
	}

	respond(r, err);
}

static void
fswrite(Req *r){
	Rfid *f;
	Client *c;
	char *b, *d, *err;
	long n;

	d = r->ifcall.data;
	n = r->ifcall.count;
	f = r->fid->aux;
	c = clientref(f->client);
	err = nil;

	switch(f->kind){
	case Qrctl: r->ofcall.count = writeqrctl(d, n); break;
	case Qctl: r->ofcall.count = writeqctl(d, n, f->client); break;
	case Qtext:
		switch(f->pid = rfork(RFPROC|RFNOWAIT|RFMEM)){
		case 0:
			f->l = &c->qtext;
			f->unlock = wunlock;
			wlock(f->l);

			r->ofcall.count = r->ifcall.count;
			b = ecalloc(r->ifcall.count + 1);
			memmove(b, d, r->ifcall.count);
			writecmd(c->tdata.fd, TExecute, b, 0);
			free(b);

			wunlock(f->l);
			respond(r, nil);
			exits(nil);
		case -1: err = "failed to fork write"; break;
		default: return;
		}
		break;
	default: err = "write prohibited"; break;
	}

	respond(r, err);
}

static void
fsflush(Req *r){
	Req *o;
	Rfid *f;

	if(o = r->oldreq)
	if(f = o->fid->aux){
		if(f->pid) postnote(PNPROC, f->pid, "interrupt");
		if(f->l) f->unlock(f->l);
		respond(o, "interrupted");
	}

	respond(r, nil);
}

static void
fsstat(Req *r){
	Rfid *f;

	f = r->fid->aux;
	mkdirent(&r->d, f->kind, f->client);

	respond(r, nil);
}

static char*
fswalk1(Fid *fid, char *name, Qid *qid){
	Rfid *f;
	Client *c;
	char *s;
	int n, i;

	if(!(fid->qid.type&QTDIR))
		return "cannot walk from non-directory";

	f = fid->aux;
	if(strcmp(name, "..") == 0){
		switch(f->kind){
		case Qroot: break;
		case Qclient:
			f->kind = Qroot;
			freeclient(f->client);
			f->client = -1;
			break;
		}
	} else {
		for(i = f->kind+1; i<QCOUNT; i++)
			if(nametab[i] && strcmp(name, nametab[i]) == 0)
				break;

		n = strtol(name, &s, 10);
		if(f->kind == Qroot && i == QCOUNT && *s == 0){
			i = Qclient;
			f->client = n;
		}

		c = clientref(f->client);
		s = "directory entry not found";
		switch(i){
		case Qclient:
			if(c == nil || c->ref == 0) return s;
			incref(c);
			break;
		case Qtext: if(c->addr == nil) return s; break;
		case Qinfo: if(c->qinfo == nil) return s; break;
		case Qevent: if(c->ev.in == 0) return s; break;
		case QCOUNT: return s; break;
		}

		f->kind = i;
	}

	mkqid(qid, f->kind, f->client);
	fid->qid = *qid;

	return nil;
}

static char*
fsclone(Fid *oldfid, Fid *newfid){
	Rfid *f, *o;

	o = oldfid->aux;
	if(o == nil)
		return "bad fid";

	f = ecalloc(sizeof(*f));
	memmove(f, o, sizeof(*f));

	if(f->client > -1)
		incref(clientref(f->client));
	newfid->aux = f;

	return nil;
}

Srv fs = {
	.destroyfid = fsdestroyfid,
	.start      = fsstart,
	.end        = fsend,
	.attach     = fsattach,
	.open       = fsopen,
	.read       = fsread,
	.write      = fswrite,
	.flush      = fsflush,
	.stat       = fsstat,
	.walk1      = fswalk1,
	.clone      = fsclone,
};

void
usage(void){
	fprintf(stderr, "usage: %s [-Dd] [-m mtpt] [-s service] [-p defport] [-n nclients]\n", argv0);
}

void
main(int argc, char **argv){
	service = nil;
	mtpt = "/mnt/ride";
	defport = "4502";
	nclients = 256;

	ARGBEGIN{
	case 'D': chatty9p++; break;
	case 'd': debug++; break;
	case 'm': mtpt = EARGF(usage()); break;
	case 'n': nclients = atoi(EARGF(usage())); break;
	case 's': service = EARGF(usage()); break;
	case 'p': defport = EARGF(usage()); break;
	default:  usage(); return;
	}ARGEND

	cpool = ecalloc(nclients*sizeof(*cpool));
	JSONfmtinstall();

	rfork(RFNOTEG);
	postmountsrv(&fs, service, mtpt, MREPL);
	exits(nil);
}