shithub: wl9m

ref: bffc4bdc7f73139ae834619e3a018c17ceee9398
dir: /plan9/drawserv.c/

View raw version
/*
 * drawserv_udp.c - Plan 9 draw server with UDP (9front)
 *
 * Based on working drawserv.c (TCP version), converted to UDP.
 * Receives LZ4 compressed tiles, NO XOR delta.
 * 
 * Modified: Accumulate tiles until FLUSH, keeping only most recent frame per tile.
 *
 * Build:
 *   6c drawserv_udp.c lz4.c
 *   6l -o drawserv_udp drawserv_udp.6 lz4.6
 */

#include <u.h>
#include <libc.h>
#include <draw.h>
#include <thread.h>
#include <mouse.h>
#include <keyboard.h>
#include "lz4.h"

/* Protocol */
#define P9WL_MAGIC        0x50395749
#define P9WL_VERSION      1
#define P9WL_HDR_SIZE     20
#define P9WL_MTU          1400
#define P9WL_MAX_CHUNK    (P9WL_MTU - 44)

#define P9WL_SIZE_REQ     0x01
#define P9WL_SIZE_RESP    0x02
#define P9WL_TILE         0x10
#define P9WL_FLUSH        0x15
#define P9WL_NACK         0x16
#define P9WL_SCROLL       0x17
#define P9WL_MOUSE        0x20
#define P9WL_KEY          0x21
#define P9WL_RESIZE       0x22

#define P9WL_FLAG_COMPRESSED  0x01
#define P9WL_FLAG_HAS_COPIES  0x02

#define HDR_MAGIC     0
#define HDR_VERSION   4
#define HDR_TYPE      5
#define HDR_FLAGS     6
#define HDR_SCROLL_ID 7
#define HDR_SEQ       8
#define HDR_FRAME     12
#define HDR_TIMESTAMP 16

#define GET16(p) ((uint)(p)[0] | ((uint)(p)[1]<<8))
#define GET32(p) ((ulong)(p)[0] | ((ulong)(p)[1]<<8) | ((ulong)(p)[2]<<16) | ((ulong)(p)[3]<<24))
#define PUT16(p, v) do { (p)[0]=(uchar)(v); (p)[1]=(uchar)((v)>>8); } while(0)
#define PUT32(p, v) do { (p)[0]=(uchar)(v); (p)[1]=(uchar)((v)>>8); (p)[2]=(uchar)((v)>>16); (p)[3]=(uchar)((v)>>24); } while(0)

Mousectl *mctl;
Keyboardctl *kctl;
int mainstacksize = 32768;

/* UDP */
int udpdata = -1;
uchar clienthdr[52];
int clientknown = 0;
Lock udplock;
ulong sendseq = 0;
int debug = 0;

/* Display */
Image *tileimg;
int tileimgw, tileimgh;

/* Packet channel */
typedef struct Packet Packet;
struct Packet {
	uchar hdr[52];
	uchar data[2048];
	int len;
};
Channel *pktch;

/* Tile reassembly for multi-chunk tiles */
typedef struct Pending Pending;
struct Pending {
	int x, y, w, h;
	int flags;
	int chunk_count;
	int total_size;
	uchar *data;
	uchar *got;
	int ngot;
	ulong frame;
	vlong when;
};

#define MAXPENDING 64
Pending pending[MAXPENDING];
Lock pendlock;

#define MAX_COPIES 512

/* Drop counters for diagnostics */
long drop_pktch = 0;
long tiles_drawn = 0;
long tiles_failed = 0;
long tiles_superseded = 0;
ulong last_stats = 0;

/* Frame tile counting */
ulong current_frame = 0;

/* Tile tracking */
#define TILE_SIZE 16
#define MAX_TILES_X 256
#define MAX_TILES_Y 256

/* Inline decompression buffer */
uchar *inline_decbuf = nil;
int inline_decbuf_size = 0;

/*
 * Tile accumulation buffer
 * 
 * Tiles are buffered here until FLUSH. Each slot holds decompressed
 * pixels for one tile position. If multiple packets arrive for the
 * same tile, only the one with the highest frame number is kept.
 */
typedef struct AccumTile AccumTile;
struct AccumTile {
	int valid;
	ulong frame;
	int x, y, w, h;
	uchar *pixels;      /* Decompressed XBGR, w*h*4 bytes */
	int pixelsize;      /* Allocated size of pixels buffer */
};

#define MAX_ACCUM (MAX_TILES_X * MAX_TILES_Y)
AccumTile accum[MAX_ACCUM];

int
tile_index(int x, int y)
{
	int tx, ty;
	
	tx = x / TILE_SIZE;
	ty = y / TILE_SIZE;
	if(tx < 0 || tx >= MAX_TILES_X || ty < 0 || ty >= MAX_TILES_Y)
		return -1;
	return ty * MAX_TILES_X + tx;
}

/*
 * Store a decompressed tile in the accumulation buffer.
 * Returns 1 if stored, 0 if superseded by a newer frame.
 */
int
accum_tile(int x, int y, int w, int h, ulong frame, uchar *pixels)
{
	int idx, nbytes;
	AccumTile *t;
	
	idx = tile_index(x, y);
	if(idx < 0)
		return 0;
	
	t = &accum[idx];
	
	/* Only update if this frame is newer or equal */
	if(t->valid && frame < t->frame){
		tiles_superseded++;
		return 0;
	}
	
	nbytes = w * h * 4;
	
	/* Allocate or grow pixel buffer if needed */
	if(t->pixels == nil || t->pixelsize < nbytes){
		free(t->pixels);
		t->pixels = malloc(nbytes);
		if(t->pixels == nil)
			return 0;
		t->pixelsize = nbytes;
	}
	
	memmove(t->pixels, pixels, nbytes);
	t->x = x;
	t->y = y;
	t->w = w;
	t->h = h;
	t->frame = frame;
	t->valid = 1;
	return 1;
}

/*
 * Draw all accumulated tiles to screen, then clear the buffer.
 */
void
flush_accum(void)
{
	int i, count;
	AccumTile *t;
	Rectangle rect, dst;
	
	count = 0;
	for(i = 0; i < MAX_ACCUM; i++){
		t = &accum[i];
		if(!t->valid)
			continue;
		
		/* Ensure tileimg is big enough */
		if(tileimg == nil || t->w > tileimgw || t->h > tileimgh){
			if(tileimg)
				freeimage(tileimg);
			tileimgw = (t->w + 63) & ~63;
			tileimgh = (t->h + 63) & ~63;
			tileimg = allocimage(display, Rect(0, 0, tileimgw, tileimgh), XBGR32, 0, DNofill);
			if(tileimg == nil){
				fprint(2, "allocimage %dx%d failed: %r\n", tileimgw, tileimgh);
				t->valid = 0;
				continue;
			}
		}
		
		rect = Rect(0, 0, t->w, t->h);
		if(loadimage(tileimg, rect, t->pixels, t->w * t->h * 4) < 0){
			fprint(2, "loadimage failed: %r\n");
			t->valid = 0;
			continue;
		}
		
		dst.min = addpt(screen->r.min, Pt(t->x, t->y));
		dst.max = addpt(dst.min, Pt(t->w, t->h));
		draw(screen, dst, tileimg, nil, rect.min);
		
		t->valid = 0;
		count++;
	}
	
	tiles_drawn += count;
	flushimage(display, 1);
}

ulong
now_ms(void)
{
	return nsec() / 1000000;
}

void
udpsend(uchar *pkt, int len)
{
	uchar buf[2048];
	
	if(udpdata < 0 || !clientknown)
		return;
	
	lock(&udplock);
	if(len + 52 <= sizeof(buf)){
		memmove(buf, clienthdr, 52);
		memmove(buf + 52, pkt, len);
		write(udpdata, buf, 52 + len);
	}
	unlock(&udplock);
}

void
send_size_resp(void)
{
	uchar pkt[24];
	
	PUT32(pkt + HDR_MAGIC, P9WL_MAGIC);
	pkt[HDR_VERSION] = P9WL_VERSION;
	pkt[HDR_TYPE] = P9WL_SIZE_RESP;
	pkt[HDR_FLAGS] = 0;
	pkt[7] = 0;
	PUT32(pkt + HDR_SEQ, sendseq++);
	PUT32(pkt + HDR_FRAME, 0);
	PUT32(pkt + HDR_TIMESTAMP, now_ms());
	PUT16(pkt + 20, Dx(screen->r));
	PUT16(pkt + 22, Dy(screen->r));
	
	//fprint(2, "SIZE_RESP: %dx%d\n", Dx(screen->r), Dy(screen->r));
	udpsend(pkt, 24);
}

void
send_mouse(int x, int y, int b)
{
	uchar pkt[28];
	
	if(!clientknown)
		return;
	
	PUT32(pkt + HDR_MAGIC, P9WL_MAGIC);
	pkt[HDR_VERSION] = P9WL_VERSION;
	pkt[HDR_TYPE] = P9WL_MOUSE;
	pkt[HDR_FLAGS] = 0;
	pkt[7] = 0;
	PUT32(pkt + HDR_SEQ, sendseq++);
	PUT32(pkt + HDR_FRAME, 0);
	PUT32(pkt + HDR_TIMESTAMP, now_ms());
	PUT16(pkt + 20, x);
	PUT16(pkt + 22, y);
	pkt[24] = b;
	pkt[25] = pkt[26] = pkt[27] = 0;
	
	udpsend(pkt, 28);
}

void
send_key(Rune k)
{
	uchar pkt[28];
	
	if(!clientknown)
		return;
	
	PUT32(pkt + HDR_MAGIC, P9WL_MAGIC);
	pkt[HDR_VERSION] = P9WL_VERSION;
	pkt[HDR_TYPE] = P9WL_KEY;
	pkt[HDR_FLAGS] = 0;
	pkt[7] = 0;
	PUT32(pkt + HDR_SEQ, sendseq++);
	PUT32(pkt + HDR_FRAME, 0);
	PUT32(pkt + HDR_TIMESTAMP, now_ms());
	PUT32(pkt + 20, k);
	pkt[24] = 1;
	pkt[25] = pkt[26] = pkt[27] = 0;
	
	udpsend(pkt, 28);
}

void
send_resize(int w, int h)
{
	uchar pkt[24];
	int i;
	
	if(!clientknown)
		return;
	
	PUT32(pkt + HDR_MAGIC, P9WL_MAGIC);
	pkt[HDR_VERSION] = P9WL_VERSION;
	pkt[HDR_TYPE] = P9WL_RESIZE;
	pkt[HDR_FLAGS] = 0;
	pkt[7] = 0;
	PUT32(pkt + HDR_SEQ, sendseq++);
	PUT32(pkt + HDR_FRAME, 0);
	PUT32(pkt + HDR_TIMESTAMP, now_ms());
	PUT16(pkt + 20, w);
	PUT16(pkt + 22, h);
	
	for(i = 0; i < 5; i++){
		udpsend(pkt, 24);
		sleep(20);
	}
}

Pending*
get_pending(int x, int y, int w, int h, int chunk_count, int total_size, int flags, ulong frame)
{
	Pending *p;
	vlong now;
	int i;
	
	now = nsec();
	lock(&pendlock);
	
	/* Find existing */
	for(i = 0; i < MAXPENDING; i++){
		p = &pending[i];
		if(p->data && p->x == x && p->y == y && p->w == w && p->h == h && p->frame == frame){
			unlock(&pendlock);
			return p;
		}
	}
	
	/* Find free slot (or oldest) */
	Pending *oldest = &pending[0];
	for(i = 0; i < MAXPENDING; i++){
		p = &pending[i];
		if(p->data == nil){
			oldest = p;
			break;
		}
		if(p->when < oldest->when)
			oldest = p;
	}
	
	p = oldest;
	free(p->data);
	free(p->got);
	
	p->x = x;
	p->y = y;
	p->w = w;
	p->h = h;
	p->flags = flags;
	p->chunk_count = chunk_count;
	p->total_size = total_size;
	p->data = malloc(total_size);
	p->got = malloc(chunk_count);
	p->ngot = 0;
	p->frame = frame;
	p->when = now;
	
	if(p->data && p->got)
		memset(p->got, 0, chunk_count);
	
	unlock(&pendlock);
	return p;
}

int
add_chunk(Pending *p, int idx, uchar *data, int len)
{
	int offset;
	
	if(p == nil || p->data == nil || p->got == nil)
		return 0;
	if(idx < 0 || idx >= p->chunk_count)
		return 0;
	if(p->got[idx])
		return 0;
	
	offset = idx * P9WL_MAX_CHUNK;
	if(offset + len > p->total_size)
		len = p->total_size - offset;
	
	memmove(p->data + offset, data, len);
	p->got[idx] = 1;
	p->ngot++;
	
	return p->ngot >= p->chunk_count;
}

void
handle_tile(uchar *pkt, int len, int flags, ulong frame)
{
	int x, y, w, h;
	int chunk_idx, chunk_count;
	int total_size, chunk_size;
	uchar *data;
	Pending *p;
	int num_copies, i;
	int copy_offset;
	
	if(len < 40)
		return;
	
	x = GET16(pkt + 20);
	y = GET16(pkt + 22);
	w = GET16(pkt + 24);
	h = GET16(pkt + 26);
	chunk_idx = GET16(pkt + 28);
	chunk_count = GET16(pkt + 30);
	total_size = GET32(pkt + 32);
	chunk_size = GET32(pkt + 36);
	
	data = pkt + 40;
	if(len - 40 < (int)chunk_size)
		return;
	
	/* Single chunk - decompress and accumulate */
	if(chunk_count == 1){
		int nbytes = w * h * 4;
		int declen;
		int copy_dx[MAX_COPIES], copy_dy[MAX_COPIES];
		int nc = 0;
		
		/* Parse copy destinations first */
		if(flags & P9WL_FLAG_HAS_COPIES){
			copy_offset = 40 + chunk_size;
			if(len >= copy_offset + 2){
				num_copies = GET16(pkt + copy_offset);
				if(num_copies > MAX_COPIES) num_copies = MAX_COPIES;
				for(i = 0; i < num_copies && copy_offset + 2 + (i+1)*4 <= len; i++){
					copy_dx[i] = GET16(pkt + copy_offset + 2 + i*4);
					copy_dy[i] = GET16(pkt + copy_offset + 2 + i*4 + 2);
				}
				nc = i;
			}
		}
		
		/* Ensure decode buffer is large enough */
		if(nbytes > inline_decbuf_size){
			free(inline_decbuf);
			inline_decbuf = malloc(nbytes);
			inline_decbuf_size = inline_decbuf ? nbytes : 0;
		}
		if(inline_decbuf == nil){
			tiles_failed++;
			return;
		}
		
		/* Decompress */
		if(flags & P9WL_FLAG_COMPRESSED){
			declen = LZ4_decompress_safe((char*)data, (char*)inline_decbuf, chunk_size, nbytes);
			if(declen != nbytes){
				tiles_failed++;
				return;
			}
		} else {
			if((int)chunk_size != nbytes){
				tiles_failed++;
				return;
			}
			memmove(inline_decbuf, data, nbytes);
		}
		
		/* Accumulate primary tile */
		accum_tile(x, y, w, h, frame, inline_decbuf);
		
		/* Accumulate copies (same pixels, different positions) */
		for(i = 0; i < nc; i++){
			accum_tile(copy_dx[i], copy_dy[i], w, h, frame, inline_decbuf);
		}
		return;
	}
	
	/* Multi-chunk - reassemble first */
	p = get_pending(x, y, w, h, chunk_count, total_size, flags, frame);
	if(p == nil)
		return;
	
	if(!add_chunk(p, chunk_idx, data, chunk_size))
		return;
	
	/* Complete - decompress and accumulate */
	{
		int nbytes = w * h * 4;
		int declen;
		uchar *tiledata = p->data;
		int tilesize = p->total_size;
		int tileflags = p->flags;
		
		/* Ensure decode buffer is large enough */
		if(nbytes > inline_decbuf_size){
			free(inline_decbuf);
			inline_decbuf = malloc(nbytes);
			inline_decbuf_size = inline_decbuf ? nbytes : 0;
		}
		
		lock(&pendlock);
		free(p->got);
		p->got = nil;
		p->data = nil;
		unlock(&pendlock);
		
		if(inline_decbuf == nil){
			tiles_failed++;
			free(tiledata);
			return;
		}
		
		/* Decompress */
		if(tileflags & P9WL_FLAG_COMPRESSED){
			declen = LZ4_decompress_safe((char*)tiledata, (char*)inline_decbuf, tilesize, nbytes);
			free(tiledata);
			if(declen != nbytes){
				tiles_failed++;
				return;
			}
		} else {
			if(tilesize != nbytes){
				tiles_failed++;
				free(tiledata);
				return;
			}
			memmove(inline_decbuf, tiledata, nbytes);
			free(tiledata);
		}
		
		/* Accumulate tile */
		accum_tile(x, y, w, h, frame, inline_decbuf);
	}
}

void
handle_packet(uchar *pkt, int len, uchar *hdr)
{
	ulong magic, frame, seq;
	int type, flags;
	static ulong last_seq = 0;
	static int seq_init = 0;
	static long total_gaps = 0;
	static long total_pkts = 0;
	
	if(len < P9WL_HDR_SIZE)
		return;
	
	magic = GET32(pkt + HDR_MAGIC);
	if(magic != P9WL_MAGIC)
		return;
	
	if(pkt[HDR_VERSION] != P9WL_VERSION)
		return;
	
	/* Save client address */
	if(!clientknown){
		lock(&udplock);
		memmove(clienthdr, hdr, 52);
		clientknown = 1;
		unlock(&udplock);
		if(debug)
		fprint(2, "client connected\n");
	}
	
	type = pkt[HDR_TYPE];
	flags = pkt[HDR_FLAGS];
	frame = GET32(pkt + HDR_FRAME);
	seq = GET32(pkt + HDR_SEQ);
	
	/* Track sequence gaps */
	total_pkts++;
	if(seq_init){
		if(seq != last_seq + 1 && seq > last_seq){
			long gap = seq - last_seq - 1;
			total_gaps += gap;
			if(debug)
			fprint(2, "SEQ GAP: expected %uld got %uld (lost %ld, total lost %ld/%ld)\n", 
			       last_seq + 1, seq, gap, total_gaps, total_pkts);
		}
	}
	last_seq = seq;
	seq_init = 1;
	
	switch(type){
	case P9WL_SIZE_REQ:
		send_size_resp();
		break;
	case P9WL_TILE:
		handle_tile(pkt, len, flags, frame);
		break;
	case P9WL_FLUSH:
		/* Draw all accumulated tiles */
		flush_accum();
		break;
	case P9WL_SCROLL:
		/* Scroll disabled - ignore */
		break;
	}
}

void
netproc(void *arg)
{
	char *port = arg;
	char buf[64], ldir[40], ctlpath[40];
	int n, acfd, ctlfd;
	Packet *p;
	
	/* Pre-allocate packet pool */
	#define PACKET_POOL_SIZE 8192
	static Packet packet_pool[PACKET_POOL_SIZE];
	static int pool_idx = 0;
	
	acfd = open("/net/udp/clone", ORDWR);
	if(acfd < 0){
		fprint(2, "open /net/udp/clone: %r\n");
		threadexitsall("udp");
	}
	
	n = read(acfd, buf, sizeof(buf)-1);
	if(n <= 0){
		fprint(2, "read clone: %r\n");
		threadexitsall("udp");
	}
	buf[n] = 0;
	
	/* Try to increase UDP receive buffer */
	snprint(ctlpath, sizeof(ctlpath), "/net/udp/%s/ctl", buf);
	ctlfd = open(ctlpath, OWRITE);
	if(ctlfd >= 0){
		fprint(ctlfd, "rcvbuf 4194304");
		close(ctlfd);
	}
	
	snprint(ldir, sizeof(ldir), "/net/udp/%s/data", buf);
	
	if(fprint(acfd, "headers") < 0){
		fprint(2, "headers: %r\n");
		threadexitsall("udp");
	}
	
	if(fprint(acfd, "announce %s", port) < 0){
		fprint(2, "announce %s: %r\n", port);
		threadexitsall("udp");
	}
	
	udpdata = open(ldir, ORDWR);
	if(udpdata < 0){
		fprint(2, "open %s: %r\n", ldir);
		threadexitsall("udp");
	}
	if(debug)
	fprint(2, "listening on UDP port %s\n", port);
	
	for(;;){
		p = &packet_pool[pool_idx];
		pool_idx = (pool_idx + 1) % PACKET_POOL_SIZE;
		
		n = read(udpdata, p->data, sizeof(p->data));
		if(n <= 52)
			continue;
		
		memmove(p->hdr, p->data, 52);
		p->len = n - 52;
		memmove(p->data, p->data + 52, p->len);
		
		if(nbsend(pktch, &p) == 0){
			drop_pktch++;
		}
	}
}

void
threadmain(int argc, char *argv[])
{
	char *port;
	Mouse m;
	Rune k;
	Packet *pkt;
	Alt alts[5];
	int i;
	
	port = "5556";
	if(argc > 1)
		port = argv[1];
	
	if(initdraw(nil, nil, "drawserv") < 0)
		sysfatal("initdraw: %r");
	
	mctl = initmouse(nil, screen);
	if(mctl == nil)
		sysfatal("initmouse: %r");
	
	kctl = initkeyboard(nil);
	if(kctl == nil)
		sysfatal("initkeyboard: %r");
	
	pktch = chancreate(sizeof(Packet*), 8192);
	
	for(i = 0; i < MAXPENDING; i++){
		pending[i].data = nil;
		pending[i].got = nil;
	}
	
	/* Initialize accumulation buffer */
	for(i = 0; i < MAX_ACCUM; i++){
		accum[i].valid = 0;
		accum[i].pixels = nil;
		accum[i].pixelsize = 0;
	}
	
	draw(screen, screen->r, display->black, nil, ZP);
	flushimage(display, 1);
	if(debug)
	fprint(2, "drawserv_udp %dx%d\n", Dx(screen->r), Dy(screen->r));
	
	proccreate(netproc, port, 32768);
	
	alts[0].c = mctl->c;
	alts[0].v = &m;
	alts[0].op = CHANRCV;
	
	alts[1].c = mctl->resizec;
	alts[1].v = nil;
	alts[1].op = CHANRCV;
	
	alts[2].c = kctl->c;
	alts[2].v = &k;
	alts[2].op = CHANRCV;
	
	alts[3].c = pktch;
	alts[3].v = &pkt;
	alts[3].op = CHANRCV;
	
	alts[4].op = CHANEND;
	
	last_stats = now_ms();
	
	for(;;){
		/* Print stats every 5 seconds */
		ulong now = now_ms();
		if(now - last_stats > 5000){
			if(drop_pktch || tiles_superseded)
				if(debug)
				fprint(2, "stats: drops=%ld superseded=%ld drawn=%ld\n", 
				       drop_pktch, tiles_superseded, tiles_drawn);
			drop_pktch = 0;
			tiles_superseded = 0;
			tiles_drawn = 0;
			last_stats = now;
		}
		
		switch(alt(alts)){
		case 0:  /* mouse */
			send_mouse(m.xy.x - screen->r.min.x,
			          m.xy.y - screen->r.min.y,
			          m.buttons);
			break;
			
		case 1:  /* resize */
			if(getwindow(display, Refnone) < 0)
				sysfatal("getwindow: %r");
			
			if(tileimg){
				freeimage(tileimg);
				tileimg = nil;
				tileimgw = tileimgh = 0;
			}
			
			draw(screen, screen->r, display->black, nil, ZP);
			flushimage(display, 1);
			if(debug)
			fprint(2, "resize: %dx%d\n", Dx(screen->r), Dy(screen->r));
			send_resize(Dx(screen->r), Dy(screen->r));
			break;
			
		case 2:  /* keyboard */
			if(k == Kdel)
				threadexitsall(nil);
			send_key(k);
			break;
			
		case 3:  /* network packet */
			handle_packet(pkt->data, pkt->len, pkt->hdr);
			
			/* Drain ALL pending packets before checking other channels */
			while(nbrecv(pktch, &pkt) > 0){
				handle_packet(pkt->data, pkt->len, pkt->hdr);
			}
			break;
		}
	}
}