ref: 2ab7efc23fd2137ce88d328441900e0a439a3508
dir: /appl/cmd/nodereg.b/
# initialize a node pool
NodePool.init(r: self ref NodePool): int
{
if(debug)
sys->fprint(stderr, "np: init'ing pool %s\n", r.cfg.name);
if(len r.instances == r.cfg.psize) {
sys->fprint(stderr, "np: pool %s already initialized\n", r.cfg.name);
return 1;
}
c := r.refresh();
if(debug)
sys->fprint(stderr, "np: pool %s init'ed: %d/%d\n", r.cfg.name, c, r.cfg.psize);
return 0;
}
NodePool.check(r: self ref NodePool): int
{
sc := 0;
if(debug)
sys->fprint(stderr, "np: refreshing pool %s\n", r.cfg.name);
for(i := 0; i < r.cfg.psize; i++) {
mtpt := "/n/" + r.cfg.name + "." + sys->sprint("%d", i);
ctlpath := mtpt + "/ctl";
if(sys->stat(ctlpath).t0 >= 0)
sc++;
else
sys->unmount(nil, mtpt);
}
return sc;
}
NodePool.refresh(r: self ref NodePool): int
{
sc := 0;
if(debug)
sys->fprint(stderr, "refreshing pool %s, with %d nodes\n", r.cfg.name, r.cfg.psize);
for(i := 0; i < r.cfg.psize; i++) {
mtpt := "/n/" + r.cfg.name + "." + sys->sprint("%d", i);
ctlpath := mtpt + "/ctl";
if(sys->stat(ctlpath).t0 >= 0)
continue;
sys->unmount(nil, mtpt);
err := r.newinst(mtpt);
if(err != nil)
break;
sc++;
}
return sc;
}
NodePool.close(r: self ref NodePool)
{
if(debug)
sys->fprint(stderr, "np: closing pool %s\n", r.cfg.name);
instances := r.instances;
while(len instances != 0) {
instance := hd instances;
instances = tl instances;
if(debug)
sys->fprint(stderr, "np: closing %s\n", instance);
sys->unmount(nil, instance);
}
sys->fprint(stderr, "np: pool %s closed\n", r.cfg.name);
}
NodePool.newinst(r: self ref NodePool, mtpt: string): string
{
(ec, ae) := sys->tokenize(r.cfg.addr, "!");
defnet := "tcp";
defsvc := "dddbctl";
nodename := r.cfg.name;
case ec {
1 or 2 =>
defnet = hd ae;
* =>
defnet = hd ae;
defsvc = hd tl tl ae;
}
keyfile := r.cfg.keyfile;
if(keyfile == "" || keyfile == nil)
keyfile = sys->sprint("/usr/%s/keyring/%s!%s!%s", user(), defnet, r.cfg.sysn, defsvc);
if(debug)
sys->fprint(stderr, "np: %s: reading keyfile %s\n", nodename, keyfile);
authinfo := keyring->readauthinfo(keyfile);
if (authinfo == nil) {
sys->fprint(stderr, "np: %s error: %r\n", nodename);
return sys->sprint("cannot read %s", keyfile);
}
addr := dial->netmkaddr(r.cfg.sysn, defnet, defsvc);
if(debug)
sys->fprint(stderr, "np: %s: dialing %s\n", nodename, addr);
(ok, c) := sys->dial(addr, nil);
if(ok < 0)
return sys->sprint("unable to dial %s", addr);
(fd, err) := auth->client("", authinfo, c.dfd);
if(fd == nil) {
sys->fprint(stderr, "np: %s: error authenticating: %s\n", nodename, err);
return err;
}
ok = sys->mount(fd, nil, mtpt, Sys->MREPL, nil);
if(ok < 0) {
sys->fprint(stderr, "np: %s: unable to mount %s\n", nodename, mtpt);
return sys->sprint("unable to mount %s\n", mtpt);
}
return nil;
}
# create an uninitialized registry
DbRegistry.new(cfgs: list of NodeConfig): ref DbRegistry
{
sys->fprint(stderr, "dbreg: creating up database registry\n");
nodepools: list of ref NodePool;
while(len cfgs != 0) {
nodepools = ref NodePool(hd cfgs, nil) :: nodepools;
cfgs = tl cfgs;
}
# rchans: list of chan of ref RegRMsg;
# tchans: list of chan of ref RegTMsg;
return ref DbRegistry(nodepools);
}
# initialize the registry
DbRegistry.init(r: self ref DbRegistry)
{
nodepools := r.nodepools;
count := 0;
sys->fprint(stderr, "dbreg: initializing pools\n");
while(len nodepools != 0) {
pool := hd nodepools;
nodepools = tl nodepools;
err := pool.init();
if(err)
count++;
}
sys->fprint(stderr, "dbreg: initialized %d out of %d pools\n", count, len r.nodepools);
}
DbRegistry.close(r: self ref DbRegistry)
{
nodepools := r.nodepools;
sys->fprint(stderr, "dbreg: closing all pools\n");
while(len nodepools != 0) {
pool := hd nodepools;
nodepools = tl nodepools;
spawn pool.close();
}
}
get_pool(r: ref DbRegistry, name: string): ref NodePool
{
nodepools := r.nodepools;
while(len nodepools != 0) {
pool := hd nodepools;
nodepools = tl nodepools;
if(pool.cfg.name == name)
return pool;
}
return nil;
}
run_chans(r: ref DbRegistry, tx: chan of ref RegTMsg, rx: chan of ref RegRMsg)
{
active := 1;
while(active) {
tm := <-tx;
pick msg := tm {
ChanClose =>
active = 0;
GetNodes =>
nodes: list of string;
nodepools := r.nodepools;
while(len nodepools != 0) {
pool := hd nodepools;
nodepools = tl nodepools;
nodes = pool.cfg.name :: nodes;
}
rx <-= ref RegRMsg.NodeList(nodes);
Check =>
pool := get_pool(r, msg.nodename);
if(pool == nil)
rx <-= ref RegRMsg.Error(Epoolnotfound);
c := pool.check();
rx <-= ref RegRMsg.Status(c, pool.cfg.psize);
Refresh =>
pool := get_pool(r, msg.nodename);
if(pool == nil)
rx <-= ref RegRMsg.Error(Epoolnotfound);
c := pool.refresh();
rx <-= ref RegRMsg.Status(c, pool.cfg.psize);
RefreshAll =>
nodepools := r.nodepools;
status: list of ref RegRMsg.Status;
for(i := 0; i < len nodepools; i++) {
sys->fprint(stderr, "idx: %d\n", i);
pool := hd nodepools;
nodepools = tl nodepools;
c := pool.refresh();
status = ref RegRMsg.Status(c, pool.cfg.psize) :: status;
}
rx <-= ref RegRMsg.StatusAll(status);
Close =>
pool := get_pool(r, msg.nodename);
if(pool == nil)
rx <-= ref RegRMsg.Error(Epoolnotfound);
pool.close();
rx <-= ref RegRMsg.Status(0, pool.cfg.psize);
}
}
}
DbRegistry.changen(r: self ref DbRegistry): (chan of ref RegTMsg, chan of ref RegRMsg)
{
tx := chan of ref RegTMsg;
rx := chan of ref RegRMsg;
spawn run_chans(r, tx, rx);
return (tx, rx);
}