ref: e259f9d582ce2cb4ee427efe0f366504794b5095
dir: /ridefs.c/
#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <thread.h>
#include <9p.h>
#include <stdio.h>
#include <json.h>
typedef struct Evfifo Evfifo;
typedef struct Qfile Qfile;
typedef struct Client Client;
typedef struct Rmsg Rmsg;
typedef struct Rfid Rfid;
struct Evfifo {
int in, out;
int open;
int pid; /* event handler */
};
struct Qfile {
RWLock;
int fd;
};
struct Client {
Ref;
QLock;
/* config */
char *addr;
/* internal */
Evfifo ev;
Qfile qtext, tdata, tctl;
char *qctl, *qinfo;
};
enum {
RHandshake,
THandshake,
TIdentify,
RIdentify,
TExecute,
RAppendSessionOutput,
RSetPromptType,
CMDCOUNT
};
static char *cmdtab[] = {
"SupportedProtocols",
"UsingProtocol",
"Identify",
"ReplyIdentify",
"Execute",
"AppendSessionOutput",
"SetPromptType",
};
enum {
Qroot,
Qrctl,
Qclone,
Qclient,
Qctl,
Qtext,
Qinfo,
Qevent,
QCOUNT
};
static char *nametab[] = {
".",
"ctl",
"clone",
nil,
"ctl",
"text",
"info",
"event",
};
struct Rfid {
Ref;
RWLock *l;
void (*unlock)(RWLock*);
int kind;
int client;
int pid;
};
#define VERSION 0
static Client *cpool;
static char *mtpt;
static char *service;
static char *defport;
static char *user;
static char *qrctl;
static long time0;
static uint nclients;
static int debug;
void*
ecalloc(ulong n){
void *p;
p = emalloc9p(n);
setmalloctag(p, getcallerpc(&n));
memset(p, 0, n);
return p;
}
void*
erealloc(void *p, ulong n){
p = erealloc9p(p, n);
setrealloctag(p, getcallerpc(&n));
return p;
}
char*
estrdup(char *s){
if(s == nil)
return nil;
s = estrdup9p(s);
setrealloctag(s, getcallerpc(&s));
return s;
}
Client*
clientref(int i){
if(i < 0 || i > nclients)
return nil;
return &cpool[i];
}
int
genqrctl(void){
if(qrctl)
free(qrctl);
qrctl = smprint(
"version %d\n"
"nclients %d\n"
"debug %d\n"
"defport %s\n",
VERSION, nclients, debug, defport);
return strlen(qrctl);
}
int
genqctl(int client){
Client *c;
c = clientref(client);
if(c->qctl)
free(c->qctl);
c->qctl = smprint(
"%d\n"
"connect %s\n",
client, c->addr);
return strlen(c->qctl);
}
int
mkclient(void){
Client *c;
int i;
for(i = 0; i < nclients; i++)
if(cpool[i].ref == 0)
break;
if(i == nclients)
return -1;
c = &cpool[i];
incref(c);
genqctl(i);
return i;
}
void
rmclient(int i){
Client *c;
c = clientref(i);
if(decref(c))
return;
if(c->ev.in) close(c->ev.in);
if(c->ev.out) close(c->ev.out);
if(c->ev.pid > 0) postnote(PNPROC, c->ev.pid, "kill");
if(c->qtext.fd) close(c->qtext.fd);
if(c->tdata.fd) close(c->tdata.fd);
if(c->tctl.fd) close(c->tctl.fd);
if(c->qctl) free(c->qctl);
if(c->qinfo) free(c->qinfo);
memset(c, 0, sizeof(*c));
}
static void
mkqid(Qid *q, int k, int client){
q->vers = VERSION;
q->path = ((u64int)client<<32) | k&0xffffffff;
switch(k){
case Qroot:
case Qclient:
q->type = QTDIR; break;
default:
q->type = QTFILE;
}
}
char*
jsonesc(char *s){
char c, *b;
int i, sz;
sz = 32;
b = ecalloc(32);
for(i = 0, c = s[i]; c != '\0'; c = s[++i]){
if(i == sz-7){ sz *= 2; b = realloc(b, sz); }
if(c >= 0 && c < 32 || c == '"' || c == '\\')
sprintf(&b[i], "\\u%04x", c);
else
b[i] = c;
}
return b;
}
long
writecmd(int fd, int cmd, ...){
va_list v;
char *j, *s;
long n;
char **strv;
int i, strc;
j = nil;
strc = 0;
strv = nil;
va_start(v, cmd);
switch(cmd){
case THandshake:
j = vsmprint("%d", v);
break;
case TIdentify:
j = vsmprint("{\"apiVersion\":%d,\"identity\":%d}", v);
break;
case TExecute:
strc = 1;
strv = ecalloc(strc * sizeof(*strv));
strv[0] = jsonesc(va_arg(v, char*));
j = smprint("{\"text\":\"%s\",\"trace\":%d}", strv[0], va_arg(v, int));
break;
}
va_end(v);
switch(cmd){
case THandshake: s = smprint("%s=%s", cmdtab[cmd], j); break;
default: s = smprint("[\"%s\",%s]", cmdtab[cmd], j); break;
}
n = 8 + strlen(s);
fprint(fd, "%c%c%c%cRIDE%s",
(char)(24>>n & 0xff),
(char)(16>>n & 0xff),
(char)(8>>n & 0xff),
(char)(n & 0xff),
s);
if(debug)
fprint(2, "ridefs: T: %s\n", s);
for(i = 0; i < strc; i++)
free(strv[i]);
if(strv) free(strv);
free(j);
free(s);
return n;
}
long
readcmd(int fd, int *cmd, JSON **arg){
JSON *j, *v;
char *pld, buf[9];
long n;
int i, ret;
if((n = readn(fd, buf, 8)) < 0)
return n;
buf[9] = '\0';
if((n = strcmp(&buf[4], "RIDE")) != 0)
return n;
n = buf[0]<<24 & 0xff000000;
n |= buf[1]<<16 & 0x00ff0000;
n |= buf[2]<<8 & 0x0000ff00;
n |= buf[3] & 0x000000ff;
n -= 8; /* len + magic */
pld = ecalloc(n+32); /* XXX: normalization overhead */
if((n = readn(fd, pld, n)) < 0)
return n;
/* Normalize bespoke handshake payload to JSON */
if((j = jsonparse(pld)) == nil)
if(sscanf(pld, "SupportedProtocols=%i", &i) == 1){
sprintf(pld, "[\"SupportedProtocols\",%i]", i);
j = jsonparse(pld);
}
free(pld);
ret = -1;
if(j->t != JSONArray || j->first == nil)
goto end;
v = j->first->val;
if(v->t != JSONString)
goto end;
for(*cmd = 0; *cmd < CMDCOUNT; (*cmd)++)
if(strcmp(v->s, cmdtab[*cmd]) == 0)
break;
if(*cmd == CMDCOUNT)
goto end;
if(debug)
fprint(2, "ridefs: R: %J\n", j);
*arg = j->first->next->val;
j->first->next->val = nil;
ret = n;
end:
jsonfree(j);
return ret;
}
char*
rideinit(char *addr, int *fd, int *cfd, char **info){
int i, cmd;
char *b;
JSON *j;
JSONEl *d;
FILE *f;
b = netmkaddr(addr, "tcp", defport);
if(debug)
fprintf(stderr, "dialing %s\n", addr);
if((*fd = dial(b, nil, nil, cfd)) < 0)
return "failed to dial";
if(readcmd(*fd, &cmd, &j) < 0)
return "failed to read handshake";
if(cmd != RHandshake || j->n != 2)
return "unrecognized protocol";
if(writecmd(*fd, THandshake, 2) < 0)
return "failed to write handshake";
jsonfree(j);
if(writecmd(*fd, TIdentify, 1, 1) < 0)
return "failed to send identification message";
if(readcmd(*fd, &cmd, &j) < 0 || cmd != RIdentify)
return "failed to receive identification reply";
f = sopenw();
for(i = 0, d = j->first; d != nil; d = d->next){
switch(d->val->t){
case JSONBool:
case JSONNumber:
i += fprintf(f, "%s %lld\n", d->name, d->val->n); break;
case JSONString:
i += fprintf(f, "%s %s\n", d->name, d->val->s); break;
}
}
*info = estrdup(f->buf);
fclose(f);
jsonfree(j);
return nil;
}
void
handlemsgs(int in, Evfifo *ev, Qfile *out){
JSON *j, *v;
char *s;
int cmd;
vlong off;
while(1){
s = nil;
readcmd(in, &cmd, &j);
switch(cmd){
case RAppendSessionOutput:
v = jsonbyname(j, "type");
switch((int)v->n){
case 11: /* echoed input */
case 14: /* input line */
goto end;
break;
}
v = jsonbyname(j, "result");
s = v->s;
break;
case RSetPromptType:
v = jsonbyname(j, "type");
switch((int)v->n){
case 0: break; /* no prompt */
case 1: s = " "; break; /* normal prompt */
case 2: s = "⎕:\n "; break; /* ⎕ input */
case 3: break; /* line editor */
case 4: s = ""; break; /* ⍞ input */
case 5: break; /* unforeseen */
}
break;
}
if(s == nil)
continue;
off = seek(out->fd, 0, 1);
wlock(out);
fprint(out->fd, s);
wunlock(out);
if(ev->open > 0)
fprint(ev->in, "t%ld %lld\n", strlen(s), off);
end:
jsonfree(j);
}
}
long
writeqrctl(char *b, long){
char *s;
char sep[7];
sprintf(sep, " \t\n\f\r\v");
for(s = strtok(b, sep); s != nil; s = strtok(nil, sep)){
if(strcmp(s, "nclients") == 0)
nclients = strtoul(strtok(nil, sep), nil, 0);
if(strcmp(s, "debug") == 0)
debug = atoi(strtok(nil, sep));
if(strcmp(s, "defport") == 0)
defport = estrdup(strtok(nil, sep));
}
cpool = erealloc(cpool, nclients*sizeof(*cpool));
genqrctl();
return strlen(qrctl);
}
long
writeqctl(char *b, long n, int client){
Client *c;
char *s, sep[7];
c = clientref(client);
sprintf(sep, " \t\n\f\r\v");
for(s = strtok(b, sep); s != nil; s = strtok(nil, sep)){
if(strcmp(s, "connect") == 0)
c->addr = estrdup(strtok(nil, sep));
}
genqctl(client);
return n;
}
void
mkdirent(Dir *d, int kind, int client){
Client *c;
Dir *dir;
char *nm;
memset(d, 0, sizeof(*d));
mkqid(&d->qid, kind, client);
d->mode = 0444;
d->atime = d->mtime = time0;
d->uid = estrdup(user);
d->gid = estrdup(user);
d->muid = estrdup(user);
if(nil != (nm = nametab[kind]))
d->name = estrdup(nm);
if(kind == Qclient){
nm = ecalloc(client+2); /* client+1 >= log(client) */
sprintf(nm, "%i", client);
d->name = estrdup(nm);
}
if(d->qid.type & QTDIR)
d->mode |= DMDIR | 0111;
c = clientref(client);
switch(kind){
case Qrctl: d->mode = 0666; break;
case Qctl:
case Qtext: d->mode = 0666; break;
}
switch(kind){
case Qrctl: d->length = strlen(qrctl); break;
case Qctl: d->length = strlen(c->qctl); break;
case Qinfo: d->length = strlen(c->qinfo); break;
case Qtext:
if(c->qtext.fd){
dir = dirfstat(c->qtext.fd);
d->length = dir->length;
free(dir);
} else
d->length = 0;
break;
}
}
int
genqroot(int i, Dir *d, void*){
int j;
i += Qroot + 1;
if(i < Qclient){
mkdirent(d, i, -1);
} else {
i -= Qclient;
for(j = i; j < nclients && cpool[j].ref == 0; j++);
if(j == nclients)
return -1;
mkdirent(d, Qclient, j);
}
return 0;
}
int
genqclient(int i, Dir *d, void *aux){
int client;
Client *c;
client = (vlong)aux;
c = clientref(client);
i += Qclient + 1;
if(i >= Qtext && c->addr == nil) i++;
if(i >= Qinfo && c->qinfo == nil) i++;
if(i >= Qevent && c->ev.in == 0) i++;
if(i == QCOUNT)
return -1;
mkdirent(d, i, client);
return 0;
}
static void
fsdestroyfid(Fid *fid){
Rfid *f;
Client *c;
if((f = fid->aux) == nil)
return;
if(c = clientref(f->client))
switch(f->kind){
case Qevent: c->ev.open -= c->ev.open ? 1 : 0; break;
}
if(c)
rmclient(f->client);
free(f);
}
static void
fsstart(Srv*){
if(mtpt != nil)
unmount(nil, mtpt);
}
static void
fsend(Srv*){
postnote(PNGROUP, getpid(), "shutdown");
exits(nil);
}
static void
fsattach(Req *r){
Rfid *f;
f = ecalloc(sizeof(*f));
f->kind = Qroot;
f->client = -1;
mkqid(&r->fid->qid, f->kind, f->client);
r->fid->aux = f;
r->ofcall.qid = r->fid->qid;
respond(r, nil);
}
static void
fsopen(Req *r){
char *err;
Rfid *f;
Client *c;
char *s;
int ppid;
f = r->fid->aux;
c = clientref(f->client);
err = nil;
switch(f->kind){
case Qclone:
if((f->client = mkclient()) < 0)
err = "reached client limit";
if(err)
break;
f->kind = Qctl;
mkqid(&r->ofcall.qid, f->kind, f->client);
r->fid->qid = r->ofcall.qid;
break;
case Qtext:
ppid = getpid();
switch(f->pid = rfork(RFPROC|RFNOWAIT|RFMEM)){
case 0:
f->l = &c->qtext;
f->unlock = wunlock;
wlock(f->l);
if(c->tdata.fd == 0){
if(err = rideinit(c->addr, &c->tdata.fd, &c->tctl.fd, &c->qinfo))
goto done;
switch(c->ev.pid = rfork(RFPROC|RFNOWAIT|RFMEM)){
case 0:
s = smprint("/tmp/ride.%d.%d", ppid, f->client);
c->qtext.fd = create(s, ORDWR|ORCLOSE, DMREAD|DMAPPEND);
pipe(&c->ev.in);
handlemsgs(c->tdata.fd, &c->ev, &c->qtext);
free(s);
return;
case -1: err = "unable to fork message handler"; break;
default: break;
}
}
done:
wunlock(f->l);
respond(r, err);
exits(nil);
case -1: err = "failed to init ride"; break;
default: return;
}
break;
case Qevent: qlock(c); c->ev.open++; qunlock(c); break;
default: break;
}
respond(r, err);
}
static void
fsread(Req *r){
Rfid *f;
Client *c;
char *err;
int fd;
err = nil;
f = r->fid->aux;
c = clientref(f->client);
switch(f->kind){
case Qroot: dirread9p(r, genqroot, nil); break;
case Qrctl: readstr(r, qrctl); break;
case Qclient: dirread9p(r, genqclient, (void*)f->client); break;
case Qctl: readstr(r, c->qctl); break;
case Qinfo: readstr(r, c->qinfo); break;
case Qtext:
switch(f->pid = rfork(RFPROC|RFNOWAIT|RFMEM)){
case 0:
f->l = &c->qtext;
f->unlock = runlock;
rlock(f->l);
fd = dup(c->qtext.fd, -1);
seek(fd, r->ifcall.offset, 0);
r->ofcall.count = read(fd, r->ofcall.data, r->ifcall.count);
close(fd);
runlock(f->l);
respond(r, nil);
exits(nil);
case -1: err = "failed to fork read"; break;
default: return;
}
break;
case Qevent:
r->ofcall.count = read(c->ev.out, r->ofcall.data, r->ifcall.count);
break;
default: err = "read prohibited"; break;
}
respond(r, err);
}
static void
fswrite(Req *r){
Rfid *f;
Client *c;
char *b, *d, *err;
long n;
d = r->ifcall.data;
n = r->ifcall.count;
f = r->fid->aux;
c = clientref(f->client);
err = nil;
switch(f->kind){
case Qrctl: r->ofcall.count = writeqrctl(d, n); break;
case Qctl: r->ofcall.count = writeqctl(d, n, f->client); break;
case Qtext:
switch(f->pid = rfork(RFPROC|RFNOWAIT|RFMEM)){
case 0:
f->l = &c->qtext;
f->unlock = wunlock;
wlock(f->l);
r->ofcall.count = write(c->qtext.fd, d, r->ifcall.count);
b = ecalloc(r->ifcall.count + 1);
memmove(b, d, r->ifcall.count);
writecmd(c->tdata.fd, TExecute, b, 0);
free(b);
wunlock(f->l);
respond(r, nil);
exits(nil);
case -1: err = "failed to fork write"; break;
default: return;
}
break;
default: err = "write prohibited"; break;
}
respond(r, err);
}
static void
fsflush(Req *r){
Req *o;
Rfid *f;
if(o = r->oldreq)
if(f = o->fid->aux){
if(f->pid) postnote(PNPROC, f->pid, "interrupt");
if(f->l) f->unlock(f->l);
respond(o, "interrupted");
}
respond(r, nil);
}
static void
fsstat(Req *r){
Rfid *f;
f = r->fid->aux;
mkdirent(&r->d, f->kind, f->client);
respond(r, nil);
}
static char*
fswalk1(Fid *fid, char *name, Qid *qid){
Rfid *f;
Client *c;
char *s;
int n, i;
if(!(fid->qid.type&QTDIR))
return "cannot walk from non-directory";
f = fid->aux;
if(strcmp(name, "..") == 0){
switch(f->kind){
case Qroot: break;
case Qclient:
f->kind = Qroot;
if(f->client > -1) rmclient(f->client);
f->client = -1;
break;
}
} else {
for(i = f->kind+1; i<QCOUNT; i++)
if(nametab[i] && strcmp(name, nametab[i]) == 0)
break;
n = strtol(name, &s, 10);
if(f->kind == Qroot && i == QCOUNT && *s == 0){
i = Qclient;
f->client = n;
}
c = clientref(f->client);
switch(i){
case Qclient:
if(c == nil || c->ref == 0) return "directory entry not found";
incref(c);
break;
case Qtext: if(c->addr == nil) return "directory entry not found"; break;
case Qinfo: if(c->qinfo == nil) return "directory entry not found"; break;
case Qevent: if(c->ev.in == 0) return "directory entry not found"; break;
case QCOUNT: return "directory entry not found"; break;
}
f->kind = i;
}
mkqid(qid, f->kind, f->client);
fid->qid = *qid;
return nil;
}
static char*
fsclone(Fid *oldfid, Fid *newfid){
Rfid *f, *o;
o = oldfid->aux;
if(o == nil)
return "bad fid";
f = ecalloc(sizeof(*f));
memmove(f, o, sizeof(*f));
if(f->client > -1)
incref(clientref(f->client));
newfid->aux = f;
return nil;
}
Srv fs = {
.destroyfid = fsdestroyfid,
.start = fsstart,
.end = fsend,
.attach = fsattach,
.open = fsopen,
.read = fsread,
.write = fswrite,
.flush = fsflush,
.stat = fsstat,
.walk1 = fswalk1,
.clone = fsclone,
};
void
usage(void){
fprintf(stderr, "usage: %s [-Dd] [-m mtpt] [-s service]\n", argv0);
}
void
main(int argc, char **argv){
user = getuser();
mtpt = "/mnt/ride";
defport = "4502";
time0 = time(0);
nclients = 256;
ARGBEGIN{
case 'D': chatty9p++; break;
case 'd': debug++; break;
case 'm': mtpt = EARGF(usage()); break;
case 's': service = EARGF(usage()); break;
default: usage(); return;
}ARGEND
cpool = ecalloc(nclients*sizeof(*cpool));
genqrctl();
JSONfmtinstall();
if(debug)
fprintf(stderr,
"ridefs:\n"
"\tchatty9p = %i\n"
"\tdebug = %i\n"
"\tmtpt = %s\n"
"\tservice = %s\n"
"\tdefport = %s\n"
"\tfs = %p\n"
"\tcpool = %p\n",
chatty9p, debug, mtpt, service,
defport, &fs, cpool);
rfork(RFNOTEG);
postmountsrv(&fs, service, mtpt, MREPL);
exits(nil);
}