ref: f21e48971068263828017c62320859748d36842e
dir: /ridefs.c/
#include <u.h> #include <libc.h> #include <fcall.h> #include <thread.h> #include <9p.h> #include <stdio.h> #include <json.h> typedef struct Evfifo Evfifo; typedef struct Qfile Qfile; typedef struct Client Client; typedef struct Rmsg Rmsg; typedef struct Rfid Rfid; struct Evfifo { int in, out; int open; int pid; /* event handler */ }; struct Qfile { RWLock; int fd; }; struct Client { Ref; QLock; /* config */ char *addr; /* internal */ Evfifo ev; Qfile qtext, tdata, tctl; char *qctl, *qinfo; }; enum { RHandshake, THandshake, TIdentify, RIdentify, TExecute, RAppendSessionOutput, RSetPromptType, CMDCOUNT }; static char *cmdtab[] = { "SupportedProtocols", "UsingProtocol", "Identify", "ReplyIdentify", "Execute", "AppendSessionOutput", "SetPromptType", }; enum { Qroot, Qrctl, Qclone, Qclient, Qctl, Qtext, Qinfo, Qevent, QCOUNT }; static char *nametab[] = { ".", "ctl", "clone", nil, "ctl", "text", "info", "event", }; struct Rfid { Ref; RWLock *l; void (*unlock)(RWLock*); int kind; int client; int pid; }; #define VERSION 0 static Client *cpool; static char *mtpt; static char *service; static char *defport; static char *user; static char *qrctl; static long time0; static uint nclients; static int pid; static int debug; void* ecalloc(ulong n){ void *p; p = emalloc9p(n); setmalloctag(p, getcallerpc(&n)); memset(p, 0, n); return p; } void* erealloc(void *p, ulong n){ p = erealloc9p(p, n); setrealloctag(p, getcallerpc(&n)); return p; } char* estrdup(char *s){ if(s == nil) return nil; s = estrdup9p(s); setrealloctag(s, getcallerpc(&s)); return s; } Client* clientref(int i){ if(i < 0 || i > nclients) return nil; return &cpool[i]; } int genqrctl(void){ if(qrctl) free(qrctl); qrctl = smprint( "version %d\n" "pid %d\n" "nclients %d\n" "debug %d\n" "defport %s\n", VERSION, pid, nclients, debug, defport); return strlen(qrctl); } int genqctl(int client){ Client *c; c = clientref(client); if(c->qctl) free(c->qctl); c->qctl = smprint( "%d\n" "connect %s\n" "pid %d\n", client, c->addr, c->ev.pid); return strlen(c->qctl); } int mkclient(void){ Client *c; int i; for(i = 0; i < nclients; i++) if(cpool[i].ref == 0) break; if(i == nclients) return -1; c = &cpool[i]; incref(c); genqctl(i); return i; } void freeclient(int i){ Client *c; c = clientref(i); if(c == nil || c->ref == 0) return; if(decref(c)) return; if(c->ev.in) close(c->ev.in); if(c->ev.out) close(c->ev.out); if(c->ev.pid > 0) postnote(PNPROC, c->ev.pid, "kill"); if(c->qtext.fd) close(c->qtext.fd); if(c->tdata.fd) close(c->tdata.fd); if(c->tctl.fd) close(c->tctl.fd); if(c->qctl) free(c->qctl); if(c->qinfo) free(c->qinfo); memset(c, 0, sizeof(*c)); } static void mkqid(Qid *q, int k, int client){ q->vers = VERSION; q->path = ((u64int)client<<32) | k&0xffffffff; switch(k){ case Qroot: case Qclient: q->type = QTDIR; break; default: q->type = QTFILE; } } char* jsonesc(char *s){ char c, *b; int i, j, sz; sz = 32; b = ecalloc(32); for(i = j = 0, c = s[i]; c != '\0'; c = s[++i]){ if(i == sz-7){ sz *= 2; b = realloc(b, sz); } if(c >= 0 && c < 32 || c == '"' || c == '\\') j += sprintf(&b[j], "\\u%04x", c); else b[j] = c; j++; } return b; } long writecmd(int fd, int cmd, ...){ va_list v; char *j, *s; long n; char **strv; int i, strc; j = nil; strc = 0; strv = nil; va_start(v, cmd); switch(cmd){ case THandshake: j = vsmprint("%d", v); break; case TIdentify: j = vsmprint("{\"apiVersion\":%d,\"identity\":%d}", v); break; case TExecute: strc = 1; strv = ecalloc(strc * sizeof(*strv)); strv[0] = jsonesc(va_arg(v, char*)); j = smprint("{\"text\":\"%s\",\"trace\":%d}", strv[0], va_arg(v, int)); break; } va_end(v); switch(cmd){ case THandshake: s = smprint("%s=%s", cmdtab[cmd], j); break; default: s = smprint("[\"%s\",%s]", cmdtab[cmd], j); break; } n = 8 + strlen(s); fprint(fd, "%c%c%c%cRIDE%s", (char)(24>>n & 0xff), (char)(16>>n & 0xff), (char)(8>>n & 0xff), (char)(n & 0xff), s); if(debug) fprint(2, "ridefs: T: %s\n", s); for(i = 0; i < strc; i++) free(strv[i]); if(strv) free(strv); free(j); free(s); return n; } long readcmd(int fd, int *cmd, JSON **arg){ JSON *j, *v; char *pld, buf[9]; long n; int i, ret; if((n = readn(fd, buf, 8)) < 0) return n; buf[9] = '\0'; if((n = strcmp(&buf[4], "RIDE")) != 0) return n; n = buf[0]<<24 & 0xff000000; n |= buf[1]<<16 & 0x00ff0000; n |= buf[2]<<8 & 0x0000ff00; n |= buf[3] & 0x000000ff; n -= 8; /* len + magic */ pld = ecalloc(n+1 + 32); /* normalization overhead */ if((n = readn(fd, pld, n)) < 0) return n; /* Normalize bespoke handshake payload to JSON */ if((j = jsonparse(pld)) == nil) if(sscanf(pld, "SupportedProtocols=%i", &i) == 1){ sprintf(pld, "[\"SupportedProtocols\",%i]", i); j = jsonparse(pld); } free(pld); ret = -1; if(j->t != JSONArray || j->first == nil) goto end; v = j->first->val; if(v->t != JSONString) goto end; for(*cmd = 0; *cmd < CMDCOUNT; (*cmd)++) if(strcmp(v->s, cmdtab[*cmd]) == 0) break; if(*cmd == CMDCOUNT) goto end; if(debug) fprint(2, "ridefs: R: %J\n", j); *arg = j->first->next->val; j->first->next->val = nil; ret = n; end: jsonfree(j); return ret; } char* rideinit(char *addr, int *fd, int *cfd, char **info){ int i, cmd; char *b; JSON *j; JSONEl *d; FILE *f; b = netmkaddr(addr, "tcp", defport); if(debug) fprintf(stderr, "dialing %s\n", addr); if((*fd = dial(b, nil, nil, cfd)) < 0) return "failed to dial"; if(readcmd(*fd, &cmd, &j) < 0) return "failed to read handshake"; if(cmd != RHandshake || j->n != 2) return "unrecognized protocol"; if(writecmd(*fd, THandshake, 2) < 0) return "failed to write handshake"; jsonfree(j); if(writecmd(*fd, TIdentify, 1, 1) < 0) return "failed to send identification message"; if(readcmd(*fd, &cmd, &j) < 0 || cmd != RIdentify) return "failed to receive identification reply"; f = sopenw(); for(i = 0, d = j->first; d != nil; d = d->next){ switch(d->val->t){ case JSONBool: case JSONNumber: i += fprintf(f, "%s %lld\n", d->name, d->val->n); break; case JSONString: i += fprintf(f, "%s %s\n", d->name, d->val->s); break; } } *info = estrdup(f->buf); fclose(f); jsonfree(j); return nil; } void handlemsgs(int in, Evfifo *ev, Qfile *out){ JSON *j, *v; char *s, *e; int cmd, t; while(1){ if(readcmd(in, &cmd, &j) < 0) continue; s = e = nil; switch(cmd){ case RAppendSessionOutput: v = jsonbyname(j, "type"); switch(t = v->n){ case 14: /* "normal" input line */ break; default: v = jsonbyname(j, "result"); s = v->s; e = smprint("o %d %ld %lld", t, strlen(s), seek(out->fd, 0, 1)); break; } break; case RSetPromptType: v = jsonbyname(j, "type"); switch(t = v->n){ case 1: s = " "; break; /* normal prompt */ case 2: s = "⎕:\n "; break; /* ⎕ input */ case 0: /* no prompt */ case 3: /* line editor */ case 4: /* ⍞ input */ case 5: /* unforeseen */ default: s = ""; break; } e = smprint("p %d %ld %lld", t, strlen(s), seek(out->fd, 0 ,1)); break; default: continue; } if(s){ wlock(out); fprint(out->fd, "%s", s); wunlock(out); } if(e){ fprint(ev->in, "%s\n", e); free(e); } jsonfree(j); } } long writeqrctl(char *b, long){ char *s; char sep[7]; sprintf(sep, " \t\n\f\r\v"); for(s = strtok(b, sep); s != nil; s = strtok(nil, sep)){ if(strcmp(s, "nclients") == 0) nclients = strtoul(strtok(nil, sep), nil, 0); if(strcmp(s, "debug") == 0) debug = atoi(strtok(nil, sep)); if(strcmp(s, "defport") == 0) defport = estrdup(strtok(nil, sep)); } cpool = erealloc(cpool, nclients*sizeof(*cpool)); genqrctl(); return strlen(qrctl); } long writeqctl(char *b, long n, int client){ Client *c; char *s, sep[7]; c = clientref(client); sprintf(sep, " \t\n\f\r\v"); for(s = strtok(b, sep); s != nil; s = strtok(nil, sep)){ if(strcmp(s, "connect") == 0) c->addr = estrdup(strtok(nil, sep)); } genqctl(client); return n; } void mkdirent(Dir *d, int kind, int client){ Client *c; Dir *dir; char *nm; memset(d, 0, sizeof(*d)); mkqid(&d->qid, kind, client); d->mode = 0444; d->atime = d->mtime = time0; d->uid = estrdup(user); d->gid = estrdup(user); d->muid = estrdup(user); if(nil != (nm = nametab[kind])) d->name = estrdup(nm); if(kind == Qclient){ nm = ecalloc(client+2); /* client+1 >= log(client) */ sprintf(nm, "%i", client); d->name = estrdup(nm); } if(d->qid.type & QTDIR) d->mode |= DMDIR | 0111; c = clientref(client); switch(kind){ case Qrctl: d->mode = 0666; break; case Qctl: case Qtext: d->mode = 0666; break; } switch(kind){ case Qrctl: d->length = strlen(qrctl); break; case Qctl: d->length = strlen(c->qctl); break; case Qinfo: d->length = strlen(c->qinfo); break; case Qtext: if(c->qtext.fd){ dir = dirfstat(c->qtext.fd); d->length = dir->length; free(dir); } else d->length = 0; break; } } int genqroot(int i, Dir *d, void*){ int j; i += Qroot + 1; if(i < Qclient){ mkdirent(d, i, -1); } else { i -= Qclient; for(j = i; j < nclients && cpool[j].ref == 0; j++); if(j == nclients) return -1; mkdirent(d, Qclient, j); } return 0; } int genqclient(int i, Dir *d, void *aux){ int client; Client *c; client = (vlong)aux; c = clientref(client); i += Qclient + 1; if(i >= Qtext && c->addr == nil) i++; if(i >= Qinfo && c->qinfo == nil) i++; if(i >= Qevent && c->ev.in == 0) i++; if(i == QCOUNT) return -1; mkdirent(d, i, client); return 0; } static void fsdestroyfid(Fid *fid){ Rfid *f; Client *c; if((f = fid->aux) == nil) return; if(c = clientref(f->client)) switch(f->kind){ case Qevent: c->ev.open -= c->ev.open ? 1 : 0; break; } freeclient(f->client); free(f); } static void fsstart(Srv*){ if(mtpt != nil) unmount(nil, mtpt); } static void fsend(Srv*){ postnote(PNGROUP, pid, "shutdown"); exits(nil); } static void fsattach(Req *r){ Rfid *f; user = getuser(); time0 = time(0); pid = getpid(); genqrctl(); f = ecalloc(sizeof(*f)); f->kind = Qroot; f->client = -1; /* no client */ mkqid(&r->fid->qid, f->kind, f->client); r->fid->aux = f; r->ofcall.qid = r->fid->qid; respond(r, nil); } static void fsopen(Req *r){ char *err; Rfid *f; Client *c; char *s; f = r->fid->aux; c = clientref(f->client); err = nil; switch(f->kind){ case Qclone: if((f->client = mkclient()) < 0) err = "reached client limit"; if(err) break; f->kind = Qctl; mkqid(&r->ofcall.qid, f->kind, f->client); r->fid->qid = r->ofcall.qid; break; case Qtext: switch(f->pid = rfork(RFPROC|RFNOWAIT|RFMEM)){ case 0: f->l = &c->qtext; f->unlock = wunlock; wlock(f->l); if(c->tdata.fd == 0){ if(err = rideinit(c->addr, &c->tdata.fd, &c->tctl.fd, &c->qinfo)) goto done; pipe(&c->ev.in); switch(c->ev.pid = rfork(RFPROC|RFNOWAIT|RFMEM)){ case 0: s = smprint("/tmp/ride.%d.%d", pid, f->client); c->qtext.fd = create(s, ORDWR|ORCLOSE, DMREAD|DMAPPEND); free(s); handlemsgs(c->tdata.fd, &c->ev, &c->qtext); return; case -1: err = "unable to fork message handler"; break; default: break; } } genqctl(f->client); done: wunlock(f->l); respond(r, err); exits(nil); case -1: err = "failed to init ride"; break; default: return; } break; case Qevent: qlock(c); c->ev.open++; qunlock(c); break; default: break; } respond(r, err); } static void fsread(Req *r){ Rfid *f; Client *c; char *err; int fd; err = nil; f = r->fid->aux; c = clientref(f->client); switch(f->kind){ case Qroot: dirread9p(r, genqroot, nil); break; case Qrctl: readstr(r, qrctl); break; case Qclient: dirread9p(r, genqclient, (void*)f->client); break; case Qctl: readstr(r, c->qctl); break; case Qinfo: readstr(r, c->qinfo); break; case Qtext: switch(f->pid = rfork(RFPROC|RFNOWAIT|RFMEM)){ case 0: f->l = &c->qtext; f->unlock = runlock; rlock(f->l); fd = dup(c->qtext.fd, -1); seek(fd, r->ifcall.offset, 0); r->ofcall.count = read(fd, r->ofcall.data, r->ifcall.count); close(fd); runlock(f->l); respond(r, nil); exits(nil); case -1: err = "failed to fork read"; break; default: return; } break; case Qevent: switch(f->pid = rfork(RFPROC|RFNOWAIT|RFMEM)){ case 0: r->ofcall.count = read(c->ev.out, r->ofcall.data, r->ifcall.count); respond(r, nil); exits(nil); case -1: err = "failed to fork read"; break; default: return; } break; default: err = "read prohibited"; break; } respond(r, err); } static void fswrite(Req *r){ Rfid *f; Client *c; char *b, *d, *err; long n; d = r->ifcall.data; n = r->ifcall.count; f = r->fid->aux; c = clientref(f->client); err = nil; switch(f->kind){ case Qrctl: r->ofcall.count = writeqrctl(d, n); break; case Qctl: r->ofcall.count = writeqctl(d, n, f->client); break; case Qtext: switch(f->pid = rfork(RFPROC|RFNOWAIT|RFMEM)){ case 0: f->l = &c->qtext; f->unlock = wunlock; wlock(f->l); r->ofcall.count = write(c->qtext.fd, d, r->ifcall.count); b = ecalloc(r->ifcall.count + 1); memmove(b, d, r->ifcall.count); writecmd(c->tdata.fd, TExecute, b, 0); free(b); wunlock(f->l); respond(r, nil); exits(nil); case -1: err = "failed to fork write"; break; default: return; } break; default: err = "write prohibited"; break; } respond(r, err); } static void fsflush(Req *r){ Req *o; Rfid *f; if(o = r->oldreq) if(f = o->fid->aux){ if(f->pid) postnote(PNPROC, f->pid, "interrupt"); if(f->l) f->unlock(f->l); respond(o, "interrupted"); } respond(r, nil); } static void fsstat(Req *r){ Rfid *f; f = r->fid->aux; mkdirent(&r->d, f->kind, f->client); respond(r, nil); } static char* fswalk1(Fid *fid, char *name, Qid *qid){ Rfid *f; Client *c; char *s; int n, i; if(!(fid->qid.type&QTDIR)) return "cannot walk from non-directory"; f = fid->aux; if(strcmp(name, "..") == 0){ switch(f->kind){ case Qroot: break; case Qclient: f->kind = Qroot; freeclient(f->client); f->client = -1; break; } } else { for(i = f->kind+1; i<QCOUNT; i++) if(nametab[i] && strcmp(name, nametab[i]) == 0) break; n = strtol(name, &s, 10); if(f->kind == Qroot && i == QCOUNT && *s == 0){ i = Qclient; f->client = n; } c = clientref(f->client); switch(i){ case Qclient: if(c == nil || c->ref == 0) return "directory entry not found"; incref(c); break; case Qtext: if(c->addr == nil) return "directory entry not found"; break; case Qinfo: if(c->qinfo == nil) return "directory entry not found"; break; case Qevent: if(c->ev.in == 0) return "directory entry not found"; break; case QCOUNT: return "directory entry not found"; break; } f->kind = i; } mkqid(qid, f->kind, f->client); fid->qid = *qid; return nil; } static char* fsclone(Fid *oldfid, Fid *newfid){ Rfid *f, *o; o = oldfid->aux; if(o == nil) return "bad fid"; f = ecalloc(sizeof(*f)); memmove(f, o, sizeof(*f)); if(f->client > -1) incref(clientref(f->client)); newfid->aux = f; return nil; } Srv fs = { .destroyfid = fsdestroyfid, .start = fsstart, .end = fsend, .attach = fsattach, .open = fsopen, .read = fsread, .write = fswrite, .flush = fsflush, .stat = fsstat, .walk1 = fswalk1, .clone = fsclone, }; void usage(void){ fprintf(stderr, "usage: %s [-Dd] [-m mtpt] [-s service] [-p defport] -n [nclients]\n", argv0); } void main(int argc, char **argv){ service = nil; mtpt = "/mnt/ride"; defport = "4502"; nclients = 256; ARGBEGIN{ case 'D': chatty9p++; break; case 'd': debug++; break; case 'm': mtpt = EARGF(usage()); break; case 'n': nclients = atoi(EARGF(usage())); break; case 's': service = EARGF(usage()); break; case 'p': defport = EARGF(usage()); break; default: usage(); return; }ARGEND cpool = ecalloc(nclients*sizeof(*cpool)); JSONfmtinstall(); rfork(RFNOTEG); postmountsrv(&fs, service, mtpt, MREPL); exits(nil); }