shithub: ridefs

Download patch

ref: ee71fd2728c3201348fdc0ffbd8720d3597f2ac5
parent: 6b897c1d48b7f6e8a1ef0f5d3d7eb963c8183218
author: B. Wilson <x@wilsonb.com>
date: Thu Jun 19 09:13:32 EDT 2025

Steamed io requires manual read offset handling

--- a/ridefs.c
+++ b/ridefs.c
@@ -17,13 +17,17 @@
 	ulong umask;
 
 	/* internal use */
+	Req *req;       /* i/o req */
+	int pid;        /*     pid */
+	int oio;        /* io opened */
+	char *r;        /* response buffer */
+	int rn;         /*          length */
+	int roff;       /*          offset */
 	char *user;
 	char *rinfo;
 	char *qctl;
 	long time0;
 	int id;
-	int oio;        /* io opened */
-	int iopid;      /* read/write fork */
 	int fd;         /* data */
 	int cfd;        /* ctl */
 };
@@ -150,6 +154,8 @@
 	c->user = estrdup(getuser());
 	c->time0 = time(0);
 	c->qctl = ecalloc(bufsz);
+	c->r = ecalloc(bufsz);
+	c->rn = -1;
 	genqctl(i);
 
 	return i;
@@ -162,12 +168,13 @@
 	c = clientref(i);
 	if(c == nil || decref(c))
 		return;
-
+		
+	if(c->fd) close(c->fd);
+	if(c->cfd) close(c->cfd);
 	if(c->user) free(c->user);
 	if(c->qctl) free(c->qctl);
 	if(c->rinfo) free(c->rinfo);
-	if(c->fd) close(c->fd);
-	if(c->cfd) close(c->cfd);
+	if(c->r) free(c->r);
 
 	memset(c, 0, sizeof(*c));
 }
@@ -492,14 +499,16 @@
 			err = rideinit(f->client);
 			if(err == nil)
 				c->oio = 1;
-			c->iopid = 0;
 			respond(r, err);
+			c->pid = 0;
+			c->req = nil;
 			exits(nil);
 		case -1:
 			respond(r, "failed to init ride");
 			break;
 		default:
-			c->iopid = pid;
+			c->pid = pid;
+			c->req = r;
 			return;
 		}
 		break;
@@ -513,7 +522,7 @@
 	Rfid *f;
 	Client *c;
 	char *buf, *err;
-	int pid, n;
+	int pid, n, off;
 
 	buf = err = nil;
 	f = r->fid->aux;
@@ -528,23 +537,34 @@
 	case Qio:
 		switch(pid = rfork(RFPROC|RFNOWAIT|RFMEM)){
 		case 0:
-			buf = ecalloc(bufsz);
-			n = readmsg(c->fd, buf, bufsz);
-			if(n < 0)
-				err = "failed to read response";
-			else
-				readbuf(r, buf, n);
+			off = r->ifcall.offset - c->roff;
 
-			c->iopid = 0;
-			respond(r, err);
+			if(c->rn < 0){
+				c->rn = readmsg(c->fd, c->r, bufsz);
+				c->roff = r->ifcall.offset;
+				off = 0;
+			}
 
-			free(buf);
+			n = r->ifcall.count + off < c->rn ?
+				r->ifcall.count : c->rn - off;
+			if(c->rn == off) {
+				r->ofcall.count = 0;
+				c->rn = -1;
+			} else {
+				memmove(r->ofcall.data, c->r + off, n);
+				r->ofcall.count = n;
+			}
+
+			respond(r, err);
+			c->pid = 0;
+			c->req = nil;
 			exits(nil);
 		case -1:
 			err = "failed to fork read";
 			break;
 		default:
-			c->iopid = pid;
+			c->pid = pid;
+			c->req = r;
 			return;
 		}
 		break;
@@ -583,14 +603,17 @@
 		switch(pid = rfork(RFPROC|RFNOWAIT|RFMEM)){
 		case 0:
 			r->ofcall.count = writemsg(c->fd, d, n);
-			c->iopid = 0;
+
 			respond(r, nil);
+			c->pid = 0;
+			c->req = nil;
 			exits(nil);
 		case -1:
 			err = "failed to fork write";
 			break;
 		default:
-			c->iopid = pid;
+			c->pid = pid;
+			c->req = r;
 			return;
 		}
 		break;
@@ -610,9 +633,11 @@
 	if(o = r->oldreq)
 	if(f = o->fid->aux)
 	if(c = clientref(f->client))
-	if(0 < c->iopid){
-		postnote(PNPROC, c->iopid, "interrupt");
-		respond(r, "interrupted");
+	if(c->req){
+		postnote(PNPROC, c->pid, "interrupt");
+		respond(c->req, "interrupted");
+		c->pid = 0;
+		c->req = nil;
 	}
 
 	respond(r, nil);
--- a/test
+++ b/test
@@ -60,9 +60,15 @@
 		~ `{wc -c rinfo} 0 &&
 			fail 'Did not receive RIDE info'
 
-		echo -n '["Execute",{"text":"      ⍳5\n","trace":0}]' ||
-			fail 'Could not write message'
-		cat >[1=2]
+		cat >/dev/null ||
+			fail 'Did not receive initial message'
+
+		echo -n '["Execute",{"text":"⍳5\n","trace":0}]' ||
+			fail 'Could not send message'
+
+		res='["AppendSessionOutput",{"result":"⍳5\n","type":14,"group":0}]'
+		~ `{cat} $res ||
+			fail 'Did not receive response'
 	}
 	cd ..
 }
--