ref: 2ab7efc23fd2137ce88d328441900e0a439a3508
dir: /appl/cmd/ctlfs.b/
include "dial.m";
dial: Dial;
include "security.m";
auth: Auth;
include "keyring.m";
keyring: Keyring;
include "styx.m";
styx: Styx;
Tmsg, Rmsg: import Styx;
include "styxservers.m";
styxservers: Styxservers;
nametree: Nametree;
Tree: import nametree;
Styxserver, Fid, Navigator, Navop,
Eperm, Ecount, Eoffset, Ebadarg: import styxservers;
# Database features
dbfeatures: list of string;
# Initial fs files
Qroot, Qctl, Qname, Qstatus, Qstorage, Qnodes: con big iota;
# helper functions
is_nonempty(s: string): int
{
if(len s == 0)
return 0;
return 1;
}
# create ctlfs and the appropriate listeners
run_ctlfs(cfg: Config, dbreg: ref DbRegistry, keyfile: string, algs: list of string)
{
sys->fprint(stderr, "setting up ctlfs\n");
dbfeatures = DBVER :: "export" :: dbfeatures;
styx = load Styx Styx->PATH;
styxservers = load Styxservers Styxservers->PATH;
nametree = load Nametree Nametree->PATH;
if(debug)
sys->fprint(stderr, "ctlfs: checking if modules are loaded\n");
if(styx == nil)
error("ctlfs: styx module not found");
if(styxservers == nil)
error("ctlfs: styxservers module not found");
if(nametree == nil)
error("ctlfs: nametree module not found");
if(debug)
sys->fprint(stderr, "ctlfs: initializing modules\n");
auth->init();
styx->init();
styxservers->init(styx);
nametree->init();
# authinfo init
authinfo: ref Keyring->Authinfo;
if (keyfile == nil)
keyfile = "/usr/" + user() + "/keyring/default";
if(debug)
sys->fprint(stderr, "ctlfs: reading authinfo %s\n", keyfile);
authinfo = keyring->readauthinfo(keyfile);
if (authinfo == nil)
error(sys->sprint("ctlfs: cannot read %s: %r", keyfile));
# announcing
if(debug)
sys->fprint(stderr, "ctlfs: announcing dddbctl\n");
# addr := dial->netmkaddr(cfg.addr, "tcp", "dddbctl");
c := dial->announce(cfg.addr);
if(c == nil)
error(sys->sprint("ctlfs: cannot listen on %s\n", cfg.addr));
# bootstrapping
if(debug)
sys->fprint(stderr, "ctlfs: bootstrapping\n");
sys->unmount(nil, "/mnt/keys");
sys->unmount(nil, "/mnt");
sys->fprint(stderr, "ctlfs: finished setting up; starting\n");
# listener entrypoint
ctlfs_listener(cfg, dbreg, c, authinfo, algs);
}
# dddbctl listener loop
ctlfs_listener(cfg: Config, dbreg: ref DbRegistry, c: ref Dial->Connection, authinfo: ref Keyring->Authinfo, algs: list of string)
{
loop: for (;;) {
nc := dial->listen(c);
if (nc == nil)
error(sys->sprint("listen failed: %r"));
if (debug)
sys->fprint(stderr, "ctlfs: got connection from %s",
readfile(nc.dir + "/remote"));
dfd := dial->accept(nc);
if (dfd == nil)
continue loop;
if(nc.cfd != nil)
sys->fprint(nc.cfd, "keepalive");
hostname: string;
hostname = readfile(nc.dir + "/remote");
if(hostname != nil)
hostname = hostname[0:len hostname - 1];
regchan := dbreg.changen();
spawn ctlfs_authenticator(cfg, nametree, regchan, dfd, authinfo, algs, hostname);
}
}
# authenticate a connection and set the user id.
ctlfs_authenticator(cfg: Config, nametree: Nametree, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), dfd: ref Sys->FD, authinfo: ref Keyring->Authinfo, algs: list of string, hostname: string)
{
# authenticate and change user id appropriately
(fd, err) := auth->server(algs, authinfo, dfd, 1);
if (fd == nil) {
if (debug)
sys->fprint(stderr, "ctlfs: authentication failed: %s\n", err);
return;
}
if (debug)
sys->fprint(stderr, "ctlfs: client authenticated as %s\n", err);
spawn ctlfs_loop(cfg, nametree, regchan, fd, hostname);
}
# filesystem loop; nb: hostname will be later used for stats
ctlfs_loop(cfg: Config, nametree: Nametree, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), fd: ref Sys->FD, nil: string)
{
# nametree; this is per mount
(tree, treeop) := nametree->start();
tree.create(Qroot, dir(".", 8r555|Sys->DMDIR, Qroot));
tree.create(Qroot, dir("ctl", 8r640, Qctl));
tree.create(Qroot, dir("name", 8r444, Qname));
tree.create(Qroot, dir("status", 8r440, Qstatus));
tree.create(Qroot, dir("storage", 8r555|Sys->DMDIR, Qstorage));
tree.create(Qroot, dir("nodes", 8r555|Sys->DMDIR, Qnodes));
# styxserver start
(tc, srv) := Styxserver.new(fd, Navigator.new(treeop), big Qroot);
# registry rx/tx
(tx, rx) := regchan;
(btos, nil) := convcs->getbtos(DCS);
# Primary server loop
loop:
while((tm := <-tc) != nil) {
# Switch on operations being performed on a given Fid
pick t := tm {
# Open operation
Open =>
srv.default(t);
# Read operation
Read =>
(f, err) := srv.canread(t);
if(f == nil) {
srv.reply(ref Rmsg.Error(t.tag, err));
break;
}
if(f.qtype & Sys->QTDIR){
srv.read(t);
break;
}
case f.path {
# Qctl
Qctl =>
ctlmsg := joinstr(dbfeatures, "\n") + "\n";
ctlmsgbuf := array of byte ctlmsg;
rend := int t.offset + t.count;
if(rend > len ctlmsg)
rend = len ctlmsg;
srv.reply(ref Rmsg.Read(t.tag, ctlmsgbuf[(int t.offset):rend]));
# Qname
Qname =>
namemsg := cfg.name + "\n";
namemsgbuf := array of byte namemsg;
rend := int t.offset + t.count;
if(rend > len namemsg)
rend = len namemsg;
srv.reply(ref Rmsg.Read(t.tag, namemsgbuf[(int t.offset):rend]));
# Qstatus
Qstatus =>
info: list of string;
info = "name " + cfg.name :: info;
info = "sysname " + cfg.sysn :: info;
info = "addr " + cfg.addr :: info;
info = "storage " + cfg.storage :: info;
info = "fsworkers " + sys->sprint("%d", cfg.fswrks) :: info;
info = "" :: info;
info = "nodes" :: info;
tx <-= ref RegTMsg.GetNodes();
reply := <- rx;
pick r := reply {
Error => srv.reply(ref Rmsg.Error(t.tag, r.err));
NodeList =>
names := lists->reverse(r.names);
while(len names != 0) {
node := hd names;
sline := "";
tx <-= ref RegTMsg.Check(node);
crep := <- rx;
pick cr := crep {
Error => sline = cr.err;
Status =>
up := cr.count;
ps := cr.poolsize;
sline = sys->sprint("%d %d", up, ps);
* => sline = "unsupported message";
}
info = node + " " + sline :: info;
names = tl names;
}
* => srv.reply(ref Rmsg.Error(t.tag, "unsupported version"));
}
statusmsg := joinstr(lists->reverse(info), "\n") + "\n";
statusmsgbuf := array of byte statusmsg;
rend := int t.offset + t.count;
if(rend > len statusmsg)
rend = len statusmsg;
srv.reply(ref Rmsg.Read(t.tag, statusmsgbuf[(int t.offset):rend]));
# Default reply
* => srv.default(t);
}
# Write operation
Write =>
(f, nil) := srv.canwrite(t);
if(f == nil) {
srv.reply(ref Rmsg.Error(t.tag, Eperm));
break;
}
case f.path {
# Qctl
Qctl =>
(nil, csargs, nil) := btos->btos(nil, t.data, len t.data);
(nil, crargs) := sys->tokenize(csargs, " \n");
cargs := lists->filter(is_nonempty, crargs);
case hd cargs {
"refresh" =>
case len cargs {
1 =>
tx <-= ref RegTMsg.RefreshAll();
<-rx;
srv.reply(ref Rmsg.Write(t.tag, len t.data));
* =>
pools := tl cargs;
for(i := 0; i < len pools; i++) {
pool := hd pools;
pools = tl pools;
tx <-= ref RegTMsg.Refresh(pool);
<-rx;
}
srv.reply(ref Rmsg.Write(t.tag, len t.data));
}
"check" =>
case len cargs {
1 => srv.reply(ref Rmsg.Error(t.tag, Ebadarg));
* =>
pools := tl cargs;
for(i := 0; i < len pools; i++) {
pool := hd pools;
pools = tl pools;
tx <-= ref RegTMsg.Check(pool);
<-rx;
}
srv.reply(ref Rmsg.Write(t.tag, len t.data));
}
"close" =>
case len tl cargs {
1 =>
pool := hd tl cargs;
tx <-= ref RegTMsg.Close(pool);
<-rx;
srv.reply(ref Rmsg.Write(t.tag, len t.data));
* => srv.reply(ref Rmsg.Error(t.tag, Ebadarg));
}
* =>
if(hd cargs != "refresh") {
sys->fprint(stderr, "is refresh\n");
}
srv.reply(ref Rmsg.Error(t.tag, Ebadarg));
}
# Default reply
* => srv.default(t);
}
# Default action
* => srv.default(t);
}
}
tree.quit();
}