ref: 8e6a699a50caaa038d45ff8db473bb7059cb2947
dir: /ridefs.c/
#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <thread.h>
#include <9p.h>
#include <stdio.h>
#include <json.h>
typedef struct Client Client;
typedef struct Rfid Rfid;
struct Client {
Ref;
char *addr;
/* tcp i/o */
QLock rl, wl;
Req *rreq, *wreq;
int rpid, wpid;
int fd, cfd;
/* buffers */
char *qinfo, *qctl;
char *r; /* response */
int rn, roff;
};
enum {
Qroot,
Qrctl,
Qclone,
Qclient,
Qctl,
Qdata,
Qinfo,
QCOUNT
};
static char *nametab[] = {
"/",
"ctl",
"clone",
nil,
"ctl",
"data",
"info",
};
struct Rfid {
Ref;
int kind;
int client;
};
#define VERSION 0
static Client *cpool;
static char *mtpt;
static char *service;
static char *defport;
static char *user;
static char *qrctl;
static long bufsz;
static long time0;
static uint nclients;
static int debug;
void*
ecalloc(ulong n){
void *p;
p = emalloc9p(n);
setmalloctag(p, getcallerpc(&n));
memset(p, 0, n);
return p;
}
void*
erealloc(void *p, ulong n){
p = erealloc9p(p, n);
setrealloctag(p, getcallerpc(&n));
return p;
}
char*
estrdup(char *s){
if(s == nil)
return nil;
s = estrdup9p(s);
setrealloctag(s, getcallerpc(&s));
return s;
}
Client*
clientref(int i){
if(i < 0 || i > nclients)
return nil;
return &cpool[i];
}
int
genqrctl(void){
snprintf(qrctl, bufsz,
"version %i\n"
"bufsz %u\n"
"nclients %u\n"
"debug %i\n"
"defport %s\n",
VERSION, bufsz, nclients, debug,defport);
return strlen(qrctl);
}
int
genqctl(int client){
Client *c;
c = clientref(client);
snprintf(c->qctl, bufsz,
"%i\n"
"connect %s\n",
client, c->addr);
return strlen(c->qctl);
}
int
mkclient(void){
Client *c;
int i;
for(i = 0; i < nclients; i++)
if(cpool[i].ref == 0)
break;
if(i == nclients)
return -1;
c = &cpool[i];
incref(c);
c->r = ecalloc(bufsz);
c->rn = -1;
c->qctl = ecalloc(bufsz);
c->qinfo = ecalloc(bufsz);
genqctl(i);
return i;
}
void
rmclient(int i){
Client *c;
c = clientref(i);
if(c == nil || decref(c))
return;
if(c->fd) close(c->fd);
if(c->cfd) close(c->cfd);
if(c->qctl) free(c->qctl);
if(c->qinfo) free(c->qinfo);
if(c->r) free(c->r);
memset(c, 0, sizeof(*c));
}
static void
mkqid(Qid *q, int k, int client){
q->vers = VERSION;
q->path = ((u64int)client<<32) | k&0xffffffff;
switch(k){
case Qroot:
case Qclient:
q->type = QTDIR; break;
default:
q->type = QTFILE;
}
}
long
writemsg(int fd, void *pld, long n){
char *r;
int len;
len = n+8;
r = ecalloc(len);
r[0] = 24>>len & 0xff;
r[1] = 16>>len & 0xff;
r[2] = 8>>len & 0xff;
r[3] = len & 0xff;
r[4] = 'R';
r[5] = 'I';
r[6] = 'D';
r[7] = 'E';
memcpy(&r[8], pld, n);
write(fd, r, len);
return n;
}
long
readmsg(int fd, void **pld){
int len, e;
char buf[9];
if(0 > (e = readn(fd, buf, 8)))
return e;
buf[9] = '\0';
if(0 != (e = strcmp(&buf[4], "RIDE")))
return e;
len = -8 + (buf[0]<<24 | buf[1]<<16 | buf[2]<<8 | buf[3]);
*pld = ecalloc(len+1); /* ensure trailing null byte */
if(0 > (e = readn(fd, *pld, len))){
free(*pld);
return e;
}
return len;
}
char *
rideinit(char *addr, int *fd, int *cfd, char **info){
int i;
char *b;
JSON *j, *v;
JSONEl *d;
b = netmkaddr(addr, "tcp", defport);
if(debug)
fprintf(stderr, "dialing %s\n", addr);
if((*fd = dial(b, nil, nil, cfd)) < 0)
return "failed to dial";
if(0 > readmsg(*fd, &b))
return "failed to read handshake";
if(0 != strcmp(b, "SupportedProtocols=2"))
return "unrecognized protocol";
free(b);
b = "UsingProtocol=2";
if(0 > writemsg(*fd, b, strlen(b)))
return "failed to write handshake";
b = "[\"Identify\",{\"apiVersion\":1,\"identity\":1}]";
if(0 > writemsg(*fd, b, strlen(b)))
return "failed to send identification message";
if(0 > readmsg(*fd, &b))
return "failed to receive identification message";
if(nil == (j = jsonparse(b)))
return "failed to parse identification message";
free(b);
if(j == nil || j->t != JSONArray || nil == j->first)
return "unrecognized reply";
v = j->first->val;
if(v->t != JSONString || 0 != strcmp(v->s, "ReplyIdentify"))
return "unexpected identification reply";
if(nil == (d = j->first->next) || d->val->t != JSONObject)
return "malformed identification reply";
for(i = 0, d = d->val->first; d != nil; d = d->next){
switch(d->val->t){
case JSONBool:
case JSONNumber:
i += snprintf(*info+i, bufsz-i, "%s %i\n", d->name, d->val->n); break;
case JSONString:
i += snprintf(*info+i, bufsz-i, "%s %s\n", d->name, d->val->s); break;
}
}
jsonfree(j);
return nil;
}
long
writeqrctl(char *b, long){
char *s;
char sep[7];
sprintf(sep, " \t\n\f\r\v");
for(s = strtok(b, sep); s != nil; s = strtok(nil, sep)){
if(strcmp(s, "bufsz") == 0)
bufsz = strtoul(strtok(nil, sep), nil, 0);
if(strcmp(s, "nclients") == 0)
nclients = strtoul(strtok(nil, sep), nil, 0);
if(strcmp(s, "debug") == 0)
debug = atoi(strtok(nil, sep));
if(strcmp(s, "defport") == 0)
defport = estrdup(strtok(nil, sep));
}
cpool = erealloc(cpool, nclients*sizeof(*cpool));
genqrctl();
return strlen(qrctl);
}
long
writeqctl(char *b, long n, int client){
Client *c;
char *s, sep[7];
c = clientref(client);
sprintf(sep, " \t\n\f\r\v");
for(s = strtok(b, sep); s != nil; s = strtok(nil, sep)){
if(strcmp(s, "connect") == 0)
c->addr = estrdup(strtok(nil, sep));
}
genqctl(client);
return n;
}
void
mkdirent(Dir *d, int kind, int client){
Client *c;
char *nm;
memset(d, 0, sizeof(*d));
mkqid(&d->qid, kind, client);
d->mode = 0444;
d->atime = d->mtime = time0;
d->uid = estrdup(user);
d->gid = estrdup(user);
d->muid = estrdup(user);
if(nil != (nm = nametab[kind]))
d->name = estrdup(nm);
if(kind == Qclient){
nm = ecalloc(client+2); /* client+1 >= log(client) */
sprintf(nm, "%i", client);
d->name = estrdup(nm);
}
if(d->qid.type & QTDIR)
d->mode |= DMDIR | 0111;
c = clientref(client);
switch(kind){
case Qrctl: d->mode = 0666; break;
case Qctl:
case Qdata: d->mode = 0666; break;
}
switch(kind){
case Qrctl: d->length = strlen(qrctl); break;
case Qctl: d->length = strlen(c->qctl); break;
case Qdata: d->length = 0; break;
case Qinfo: d->length = strlen(c->qinfo); break;
}
}
int
genqroot(int i, Dir *d, void*){
static int n;
int j;
i += Qroot + 1;
if(i < Qclient){
mkdirent(d, i, -1);
} else {
i -= Qclient;
if(i == 0)
n = 0;
for(j = n; j < nclients && cpool[j].ref == 0; j++);
if(j == nclients)
return -1;
n++;
mkdirent(d, Qclient, j);
}
return 0;
}
int
genqclient(int i, Dir *d, void *aux){
int client;
client = (vlong)aux;
i += Qclient + 1;
if(i >= QCOUNT)
return -1;
mkdirent(d, i, client);
return 0;
}
static void
fsdestroyfid(Fid *fid){
Rfid *f;
f = fid->aux;
if(f == nil)
return;
rmclient(f->client);
free(f);
}
static void
fsstart(Srv*){
if(mtpt != nil)
unmount(nil, mtpt);
}
static void
fsend(Srv*){
postnote(PNGROUP, getpid(), "shutdown");
exits(nil);
}
static void
fsattach(Req *r){
Rfid *f;
f = ecalloc(sizeof(*f));
f->kind = Qroot;
f->client = -1;
mkqid(&r->fid->qid, f->kind, f->client);
r->fid->aux = f;
r->ofcall.qid = r->fid->qid;
respond(r, nil);
}
static void
fsopen(Req *r){
char *err;
Rfid *f;
Client *c;
f = r->fid->aux;
c = clientref(f->client);
err = nil;
switch(f->kind){
case Qclone:
if((f->client = mkclient()) == -1){
respond(r, "reached client limit");
return;
}
f->kind = Qctl;
mkqid(&r->ofcall.qid, f->kind, f->client);
r->fid->qid = r->ofcall.qid;
respond(r, nil);
break;
case Qdata:
if(nil == c->addr){
respond(r, "no server set");
return;
}
qlock(&c->wl);
incref(&r->ref);
c->wreq = r;
switch(c->wpid = rfork(RFPROC|RFNOWAIT|RFMEM)){
case 0:
if(!c->fd)
err = rideinit(c->addr, &c->fd, &c->cfd, &c->qinfo);
c->wpid = 0;
c->wreq = nil;
decref(&r->ref);
respond(r, err);
qunlock(&c->wl);
exits(nil);
case -1:
respond(r, "failed to init ride");
c->wpid = 0;
c->wreq = nil;
decref(&r->ref);
qunlock(&c->wl);
break;
default:
return;
}
break;
default:
respond(r, nil);
}
}
static void
fsread(Req *r){
Rfid *f;
Client *c;
char *buf, *err;
int n, off;
buf = err = nil;
f = r->fid->aux;
c = clientref(f->client);
switch(f->kind){
case Qroot: dirread9p(r, genqroot, nil); break;
case Qrctl: buf = estrdup(qrctl); break;
case Qclone: err = "read prohibited"; break;
case Qclient: dirread9p(r, genqclient, (void*)f->client); break;
case Qctl: buf = estrdup(c->qctl); break;
case Qinfo: buf = estrdup(c->qinfo); break;
case Qdata:
qlock(&c->rl);
incref(&r->ref);
c->rreq = r;
switch(c->rpid = rfork(RFPROC|RFNOWAIT|RFMEM)){
case 0:
off = r->ifcall.offset - c->roff;
if(c->rn < 0){
free(c->r);
c->r = nil;
c->rn = readmsg(c->fd, &c->r);
c->roff = r->ifcall.offset;
off = 0;
}
n = r->ifcall.count + off < c->rn ?
r->ifcall.count : c->rn - off;
if(c->rn == off) {
r->ofcall.count = 0;
c->rn = -1;
} else {
memmove(r->ofcall.data, c->r + off, n);
r->ofcall.count = n;
}
c->rpid = 0;
c->rreq = nil;
decref(&r->ref);
respond(r, err);
qunlock(&c->rl);
exits(nil);
case -1:
err = "failed to fork read";
c->rpid = 0;
c->rreq = nil;
decref(&r->ref);
qunlock(&c->rl);
break;
default:
return;
}
break;
}
if(buf != nil){
readstr(r, buf);
free(buf);
}
respond(r, err);
}
static void
fswrite(Req *r){
Rfid *f;
Client *c;
char *d, *err;
long n;
d = r->ifcall.data;
n = r->ifcall.count;
f = r->fid->aux;
c = clientref(f->client);
err = nil;
switch(f->kind){
case Qrctl:
r->ofcall.count = writeqrctl(d, n);
break;
case Qctl:
r->ofcall.count = writeqctl(d, n, f->client);
break;
case Qdata:
qlock(&c->wl);
incref(&r->ref);
c->wreq = r;
switch(c->wpid = rfork(RFPROC|RFNOWAIT|RFMEM)){
case 0:
r->ofcall.count = writemsg(c->fd, d, n);
c->wpid = 0;
c->wreq = nil;
decref(&r->ref);
respond(r, nil);
qunlock(&c->wl);
exits(nil);
case -1:
err = "failed to fork write";
c->wpid = 0;
c->wreq = nil;
decref(&r->ref);
qunlock(&c->wl);
break;
default:
return;
}
break;
default:
err = "write prohibited";
}
respond(r, err);
}
static void
fsflush(Req *r){
Req *o;
Rfid *f;
Client *c;
if(o = r->oldreq)
if(f = o->fid->aux)
if(c = clientref(f->client))
if(o == c->rreq){
postnote(PNPROC, c->rpid, "interrupt");
respond(c->rreq, "interrupted");
c->rpid = 0;
decref(&c->rreq->ref);
qunlock(&c->rl);
} else if (o == c->wreq){
postnote(PNPROC, c->wpid, "interrupt");
respond(c->wreq, "interrupted");
c->wpid = 0;
c->wreq = nil;
decref(&c->wreq->ref);
qunlock(&c->wl);
}
respond(r, nil);
}
static void
fsstat(Req *r){
Rfid *f;
f = r->fid->aux;
mkdirent(&r->d, f->kind, f->client);
respond(r, nil);
}
static char*
fswalk1(Fid *fid, char *name, Qid *qid){
Rfid *f;
Client *c;
int i, n;
char *nend;
if(!(fid->qid.type&QTDIR))
return "cannot walk from non-directory";
f = fid->aux;
n = -1;
if(strcmp(name, "..") == 0){
switch(f->kind){
case Qroot:
break;
case Qclient:
rmclient(f->client);
f->client = -1;
break;
default:
if(f->kind > Qclient)
f->kind = Qclient;
}
} else {
for(i = f->kind+1; i<QCOUNT; i++){
if(nametab[i] && strcmp(name, nametab[i]) == 0)
break;
if(i == Qclient){
n = strtol(name, &nend, 10);
c = clientref(n);
if(*nend == 0 && c != nil && c->ref != 0){
incref(c);
f->client = n;
break;
}
}
}
if(i >= QCOUNT)
return "directory entry not found";
f->kind = i;
}
mkqid(qid, f->kind, n);
fid->qid = *qid;
return nil;
}
static char*
fsclone(Fid *oldfid, Fid *newfid){
Rfid *f, *o;
o = oldfid->aux;
if(o == nil)
return "bad fid";
f = ecalloc(sizeof(*f));
memmove(f, o, sizeof(*f));
if(f->client > -1)
incref(clientref(f->client));
newfid->aux = f;
return nil;
}
Srv fs = {
.destroyfid = fsdestroyfid,
.start = fsstart,
.end = fsend,
.attach = fsattach,
.open = fsopen,
.read = fsread,
.write = fswrite,
.flush = fsflush,
.stat = fsstat,
.walk1 = fswalk1,
.clone = fsclone,
};
void
usage(void){
fprintf(stderr, "usage: %s [-Dd] [-m mtpt] [-s service]\n", argv0);
}
void
main(int argc, char **argv){
user = getuser();
mtpt = "/mnt/ride";
defport = "4502";
bufsz = 4096;
time0 = time(0);
nclients = 256;
ARGBEGIN{
case 'D': chatty9p++; break;
case 'd': debug++; break;
case 'm': mtpt = EARGF(usage()); break;
case 's': service = EARGF(usage()); break;
default: usage(); return;
}ARGEND
cpool = ecalloc(nclients*sizeof(*cpool));
qrctl = ecalloc(bufsz);
genqrctl();
JSONfmtinstall();
if(debug)
fprintf(stderr,
"ridefs:\n"
"\tchatty9p = %i\n"
"\tdebug = %i\n"
"\tmtpt = %s\n"
"\tservice = %s\n"
"\tdefport = %s\n"
"\tfs = %p\n"
"\tcpool = %p\n",
chatty9p, debug, mtpt, service,
defport, &fs, cpool);
rfork(RFNOTEG);
postmountsrv(&fs, service, mtpt, MREPL);
exits(nil);
}