ref: 122eaf84ccbf1c6694943bfcce99b47c8c19440b
dir: /appl/cmd/ctlfs.b/
include "dial.m";
dial: Dial;
include "security.m";
auth: Auth;
include "keyring.m";
keyring: Keyring;
include "styx.m";
styx: Styx;
Tmsg, Rmsg: import Styx;
include "styxservers.m";
styxservers: Styxservers;
nametree: Nametree;
Tree: import nametree;
Styxserver, Fid, Navigator, Navop,
Eperm, Ecount, Eoffset: import styxservers;
# Database features
dbfeatures: list of string;
# Initial fs files
Qroot, Qctl, Qname, Qstatus: con big iota;
# create ctlfs and the appropriate listeners
run_ctlfs(cfg: Config, dbreg: ref DbRegistry, keyfile: string, algs: list of string)
{
sys->fprint(stderr, "setting up ctlfs\n");
dbfeatures = DBVER :: dbfeatures;
styx = load Styx Styx->PATH;
styxservers = load Styxservers Styxservers->PATH;
nametree = load Nametree Nametree->PATH;
if(debug)
sys->fprint(stderr, "ctlfs: checking if modules are loaded\n");
if(styx == nil)
error("ctlfs: styx module not found");
if(styxservers == nil)
error("ctlfs: styxservers module not found");
if(nametree == nil)
error("ctlfs: nametree module not found");
if(debug)
sys->fprint(stderr, "ctlfs: initializing modules\n");
auth->init();
styx->init();
styxservers->init(styx);
nametree->init();
# authinfo init
authinfo: ref Keyring->Authinfo;
if (keyfile == nil)
keyfile = "/usr/" + user() + "/keyring/default";
if(debug)
sys->fprint(stderr, "ctlfs: reading authinfo %s\n", keyfile);
authinfo = keyring->readauthinfo(keyfile);
if (authinfo == nil)
error(sys->sprint("ctlfs: cannot read %s: %r", keyfile));
# announcing
if(debug)
sys->fprint(stderr, "ctlfs: announcing dddbctl\n");
# addr := dial->netmkaddr(cfg.addr, "tcp", "dddbctl");
c := dial->announce(cfg.addr);
if(c == nil)
error(sys->sprint("ctlfs: cannot listen on %s\n", cfg.addr));
# bootstrapping
if(debug)
sys->fprint(stderr, "ctlfs: bootstrapping\n");
sys->unmount(nil, "/mnt/keys");
sys->unmount(nil, "/mnt");
# nametree; this is shared across all attachees
(tree, treeop) := nametree->start();
tree.create(Qroot, dir(".", 8r555|Sys->DMDIR, Qroot));
tree.create(Qroot, dir("ctl", 8r666, Qctl));
tree.create(Qroot, dir("status", 8r444, Qstatus));
sys->fprint(stderr, "ctlfs: finished setting up; starting\n");
# listener entrypoint
ctlfs_listener(cfg, dbreg, c, treeop, authinfo, algs);
tree.quit();
}
# dddbctl listener loop
ctlfs_listener(cfg: Config, dbreg: ref DbRegistry, c: ref Dial->Connection, treeop: chan of ref Styxservers->Navop, authinfo: ref Keyring->Authinfo, algs: list of string)
{
loop: for (;;) {
nc := dial->listen(c);
if (nc == nil)
error(sys->sprint("listen failed: %r"));
if (debug)
sys->fprint(stderr, "ctlfs: got connection from %s",
readfile(nc.dir + "/remote"));
dfd := dial->accept(nc);
if (dfd == nil)
continue loop;
if(nc.cfd != nil)
sys->fprint(nc.cfd, "keepalive");
hostname: string;
hostname = readfile(nc.dir + "/remote");
if(hostname != nil)
hostname = hostname[0:len hostname - 1];
regchan := dbreg.changen();
spawn ctlfs_authenticator(cfg, regchan, dfd, treeop, authinfo, algs, hostname);
}
}
# authenticate a connection and set the user id.
ctlfs_authenticator(cfg: Config, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), dfd: ref Sys->FD, treeop: chan of ref Styxservers->Navop, authinfo: ref Keyring->Authinfo,
algs: list of string, hostname: string)
{
# authenticate and change user id appropriately
(fd, err) := auth->server(algs, authinfo, dfd, 1);
if (fd == nil) {
if (debug)
sys->fprint(stderr, "ctlfs: authentication failed: %s\n", err);
return;
}
if (debug)
sys->fprint(stderr, "ctlfs: client authenticated as %s\n", err);
spawn ctlfs_loop(cfg, regchan, fd, treeop, hostname);
}
# filesystem loop; nb: hostname will be later used for stats
ctlfs_loop(cfg: Config, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), fd: ref Sys->FD, treeop: chan of ref Styxservers->Navop, nil: string)
{
(tc, srv) := Styxserver.new(fd, Navigator.new(treeop), big Qroot);
# registry rx/tx
(tx, rx) := regchan;
# Primary server loop
loop:
while((tm := <-tc) != nil) {
# Switch on operations being performed on a given Fid
pick t := tm {
# Open operation
Open =>
(f, mode, d, err) := srv.canopen(t);
if(f == nil){
srv.reply(ref Rmsg.Error(t.tag, err));
continue loop;
}
f.open(mode, d.qid);
case f.path {
# Qroot
Qroot =>
if(t.mode != Sys->OREAD) {
srv.reply(ref Rmsg.Error(t.tag, Eperm));
continue loop;
}
srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
# Qctl
Qctl =>
if((t.mode & (Sys->OTRUNC | Sys->ORCLOSE | Sys->OEXCL)) != 0) {
srv.reply(ref Rmsg.Error(t.tag, Eperm));
continue loop;
}
srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
# Qname
Qname =>
if((t.mode & (Sys->OTRUNC | Sys->ORCLOSE | Sys->OEXCL)) != 0) {
srv.reply(ref Rmsg.Error(t.tag, Eperm));
continue loop;
}
srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
# Qstatus
Qstatus =>
if(t.mode != Sys->OREAD) {
srv.reply(ref Rmsg.Error(t.tag, Eperm));
continue loop;
}
srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
# Default reply
* => srv.default(t);
}
# Read operation
Read =>
(f, err) := srv.canread(t);
if(f == nil) {
srv.reply(ref Rmsg.Error(t.tag, err));
break;
}
if(f.qtype & Sys->QTDIR){
srv.read(t);
continue loop;
}
case f.path {
# Qctl
Qctl =>
ctlmsg := joinstr(dbfeatures, "\n") + "\n";
ctlmsgbuf := array of byte ctlmsg;
rend := int t.offset + t.count;
if(rend > len ctlmsg)
rend = len ctlmsg;
srv.reply(ref Rmsg.Read(t.tag, ctlmsgbuf[(int t.offset):rend]));
# Qname
Qname =>
namemsg := cfg.name + "\n";
namemsgbuf := array of byte namemsg;
rend := int t.offset + t.count;
if(rend > len namemsg)
rend = len namemsg;
srv.reply(ref Rmsg.Read(t.tag, namemsgbuf[(int t.offset):rend]));
# Qstatus
Qstatus =>
info: list of string;
info = "name " + cfg.name :: info;
info = "sysname " + cfg.sysn :: info;
info = "addr " + cfg.addr :: info;
info = "storage " + cfg.storage :: info;
info = "fsworkers " + sys->sprint("%d", cfg.fswrks) :: info;
info = "" :: info;
info = "nodes" :: info;
tx <-= ref RegTMsg.GetNodes();
reply := <- rx;
pick r := reply {
Error => srv.reply(ref Rmsg.Error(t.tag, r.err));
NodeList =>
names := lists->reverse(r.names);
while(len names != 0) {
node := hd names;
sline := "";
tx <-= ref RegTMsg.Check(node);
crep := <- rx;
pick cr := crep {
Error => sline = cr.err;
Status =>
up := cr.count;
ps := cr.poolsize;
sline = sys->sprint("%d %d", up, ps);
* => sline = "unsupported message";
}
info = node + " " + sline :: info;
names = tl names;
}
* => srv.reply(ref Rmsg.Error(t.tag, "unsupported version"));
}
statusmsg := joinstr(lists->reverse(info), "\n") + "\n";
statusmsgbuf := array of byte statusmsg;
rend := int t.offset + t.count;
if(rend > len statusmsg)
rend = len statusmsg;
srv.reply(ref Rmsg.Read(t.tag, statusmsgbuf[(int t.offset):rend]));
# Default reply
* => srv.default(t);
}
# Write operation
Write =>
srv.default(t);
# Default action
* => srv.default(t);
}
}
}